本文是flink博文的翻译,原文连接https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.htmlhtml
2017年12月份发布的Apache Flink 1.4版本,引进了一个重要的特性:TwoPhaseCommitSinkFunction (关联Jirahttps://issues.apache.org/jira/browse/FLINK-7210) ,它抽取了两阶段提交协议的公共部分,使得构建端到端Excatly-Once的Flink程序变为了可能。这些外部系统包括Kafka0.11及以上的版本,以及一些其余的数据输入(data sources)和数据接收(data sink)。它提供了一个抽象层,须要用户本身手动去实现Exactly-Once语义。java
若是仅仅是使用,能够查看这个文档https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html。算法
若是想要了解更多,这篇文章咱们会深刻了解这个特性,以及Flink背后作的工做。apache
纵览全篇,有如下几点:api
当咱们在讨论Exactly-Once语义的时候,咱们指的是每个到来的事件仅会影响最终结果一次。就算机器宕机或者软件崩溃,即没有数据重复,也没有数据丢失。网络
Flink好久以前就提供了Exactly-Once语义。在过去的几年时间里,咱们对Flink的checkpoint作了深刻的描述 ,这个是Flink可以提供Exactly-Once语义的核心。Flink文档也对这个特性作了深刻的介绍 。异步
在咱们继续以前,有一个关于checkpoint算法的简要介绍,这对于了解更广的主题来讲是十分必要的。分布式
一个checkpoint是Flink的一致性快照,它包括:性能
Flink经过一个可配置的时间,周期性的生成checkpoint,将它写入到存储中,例如S3或者HDFS。写入到存储的过程是异步的,意味着Flink程序在checkpoint运行的同时还能够处理数据。翻译
在机器或者程序遇到错误重启的时候,Flink程序会使用最新的checkpoint进行恢复。Flink会恢复程序的状态,将输入流回滚到checkpoint保存的位置,而后从新开始运行。这意味着Flink能够像没有发生错误同样计算结果。
在Flink 1.4.0版本以前,Flink仅保证Flink程序内部的Exactly-Once语义,没有扩展到在Flink数据处理完成后存储的外部系统。
Flink程序能够和不一样的接收器(sink)交互,开发者须要有能力在一个组件的上下文中维持Exactly-Once语义。
为了提供端到端Exactly-Once语义,除了Flink应用程序自己的状态,Flink写入的外部存储也须要知足这个语义。也就是说,这些外部系统必须提供提交或者回滚的方法,而后经过Flink的checkpoint来协调。
在分布式系统中,协调提交和回滚的通用作法是两阶段提交。接下来,咱们讨论Flink的TwoPhaseCommitSinkFunction如何使用两阶段提交协议来保证端到端的Exactly-Once语义。
咱们简略的看一下两阶段提交协议,以及它如何在一个读写Kafka的Flink实例程序中提供端到端的Exactly-Once语义。Kafka是一个流行的消息中间件,常常被拿来和Flink一块儿使用,Kafka 在最近的0.11版本中添加了对事务的支持。这意味着如今Flink读写Kafka有了必要的支持,使之能提供端到端的Exactly-Once语义。
Flink对端到端的Exactly-Once语义不只仅局限在Kafka,你可使用任一输入输出源(source、sink),只要他们提供了必要的协调机制。例如Pravega ,来自DELL/EMC的流数据存储系统,经过Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once语义。
在这个示例程序中,咱们有:
在data sink中要保证Exactly-Once语义,它必须将全部的写入数据经过一个事务提交到Kafka。在两个checkpoint之间,一个提交绑定了全部要写入的数据。
这保证了当出错的时候,写入的数据能够被回滚。
然而在分布式系统中,一般拥有多个并行执行的写入任务,简单的提交和回滚是效率低下的。为了保证一致性,全部的组件必须先达成一致,才能进行提交或者回滚。Flink使用了两阶段提交协议以及预提交阶段来解决这个问题。
在checkpoint开始的时候,即两阶段提交中的预提交阶段。首先,Flink的JobManager在数据流中注入一个checkpoint屏障(它将数据流中的记录分割开,一些进入到当前的checkpoint,另外一些进入下一个checkpoint)。
屏障经过operator传递。对于每个operator,它将触发operator的状态快照写入到state backend。
data source保存了Kafka的offset,以后把checkpoint屏障传递到后续的operator。
这种方式仅适用于operator有他的内部状态。内部状态是指,Flink state backends保存和管理的内容-举例来讲,第二个operator中window聚合算出来的sum。当一个进程有它的内部状态的时候,除了在checkpoint以前将须要将数据更改写入到state backend,不须要在预提交阶段作其余的动做。在checkpoint成功的时候,Flink会正确的提交这些写入,在checkpoint失败的时候会终止提交。
然而,当一个进程有外部状态的时候,须要用一种不一样的方式来处理。外部状态一般由须要写入的外部系统引入,例如Kafka。所以,为了提供Exactly-Once保证,外部系统必须提供事务支持,借此和两阶段提交协议交互。
咱们知道在咱们的例子中,因为须要将数据写到Kafka,data sink有外部的状态。所以,在预提交阶段,除了将状态写入到state backend以外,data sink必须预提交本身的外部事务。
当checkpoint屏障在全部operator中都传递了一遍,以及它触发的快照写入完成,预提交阶段结束。这个时候,快照成功结束,整个程序的状态,包括预提交的外部状态是一致的。万一出错的时候,咱们能够经过checkpoint从新初始化。
下一步是通知全部operator,checkpoint已经成功了。这时两阶段提交中的提交阶段,Jobmanager为程序中的每个operator发起checkpoint已经完成的回调。data source和window operator没有外部的状态,在提交阶段中,这些operator不会执行任何动做。data sink拥有外部状态,因此经过事务提交外部写入。
让咱们对上述的知识点汇总一下:
所以,咱们能够肯定全部的operator赞成checkpoint的最终结果:要么都赞成提交数据,要么提交被终止而后回滚。
完整的实现两阶段提交协议可能会有一点复杂,所以Flink将通用逻辑提取到一个abstract的类TwoPhaseCommitSinkFunction。
让咱们经过一个简单的文件操做例子来讲明如何使用TwoPhaseCommitSinkFunction。咱们只须要实现四个method,并使sink呈现Exactly-Once语义。
咱们知道,若是步骤中有任何错误,Flink会经过最新的checkpoint来恢复程序状态。在一个罕见的场景中,预提交成功了,在通知到达operator以前失败了。这时候,Flink将operator的状态恢复到预提交阶段,即还未真正提交的时候。
为了能在重启的时候可以正确的终止或者提交事务,咱们须要在预提交阶段将足够的信息保存到checkpoint中。在这个例子中,这些信息是临时文件以及目标目录的地址。
TwoPhaseCommitSinkFunction 已经把这个场景考虑进去了,在从checkpoint恢复的时候,它会优先提交一个commit。咱们的任务是将commit实现成一个幂等的操做。通常的,这不是难题。在这个例子中,咱们能够发现这种状况:临时文件不在临时目录中,可是已经移动到目标目录了。
在TwoPhaseCommitSinkFunction中,有一些其余的边界条件也考虑在内了。经过Flink文档查看更多。
若是你看到了这么后面,很感谢你通读这个详细的帖子。如下是咱们主要覆盖的关键点:
咱们为这个特性能提供的功能感到很兴奋,从此但愿能找到更多支持 TwoPhaseCommitSinkFunction的producer。