flink ---- 系统内部消息传递的exactly once语义

At Most once,At Least once和Exactly once

在分布式系统中,组成系统的各个计算机是独立的。这些计算机有可能fail。html

一个sender发送一条message到receiver。根据receiver出现fail时sender如何处理fail,能够将message delivery分为三种语义:node

 

At Most once: 对于一条message,receiver最多收到一次(0次或1次).算法

能够达成At Most Once的策略:异步

sender把message发送给receiver.不管receiver是否收到message,sender都再也不重发message.分布式

 

At Least once: 对于一条message,receiver最少收到一次(1次及以上).3d

能够达成At Least Once的策略:htm

sender把message发送给receiver.当receiver在规定时间内没有回复ACK或回复了error信息,那么sender重发这条message给receiver,直到sender收到receiver的ACK.blog

 

Exactly once: 对于一条message,receiver确保只收到一次内存

 

Flink的Exactly once模式

Flink实现Exactly once的策略: Flink会持续地对整个系统作snapshot,而后把global state(根据config文件设定)储存到master node或HDFS.当系统出现failure,Flink会中止数据处理,而后把系统恢复到最近的一次checkpoint.get

什么是分布式系统的global state?

分布式系统由空间上分立的process和链接这些process的channel组成.

空间上分立的含义是,这些process不共享memory,而是经过在communication channel上进行的message pass来异步交流.

分布式系统的global state就是全部process,channel的local state的集合.

process的local state取决于the state of local memory and the history of its activity.

channel的local state是上游process发送进channel的message集减去下游process从channel接收的message的差集.

什么是一致性global state?

假设有两个银行帐户A,B.A中初始有600美圆,B中初始有200美圆. SA, SB, CAB, CBA由A和B分别记录,组成了global state.

在t0时刻,A向B转帐50美圆;在t1时刻,B向A转帐80美圆.

若是SA, SB记录于(t0, t1), CAB, CBA记录于(t1, t2),那么global state = 550+200+50+80 = 880,比真实值多了$80. 这就是不一致性global state.

若是 SA, SB, CAB, CBA同属于一个时间区间,那么获得的global state就是一致性的.

Snapshot算法得到一致性global state的难点是什么?

分布式系统没有共享内存(globally shared memory)和全局时钟(global clock).

若是分布式系统有共享内存,那么能够从共享内存中直接获取整个分布式系统的snapshot,无需分别得到各个process,channel的local state再组合成global state.

若是分布式系统有global clock,那么全部的process能在同一时刻各自记录local state,这样就保证了state的一致性.

得到一致性global state的算法 ---- Chandy-Lamport算法

精髓:该算法在普通message中插入了control message – marker

前提:

1)       message的传输可能有delay,但必定会到达

2)       每两个process之间都有一条communication path(可能由多条channel组成)

3)       Channel是单向的FIFO

描述:

Marker sending rule for process Pi

(1)     Process Pi 记录自身state

(2)     Pi在记录自身state后,发送下一条message前,Pi向本身全部的outgoing channel发送marker

Marker receiving rule for process Pj on receiving a marker along channel C

若是Pj第一次接收到marker,那么

         把channel C的state记为空集

         执行marker sending rule

不然(并不是第一次接收到marker)

         把记录自身state(或最近一次记录另外一个channel的state)后,收到这个marker前的message集记为C的state

 

每一个process会记录自身的state和它的incoming channel的state

图解:

A,B,C,D表明4个process.有向线段表明FIFO的channel.绿色圆形表明普通message,橙色矩形表明marker.蓝色的节点和线段表明已经记录state的process和channel

Process A启动snapshot算法,A执行marker sending rule(记录自身state,而后发送marker):

Process B接收到marker,执行marker receiving rule:将channel AB的state记为空集,而后记录自身state并向下发送marker:

 

Process C接收到marker, 执行marker receiving rule:将channel AC的state记为空集,而后记录自身state并向下发送marker:

 

Process D接收到来自于process B的marker, 执行marker receiving rule:将channel BD的state记为空集,而后记录自身state并向下发送marker:

 

 

Process D接收到来自于process C的marker, 执行marker receiving rule:这是process D第二次接收到marker,将channel CD的state记为{5},不会向下发送marker:

自此process A,B,C,D的local state和全部Channel的state都记录完毕. 将这些local state组合,所获得的就是global state

Flink的snapshot算法 ---- Asynchronous Barrier Snapshotting(ABS)

为了消去记录channel state这一步骤,process在接收到第一个barrier后不会立刻作snapshot,

而是等待接受其余上游channel的barrier.

在等待期间,process会把barrier已到的channel的record放入input buffer.

当全部上游channel的barrier到齐后,process才记录自身state,以后向全部下游channel发送barrier.

由于先到的barrier会等待后到的barrier,全部全部barrier至关于同时到达process,

所以,该process的上游channel的state都是空集.这就避免了去记录channel的state

图解:

A是JobManager, B C是source,D是普通task.

JobManager发起一次snapshot:向全部source发送barrier.

每一个Barrier前后到达各自的source.Source在收到barrier后记录自身state,而后向下游节点发送barrier

Barrier (from)B 到达process D,但不会进行snapshot

Barrier (from)B已经到达process D,

因此当来自于channel BD的record 6 7到达后,process D不会处理它们,而是将它们放入input buffer.

而Barrier (from)C还没有到达process D,因此当来自于channel CD的record 4到达后,process D会处理它.

Barrier C也到达process D.

这样,process D已经接收到了全部上游process的barrier.process D记录自身state,而后向下游节点发送barrier

ABS的at least once模式

当process接收到barrier后,会马上作snapshot. Process会继续处理全部channel的record.后来的snapshot会覆盖以前的snapshot.

Record 6本不属于此次checkpoint,却包含在process D的local state中.

在recovery时,source认为record 6尚未被处理过,因此重发record 6. 这就致使stream中出现了两个record 6,形成了at least once.

 

这里的问题在于,当第二个barrier到达时,节点D再次对自身作了snapshot.

而在Chandy-Lamport的算法中,第二个barrier到达时,节点D应该对barrier来源的channel作snapshot.

 

对单一input channel的算子来讲,没有Alignment这个概念.这些算子在at least once模式下也是呈现exactly once的行为

 

原文:http://www.javashuo.com/article/p-swokzjdg-ea.html

相关文章
相关标签/搜索