基于Streaming构建统一的数据处理引擎的挑战与实践

做者:杨克特/伍翀 整理:徐前进(Apache Flink Contributor) 校对:杨克特/伍翀html

本文整理自12月20日在北京举行的Flink-forward-china-2018大会, 分享嘉宾杨克特:花名鲁尼,Apache Flink Committer,2011年硕士毕业加入阿里,参与多个计算平台核心项目的设计和研发,包括搜索引擎、调度系统、监控分析等。目前负责Blink SQL的研发和优化。 分享嘉宾伍翀:花名云邪,Apache Flink Committer,从Flink v1.0就参与贡献,从事Flink Table & SQL相关工做已有三年,目前在阿里Blink SQL项目组。数据库

文章概述:基于Flink以流为本的计算引擎去构建一个流与批统一的解决方案 本文主要从如下5个方面来介绍基于Flink Streaming构建统一的数据处理引擎的挑战和实践。apache

  • 为何要统一批和流数组

  • 什么是统一的SQL处理引擎缓存

  • 如何统一批和流性能优化

  • 性能表现网络

  • 将来计划数据结构

1、为何要统一批和流

通常公司里面都会有一个比较传统的批处理系统天天去算一些报表,随着愈来愈多更实时的需求,你们也许会采用 Storm、Flink、Spark 来作流计算,同时又会在边上跑一个批处理,以小时或天的粒度去计算一个结果,来实现两边的校验。这个就是经典的 Lambda 架构。架构

可是这个架构有不少问题,好比用户须要学习两套引擎的开发方式,运维须要同时维护两套系统。更重要的问题是,咱们须要维护两套流程,一套增量流程,一套全量流程,同时这两套流程之间必需要有必定的自洽性,它们必需要保证一致。当业务变得愈来愈复杂的时候,这种一致性自己也会成为一个挑战。框架

这也是 Flink SQL 但愿解决的问题,但愿经过 Flink SQL,无论是开发人员仍是运维人员,只须要学习一套引擎,就能解决各类大数据的问题。也不只限于批处理或者说流计算,甚至能够更多。例如:支持高时效的批处理达到 OLAP 的效果,直接用SQL语法去作复琐事件处理(CEP),使用 Table API 或 SQL 来支持机器学习,等等。在 SQL 上还有很是多的想象空间。

2、什么是统一的SQL处理引擎

那么什么是统一的 SQL 处理引擎呢?统一的 SQL 引擎道路上有哪些挑战呢?

从用户的角度来讲,一句话来描述就是“一份代码,一个结果”。也就是只须要写一份代码,流和批跑出来的最终结果是同样。这个的好处是,用户不再用去保证增量流程和全量流程之间的一致性了,这个一致性将由 Flink 来保证。

从开发的角度来看,其实更加关注底层架构的统一,好比说一些技术模块是否是足够通用,流和批模式下是否能作到尽量地复用。精心设计的高效数据结构是否是能够普遍地应用在引擎的不少模块中。

用户角度

首先咱们有一张用户的分数表 USER_SCORES,里面有用户的名字、得分和得分时间。经过这张表来作一个很是简单的统计,统计每一个用户的总分,以及获得这个分数的最近时间是多少? 从流计算跟批处理的角度,无论是作一张离线报表,仍是实时不断地产出计算结果,它们的SQL是彻底如出一辙的,就是一个简单的GroupBy分组,求和,求max。

如上图有一张源数据表,有名字,分数,事件时间。

对于批处理,经过这样一个SQL能够直接拿到最终的结果,结果只会显示一次,由于在数据消费完以后,才会输出结果。

对于流处理,SQL 也是如出一辙的。假设流任务是从 12:01 开始运行的,这时候尚未收到任何消息,因此它什么结果都没输出。随着时间推移,收到了第一条 Julie 的得分消息,此时会输出 Julie 7 12:01。当到达12:04分时,输出的结果会更新成 Julie 8 12:03,由于又收到了一条 Julie 的得分消息。这个结果对于最终的结果来说多是不对的,可是至少在 12:04 这一刻,它是一个正确的结果。当时间推动到当前时间(假设是 12:08 分),全部已产生的消息都已消费完了,能够看到这时候的输出结果和批处理的结果是如出一辙的。

这就是“一份代码,一个结果”。其实从用户的角度来说,流计算跟批处理在结果正确性上并无区别,只是在结果的时效性上有一些区别。

开发角度

对于开发人员来讲,引擎的统一又是意味着什么?这张图是目前Flink的架构图,最上层 Table API 与 SQL。在执行以前,会根据执行环境翻译成 DataSet 或 DataStream 的 API。这两个 API 之间仍是有比较大的不一样,咱们能够放大以后看看。好比 DataSet API 是Flink批处理的API,它本身有一个优化器。 可是在DataStream API下,就是一些比较直观简单的翻译。而后在运行时,他们也是依托于不一样的task。在 DataStream 这边,主要是运行Stream Task,同时在里面会运行各类各样的operator。 在批处理这边主要是运行 Batch Task 和 Driver。这个主要是执行模式的区别。

在代码上能有多少能共用的呢?好比说要实现一个 INNER JOIN,以如今的代码距离,若是要在流上实现 INNER JOIN,首先会把两路输入变成两个DataStream,而后把两个输入 connect 起来,再进行 keyBy join key,最后实现一个 KeyedCoProcessOperator 来完成 join 的操做。可是在 DataSet 这边,你会发现 API 就不太同样了,缘由是 DataSet 底下是有个优化器的。换句话说,DataSet 的 API 有些是声明式的,DataStream 的 API 是命令式的。从这个例子上来看,对于咱们开发人员来讲,在流计算或者是在批处理下实现 JOIN 所面对的 API 实际上是比较不同的,因此这也很难让咱们去复用一些代码,甚至是设计上的复用。这个是API 上的区别,到了Runtime以后,比较大的不一样是 Stream Task 跟 Batch Task 的区别。

如上图主要的区别是 Batch Task 是以 pull 模式执行的,而 Stream Task 是以 push 模式执行的。先简单介绍一下这两个模式。假如说咱们须要从T表扫出数据,在A字段上作简单的过滤,而后对B字段进行求和。这个模型很经典,这里咱们须要关注的是控制流和数据流。

在一个经典的pull的模式下,首先会有执行器开始向执行的,能够理解为程序入口,它会向最后一个节点请求最终的结果。最后一个节点(求和节点)会向前一个节点(过滤节点)请求数据,而后再向前一个节点请求数据直到源节点。源节点就会本身去把数据读出来,而后一层层往下传递,最终最后一个节点计算完求和后返回给程序入口。这和函数调用栈很是相像。

在 push 模式下,在程序开始执行的时候,咱们会直接找到 DAG 的头节点,全部的数据和控制消息都由这个头节点往下发送,控制流会跟数据流一块儿,至关于它同时作一个函数调用,而且把数据发送给下一个算子,最终达到求和的算子。

经过这个简单例子,你们能够体会一下,这两个在执行模式上有很大的不一样,这会在 runtime 统一上带来不少问题,但其实他们完成的功能是相似的。

3、如何统一流和批

咱们在深度统一流和批的道路上遇到了这么多挑战,那么是如何作到统一的呢?

01 动态表

首先,你们已经愈来愈认同 SQL 是大数据处理的通用语言,不只仅是由于 SQL 是一个很是易于表达的语言,还由于 SQL 是一个很是适合于流批统一的语言。可是在传统的 SQL 里面,SQL 是一直做用在“表”上的,不是做用在“流”上的。怎么样让 SQL 可以做用在流上,并且让流式 SQL 的语义、结果和批同样呢?这是咱们遇到的第一个问题。为了解决这个问题,咱们和社区提出了“流表对偶性”还有“动态表”的这两个理论基础。在这里,这个理论基础咱们这里就不展开了,感兴趣的同窗能够去官网上阅读下这篇文章。你们只须要知道只有在基于这两个理论的基础上,流式SQL的语义才可以保证和批的语义是同样的,结果是同样的。

02 架构改进

如图是对架构上的一些改进。架构的改进主要集中在中间两层,在 Runtime 层咱们加强了现有的 Operator 框架,使得能支持批算子。在 Runtime 之上,咱们提出了一个 Query Processor 层,包括查询优化和查询执行,Table API & SQL 再也不翻译到 DataStream 和 DataSet ,而是架在 Query Processor 之上。

统一的 Operator 框架

在 Runtime 层,首先实现的是 Runtime DAG 层的统一,基于统一的DAG层之上,再去构建流的算子和批的算子。为了统一流和批的最底层的API,引入了一个统一的Operator层抽象。批算子的实现再也不基于 DataSet 底层 Deriver 接口实现,都基于 StreamOperator 接口来实现了。这样流和批都使用了统一的 Operator API 来实现。

除此以外,针对批的场景,咱们对 Operator 框架作了些扩展使得批能得到额外的优化。

第一点是 Operator 能够自主的选择输入边,例如hashjoin,批的hashjoin通常先会把build端处理完,把哈希表先build起来,而后再去处理另一边的probe端。

第二点是更加灵活的 Chaining 的策略。StreamOperator 的默认 Chaining 策略只能将单输入的 operator chain在一块儿。可是在批的一些场景,咱们但愿可以对多输入的Operator也进行Chaining。好比说两个 Join Operator,咱们但愿也可以 Chaining 在一块儿,这样这两个 Join Operator 之间的数据shuffle 就能够省掉。

关于统一的 Operator 框架,咱们已经在社区里面展开了讨论,感兴趣的同窗能够关注一下这个讨论连接

统一的查询处理

而后讲一下统一的Query Processor 层,无论是流计算仍是批处理的SQL,他们都将沿着相同的解析和优化的路径往前走。在解析层,也就是将 SQL 和 Table API 代码解析成逻辑计划,这里流和批完彻底全复用了同样的代码。而后在优化层流和批也使用了相同的优化器来实现,在优化器里面,全部的优化的规则都是可插拔的。流和批共用了绝大部分的优化的规则,只有少部分的规则是流特有的,或者是批特有的。而后在优化以后,获得了一个物理计划,物理的计划会通过翻译成最终的Execution DAG,也就是咱们刚刚讲的Stream Operator算子,最后会分布式地运行起来。

03 优化器的统一

在优化器这一层,很符合二八定律,也就是80%的优化规则都是流和批是共享的,好比说列裁剪、分区裁剪、条件下推等等这些都是共享的。还有20%的优化规则是流批特有的,通过咱们的研究发现比较有意思的一个规律。

批这边优化规则,不少都是跟sort相关相关的,由于流如今不支持sort,因此 sort 能够理解是批特有的,好比说一些sort merge join 的优化、sort agg的优化。

而流这边所特有的一些规则,都是跟state相关的。为何呢?由于目前流做业在生产中跑一个 SQL 的做业,通常会选择使用 RocksDB 的 StateBackend。RocksDB StateBackend,它有一个很大的瓶颈,就是你每一次的操做,都会涉及到序列化和反序列化,因此说 State 操做会成为一个流做业的瓶颈。因此如何去优化一个流做业,不少时候是思考如何节省State操做,如何减少State的size,如何避免存储重复的State数据。这些都是目前流计算任务优化的立竿见影的方向。

这里介绍一个流和批共用的高级一点的优化规则。你们能够先看一下上图左边这个query,这是一个通过简化以后的TBCH13的query,有一张用户表customer,还有一张订单表 orders,customer 表会根据 custkey 去 join 上 orders 这张表,而后 join 以后,再根据 custkey 来进行分组,统计出现过的订单的数量。

梳理一下就是要统计每一个客户下的订单数,可是订单的数据是存在orders表里面的,因此就须要去join这个orders表。这个query通过解析以后,获得的逻辑计划就是中间这个图。能够看到customer表和orders表进行了join,join以后作了一个agg。可是这里有一个问题,就是customer表和orders表都是两张很是大的表,都是上亿级别的。在批处理下,为这个join去build哈希表的时候,要用到大量的buffer,甚至还须要落盘,这就可能致使这个join性能比较差。 在流处理下也是相似的,须要把customer表和orders表全部的数据都存到state里面去。state越大,流处理性能也就越差。

因此说怎样去节省和避免数据量是这个查询优化的方向。咱们注意到customer它自己就带了一个主键就是custkey,最后的agg也是针对 custkey 进行聚合统计的。那么其实咱们能够先对orders表作一个聚合统计,先统计出每一个用户每一个custkey它下一个多少的订单,而后再和customer表作一个join,也就是说把agg进行下推,下推到了join以前,这样子,orders表就从原来的15亿的数据量压缩到了一个亿,而后再进行join。这个对于流和批都是巨大的性能优化。咱们在流场景下测试发现从原先耗时六个小时提高到了14分钟。

讲这个例子目的是想说明 SQL 已经发展了几十年了,有很是多的牛人在这个领域耕耘多年,已经有了很是多成熟的优化。这些优化,基于流批统一的模型,不少事能够直接拿过来给流用的。咱们不须要再为流在开发、研究一套优化规则,作到事半功倍的效果。

04 基本数据结构的统一

原先在 Flink 中无论流仍是批,具体干活的算子之间传递的都是一种叫 Row 的数据结构。 可是Row有这么几个比较典型的问题。

  1. Row结构很简单,里面就存了一个Object数组,好比说如今有一行数据,第一个是整形,另两个是字符串。那么row里面就会有一个Int,还有两个String。可是咱们知道Java在对象上,它会有一些额外的空间的一些开销。

  2. 另外对于主类型的访问,会有装箱和拆箱的开销。

  3. 还有在算Row的hashcode、序列化、反序列化时,须要去迭代Row里面数组的每个元素的hashcode方法、反序列化方法、序列化方法,这就会涉及到不少额外的虚函数调用的开销。

  4. 最后一点是对于一些稍微高级一点的数据结构,好比说排序器 sort,还有agg join中的一些hashtable,hashmap的这种二进制的数据结构,基于Row的这种封装,很难去作到极致的效率。

因此针对这些问题,咱们也提出了一个全新的数据结构BinaryRow,而后它是彻底基于二进制的结构来设计的。BinaryRow 分红了定长区和变长区,在定长区开头是一个null bit的一个区间,用来记录每一个字段是不是null值。而后像int,long,double这种定长的数据类型,咱们会直接把这个直接存在定长区里面,而后string这种变长形的数据,咱们会把他的变长的数据存在变长区,而后把他的指针还有他的长度存在定长区。在存放数据的时候,BinaryRow 中每个数据块都是八字节对齐的。 为何八字节对齐?一方面是为了更快的 random access,查找字段时不须要从头遍历,直接就能定位到字段的位置。另外一方面是可以作到更好的cpu的缓存。

BinaryRow 有一个比较重要的优势:延迟反序列化。例如从网络过来的二进制数据、从state拿到的二进制数据,不会立刻反序列化出来,而是会 wrap 成 BinaryRow,当须要的时候才进行反序列化,这能节省不少没必要要的反序列化操做,从而提高性能。通过测试,这个数据结构不只在批处理中表现的很是优秀,在流处理中也获得了一倍的性能提高。

05 实现Runtime共享

在Runtime的实现上,咱们已经有不少实现是共用的,好比说source,sink,Correlate,CodeGeneration。这里咱们展开讲讲维表关联和内存管理的流批复用。

1 维表关联

维表关联,你们应该都比较了解,就是一个流要去join一个存在外部的数据库。咱们会拿流数据的 ID 去 lookup 维表,这个lookup的过程,咱们会实现成同步的模式或者是异步的模式。 咱们知道 DataStream 上支持了异步IO接口,可是DataSet是没有的。不过因为咱们统一了Operator层,因此说批能够直接复用流的 operator 实现。虽然在传统的批处理中,若是要查询维表,会先把维表scan下来再作 JOIN。但若是说维表特别大,probe端特别小,这样多是不划算的,使用lookup的方式可能会更高效一些,因此说这也是弥补了批在某些场景的一个短板。

2 流的微批处理

为了不对state的频繁操做,咱们在流上引入了Micro-Batch 机制。实现方式就是在数据流中插入了一些 micro-batch 的事件。而后在Aggregate的Operator里面,收到数据的时候,咱们就会把它存到或直接聚合到二进制的哈希表里面(缓存到内存中)。而后当收到 micro-batch 事件的时候,再去触发二进制的映射表(BinaryHashMap),将缓存的结果刷到 state 中,并将输出最终结果。 这里 BinaryHashMap 是彻底和批这边复用的。流这边没有去从新造一套,在性能上也获得了十倍的提高。

4、性能表现

咱们先测试了一个批的性能,拿的是TPC-H去作一个测试。咱们与Flink1.6.0进行了比较,这个图是在1T的数据量下每一个query的一个耗时的对比,因此说耗时越小,它的性能也就越好。能够看出每个query,Blink都会比Flink1.6 要优秀不少。 平均性能上要比Flink1.6要快十倍。另外借助统一的架构,流也成功的攻克了全部的TBCH的query。值得一提的是,这是目前其余引擎作不到的。还有在今年的天猫双11上流计算,达到了17亿的TPS这么大的一个吞吐量。能达到这么高的性能表现,离不开咱们今天聊的统一流批架构。

5、将来计划

咱们会继续探索流和批的一些结合,由于流和批并非非黑即白的,不是说批就是批做业,流就是流做业,流和批之间还有不少比较大的空间值得咱们去探索。好比说一个做业,他可能一部分是一个一直运行的流做业,另外一部分是一个间隔调度的批做业,他们之间是融合运行着的。再好比一个批做业运行完以后,怎么样可以无缝地把它迁移成一个流做业,这些都是咱们将来尝试去作的一些研究的方向。

更多资讯请访问 Apache Flink 中文社区网站

相关文章
相关标签/搜索