Flink在饿了么的应用与实践

本文做者:易伟平(饿了么)java

整理:姬平(阿里巴巴实时计算部)python

本文将为你们展现饿了么大数据平台在实时计算方面所作的工做,以及计算引擎的演变之路,你能够借此了解Storm、Spark、Flink的优缺点。如何选择一个合适的实时计算引擎?Flink凭借何种优点成为饿了么首选?本文将带你一一解开谜题。mysql

平台现状

下面是目前饿了么平台现状架构图:程序员

来源于多个数据源的数据写到 kafka 里,计算引擎主要是 Storm , Spark 和 Flink,计算引擎出来的结果数据再落地到各类存储上。

目前 Storm 任务大概有100多个,Spark任务有50个左右,Flink暂时还比较少。redis

目前咱们集群规模天天数据量有60TB,计算次数有1000000000,节点有400个。这里要提一下,Spark 和 Flink都是 on yarn 的,其中Flink onyarn主要是用做任务间 jobmanager 隔离, Storm 是 standalone 模式。sql

应用场景

1.一致性语义

在讲述咱们应用场景以前,先强调实时计算一个重要概念, 一致性语义:apache

  1. at-most-once:即 fire and forget,咱们一般写一个 java 的应用,不去考虑源头的 offset 管理,也不去考虑下游的幂等性的话,就是简单的at-most-once,数据来了,无论中间状态怎样,写数据的状态怎样,也没有ack机制。api

  2. at-least-once: 重发机制,重发数据保证每条数据至少处理一次。性能优化

  3. exactly-once: 使用粗 checkpoint 粒度控制来实现 exactly-once,咱们讲的 exactly-once 大多数指计算引擎内的 exactly-once,即每一步的 operator 内部的状态是否能够重放;上一次的 job 若是挂了,可否从上一次的状态顺利恢复,没有涉及到输出到 sink 的幂等性概念。架构

  4. at-least-one + idempotent = exactly-one:若是咱们能保证说下游有幂等性的操做,好比基于mysql实现 update on duplicate key;或者你用es, cassandra之类的话,能够经过主键key去实现upset的语义, 保证at-least-once的同时,再加上幂等性就是exactly-once。

2. Storm

饿了么早期都是使用Storm,16年以前仍是Storm,17年才开始有Sparkstreaming, Structed-streaming。Storm用的比较早,主要有下面几个概念:

  1. 数据是 tuple-based

  2. 毫秒级延迟

  3. 主要支持java, 如今利用apache beam也支持python和go。

  4. Sql的功能还不完备,咱们本身内部封装了typhon,用户只须要扩展咱们的一些接口,就可使用不少主要的功能;flux是Storm的一个比较好的工具,只须要写一个yaml文件,就能够描述一个Storm任务,某种程度上说知足了一些需求,但仍是要求用户是会写java的工程师,数据分析师就使用不了。

★ 2.1 总结
  1. 易用性:由于使用门槛高,从而限制了它的推广。

2)StateBackend:更多的须要外部存储,好比redis之类的kv存储。

  1. 资源分配方面:用worker和slot提早设定的方式,另外因为优化点作的较少,引擎吞吐量相对比较低一点。

3. Sparkstreaming

有一天有个业务方过来提需求说 咱们能不能写个sql,几分钟内就能够发布一个实时计算任务。 因而咱们开始作Sparkstreaming。它的主要概念以下:

  1. Micro-batch:须要提早设定一个窗口,而后在窗口内处理数据。

  2. 延迟是秒级级别,比较好的状况是500ms左右。

  3. 开发语言是java和scala。

  4. Streaming SQL,主要是咱们的工做,咱们但愿提供 Streaming SQL 的平台。

特色:

  1. Spark生态和 SparkSQL: 这是Spark比较好的地方,技术栈是统一的,SQL,图计算,machine learning的包都是能够互调的。由于它先作的是批处理,和Flink不同,因此它自然的实时和离线的api是统一的。

  2. Checkpointon hdfs。

  3. On Yarn:Spark是属于 hadoop 生态体系,和 yarn 集成度高。

  4. 高吞吐: 由于它是 micro-batch 的方式,吞吐也是比较高的。

下面给你们大体展现一下咱们平台用户快速发布一个实时任务的操做页面,它须要哪些步骤。咱们这里不是写 DDL 和 DML 语句,而是 UI 展现页面的方式。

页面里面会让用户选一些必要的参数, 首先会选哪个 kafka 集群,每一个分区消费多少,反压也是默认开启的。消费位置须要让用户每次去指定,有可能用户下一次重写实时任务的时候,能够根据业务需求去选择offset消费点。

中间就是让用户描述 pipeline。 SQL 就是 kafka 的多个 topic,输出选择一个输出表,SQL 把上面消费的 kafka DStream 注册成表,而后写一串 pipeline,最后咱们帮用户封装了一些对外 sink (刚刚提到的各类存储都支持,若是存储能实现 upsert 语义的话,咱们都是支持了的)。

★ 3.1 MultiStream-Join

虽然刚刚知足通常无状态批次内的计算要求,但就有用户想说, 我想作流的 join 怎么办, 早期的 Spark1.5 能够参考 Spark-streamingsql 这个开源项目把 DStream 注册为一个表,而后对这个表作 join 的操做,但这只支持1.5以前的版本,Spark2.0 推出 structured streaming 以后项目就废弃了。咱们有一个 tricky 的方式:

让 Sparkstreaming 去消费多个 topic,可是我根据一些条件把消费的 DStream 里面的每一个批次 RDD 转化为DataFrame,这样就能够注册为一张表,根据特定的条件,切分为两张表,就能够简单的作个 join,这个 join 的问题彻底依赖于本次消费的数据,它们 join 的条件是不可控的,是比较 tricky 的方式。好比说下面这个例子,消费两个 topic,而后简单经过 filer 条件,拆成两个表,而后就能够作个两张表的 join,但它本质是一个流。

★ 3.2 Exactly-once

exactly-once 须要特别注意一个点:

咱们必需要求数据 sink 到外部存储后,offset 才能 commit,不论是到 zookeeper,仍是 mysql 里面,你最好保证它在一个 transaction 里面,并且必须在输出到外部存储(这里最好保证一个 upsert 语义,根据 unique key 来实现upset语义)以后,而后这边源头driver再根据存储的 offeset 去产生 kafka RDD,executor 再根据 kafka 每一个分区的 offset 去消费数据。若是知足这些条件,就能够实现端到端的 exactly-once 这是一个大前提。

★ 3.3 总结
  1. Stateful Processing SQL ( <2.x mapWithState、updateStateByKey):咱们要实现跨批次带状态的计算的话,在1.X版本,咱们经过这两个接口去作,但仍是须要把这个状态存到 hdfs 或者外部去,实现起来比较麻烦一点。

  2. Real Multi-Stream Join:没办法实现真正的多个流join的语义。

  3. **End-To-End Exactly-Once Semantics:**它的端到端的 exactly-once 语义实现起来比较麻烦,须要sink到外部存储后还须要手动的在事务里面提交offset。

4. STRUCTURED STREAMING

咱们调研而后并去使用了 Spark2.X 以后带状态的增量计算。下面这个图是官方网站的:

全部的流计算都参照了 Google 的 data flow,里面有个重要的概念:数据的 processing time 和 event time,即数据的处理时间和真正的发生时间有个 gap 。因而流计算领域还有个 watermark,当前进来的事件水位须要watermark 来维持,watermark 能够指定时间 delay 的范围,在延迟窗口以外的数据是能够丢弃的,在业务上晚到的数据也是没有意义的。

下面是 structured streaming 的架构图:

这里面就是把刚才 sparkstreaming 讲 exactly-once 的步骤1,2,3都实现了,它本质上仍是分批的 batch 方式,offset 本身维护,状态存储用的 hdfs,对外的 sink 没有作相似的幂等操做,也没有写完以后再去 commit offset,它只是再保证容错的同时去实现内部引擎的 exactly-once。

★ 4.1 特色
  1. Stateful Processing SQL&DSL:能够知足带状态的流计算

  2. Real Multi-Stream Join:能够经过 Spark2.3 实现多个流的 join,多个流的 join 作法和 Flink 相似,你须要先定义两个流的条件(主要是时间做为一个条件),好比说有两个topic的流进来,而后你但愿经过某一个具体的 schema 中某个字段(一般是 event time)来限定须要 buffer 的数据,这样能够实现真正意义上的流的 join。

3)比较容易实现端到端的 exactly-once 的语义,只须要扩展sink的接口支持幂等操做是能够实现 exactly-once的。

特别说一下,structured streaming 和原生的 streaming 的 API 有一点区别,它建立表的 Dataframe 的时候,是须要指定表的 schema 的,意味着你须要提早指定 schema。另外它的 watermark 是不支持 SQL 的,因而咱们加了一个扩展,实现彻底写 SQL,能够从左边到右边的转换(下图),咱们但愿用户不止是程序员,也但愿不会写程序的数据分析师等同窗也能用到。

★ 4.2 总结
  1. Trigger(Processing Time、 Continuous ):2.3以前主要基于processing Time,每一个批次的数据处理完了立马触发下一批次的计算。2.3推出了record by record的持续处理的trigger。

  2. Continuous Processing (Only Map-Like Operations):目前它只支持map like的操做,同时sql的支持度也有些限制。

  3. LowEnd-To-End Latency With Exactly-Once Guarantees:端到端的exactly-once的保证须要本身作一些额外的扩展, 咱们发现kafka0.11版本提供了事务的功能,是能够从基于这方面考虑从而去实现从source到引擎再到sink,真正意义上的端到端的exactly-once。

  4. CEP(Drools):咱们发现有业务方须要提供 CEP 这样复琐事件处理的功能,目前咱们的语法没法直接支持,咱们让用户使用规则引擎 Drools,而后跑在每一个 executor 上面,依靠规则引擎功能去实现 CEP。

因而基于以上几个 Spark Structured Streaming 的特色和缺点,咱们考虑使用 Flink 来作这些事情。

5.Flink

Flink 目标是对标 Spark,流这块是领先比较多,它野心也比较大,图计算,机器学习等它都有,底层也是支持yarn,tez等。对于社区用的比较多的存储,Flink 社区官方都支持比较好,相对来讲。

Flink 的框架图:

Flink中的 JobManager,至关于 Spark 的 Driver 角色,TaskManger 至关于 Executor,里面的 Task 也有点相似Spark 的那些 Task。 不过 Flink 用的 RPC 是 akka,同时 Flink Core 自定义了内存序列化框架,另外 Task 无需像Spark 每一个 Stage 的 Task 必须相互等待而是处理完后即往下游发送数据。

Flink binary data处理operator:

Spark 的序列化用户通常会使用 kryo 或者 java 默认的序列化,同时也有 Tungsten 项目对 Spark 程序作一 JVM 层面以及代码生成方面的优化。相对于 Spark,Flink本身实现了基于内存的序列化框架,里面维护着key和pointer 的概念,它的 key 是连续存储,在 CPU 层面会作一些优化,cache miss 几率极低。比较和排序的时候不须要比较真正的数据,先经过这个 key 比较,只有当它相等的时候,才会从内存中把这个数据反序列化出来,再去对比具体的数据,这是个不错的性能优化点。

Flink Task Chain:

Task中 operator chain,是比较好的概念。若是上下游数据分布不须要从新 shuffle 的话,好比图中 source 是kafka source,后面跟的 map 只是一个简单的数据 filter,咱们把它放在一个线程里面,就能够减小线程上下文切换的代价。

并行度概念

好比说这里面会有 5 个 Task,就会有几个并发线程去跑,chain 起来的话放在一个线程去跑就能够提高数据传输性能。Spark 是黑盒的,每一个 operator 没法设并发度,而 Flink 能够对每一个 operator 设并发度,这样能够更灵活一点,做业运行起来对资源利用率也更高一点。

Spark 通常经过 Spark.default.parallelism 来调整并行度,有 shuffle 操做的话,并行度通常是通Spark.sql.shuffle.partitions 参数来调整,实时计算的话其实应该调小一点,好比咱们生产中和 kafka 的 partition 数调的差很少,batch 在生产上会调得大一点,咱们设为1000,左边的图咱们设并发度为2,最大是10,这样首先分2个并发去跑,另外根据 key 作一个分组的概念,最大分为10组,就能够作到把数据尽可能的打散。

State & Checkpoint

由于 Flink 的数据是一条条过来处理,因此 Flink 中的每条数据处理完了立马发给下游,而不像 spark,须要等该operator 所在的 stage 全部的 task 都完成了再往下发。

Flink 有粗粒度的 checkpoint 机制,以很是小的代价为每一个元素赋予一个 snapshot 概念,只有当属于本次snapshot 的全部数据都进来后才会触发计算,计算完后,才把 buffer 数据往下发,目前 Flink sql 没有提供控制buffer timeout 的接口,即个人数据要buffer多久才往下发。能够在构建 Flink context 时,指定 buffer timeout为 0,处理完的数据才会立马发下去,不须要等达到必定阈值后再往下发。

Backend 默认是维护在 jobmanager 内存,咱们更多使用的的是写到 hdfs 上,每一个 operator 的状态写到 rocksdb 上,而后异步周期增量同步到外部存储。

容错

图中左半部分的红色节点发生了 failover,若是是 at-least-once,则其最上游把数据重发一次就好;但若是是exactly-once,则须要每一个计算节点从上一次失败的时机重放。

Exactly Once Two-Phase Commit

Flink1.4 以后有两阶段提交来支持 exactly-once 。它的概念是从上游 kafka 消费数据后,每一步都会发起一次投票,来记录状态,经过checkpoint的屏障来处理标记,只有最后再写到kafka(0.11以后的版本),只有最后完成以后,才会把每一步的状态让 jobmanager 中的 cordinator 去通知能够固化下来,这样实现 exactly-once。

Savepoints

还有一点 Flink 比较好的就是,基于它的 checkpoint 来实现 savepoint 功能。业务方须要每一个应用恢复节点不同,但愿恢复到的版本也是能够指定的,这是比较好的。这个 savepoint 不仅是数据的恢复,也有计算状态的恢复。

特色:

  1. Trigger (Processing Time、 Event Time、IngestionTime):对比下,Flink支持的流式语义更丰富,不只支持 Processing Time, 也支持 Event timeIngestion Time

2)Continuous Processing & Window:支持纯意义上的持续处理,record by record的,window 也比 Spark处理的好。

  1. Low End-To-End Latency With Exactly-Once Guarantees:由于有两阶段提交,用户是能够选择在牺牲必定吞吐量的状况下,根据业务需求状况来调整来保证端到端的exactly-once。

  2. CEP:支持得好。

  3. Savepoints:能够根据业务的需求作一些版本控制。

也有作的还很差的:

1)SQL (Syntax Function、Parallelism):SQL功能还不是很完备,大部分用户是从hive迁移过来,Spark支持hive覆盖率达到99%以上。 SQL函数不支持,目前还没法对单个operator作并行度的设置。

  1. ML、Graph等:机器学习,图计算等其余领域比Spark要弱一点,但社区也在着力持续改进这个问题。

后续规划

由于如今饿了么已经属于阿里的一员,后续会更多地使用 Flink,也期待用到 Blink。

更多资讯请访问 Apache Flink 中文社区网站

相关文章
相关标签/搜索