漫谈流式计算的一致性

参考,html

http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/算法

http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/数据库

 

image

对于batch分析,fault-tolerant很容易作,失败只须要replay,就能够完美作到容错。apache

对于streaming分析, 数据流自己是动态,没有所谓的开始或结束,虽然能够replay buffer的部分数据,但fault-tolerant作起来会复杂的多编程

当前主流的一些streaming分析平台,都有一些各自特有的fault-tolerant的机制,在此分析和总结一下,windows

无状态流数据处理,网络

这是种比较简单的流式数据的场景,典型的应用是数据ETL,数据存储,数据流过是没有状态的分布式

保证at least once语义, 
分钟级别,Storm的acker机制,就能够很好的保证, http://storm.apache.org/documentation/Guaranteeing-message-processing.html 
message没有被正确处理,收到ack时,能够选择重发,这样每条message对能够保证被处理到,但可能会被重复处理性能

小时,天级别,利用kafka的replay,通常达到天级别的cache优化

保证exactly once语义, 
对于无状态数据流,其实只要依赖最终存储的去重性(deduplication), 就能够达到exactly once 
好比对于数据库,经过unique key和insert ignore就能够解决这个问题,不管你以前重复处理多少次,最终我只存储一次。

若是最终存储不支持去重,或者场景比较复杂不只仅是存储,好比作叠加计数 或 update 
作叠加计数,当前的机制,你没法知道这个message是否加过 
作update的时候,更新的时序性很重要,这个是ack机制没法保证的

Storm 0.7就提供transactional topology特性,http://storm.apache.org/documentation/Transactional-topologies.html

首先给message加上transaction id,这样有两个好处,能够保证时序性,在写入存储的时候,能够按transaction id顺序写入 
而且在能够外部存储上记录当前最新的transaction id,保证相同的transaction,不会被重复写入 
这个是transactional topology的核心思路,这样确实是能够保证强一致性,exactly once语义 
但这个方案只适用于无状态,或是依赖外部存储的,状态必需要存储在外部存储上

至于使用batch,或将topology分为processing和commit阶段,都是对性能的优化,并不会提高一致性的保障 
但因为使用micro-batch是必须的,因此也称这类方案是micro-batch方案,除了transactional topology,还有Apache Spark Streaming 
micro-batch的坏处, 
1. 改变编程模型,伪流式 
2. windows based聚合的限制,只能是micro-batch的倍数,好比micro-batch是3分钟,你想作个5分钟聚合,无法作 
2. 延迟变大,若是自己秒级别,但若是micro-batch是1分钟,那延迟就至少1分钟 

有状态流数据处理,

典型的场景,就是windows-based的聚合或计算,好比计算1分钟内的计数或平均值,这样会有部分数据须要cache在内存中 
这样当fail-over时,如何能够恢复cache,并保证exactly once语义

最直接的想法,

局部的snapshot

每一个component对cache按期作snapshot,而后在fail-over后,各自恢复本身的cache, 
这样作的问题, 
1. snapshot很难增量作,若是cache比较大,成本会比较高 
2. snapshot只能按期作,会有部分丢失 
3. 最关键的,对于分布式系统,各个compoent独立的进行snapshot,很难达到同一个状态,每一个component的处理速度都是不同的,有的处理到n作了snapshot,而有的可能作到n+1才作, 
缺少一个统一的参照系。

 

change-log 
每一个 component,当接收到一个 message 的时候,产生一条 change log 记录该 message 和更新的状态,存入 transactional log 和数据库 
当作 fail-over 的时候,只须要每一个 component 将数据库中的 log,拿出来 replay 便可 
这种方式使用的平台如 Google Cloud Dataflow,Apache Samza

对于 Apache Samza,会将 change log 放入kafka中,

image

当fail-over后,每一个task从相应的kafka topic里面读出change-log,完成local state的replay

这样作的好处,是不用直接去snapshot local cache,若是cache比较大的话,这样是比较划算的 
可是若是数据流很big的话,这样作也不合适了,由于change-log会很是大

 

Distributed Snapshots (Apache Flink),全局的 snapshot

针对前面提到的局部 snapshot 最关键的问题,提出全局 snapshot 的方法, 
其实最大的问题仍然是分布式系统的根本问题,统一参照系的问题,如何让每一个 component 在同一的状态下,进行 snapshot

这个原理来自 Chandy and Lamport, 1985,的paper “Distributed Snapshots: Determining Global States of Distributed Systems”

http://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/

局部的snapshot会有的问题,

状态丢失,以下图,但状态中传输的时候,对P和Q进行snapshot,会致使队列中的绿蓝橙状态丢失

image

状态重复,brown状态中P和Q的snapshot里面同时出现

image

怎么解这样的问题?分布式系统中缺少统一参照系的状况下,只有经过通讯才能肯定偏序的问题 
因此这里使用marker来作组件间的同步,并防止丢失状态,会同时对组件,以及队列同时作snapshot, 以下图

image

P作snapshot,而后发送marker到Q 
Q收到marker的时候,知道P作了snapshot,那么我也要作snapshot 
同时还要对PQ channel作snapshot,此时channel中有个green,可是因为green是在marker后面的,说明它在P的snapshot里面已经作过,不须要再作,因此此时PQ的snapshot为空 
Q在作完snapshot后,还须要把marker返回给P,由于在过程当中orange从Q被发送到P 
当P收到Q返回的marker时,因为P的snapshot已经作过,没法改变 
因此把orange放在QP channel的snapshot中

最终作出的全局的snapshot为,

P(red, green, blue)
channel PQ ()
Q(brown, pink)
channel QP (orange)

这样就解决了状态丢或重复的问题

 

Flink’s distributed snapshotting实现基于stream barriers

image

可见,barrier能够将流拆分红一段段的数据,每一个barrier都是一个snapshot点,可是这种拆分不一样于micro-batch,并不会影响到正常的流式处理 
在DAG,即有向无环图的case下,是不须要对channel作snapshot的,场景会比较简单 
只是每一个组件收到barrier的时候去作snapshot就好,该算法的几个前提: 
1. 网络可靠,消息FIFO; 
2. channel能够block,unblock,支持对全部output channel进行广播 
3. 可自动识别注入的barrier

完成过程如图,这是个有两条入边的case,相对复杂些 
当收到一条channel的barrier时,须要先block该channel,而后等待另外一个channel中的barrier 
当两条channel的barrier都到达时,说明达到统一状态,进行checkpoint 
而后unblock以前block的channel,并对全部的output channel广播该barrier

image

当DAG上的全部组件都完成snapshot时,那么一个全局的snapshot就完成了,以barrier为惟一标识

比较抽象,下图以kafka为例子解释一下,https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

image

对于kafka而言,不一样的partition须要不一样的线程读, 
图中,4个source thread分别从4个partition读取数据 
其中由惟一的master来发起checkpoint流程, 
过程是, 
1. Master给全部的source thread发checkpoint请求 
2. source thread接收到cp请求后,会记录当前的offset,好比5791,并作该offset的message前发出streaming barrier 
    并将offset返回给master

3. 这样master收到全部source的ack offset,就至关于对source作了snapshot,恢复时只须要将相应的source置到该offset便可 
4. 中间每一个组件,当收到全部input channel的barrier时,将cp存入数据库,并通知Master 
5. 层层下去,直到全部Sink节点,最终节点,完成snapshot

6. master接收到全部节点的作完cp的ack,知道此次checkpoint所有完成

这个方案的最大的问题是,当多个input channel时,须要等全部的barrier到齐,这个明显会增长latency 
Flink的优化是,不等,看到barrier就打snapshot,这样的问题就是没法保证exactly once,会重复, 
由于后来的barrier打checkpoint时会覆盖先前的cp, 
此时barrier先到的channel已经处理了一些barrier以后的数据,这部分结果也会存在cp中

但当fail-over的时候,由于replay是根据你发送barrier的offset来重发的,因此这部分会重复

相关文章
相关标签/搜索