目录html
分布式系统须要解决:分配和管理在集群的计算资源、处理配合、持久和可访问的数据存储、失败恢复。Fink专一分布式流处理。算法
Components of a Flink Setupapache
task是最基本的调度单位,由一个线程执行,里面包含一个或多个operator。多个operators就成为operation chain,须要上下游并发度一致,且传递模式(以前的Data exchange strategies)是forward。缓存
slot是TM的资源子集。结合下面Task Execution的图,一个slot并不表明一个线程,它里面并不必定只放一个task。多个task在一个slot就涉及slot sharing group。一个jobGraph的任务须要多少slot,取决于最大的并发度,这样的话,并发1和并发2就不会放到一个slot中。Co-Location Group是在此基础上,数据的forward形式,即一个slot中,若是它处理的是key1的数据,那么接下来的task也是处理key1的数据,此时就达到Co-Location Group。安全
尽管有slot sharing group,但一个group里串联起来的task各自所需资源的大小并很差肯定。阿里平常用得最多的仍是一个task一个slot的方式。bash
Session模式(上图):预先启动好AM和TM,每提交一个job就启动一个Job Manager并向Flink的RM申请资源,不够的话,Flink的RM向YARN的RM申请资源。适合规模小,运行时间短的做业。./bin/flink run ./path/to/job.jar
网络
Job模式:每个job都从新启动一个Flink集群,完成后结束Flink,且只有一个Job Manager。资源按需申请,适合大做业。./bin/flink run -m yarn-cluster ./path/to/job.jar
session
下面是简单例子,详细看官网。数据结构
# 启动yarn-session,4个TM,每一个有4GB堆内存,4个slot cd flink-1.7.0/ ./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m -s 4 # 启动做业 ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar
细节取决于具体环境,如不一样的RM多线程
Application Deployment
Framework模式:Flink做业为JAR,并被提交到Dispatcher or JM or YARN。
Library模式:Flink做业为application-specific container image,如Docker image,适合微服务。
Task Execution
做业调度:在流计算中预先启动好节点,而在批计算中,每当某个阶段完成计算才启动下一个节点。
资源管理:slot做为基本单位,有大小和位置属性。JM有SlotPool,向Flink RM申请Slot,FlinkRM发现本身的SlotManager中没有足够的Slot,就会向集群RM申请。后者返回可用TM的ip,让FlinkRM去启动,TM启动后向FlinkRM注册。后者向TM请求Slot,TM向JM提供相应Slot。JM用完后释放Slot,TM会把释放的Slot报告给FlinkRM。在Blink版本中,job模式会根据申请slot的大小分配相应的TM,而session模式则预先设置好TM大小,每有slot申请就从TM中划分相应的资源。
任务能够是相同operator (data parallelism),不一样 operator (task parallelism),甚至不一样application (job parallelism)。TM提供必定数量的slots来控制并行的任务数。
上图A和C是source function,E是sink function,小数字表示并行度。
一个TM是一个JVM进程,它经过多线程完成任务。线程的隔离不太好,一个线程失败有可能致使整个TM失败。
Highly-Available Setup
从失败中恢复须要重启失败进程、做业和恢复它的state。
当一个TM挂掉而RM又没法找到空闲的资源时,就只能暂时下降并行度,直到有空闲的资源重启TM。
当JM挂掉就靠ZK来从新选举,和找到JM存储到远程storage的元数据、JobGraph。重启JM并从最后一个完成的checkpoint开始。
JM在执行期间会获得每一个task checkpoints的state存储路径(task将state写到远程storage)并写到远程storage,同时在ZK的存储路径留下pointer指明到哪里找上面的存储路径。
背压
数据涌入的速度大于处理速度。在source operator中,可经过Kafka解决。在任务间的operator有以下机制应对:
Local exchange:task1和2在同一个工做节点,那么buffer pool能够直接交给下一个任务,但下一个任务task2消费buffer pool中的信息速度减慢时,当前任务task1填充buffer pool的速度也会减慢。
Remote exchange:TM保证每一个task至少有一个incoming和一个outgoing缓冲区。当下游receiver的处理速度低于上有的sender的发送速度,receiver的incoming缓冲区就会开始积累数据(须要空闲的buffer来放从TCP链接中接收的数据),当挤满后就再也不接收数据。上游sender利用netty水位机制,当网络中的缓冲数据过多时暂停发送。
TM负责数据在tasks间的转移,转移以前会存储到buffer(这又变回micro-batches)。每一个TM有32KB的网络buffer用于接收和发送数据。若是sender和receiver在不一样进程,那么会经过操做系统的网络栈来通讯。每对TM保持permanent TCP链接来交换数据。每一个sender任务可以给全部receiving任务发送数据,反之,全部receiver任务可以接收全部sender任务的数据。TM保证每一个任务都至少有一个incoming和outgoing的buffer,并增长额外的缓冲区分配约束来避免死锁。
若是sender和receiver任务在同一个TM进程,sender会序列化结果数据到buffer,若是满了就放到队列。receiver任务经过队列获得数据并进行反序列化。这样的好处是解耦任务并容许在任务中使用可变对象,从而减小了对象实例化和垃圾收集。一旦数据被序列化,就能安全地修改。而缺点是计算消耗大,在一些条件下可以把task穿起来,避免序列化。(C10)
Flow Control with Back Pressure
receiver放到缓冲区的数据变为队列,sender将要发送的数据变为队列,最后sender减慢发送速度。
event time处理的数据必须有时间戳(Long unix timestamp)并定义了watermarks。watermark是一种特殊的records holding a timestamp long value。它必须是递增的(防止倒退),有一个timestamp t(下图的5),暗示全部接下来的数据都会大于这个值。后来的,小于这个值,就被视为迟来数据,Flink有其余机制处理。
Watermarks and Event Time
WM在Flink是一种特殊的record,它会被operator tasks接收和释放。
tasks有时间服务来维持timers(timers注册到时间服务上),在time-window task中,timers分别记录了各个window的结束时间。当任务得到一个watermark时,task会根据这个watermark的timestamp更新内部的event-time clock。任务内部的时间服务肯定全部timers时间是否小于watermark的timestamp,若是大于则触发call-back算子来释放记录并返回结果。最后task还会将更新的event-time clock的WM进行广播。(结合下图理解)
只有ProcessFunction能够读取和修改timestamp或者watermark(The ProcessFunction
can read the timestamp of a currently processed record, request the current event-time of the operator, and register timers)。下面是PF的行为。
当收到WM大于全部目前拥有的WM,就会把event-time clock更新为全部WM中最小的那个,并广播这个最小的WM。即使是多个streams输入,机制也同样,只是增长Paritition WM数量。这种机制要求得到的WM必须是累加的,并且task必须有新的WM接收,不然clock就不会更新,task的timers就不会被触发。另外,当多个streams输入时,timers会被WM比较离散的stream主导,从而使更密集的stream的state不断积累。
Timestamp Assignment and Watermark Generation
当streaming application消化流时产生。Flink有三种方式产生:
AssignerWithPeriodicWatermarks
提取每条记录的timestamp,并周期性的查询当前WM,即上图的Partition WM。AssignerWithPunctuatedWatermarks
能够从每条数据提取WM。上面两个User-defined timestamp assignment functions一般用在source operator附近,由于stream一经处理就很难把握record的时间顺序了。因此UDF能够修改timestamp和WM,但在数据处理时使用不是一个好主意。
由任务维护并用于计算函数结果的全部数据都属于任务的state。其实state能够理解为task业务逻辑的本地或实例变量。
在Flink,state老是和特定的operator关联。operator须要注册它的state,而state有两种类型:
上面两种state的存在方式有两种:raw和managed,通常都是用后者,也推荐用后者(更好的内存管理、不需造轮子)。
State Backends
state backend决定了state如何被存储、访问和维持。它的主要职责是本地state管理和checkpoint state到远程。在管理方面,可选择将state存储到内存仍是磁盘。checkpoint方面在C8详细介绍。
MemoryStateBackend, FsStateBackend, RocksDBStateBackend适合愈来愈大的state。都支持异步checkpoint,其中RocksDB还支持incremental的checkpoint。
Scaling Stateful Operators
Flink会根据input rate调整并发度。对于stateful operators有如下4种方式:
keyed state:根据key group来调整,即分为同一组的key-value会被分到相同的task
list state:全部list entries会被收集并从新均匀分布,当增长并发度时,要新建list
union list state:增长并发时,广播整个list,因此rescaling后,全部task都有全部的list state。
Flink’s Lightweight Checkpointing Algorithm
在分布式开照算法Chandy-Lamport的基础上实现。有一种特殊的record叫checkpoint barrier(由JM产生),它带有checkpoint ID来把流进行划分。在CB前面的records会被包含到checkpoint,以后的会被包含在以后的checkpoint。
当source task收到这种信息,就会中止发送recordes,触发state backend对本地state的checkpoint,并广播checkpoint ID到全部下游task。当checkpoint完成时,state backend唤醒source task,后者向JM肯定相应的checkpoint ID已经完成任务。
当下游得到其中一个CB时,就会暂停处理这个CB对应的source的数据(完成checkpoint后发送的数据),并将这些数据存到缓冲区,直到其余相同ID的CB都到齐,就会把state(下图的十二、8)进行checkpoint,并广播CB到下游。直到全部CB被广播到下游,才开始处理排队在缓冲区的数据。固然,其余没有发送CB的source的数据会继续处理。
最后,当全部sink会向JM发送BC肯定checkpoint已完成。
这种机制还有两个优化:
Recovery from Consistent Checkpoints
上图队列中的7和6之因此能恢复,取决于数据源是否resettable,如Kafka,不会由于发送信息就把信息删除。这才能实现处理过程的exactly-once state consistency(严格来说,数据仍是被重复处理,可是在读档后重复的)。可是下游系统有可能接收到多个结果。这方面,Flink提供sink算子实现output的exactly-once,例如给checkpoint提交records释放记录。另外一个方法是idempotent updates,详细看C7。
Savepoints
checkpoints加上一些额外的元数据,功能也是在checkpoint的基础上丰富。不一样于checkpoints,savepoint不会被Flink自动创造(由用户或者外部scheduler触发创造)和销毁。savepoint能够重启不一样但兼容的做业,从而:
也能够用于暂停做业,经过savepoint查看做业状况。
参考
Stream Processing with Apache Flink by Vasiliki Kalavri; Fabian Hueske