Apache Flink 端到端(end-to-end)Exactly-Once特性概览 (翻译)

Apache Flink 端到端(end-to-end)Exactly-Once特性概览

本文是flink博文的翻译,原文连接https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.htmlhtml

2017年12月份发布的Apache Flink 1.4版本,引进了一个重要的特性:TwoPhaseCommitSinkFunction (关联Jirahttps://issues.apache.org/jira/browse/FLINK-7210) ,它抽取了两阶段提交协议的公共部分,使得构建端到端Excatly-Once的Flink程序变为了可能。这些外部系统包括Kafka0.11及以上的版本,以及一些其余的数据输入(data sources)和数据接收(data sink)。它提供了一个抽象层,须要用户本身手动去实现Exactly-Once语义。java

若是仅仅是使用,能够查看这个文档https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html算法

若是想要了解更多,这篇文章咱们会深刻了解这个特性,以及Flink背后作的工做。apache

纵览全篇,有如下几点:api

  • 描述Flink checkpoints的做用,以及它是如何保障Flink程序Exactly-Once的语义的。
  • 展示Flink如何与两阶段提交协议与输入输出(data sources and data sinks)交互,借此传递端到端的Exactly-Once语义保证。
  • 经过一个简单的例子来展示如何使用TwoPhaseCommitSinkFunction,来实现Exactly-Once的文件输出(file sink)。

Apache Flink程序的Exactly-Once语义

当咱们在讨论Exactly-Once语义的时候,咱们指的是每个到来的事件仅会影响最终结果一次。就算机器宕机或者软件崩溃,即没有数据重复,也没有数据丢失。网络

Flink好久以前就提供了Exactly-Once语义。在过去的几年时间里,咱们对Flink的checkpoint作了深刻的描述 ,这个是Flink可以提供Exactly-Once语义的核心。Flink文档也对这个特性作了深刻的介绍异步

在咱们继续以前,有一个关于checkpoint算法的简要介绍,这对于了解更广的主题来讲是十分必要的。分布式

一个checkpoint是Flink的一致性快照,它包括:性能

  1. 程序当前的状态
  2. 输入流的位置

Flink经过一个可配置的时间,周期性的生成checkpoint,将它写入到存储中,例如S3或者HDFS。写入到存储的过程是异步的,意味着Flink程序在checkpoint运行的同时还能够处理数据。翻译

在机器或者程序遇到错误重启的时候,Flink程序会使用最新的checkpoint进行恢复。Flink会恢复程序的状态,将输入流回滚到checkpoint保存的位置,而后从新开始运行。这意味着Flink能够像没有发生错误同样计算结果。

在Flink 1.4.0版本以前,Flink仅保证Flink程序内部的Exactly-Once语义,没有扩展到在Flink数据处理完成后存储的外部系统。

Flink程序能够和不一样的接收器(sink)交互,开发者须要有能力在一个组件的上下文中维持Exactly-Once语义。

为了提供端到端Exactly-Once语义,除了Flink应用程序自己的状态,Flink写入的外部存储也须要知足这个语义。也就是说,这些外部系统必须提供提交或者回滚的方法,而后经过Flink的checkpoint来协调。

在分布式系统中,协调提交和回滚的通用作法是两阶段提交。接下来,咱们讨论Flink的TwoPhaseCommitSinkFunction如何使用两阶段提交协议来保证端到端的Exactly-Once语义。

Flink程序端到端的Exactly-Once语义

咱们简略的看一下两阶段提交协议,以及它如何在一个读写Kafka的Flink实例程序中提供端到端的Exactly-Once语义。Kafka是一个流行的消息中间件,常常被拿来和Flink一块儿使用,Kafka 在最近的0.11版本中添加了对事务的支持。这意味着如今Flink读写Kafka有了必要的支持,使之能提供端到端的Exactly-Once语义。

Flink对端到端的Exactly-Once语义不只仅局限在Kafka,你可使用任一输入输出源(source、sink),只要他们提供了必要的协调机制。例如Pravega ,来自DELL/EMC的流数据存储系统,经过Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once语义。

在这个示例程序中,咱们有:

  • 从Kafka读取数据的data source(KafkaConsumer,在Flink中)
  • 窗口聚合
  • 将数据写回到Kafka的data sink(KafkaProducer,在Flink中)

在data sink中要保证Exactly-Once语义,它必须将全部的写入数据经过一个事务提交到Kafka。在两个checkpoint之间,一个提交绑定了全部要写入的数据。

这保证了当出错的时候,写入的数据能够被回滚。

然而在分布式系统中,一般拥有多个并行执行的写入任务,简单的提交和回滚是效率低下的。为了保证一致性,全部的组件必须先达成一致,才能进行提交或者回滚。Flink使用了两阶段提交协议以及预提交阶段来解决这个问题。

在checkpoint开始的时候,即两阶段提交中的预提交阶段。首先,Flink的JobManager在数据流中注入一个checkpoint屏障(它将数据流中的记录分割开,一些进入到当前的checkpoint,另外一些进入下一个checkpoint)。

屏障经过operator传递。对于每个operator,它将触发operator的状态快照写入到state backend。

data source保存了Kafka的offset,以后把checkpoint屏障传递到后续的operator。

这种方式仅适用于operator有他的内部状态。内部状态是指,Flink state backends保存和管理的内容-举例来讲,第二个operator中window聚合算出来的sum。当一个进程有它的内部状态的时候,除了在checkpoint以前将须要将数据更改写入到state backend,不须要在预提交阶段作其余的动做。在checkpoint成功的时候,Flink会正确的提交这些写入,在checkpoint失败的时候会终止提交。

然而,当一个进程有外部状态的时候,须要用一种不一样的方式来处理。外部状态一般由须要写入的外部系统引入,例如Kafka。所以,为了提供Exactly-Once保证,外部系统必须提供事务支持,借此和两阶段提交协议交互。

咱们知道在咱们的例子中,因为须要将数据写到Kafka,data sink有外部的状态。所以,在预提交阶段,除了将状态写入到state backend以外,data sink必须预提交本身的外部事务。

当checkpoint屏障在全部operator中都传递了一遍,以及它触发的快照写入完成,预提交阶段结束。这个时候,快照成功结束,整个程序的状态,包括预提交的外部状态是一致的。万一出错的时候,咱们能够经过checkpoint从新初始化。

下一步是通知全部operator,checkpoint已经成功了。这时两阶段提交中的提交阶段,Jobmanager为程序中的每个operator发起checkpoint已经完成的回调。data source和window operator没有外部的状态,在提交阶段中,这些operator不会执行任何动做。data sink拥有外部状态,因此经过事务提交外部写入。

让咱们对上述的知识点汇总一下:

  • 一旦全部的operator完成预提交,就提交一个commit。
  • 若是至少有一个预提交失败,其余的都会失败,这时回滚到上一个checkpoint保存的位置。
  • 预提交成功后,提交的commit也须要保障最终成功-operator和外部系统须要提供这个保障。若是commit失败了(好比网络中断引发的故障),整个flink程序也所以失败,它会根据用户的重启策略重启,可能还会有一个尝试性的提交。这个过程很是严苛,由于若是提交没有最终生效,会致使数据丢失。

所以,咱们能够肯定全部的operator赞成checkpoint的最终结果:要么都赞成提交数据,要么提交被终止而后回滚。

在Flink程序中实现两阶段提交

完整的实现两阶段提交协议可能会有一点复杂,所以Flink将通用逻辑提取到一个abstract的类TwoPhaseCommitSinkFunction。

让咱们经过一个简单的文件操做例子来讲明如何使用TwoPhaseCommitSinkFunction。咱们只须要实现四个method,并使sink呈现Exactly-Once语义。

  1. beginTransaction - 在事务开始前,咱们在目标文件系统上面的临时目录上建立一个临时文件。随后,咱们在程序处理的时候能够将数据写入到这个文件。
  2. preCommit - 在预提交阶段,咱们刷新文件到磁盘,关闭文件,不要从新打开写入。咱们也会为下一个checkpoint的文件写入开启一个新的事务。
  3. commit - 在提交阶段,咱们原子性的将预提交阶段的文件移动到真正的目标目录。须要注意的是,这增长了输出数据的可见性的延迟。
  4. abort - 在终止阶段,咱们删除临时文件。

咱们知道,若是步骤中有任何错误,Flink会经过最新的checkpoint来恢复程序状态。在一个罕见的场景中,预提交成功了,在通知到达operator以前失败了。这时候,Flink将operator的状态恢复到预提交阶段,即还未真正提交的时候。

为了能在重启的时候可以正确的终止或者提交事务,咱们须要在预提交阶段将足够的信息保存到checkpoint中。在这个例子中,这些信息是临时文件以及目标目录的地址。

TwoPhaseCommitSinkFunction 已经把这个场景考虑进去了,在从checkpoint恢复的时候,它会优先提交一个commit。咱们的任务是将commit实现成一个幂等的操做。通常的,这不是难题。在这个例子中,咱们能够发现这种状况:临时文件不在临时目录中,可是已经移动到目标目录了。

在TwoPhaseCommitSinkFunction中,有一些其余的边界条件也考虑在内了。经过Flink文档查看更多。

总结

若是你看到了这么后面,很感谢你通读这个详细的帖子。如下是咱们主要覆盖的关键点:

  • Flink的checkpoint系统是它支撑两阶段协议和保障Exactly-Once语义的基础设施,
  • 这种实现方案的优势是,Flink不像其余系统那样,经过网络传输存储数据 - 它不须要像大部分批处理程序那样,将每个计算结果保存到磁盘。
  • Flink的TwoPhaseCommitSinkFunction提取了两阶段提交协议的通用部分,经过这个方法结合Flink以及支持事务的外部系统,能够构建端到端的Exactly-Once程序。
  • Flink 1.4.0开始,Pravega和Kafka 0.11 producer提供了Exactly-Once语义;经过Kafka在0.11版本第一次引入的事务,为在Flink中使用Kafka producer提供Exactly-Once语义提供了可能性。
  • Kafka 0.11版本的producer 是在TwoPhaseCommitSinkFunction基础上实现的,它at-least-once的producer的基础上增长了很小的开销。

咱们为这个特性能提供的功能感到很兴奋,从此但愿能找到更多支持 TwoPhaseCommitSinkFunction的producer。

相关文章
相关标签/搜索