Apache Flink 自2017年12月发布的1.4.0版本开始,为流计算引入了一个重要的里程碑特性:TwoPhaseCommitSinkFunction(相关的Jira)。它提取了两阶段提交协议的通用逻辑,使得经过Flink来构建端到端的Exactly-Once程序成为可能。同时支持一些数据源(source)和输出端(sink),包括Apache Kafka 0.11及更高版本。它提供了一个抽象层,用户只须要实现少数方法就能实现端到端的Exactly-Once语义。网络
有关TwoPhaseCommitSinkFunction的使用详见文档: TwoPhaseCommitSinkFunction。或者能够直接阅读Kafka 0.11 sink的文档: kafka。并发
接下来会详细分析这个新功能以及Flink的实现逻辑,分为以下几点。异步
当咱们说『Exactly-Once』时,指的是每一个输入的事件只影响最终结果一次。即便机器或软件出现故障,既没有重复数据,也不会丢数据。分布式
Flink好久以前就提供了Exactly-Once语义。在过去几年中,咱们对Flink的checkpoint机制有过深刻的描述,这是Flink有能力提供Exactly-Once语义的核心。Flink文档还提供了该功能的全面概述。函数
在继续以前,先看下对checkpoint机制的简要介绍,这对理解后面的主题相当重要。url
Flink能够配置一个固定的时间点,按期产生checkpoint,将checkpoint的数据写入持久存储系统,例如S3或HDFS。将checkpoint数据写入持久存储是异步发生的,这意味着Flink应用程序在checkpoint过程当中能够继续处理数据。spa
若是发生机器或软件故障,从新启动后,Flink应用程序将从最新的checkpoint点恢复处理; Flink会恢复应用程序状态,将输入流回滚到上次checkpoint保存的位置,而后从新开始运行。这意味着Flink能够像从未发生过故障同样计算结果。中间件
在Flink 1.4.0以前,Exactly-Once语义仅限于Flink应用程序内部,并无扩展到Flink数据处理完后发送的大多数外部系统。Flink应用程序与各类数据输出端进行交互,开发人员须要有能力本身维护组件的上下文来保证Exactly-Once语义。进程
为了提供端到端的Exactly-Once语义 – 也就是说,除了Flink应用程序内部,Flink写入的外部系统也须要能知足Exactly-Once语义 – 这些外部系统必须提供提交或回滚的方法,而后经过Flink的checkpoint机制来协调。事件
分布式系统中,协调提交和回滚的经常使用方法是两阶段提交协议。在下一节中,咱们将讨论Flink的TwoPhaseCommitSinkFunction是如何利用两阶段提交协议来提供端到端的Exactly-Once语义。
咱们将介绍两阶段提交协议,以及它如何在一个读写Kafka的Flink程序中实现端到端的Exactly-Once语义。Kafka是一个流行的消息中间件,常常与Flink一块儿使用。Kafka在最近的0.11版本中添加了对事务的支持。这意味着如今经过Flink读写Kafaka,并提供端到端的Exactly-Once语义有了必要的支持。
Flink对端到端的Exactly-Once语义的支持不只局限于Kafka,您能够将它与任何一个提供了必要的协调机制的源/输出端一块儿使用。例如Pravega,来自DELL/EMC的开源流媒体存储系统,经过Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once语义。
在今天讨论的这个示例程序中,咱们有:
要使数据输出端提供Exactly-Once保证,它必须将全部数据经过一个事务提交给Kafka。提交捆绑了两个checkpoint之间的全部要写入的数据。这可确保在发生故障时能回滚写入的数据。可是在分布式系统中,一般会有多个并发运行的写入任务的,简单的提交或回滚是不够的,由于全部组件必须在提交或回滚时“一致”才能确保一致的结果。Flink使用两阶段提交协议及预提交阶段来解决这个问题。
在checkpoint开始的时候,即两阶段提交协议的“预提交”阶段。当checkpoint开始时,Flink的JobManager会将checkpoint barrier(将数据流中的记录分为进入当前checkpoint与进入下一个checkpoint)注入数据流。
brarrier在operator之间传递。对于每个operator,它触发operator的状态快照写入到state backend。
数据源保存了消费Kafka的偏移量(offset),以后将checkpoint barrier传递给下一个operator。
这种方式仅适用于operator具备『内部』状态。所谓内部状态,是指Flink state backend保存和管理的 -例如,第二个operator中window聚合算出来的sum值。当一个进程有它的内部状态的时候,除了在checkpoint以前须要将数据变动写入到state backend,不须要在预提交阶段执行任何其余操做。Flink负责在checkpoint成功的状况下正确提交这些写入,或者在出现故障时停止这些写入。
可是,当进程具备『外部』状态时,须要做些额外的处理。外部状态一般以写入外部系统(如Kafka)的形式出现。在这种状况下,为了提供Exactly-Once保证,外部系统必须支持事务,这样才能和两阶段提交协议集成。
在本文示例中的数据须要写入Kafka,所以数据输出端(Data Sink)有外部状态。在这种状况下,在预提交阶段,除了将其状态写入state backend以外,数据输出端还必须预先提交其外部事务。
当checkpoint barrier在全部operator都传递了一遍,而且触发的checkpoint回调成功完成时,预提交阶段就结束了。全部触发的状态快照都被视为该checkpoint的一部分。checkpoint是整个应用程序状态的快照,包括预先提交的外部状态。若是发生故障,咱们能够回滚到上次成功完成快照的时间点。
下一步是通知全部operator,checkpoint已经成功了。这是两阶段提交协议的提交阶段,JobManager为应用程序中的每一个operator发出checkpoint已完成的回调。
数据源和 widnow operator没有外部状态,所以在提交阶段,这些operator没必要执行任何操做。可是,数据输出端(Data Sink)拥有外部状态,此时应该提交外部事务。
咱们对上述知识点总结下:
所以,咱们能够肯定全部operator都赞成checkpoint的最终结果:全部operator都赞成数据已提交,或提交被停止并回滚。
完整的实现两阶段提交协议可能有点复杂,这就是为何Flink将它的通用逻辑提取到抽象类TwoPhaseCommitSinkFunction中的缘由。
接下来基于输出到文件的简单示例,说明如何使用TwoPhaseCommitSinkFunction。用户只须要实现四个函数,就能为数据输出端实现Exactly-Once语义:
咱们知道,若是发生任何故障,Flink会将应用程序的状态恢复到最新的一次checkpoint点。一种极端的状况是,预提交成功了,但在此次commit的通知到达operator以前发生了故障。在这种状况下,Flink会将operator的状态恢复到已经预提交,但还没有真正提交的状态。
咱们须要在预提交阶段保存足够多的信息到checkpoint状态中,以便在重启后能正确的停止或提交事务。在这个例子中,这些信息是临时文件和目标目录的路径。
TwoPhaseCommitSinkFunction已经把这种状况考虑在内了,而且在从checkpoint点恢复状态时,会优先发出一个commit。咱们须要以幂等方式实现提交,通常来讲,这并不难。在这个示例中,咱们能够识别出这样的状况:临时文件不在临时目录中,但已经移动到目标目录了。
在TwoPhaseCommitSinkFunction中,还有一些其余边界状况也会考虑在内,请参考Flink文档了解更多信息。
总结下本文涉及的一些要点:
这是个使人兴奋的功能,期待Flink TwoPhaseCommitSinkFunction在将来支持更多的数据接收端。
原文连接 本文为云栖社区原创内容,未经容许不得转载。