做者:孙梦瑶 整理:韩非java
本文主要分享内容以下:git
首先举一个无状态计算的例子:消费延迟计算。假设如今有一个消息队列,消息队列中有一个生产者持续往消费队列写入消息,多个消费者分别从消息队列中读取消息。从图上能够看出,生产者已经写入 16 条消息,Offset 停留在 15 ;有 3 个消费者,有的消费快,而有的消费慢。消费快的已经消费了 13 条数据,消费者慢的才消费了 七、8 条数据。github
如何实时统计每一个消费者落后多少条数据,如图给出了输入输出的示例。能够了解到输入的时间点有一个时间戳,生产者将消息写到了某个时间点的位置,每一个消费者同一时间点分别读到了什么位置。刚才也提到了生产者写入了 15 条,消费者分别读取了 十、七、12 条。那么问题来了,怎么将生产者、消费者的进度转换为右侧示意图信息呢?apache
consumer 0 落后了 5 条,consumer 1 落后了 8 条,consumer 2 落后了 3 条,根据 Flink 的原理,此处需进行 Map 操做。Map 首先把消息读取进来,而后分别相减,便可知道每一个 consumer 分别落后了几条。Map 一直往下发,则会得出最终结果。api
你们会发现,在这种模式的计算中,不管这条输入进来多少次,输出的结果都是同样的,由于单条输入中已经包含了所需的全部信息。消费落后等于生产者减去消费者。生产者的消费在单条数据中能够获得,消费者的数据也能够在单条数据中获得,因此相同输入能够获得相同输出,这就是一个无状态的计算。数组
相应的什么是有状态的计算?性能优化
以访问日志统计量的例子进行说明,好比当前拿到一个 Nginx 访问日志,一条日志表示一个请求,记录该请求从哪里来,访问的哪一个地址,须要实时统计每一个地址总共被访问了多少次,也即每一个 API 被调用了多少次。能够看到下面简化的输入和输出,输入第一条是在某个时间点请求 GET 了 /api/a;第二条日志记录了某个时间点 Post /api/b ;第三条是在某个时间点 GET了一个 /api/a,总共有 3 个 Nginx 日志。从这 3 条 Nginx 日志能够看出,第一条进来输出 /api/a 被访问了一次,第二条进来输出 /api/b 被访问了一次,紧接着又进来一条访问 api/a,因此 api/a 被访问了 2 次。不一样的是,两条 /api/a 的 Nginx 日志进来的数据是同样的,但输出的时候结果可能不一样,第一次输出 count=1 ,第二次输出 count=2,说明相同输入可能获得不一样输出。输出的结果取决于当前请求的 API 地址以前累计被访问过多少次。第一条过来累计是 0 次,count = 1,第二条过来 API 的访问已经有一次了,因此 /api/a 访问累计次数 count=2。单条数据其实仅包含当前此次访问的信息,而不包含全部的信息。要获得这个结果,还须要依赖 API 累计访问的量,即状态。微信
这个计算模式是将数据输入算子中,用来进行各类复杂的计算并输出数据。这个过程当中算子会去访问以前存储在里面的状态。另一方面,它还会把如今的数据对状态的影响实时更新,若是输入 200 条数据,最后输出就是 200 条结果。网络
什么场景会用到状态呢?下面列举了常见的 4 种:数据结构
管理状态最直接的方式就是将数据都放到内存中,这也是很常见的作法。好比在作 WordCount 时,Word 做为输入,Count 做为输出。在计算的过程当中把输入不断累加到 Count。
但对于流式做业有如下要求:
基于以上要求,内存的管理就会出现一些问题。因为内存的容量是有限制的。若是要作 24 小时的窗口计算,将 24 小时的数据都放到内存,可能会出现内存不足;另外,做业是 7*24,须要保障高可用,机器若出现故障或者宕机,须要考虑如何备份及从备份中去恢复,保证运行的做业不受影响;此外,考虑横向扩展,假如网站的访问量不高,统计每一个 API 访问次数的程序能够用单线程去运行,但若是网站访问量忽然增长,单节点没法处理所有访问数据,此时须要增长几个节点进行横向扩展,这时数据的状态如何平均分配到新增长的节点也问题之一。所以,将数据都放到内存中,并非最合适的一种状态管理方式。
最理想的状态管理须要知足易用、高效、可靠三点需求:
Managed State 是 Flink 自动管理的 State,而 Raw State 是原生态 State,二者的区别以下:
Managed State 分为两种,一种是 Keyed State;另一种是 Operator State。在Flink Stream模型中,Datastream 通过 keyBy 的操做能够变为 KeyedStream 。 每一个 Key 对应一个 State,即一个 Operator 实例处理多个 Key,访问相应的多个 State,并由此就衍生了 Keyed State。Keyed State 只能用在 KeyedStream 的算子中,即在整个程序中没有 keyBy 的过程就没有办法使用 KeyedStream。
相比较而言,Operator State 能够用于全部算子,相对于数据源有一个更好的匹配方式,经常使用于 Source,例如 FlinkKafkaConsumer。相比 Keyed State,一个 Operator 实例对应一个 State,随着并发的改变,Keyed State 中,State 随着 Key 在实例间迁移,好比原来有 1 个并发,对应的 API 请求过来,/api/a 和 /api/b 都存放在这个实例当中;若是请求量变大,须要扩容,就会把 /api/a 的状态和 /api/b 的状态分别放在不一样的节点。因为 Operator State 没有 Key,并发改变时须要选择状态如何从新分配。其中内置了 2 种分配方式:一种是均匀分配,另一种是将全部 State 合并为全量 State 再分发给每一个实例。
在访问上,Keyed State 经过 RuntimeContext 访问,这须要 Operator 是一个Rich Function。Operator State 须要本身实现 CheckpointedFunction 或 ListCheckpointed 接口。在数据结构上,Keyed State 支持的数据结构,好比 ValueState、ListState、ReducingState、AggregatingState 和 MapState;而 Operator State 支持的数据结构相对较少,如 ListState。
Keyed State 有不少种,如图为几种 Keyed State 之间的关系。首先 State 的子类中一级子类有 ValueState、MapState、AppendingState。AppendingState 又有一个子类 MergingState。MergingState 又分为 3 个子类分别是ListState、ReducingState、AggregatingState。这个继承关系使它们的访问方式、数据结构也存在差别。
几种 Keyed State 的差别具体体如今:
下面以 ValueState 为例,来阐述一下具体如何使用,以状态机的案例来说解 。
源代码地址:github.com/apache/flin…
感兴趣的同窗可直接查看完整源代码,在此截取部分。如图为 Flink 做业的主方法与主函数中的内容,前面的输入、后面的输出以及一些个性化的配置项都已去掉,仅保留了主干。
首先 events 是一个 DataStream,经过 env.addSource 加载数据进来,接下来有一个 DataStream 叫 alerts,先 keyby 一个 sourceAddress,而后在 flatMap 一个StateMachineMapper。StateMachineMapper 就是一个状态机,状态机指有不一样的状态与状态间有不一样的转换关系的结合,以买东西的过程简单举例。首先下订单,订单生成后状态为待付款,当再来一个事件状态付款成功,则事件的状态将会从待付款变为已付款,待发货。已付款,待发货的状态再来一个事件发货,订单状态将会变为配送中,配送中的状态再来一个事件签收,则该订单的状态就变为已签收。在整个过程当中,随时均可以来一个事件,取消订单,不管哪一个状态,一旦触发了取消订单事件最终就会将状态转移到已取消,至此状态就结束了。
Flink 写状态机是如何实现的?首先这是一个 RichFlatMapFunction,要用 Keyed State getRuntimeContext,getRuntimeContext 的过程当中须要 RichFunction,因此须要在 open 方法中获取 currentState ,而后 getState,currentState 保存的是当前状态机上的状态。
若是刚下订单,那么 currentState 就是待付款状态,初始化后,currentState 就表明订单完成。订单来了后,就会走 flatMap 这个方法,在 flatMap 方法中,首先定义一个 State,从 currentState 取出,即 Value,Value 取值后先判断值是否为空,若是 sourceAddress state 是空,则说明没有被使用过,那么此状态应该为刚建立订单的初始状态,即待付款。而后赋值 state = State.Initial,注意此处的 State 是本地的变量,而不是 Flink 中管理的状态,将它的值从状态中取出。接下来在本地又会来一个变量,而后 transition,将事件对它的影响加上,刚才待付款的订单收到付款成功的事件,就会变成已付款,待发货,而后 nextState 便可算出。此外,还须要判断 State 是否合法,好比一个已签收的订单,又来一个状态叫取消订单,会发现已签收订单不能被取消,此时这个状态就会下发,订单状态为非法状态。
若是不是非法的状态,还要看该状态是否已经没法转换,好比这个状态变为已取消时,就不会在有其余的状态再发生了,此时就会从 state 中 clear。clear 是全部的 Flink 管理 keyed state 都有的公共方法,意味着将信息删除,若是既不是一个非法状态也不是一个结束状态,后面可能还会有更多的转换,此时须要将订单的当前状态 update ,这样就完成了 ValueState 的初始化、取值、更新以及清零,在整个过程当中状态机的做用就是将非法的状态进行下发,方便下游进行处理。其余的状态也是相似的使用方式。
Flink 状态保存主要依靠 Checkpoint 机制,Checkpoint 会定时制做分布式快照,对程序中的状态进行备份。分布式快照是如何实现的能够参考【第二课时】的内容,这里就不在阐述分布式快照具体是如何实现的。分布式快照 Checkpoint 完成后,看成业发生故障了如何去恢复?假如做业分布跑在 3 台机器上,其中一台挂了。这个时候须要把进程或者线程移到 active 的 2 台机器上,此时还须要将整个做业的全部 Task 都回滚到最后一次成功 Checkpoint 中的状态,而后从该点开始继续处理。
若是要从 Checkpoint 恢复,必要条件是数据源须要支持数据从新发送。Checkpoint恢复后, Flink 提供两种一致性语义,一种是刚好一次,一种是至少一次。在作 Checkpoint时,可根据 Barries 对齐来判断是刚好一次仍是至少一次,若是对齐,则为刚好一次,不然没有对齐即为至少一次。若是做业是单线程处理,也就是说 Barries 是不须要对齐的;若是只有一个 Checkpoint 在作,无论何时从 Checkpoint 恢复,都会恢复到刚才的状态;若是有多个节点,假如一个数据的 Barries 到了,另外一个 Barries 尚未来,内存中的状态若是已经存储。那么这 2 个流是不对齐的,恢复的时候其中一个流可能会有重复。
Checkpoint 经过代码的实现方法以下:
上面讲过,除了故障恢复以外,还须要能够手动去调整并发从新分配这些状态。手动调整并发,必需要重启做业并会提示 Checkpoint 已经不存在,那么做业如何恢复数据?
一方面 Flink 在 Cancel 时容许在外部介质保留 Checkpoint ;另外一方面,Flink 还有另一个机制是 SavePoint。
Savepoint 与 Checkpoint 相似,一样是把状态存储到外部介质。看成业失败时,能够从外部恢复。Savepoint 与 Checkpoint 有什么区别呢?
Checkpoint 的存储,第一种是内存存储,即 MemoryStateBackend,构造方法是设置最大的StateSize,选择是否作异步快照,这种存储状态自己存储在 TaskManager 节点也就是执行节点内存中的,由于内存有容量限制,因此单个 State maxStateSize 默认 5 M,且须要注意 maxStateSize <= akka.framesize 默认 10 M。Checkpoint 存储在 JobManager 内存中,所以总大小不超过 JobManager 的内存。推荐使用的场景为:本地测试、几乎无状态的做业,好比 ETL、JobManager 不容易挂,或挂掉影响不大的状况。不推荐在生产场景使用。
另外一种就是在文件系统上的 FsStateBackend ,构建方法是须要传一个文件路径和是否异步快照。State 依然在 TaskManager 内存中,但不会像 MemoryStateBackend 有 5 M 的设置上限,Checkpoint 存储在外部文件系统(本地或 HDFS),打破了总大小 Jobmanager 内存的限制。容量限制上,单 TaskManager 上 State 总量不超过它的内存,总大小不超过配置的文件系统容量。推荐使用的场景、常规使用状态的做业、例如分钟级窗口聚合或 join、须要开启HA的做业。
还有一种存储为 RocksDBStateBackend ,RocksDB 是一个 key/value 的内存存储系统,和其余的 key/value 同样,先将状态放到内存中,若是内存快满时,则写入到磁盘中,但须要注意 RocksDB 不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。不过 RocksDB 支持增量的 Checkpoint,也是目前惟一增量 Checkpoint 的 Backend,意味着每次用户不须要将全部状态都写进去,将增量的改变的状态写进去便可。它的 Checkpoint 存储在外部文件系统(本地或HDFS),其容量限制只要单个 TaskManager 上 State 总量不超过它的内存+磁盘,单 Key最大 2G,总大小不超过配置的文件系统容量便可。推荐使用的场景为:超大状态的做业,例如天级窗口聚合、须要开启 HA 的做业、最好是对状态读写性能要求不高的做业。
前面提到有状态的做业要有有状态的逻辑,有状态的逻辑是由于数据之间存在关联,单条数据是没有办法把全部的信息给表现出来。因此须要经过状态来知足业务逻辑。
使用了状态,为何要管理状态?由于实时做业须要7*24不间断的运行,须要应对不可靠的因素而带来的影响。
那如何选择状态的类型和存储方式?结合前面的内容,能够看到,首先是要分析清楚业务场景;好比想要作什么,状态到底大不大。比较各个方案的利弊,选择根据需求合适的状态类型和存储方式便可。
视频回顾:www.bilibili.com/video/av497…
▼ Apache Flink 社区推荐 ▼
Apache Flink 及大数据领域顶级盛会 Flink Forward Asia 2019 重磅开启,目前正在征集议题,限量早鸟票优惠ing。了解 Flink Forward Asia 2019 的更多信息,请查看:
developer.aliyun.com/special/ffa…
首届 Apache Flink 极客挑战赛重磅开启,聚焦机器学习与性能优化两大热门领域,40万奖金等你拿,加入挑战请点击:
tianchi.aliyun.com/markets/tia…
关注 Flink 官方社区微信公众号,了解更多 Flink 资讯!