Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理

文章目录: android

1. Apache Flink 应用程序中的 Exactly-Once 语义web

2. Flink 应用程序端到端的 Exactly-Once 语义cookie

3. 示例 Flink 应用程序启动预提交阶段网络

4. 在 Flink 中实现两阶段提交 Operatorsession

5. 总结并发


Apache Flink 自2017年12月发布的1.4.0版本开始,为流计算引入了一个重要的里程碑特性:TwoPhaseCommitSinkFunction(相关的 Jira)。它提取了两阶段提交协议的通用逻辑,使得经过 Flink 来构建端到端的 Exactly-Once 程序成为可能。同时支持一些数据源(source)和输出端(sink),包括 Apache Kafka  0.11及更高版本。它提供了一个抽象层,用户只须要实现少数方法就能实现端到端的 Exactly-Once 语义。app


有关 TwoPhaseCommitSinkFunction 的使用详见文档: TwoPhaseCommitSinkFunction。或者能够直接阅读 Kafka 0.11 sink 的文档: kafka异步


接下来会详细分析这个新功能以及Flink的实现逻辑,分为以下几点。分布式


  • 描述 Flink checkpoint 机制是如何保证Flink程序结果的 Exactly-Once 的ide

  • 显示 Flink 如何经过两阶段提交协议与数据源和数据输出端交互,以提供端到端的 Exactly-Once 保证

  • 经过一个简单的示例,了解如何使用 TwoPhaseCommitSinkFunction 实现 Exactly-Once 的文件输出

Flink 应用程序中的 Exactly-Once 语义

当咱们说『Exactly-Once』时,指的是每一个输入的事件只影响最终结果一次。即便机器或软件出现故障,既没有重复数据,也不会丢数据。


Flink 好久以前就提供了 Exactly-Once 语义。在过去几年中,咱们对 Flink 的 checkpoint 机制有过深刻的描述,这是 Flink 有能力提供 Exactly-Once 语义的核心。Flink 文档还提供了该功能的全面概述


在继续以前,先看下对 checkpoint 机制的简要介绍,这对理解后面的主题相当重要。


一次 checkpoint 是如下内容的一致性快照:

 

  • 应用程序的当前状态

  • 输入流的位置


Flink 能够配置一个固定的时间点,按期产生 checkpoint,将 checkpoint 的数据写入持久存储系统,例如 S3 或 HDFS 。将 checkpoint 数据写入持久存储是异步发生的,这意味着 Flink 应用程序在 checkpoint 过程当中能够继续处理数据。


若是发生机器或软件故障,从新启动后,Flink 应用程序将从最新的 checkpoint 点恢复处理; Flink 会恢复应用程序状态,将输入流回滚到上次 checkpoint 保存的位置,而后从新开始运行。这意味着 Flink 能够像从未发生过故障同样计算结果。


在 Flink 1.4.0 以前,Exactly-Once 语义仅限于 Flink 应用程序内部,并无扩展到 Flink 数据处理完后发送的大多数外部系统。Flink 应用程序与各类数据输出端进行交互,开发人员须要有能力本身维护组件的上下文来保证 Exactly-Once 语义。


为了提供端到端的 Exactly-Once 语义 - 也就是说,除了 Flink 应用程序内部, Flink 写入的外部系统也须要能知足 Exactly-Once 语义 - 这些外部系统必须提供提交或回滚的方法,而后经过 Flink 的 checkpoint 机制来协调。


分布式系统中,协调提交和回滚的经常使用方法是两阶段提交协议。在下一节中,咱们将讨论 Flink 的 TwoPhaseCommitSinkFunction 是如何利用两阶段提交协议来提供端到端的 Exactly-Once 语义。


Flink 应用程序端到端的 Exactly-Once 语义

咱们将介绍两阶段提交协议,以及它如何在一个读写 Kafka 的 Flink 程序中实现端到端的 Exactly-Once 语义。Kafka 是一个流行的消息中间件,常常与 Flink 一块儿使用。Kafka 在最近的 0.11 版本中添加了对事务的支持。这意味着如今经过 Flink 读写 Kafka ,并提供端到端的 Exactly-Once 语义有了必要的支持


Flink 对端到端的 Exactly-Once 语义的支持不只局限于 Kafka ,您能够将它与任何一个提供了必要的协调机制的源/输出端一块儿使用。例如 Pravega,来自 DELL/EMC 的开源流媒体存储系统,经过 Flink 的 TwoPhaseCommitSinkFunction 也能支持端到端的 Exactly-Once 语义。



640.png


在今天讨论的这个示例程序中,咱们有:


  • 从 Kafka 读取的数据源( Flink 内置的 KafkaConsumer

  • 窗口聚合

  • 将数据写回 Kafka 的数据输出端( Flink 内置的 KafkaProducer )


要使数据输出端提供 Exactly-Once 保证,它必须将全部数据经过一个事务提交给 Kafka。提交捆绑了两个 checkpoint 之间的全部要写入的数据。这可确保在发生故障时能回滚写入的数据。可是在分布式系统中,一般会有多个并发运行的写入任务的,简单的提交或回滚是不够的,由于全部组件必须在提交或回滚时“一致”才能确保一致的结果。Flink 使用两阶段提交协议及预提交阶段来解决这个问题。


在 checkpoint 开始的时候,即两阶段提交协议的“预提交”阶段。当 checkpoint 开始时,Flink 的 JobManager 会将 checkpoint barrier(将数据流中的记录分为进入当前 checkpoint 与进入下一个 checkpoint )注入数据流。


brarrier 在 operator 之间传递。对于每个 operator,它触发 operator 的状态快照写入到 state backend。


640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1

exactly-once-two-phase-commit-2


数据源保存了消费 Kafka 的偏移量(offset),以后将 checkpoint barrier 传递给下一个 operator。


这种方式仅适用于 operator 具备『内部』状态。所谓内部状态,是指 Flink statebackend 保存和管理的 -例如,第二个 operator 中 window 聚合算出来的 sum 值。当一个进程有它的内部状态的时候,除了在 checkpoint 以前须要将数据变动写入到 state backend ,不须要在预提交阶段执行任何其余操做。Flink 负责在 checkpoint 成功的状况下正确提交这些写入,或者在出现故障时停止这些写入。


640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1

exactly-once-two-phase-commit-3

示例 Flink 应用程序启动预提交阶段


可是,当进程具备『外部』状态时,须要做些额外的处理。外部状态一般以写入外部系统(如 Kafka)的形式出现。在这种状况下,为了提供 Exactly-Once 保证,外部系统必须支持事务,这样才能和两阶段提交协议集成。


在本文示例中的数据须要写入 Kafka,所以数据输出端( Data Sink )有外部状态。在这种状况下,在预提交阶段,除了将其状态写入 state backend 以外,数据输出端还必须预先提交其外部事务。


640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1

exactly-once-two-phase-commit-4


当 checkpoint barrier 在全部 operator 都传递了一遍,而且触发的 checkpoint 回调成功完成时,预提交阶段就结束了。全部触发的状态快照都被视为该 checkpoint 的一部分。checkpoint 是整个应用程序状态的快照,包括预先提交的外部状态。若是发生故障,咱们能够回滚到上次成功完成快照的时间点。


下一步是通知全部 operator,checkpoint 已经成功了。这是两阶段提交协议的提交阶段,JobManager 为应用程序中的每一个 operator 发出 checkpoint 已完成的回调。


数据源和 windnow operator 没有外部状态,所以在提交阶段,这些 operator 没必要执行任何操做。可是,数据输出端(Data Sink)拥有外部状态,此时应该提交外部事务。


640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1

exactly-once-two-phase-commit-5


咱们对上述知识点总结下:


  • 一旦全部 operator 完成预提交,就提交一个 commit。

  • 若是至少有一个预提交失败,则全部其余提交都将停止,咱们将回滚到上一个成功完成的 checkpoint 。

  • 在预提交成功以后,提交的 commit 须要保证最终成功 - operator 和外部系统都须要保障这点。若是 commit 失败(例如,因为间歇性网络问题),整个 Flink 应用程序将失败,应用程序将根据用户的重启策略从新启动,还会尝试再提交。这个过程相当重要,由于若是 commit 最终没有成功,将会致使数据丢失。


所以,咱们能够肯定全部 operator 都赞成 checkpoint 的最终结果:全部 operator 都赞成数据已提交,或提交被停止并回滚


在 Flink 中实现两阶段提交 Operator

完整的实现两阶段提交协议可能有点复杂,这就是为何 Flink 将它的通用逻辑提取到抽象类 TwoPhaseCommitSinkFunction 中的缘由。

接下来基于输出到文件的简单示例,说明如何使用 TwoPhaseCommitSinkFunction 。用户只须要实现四个函数,就能为数据输出端实现 Exactly-Once 语义:


  • beginTransaction - 在事务开始前,咱们在目标文件系统的临时目录中建立一个临时文件。随后,咱们能够在处理数据时将数据写入此文件。

  • preCommit - 在预提交阶段,咱们刷新文件到存储,关闭文件,再也不从新写入。咱们还将为属于下一个 checkpoint 的任何后续文件写入启动一个新的事务。

  • commit - 在提交阶段,咱们将预提交阶段的文件原子地移动到真正的目标目录。须要注意的是,这会增长输出数据可见性的延迟。

  • abort - 在停止阶段,咱们删除临时文件。


咱们知道,若是发生任何故障,Flink 会将应用程序的状态恢复到最新的一次 checkpoint 点。一种极端的状况是,预提交成功了,但在此次 commit 的通知到达 operator 以前发生了故障。在这种状况下,Flink 会将 operator 的状态恢复到已经预提交,但还没有真正提交的状态。


咱们须要在预提交阶段保存足够多的信息到 checkpoint 状态中,以便在重启后能正确的停止或提交事务。在这个例子中,这些信息是临时文件和目标目录的路径。


TwoPhaseCommitSinkFunction 已经把这种状况考虑在内了,而且在从 checkpoint 点恢复状态时,会优先发出一个 commit 。咱们须要以幂等方式实现提交,通常来讲,这并不难。在这个示例中,咱们能够识别出这样的状况:临时文件不在临时目录中,但已经移动到目标目录了。


在 TwoPhaseCommitSinkFunction 中,还有一些其余边界状况也会考虑在内,请参考 Flink 文档了解更多信息。


总结

总结下本文涉及的一些要点:


  • Flink 的 checkpoint 机制是支持两阶段提交协议并提供端到端的 Exactly-Once 语义的基础。

  • 这个方案的优势是: Flink 不像其余一些系统那样,经过网络传输存储数据 - 不须要像大多数批处理程序那样将计算的每一个阶段写入磁盘。

  • Flink 的 TwoPhaseCommitSinkFunction 提取了两阶段提交协议的通用逻辑,基于此将 Flink 和支持事务的外部系统结合,构建端到端的 Exactly-Once 成为可能。

  • 从 Flink 1.4.0 开始,Pravega 和 Kafka 0.11 producer 都提供了 Exactly-Once 语义;Kafka 在0.11版本首次引入了事务,为在 Flink 程序中使用 Kafka producer 提供 Exactly-Once 语义提供了可能性。

  • Kafka 0.11 producer的事务是在 TwoPhaseCommitSinkFunction 基础上实现的,和 at-least-once producer 相比只增长了很是低的开销。


这是个使人兴奋的功能,期待 Flink TwoPhaseCommitSinkFunction 在将来支持更多的数据接收端。

转自:https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247483737&idx=1&sn=666557f58ee7be285e3edcc08a386fde&chksm=fd3b8f1bca4c060dbf0af968f3e871d7e3e734c4d23859ff66ab2cbc064036249245d4129368&xtrack=1&scene=90&subscene=93&sessionid=1558483923&clicktime=1558483932&ascene=56&devicetype=android-25&version=2700043a&nettype=WIFI&abtest_cookie=BAABAAoACwASABMABgAjlx4AVpkeAMaZHgDcmR4A%2BJkeAAOaHgAAAA%3D%3D&lang=zh_CN&pass_ticket=3QYkRaqlS27lCm7tC4NENdPcU7WhFO0wuwvcaH8DF%2B0Qlx9BMXZd6uAp2lFq6p1P&wx_header=1

相关文章
相关标签/搜索