Blink 有何特别之处?菜鸟供应链场景最佳实践

阿里妹导读:菜鸟供应链业务链路长、节点多、实体多,使得技术团队在建设供应链实时数仓的过程当中,面临着诸多挑战,如:如何实现实时变Key统计?如何实现实时超时统计?如何进行有效地资源优化?如何提高多实时流关联效率?如何提高实时做业的开发效率? 而 Blink 可否解决这些问题?下面一块儿来深刻了解。

背景

菜鸟从2017年4月开始探索 Blink(即 Apache Flink 的阿里内部版本),2017年7月开始在线上环境使用 Blink,做为咱们的主流实时计算引擎。git

为何短短几个月的探索以后,咱们就选择Blink做为咱们主要的实时计算引擎呢?github

在效率上,Blink 提供 DataStream、TableAPI、SQL 三种开发模式,强大的 SQL 模式已经知足大部分业务场景,配合半智能资源优化、智能倾斜优化、智能做业压测等功能,能够极大地提高实时做业的开发效率;在性能上,诸如MiniBatch&MicroBatch、维表 Async&Cache、利用 Niagara 进行本地状态管理等内部优化方案,能够极大地提高实时做业的性能;在保障上,Blink 自带的 Failover 恢复机制,可以实现线程级的恢复,能够作到分钟级恢复,配合 Kmonitor 监控平台、烽火台预警平台,能够有效地实现实时做业的数据保障。算法

接下来,我将结合供应链业务的一些业务场景,简要说明,Blink 如何解决咱们遇到的一些实际问题。数据库

回撤机制

订单履行是供应链业务中最多见的物流场景。什么是订单履行呢?当商家 ERP 推单给菜鸟以后,菜鸟履行系统会实时计算出每笔订单的出库、揽收、签收等节点的预计时间,配送公司须要按照各节点的预计时间进行订单的配送。为了保证订单的准点履约,咱们常常须要统计每家配送公司天天各个节点的预计单量,便于配送公司提早准备产能。缓存

看似很简单的实时统计加工,咱们在开发过程当中遇到了什么问题呢?履行重算!当物流订单的上游某个节点延迟时,履行系统会自动重算该笔订单下游全部节点的预计时间。好比某个物流订单出库晚点后,其后的预计揽收时间、预计签收时间都会重算。而对于大部分的实时计算引擎来讲,并不能很友好的支持这种变 Key 统计的问题。之前,数据量没那么大的时候,还能够经过 OLAP 数据库来解决这类场景,当量上来后, OLAP 方案的成本、性能都是很大的问题。网络

除了 OLAP 方案,咱们提倡采用 Blink 已经内置的 Retraction 机制,来解决这类变 Key 统计的问题,这也是咱们在2017年初就开始尝试 Blink 的重要缘由。Blink 的Retraction 机制,使用 State 在内存或者外部存储设备中对数据进行统计处理,当上游数据源对某些汇总 Key 的数据作更新时,Blink 会主动给下游下发一个删除消息从而“撤回”以前的那条消息,并用最新下发的消息对表作更新操做。并发

下面是一个简化后的案例,供了解Blink Retraction的内部计算过程:框架

对于上述案例,能够经过 Blink 提供的强大的、灵活的、简易的 SQL 开发模式来实现,只须要几行 SQL 便可完成。异步

select   plan_tms_sign_time
       ,sum(1) as plan_tms_sign_lgtord_cnt
from
       (select   lg_order_code
                ,last_value(plan_tms_sign_time) as plan_tms_sign_time
        from     dwd_csn_whc_lgt_fl_ord_ri
        group by lg_order_code
        ) ss
group by plan_tms_sign_time
;

维表关联

供应链业务的实体角色很是多(仓、配、分拨、站点、小件员、货主、行业、地区等),实体繁多,这意味着咱们在建设实时明细中间层的时候,会使用大量的维表关联,这对 Blink 在维表关联的性能上提出了更高的要求——如何提高大量的大小维表的关联性能?Blink 历来没让用户失望,Blink SQL 模式在维表关联的性能上,也作了大量的优化:ide

优化1:Async IO,有一些实时计算引擎,维表关联是采用同步访问的方式,即来一条数据,去数据库查询一次,等待返回后输出关联结果。这种方式,能够发现网络等待时间极大地阻碍了吞吐和延迟。而 Blink 采用了异步访问的模式,能够并发地处理多个请求和回复,从而连续地请求之间不须要阻塞等待,吞吐量大大提高。

优化2:缓存,维表关联涉及到大量的维表查询请求,其中可能存在大量相同 Key 的重复请求。Blink SQL 模式提供了缓存的机制,并提供 LRU 和 ALLCache 两种缓存方案。

用户能够经过配置 Cache='LRU' 参数,开启 LRU 缓存优化。开启后,Blink 会为每一个 JoinTable 节点建立一个 LRU 本地缓存。当每一个查询进来的时候,先去缓存中查询,若是存在则直接关联输出,减小了一次 IO 请求。若是不存在,再发起数据库查询请求,请求返回的结果会先存入缓存中以备下次查询。

若是维表数据不大,用户能够经过配置 Cache='ALL' 参数,对维表进行全量缓存。这样,全部对该维表的查询操做,都会直接走本地缓存模式,几乎没有 IO,关联的性能很是好。

优化3:缓存无效 Key,若是维表很大,没法采用 ALLCache 的方案,而在使用 LRU 缓存时,会存在很多维表中不存在的 Key 。因为命中不了缓存,致使缓存的收益较低,仍然会有大量请求发送到数据库,而且LRU模式下缓存里的key不会永久保留,能够经过调整参数,设置保留时间。

优化4:Distribute By 提升缓存命中率,默认状况下,维表关联的节点与上游节点之间是 Chain 在一块儿,不通过网络。这在缓存大小有限、Key 总量大、热点不明显的状况下, 缓存的收益可能较低。这种状况下能够将上游节点与维表关联节点的数据传输改为按 Key 分区。这样一般能够缩小单个节点的 Key 个数,提升缓存的命中率。

除了上述几点优化,Blink SQL 模式还在尝试引入 SideInput、Partitioned ALL Cache 等优化方案,相信在随后开源的 Blink 版本中,维表关联的性能会愈来愈好。

下面是一张来自 Flink Committer 云邪 异步查询的流程图,供理解与同步请求的差别。

数据倾斜

无数据不倾斜,咱们在实时数仓建设过程当中,也固然会遇到数据倾斜问题。在统计卖家的单量时,有些卖家单量大,有些卖家单量小,单量超大的卖家,就会产生数据倾斜;在统计行业的单量时,有些行业单量大,有些行业单量小,单量超大的行业,就会产生数据倾斜;在统计货品的库存流水状况时,有些货品库存流水频繁,一些货品库存流水较少,库存流水超频繁的货品就会产生数据倾斜……

咱们应该如何处理数据倾斜问题呢?以统计卖家的单量为例,之前咱们会先把订单这个 Key 做 Hash,先针对 Hash 以后的值作一次去重的聚合操做,再在此基础上,再作一次针对原 Key 去重的聚合操做。两次相似的聚合操做,致使代码写起来比较复杂,体力劳动比较多。

2017年,咱们的实时数据开始全面切换到 Blink 上,Blink 在数据倾斜这块,又给咱们提供了什么的方案呢?Blink 给出的答案是:MiniBatch/MicroBatch+LocalGlobal+PartialFinal。

MiniBatch/MicroBatch,能够实现微批处理,进而减小对 State 的访问,提高吞吐。由于微批处理会致使必定的延迟,最好结合 Blink 提供的容许延迟的相关参数来使用。

LocalGlobal,分为 Local 和 Global 两个阶段,有点相似 MapReduce 中的Combine 和 Reduce 两个阶段。LocalGlobal 能够很好地处理非去重类的聚合操做,但对 Count Distinct 的优化效果通常,由于在 Local 阶段,可能 Distinct Key的去重率并不会很高,进而致使后续的 Global 阶段,仍然会有热点。

PartialFinal,能够很好地解决 Count Distinct 带来的数据倾斜问题。PartialFinal 能够将 Distinct Key 自动打散,先聚合一次,在此基础上,再聚合一次,从而实现打散热点的做用。PartialFinal 跟手动 Hash 再聚合两次的效果一致,经过 Blink 提供的 PartialFinal 参数,能够自动实现,再也不须要人为手工编写 Hash 再聚合两次的代码。

由上能够看出,Blink 在数据倾斜的处理上,已经实现了自动化,之前人为编写的打散热点方案,如今几个参数就能所有搞定,大大提高了代码的编写效率。

下面是相关参数,用户能够直接在 Blink 的做业参数中进行配置。

miniBatch/microBatch攒批的间隔时间

blink.miniBatch.allowLatencyMs=5000
blink.microBatch.allowLatencyMs=5000

防止OOM,每一个批次最多缓存多少条数据

blink.miniBatch.size=20000

开启LocalGlobal

blink.localAgg.enabled=true

开启PartialFinal

blink.partialAgg.enabled=true

超时统计

上架是仓储业务的重要组成部分。上架,顾名思义,就是要把到仓的货品,上到仓库的存储货架上。上架通常分为采购上架、销退上架、调拨上架等。及时上架是对仓库的重要考核项之一,不管哪种类型的上架,咱们常常须要针对到货后超过 x 小时未上架的订单进行预警。

可是,Blink 的计算是消息机制,须要上游发送消息才能触发下游计算,而上述的场景中,未上架就说明不会有上架的消息流入 Blink,进而没法完成下游的计算。

对于这种实时超时统计的问题,应该如何来解呢?咱们尝试了几种方案,供参考:

方案1:针对部分 Source Connector,Blink 提供了"延时下发"的功能,用户能够经过指定 DataDeliveryDelayMs 参数,实现消息延迟下发。正常的消息正常流入,正常消息也能够经过配置该参数,使其按照本身的需求延时流入。这样,经过正常流入的消息关联延时流入的消息,能够触发 Blink 在消息正常流入时计算一次,在延时消息流入时再触发计算一次。这种方案,能够实现咱们的业务需求,可是这种方案会把全部消息从新发送一遍,而不只仅是到货后超过x小时未上架的消息,这样会形成计算资源的浪费,咱们不建议在数据量很大的场景下使用该方案。

方案2:若是有第三方的消息中间件,而这个消息中间件又能支持配置超时下发的规则,这将是一个比较好的方案。据了解,Kafka 的最新版本已经可以根据业务需求,配置消息超时下发的规则。咱们只须要在 Blink 中,经过正常流入的消息流关联关键Kafka 超时下发的消息流,就能够触发 Blink 进行超时消息的统计。这样,除了Blink,咱们须要同时保障 Kafka 的稳定性。Kafka的超时消息订阅,能够参见:[1]。

方案3:咱们可以很天然的想到 CEP,而 Blink 也已经提供了 CEP 的功能,且已经SQL化。用户能够经过 Blink CEP 完成上述业务需求的统计。在实操过程当中,咱们发现,经过 Blink CEP 统计的结果,每每与真实结果(明细汇总统计)有必定的出入。什么缘由呢?原来到货时间,被回传了屡次,有可能开始回传的是9点,可是后面发现回传错了,改为了8点,而 CEP 的 Watermark 是全局地向前走的,对于这种场景,没法很好的适配。

方案4:Flink 的 ProcessFunction,是一个 Low-Level 的流处理操做。经过改写其中的 ProcessElement 方法,能够告诉 Blink的State 里面存什么,以及如何更新State;经过改写 OnTimer 方法,能够告诉 State 什么时候下发超时消息。经过对上述几种方案的原理对比及性能压测,咱们最终选择的也是这套方案。因为超时场景,在供应链业务中很是常见,咱们已经将该方案沉淀下来,一样的场景,经过 1min 配置下相关参数,便可完成相似场景超时消息的下发。

下面是方案4简化后的实现框架图,供了解相关实现及优点。

零点起跳

每次大促,大屏上零点时刻双十一的零点时刻一直是你们关注的焦点,为了在零点一过就让各项指标尽快在大屏上展示出来,咱们进行了一些端到端的优化,供参考。

优化1:合理调整 Blink 读取上游消息源的 FlushInterval 。咱们知道 Blink 是以Block 的形式传输数据,若是 Block 一直积攒不满,Block 可能一直等待没法下发。这种状况,咱们能够经过调整 FlushInterval 参数,直接控制多长时间往下游 sink 一次。这样,Block 积满或间隔达到知足其中一个条件,Block 就会往下流。

优化2:合理调整 MiniBatch/MicroBatch的size 和 AllowLatency 参数。前文提到,MiniBatch/MicroBatch 是微批处理模式,都会带来必定的延迟,能够经过合理控制 Size 和 AllowLatency 参数,来控制该模式带来的延迟。与优化1同样,二者知足其一,就会往下继续执行。

优化3:合理控制写 Checkpoint 的方式以及 Checkpoint 的大小。利用 Checkpoint 实现 Exactly Once 的容错方式一直是 Flink 做为流引擎的一个亮点。可是过于复杂的运算和网络环境有可能致使 checkpoint 的对齐时间过长,从而致使整个 Job 的延迟变长。同时,Exactly Once 模式下作 Checkpoint 的时间间隔与整个任务中数据流的延迟也是一个 Trade Off。所以咱们在处理特别复杂的 Job 时也将这个因素考虑了进去,并无使用默认的 Exactly Once 方式,而是依旧实际需求采用了 At Least Once 。同时,将 Checkpoint 的周期设置为了60s,尽量的保证了任务在延迟较小的状况下,在 Failover 的情形下仍然能作到快速恢复。

优化4:除了 Blink 端,在数据服务端,大屏上的实时数据,咱们建议采用查询性能优异的 Hbase 做为存储引擎,能够保证零点一过,三秒内便能实现大屏数据的跳动。
……

将来展望

Blink 在不断快速地发展,不只仅是流处理,当前也开始支持批处理,用户只须要写一套代码就能够同时实现批和流的数据开发,当前在日志型的数据场景上,咱们也正在探索利用 Blink 直接实现批流混合模式;不只仅是半智能资源调优,当前开始内测智能资源调优,Blink 能够根据吞吐量、算子复杂度等因素,对线上做业的资源配置进行全智能自适应调优,不再用在大促前手动更改资源配置;不只仅是 Java,更指望有 Python 等多语言生态,来描述计算逻辑,相信开发效率又会上一个新的台阶;不只仅是 ETL,更指望有更广阔的大数据算法集成,能够实现复杂的大数据AI场景……将来已来,咱们相信,Blink 已经作好了迎接将来的准备。

参考资料:

[1]https://ketao1989.github.io/2016/01/02/delayed-message-consume-service-use-kafka/



本文做者: 晨笙、缘桥 

阅读原文

本文来自云栖社区合做伙伴“阿里技术”,如需转载请联系原做者。

相关文章
相关标签/搜索