Flink架构,源码及debug

        工做中用Flink作批量和流式处理有段时间了,感受只看Flink文档是对Flink ProgramRuntime的细节描述不是不少, 程序员仍是看代码最简单和有效。因此想写点东西,记录一下,若是能对别人有所帮助,善莫大焉。html

        说一下个人工做,在一个项目里咱们在Flink-SQL基础上构建了一个SQL Engine, 使懂SQL非技术人员可以使用SQL代替程序员直接实现Application, 而后在此基础上在加上一些拖拽的界面,使不懂SQL非技术人员 利用拖拽实现批量或流式数据处理的Application 。 公司的数据源多样且庞大,发布渠道也很丰富, 咱们在SQL Engine 里实现了各类各样的Table Source (数据源) , Table Sink (数据发布)和 UDF (计算器), 公司里有不少十分懂业务专业分析员,若是他们真的能够简简单单,托托拽拽的操做大数据,创建计算模型,而后快速上线和发布,这样的产品应该前景广阔。java

        但是后台并不是提及来这么简单,SQL使用不善,难以达到业务想要的效果,数据量一上来各类问题会出现,后端须要大量的优化工做, 好比 数据倾斜, 是最常发生的事情。SQL基本上是一个Join Language。用户常常会将一个大数据源和一个小数据源作Inner Join, 若是大数据源的数据项很大部分都使用极少数的几个join key, 就很容易出现数据倾斜。现实倾斜的或不均衡的,好比国际资本>80%以用美圆计价,世界人口50%属于某两个国家, 财富主要有20%的人拥有, 等等 。 Flink 若是把SQL join 执行成Hash Join, 最后的结果是不管你实现分配了多少个TaskSlots, 若是80%的数据都跑到某一个TaskSlot里,缓慢运行直至将个这Slot的资源耗尽,整个job失败。这种状况最好是将小数据集广播给全部的下游通道, 大数据集按原始的分片并行,这样的join因分配均衡而快速。然而标准SQL里没有办法指定joinhint , Flink sql也不支持这个,只能经过debug flink 来看看哪里能作一些改变解决这个问题。咱们在最后一章,从Flink client , flink optimizer, flink run-time (job manager, task manager) 一步一步的 在源码里设置断点, debug, 将数据流过一遍,看看有哪些方案能够将这个小数据集合广播起来。git

       为了使本文读起来流畅一些, 我先经过几个章节大概介绍一下Flink 。本文关心架构, 因此不会涉及不少关于API的东西(好比Flink streaming 的windowing, watermark, Dataset, DataStream, 及SQL的API等, 网上应该有不少关于这些的文章)。只是想大概梳理一个Flink的架构,使架构对应到源码结构里, 了解一下Flink的 Graph metadata, 高可靠性的设计,不一样cluster环境里 depoloyment的实现等, 最后利用IntelliJ IDEA 通過一個小例子带你们debug一下Flink 。若是对flink的架构有较好的理解(好比主要类及metadata),就比较容易在准确的地方设置断点,debug Flink代码将更有效率,从而解决问题就会更有效率, 这就是本文的目的。大概了解一下框架,但并不会面面俱到。当若是你须要深刻了解一下Flink某方面的细节, 本文可以告诉你入口在哪里,或者经过对架构了解过程当中获得的common sense , 再加上一点想象力, 你或许直接可以获得解决问题的方案, 而后再经过阅读源码及调试来加以验证。 程序员

1.  Flink的架构简介

1.1  Flink 分布式运行环境(官方图)

(图-1 Flink Runtime 来自:https://ci.apache.org/projects/flink/flink-docs-release-1.6/concepts/runtime.htmlgithub

关于架构,先上一个官方的图,尽管对于Flink架构,上图不是很准确(好比client与JobManager的通信已经改成REST 方式, 而非AKKA的actor system),咱们仍是能够知道一些要点:web

  • FlinkCluster: Flink的分布式运行环境是由一个(起做用的)JobManager 和多个TaskManager组成。每个JobManager(JM)或TaskManager(TM)都运行在一个独立的JVM里,他们之间经过AKKA的Actor system (创建的RPC service)通信。全部的TaskManager 都有JobManager管理,Flink distributed runtime其实是一个没有硬件资源管理 的软件集群 ( FlinkCluster ), JM是这个FlinkCluster 的master, TM是worker。  因此将Flink运行在真正的cluster 环境里(可以动态分配硬件资源的cluster,好比Yarn, Mesos, kubernetes), 只须要将JM 和 TM运行在这些集群资源管理器分配的容器里,配置网络环境和集群服务使AKKA能工做起来, Flink cluster 看起来就能够工做了。具体的关于怎么将Flink 部署到不一样的环境, 以后有介绍, 虽然没有上面说的这么简单,还有一些额外的工做, 不过大概就是这样:由于Flink runtime 自身已经经过AKKA的 sharding cluster创建了FlinkCluster, 部署到外围的集群管理只是为了获取硬件资源服务。 Flink不是搭建在零基础上的框架,任何功能都要本身从新孵化,实际上它使用了大量的优秀的开源框架, 好比用AKKA实现软件集群及远程方法调用服务(RPC), 用ZooKeeper提升JM的高可用性, 用HDFS,S3, RocksDB 永久存储数据, 用 Yarn/Mesos/Kubernetes作容器资管理, 用Netty 作高速数据流传输等等。
  • JobManager: JobManager 做为FlinkCluster的manager,它是由一些Services 组成的,有的service 接受从flink client端 提交的Dataflow Graph(JobGraph),并将JobGraph schedule 到TaskManager里运行,有的Service 协调作每一个operator 的checkpoint以备job graph运行失败后及时恢复到失败前的现场从而继续运行 , 有的Service负责资源管理, 有的service 负责高可用性,后面在详细介绍。值得一提的是,集群里有且只有一个工做的JM, 它会对每个job实例化一个Job
  • TaskManager : TaskManager是slot  的提供者和sub task的执行者。一般Flink Cluster里会有多个TM, 每一个TM都拥有可以同时运行多个SubTask的限额,Flink称之为TaskSlot。当TM启动后, TM 将slot限额注册到Cluster里的JM的ResourceManager(RM), RM知道从而Cluster中的slot 总量,并要求TM将必定数量的slot 提供给JM,从而JM 能够将Dataflow Graph的task(sub task)分配给TM 去执行。TM是运行并行子任务(sub Task) 的载体 (一个Job workflow 须要分解成不少task, 每个task 分解成一个过多个并行子任务:sub Task) , TM须要把这些sub task在本身的进程空间里运行起来, 并且负责传递他们之间的输入输出数据, 这些数据 包括是本地task的和运行在另外一个TM里的远程Task 。关于如何具体excute Tasks, 和交换数据 后面介绍。
  • Client:Client端(Flink Program)经过invoke 用户jar文件 (flink run 中提供的 jar file)的里main函数 (注册data source, apply operators, 注册data sink, apply data sink),从而在ExecutionEnvironment或StreamingExecutionEnvironment里创建sink operator 为根的一个或多个FlinkPlan(以sink为根, source 为叶子, 其余operator为中间节点的树状结构), 以后client用Flink-Optimizer将Plan优化成OptimimizedPlan(根据Cost estimator计算出来的cost 优化operator在树中的原始顺序, 同时加入了Operator与Operator链接的边 , 并根据规则设置每一个边的shipingStrategy, 实际上OptimizedPlan已经从一个树结构转换成一个图结构), 以后使用GraphGenerator(或StreamingGraphGenerator)将OptimizedPlan转化成JobGraph提交给JobManager, 这个提交是经过JM的DispatcherRestEndPoint提交的。
  • Communication: JobManager 与Taskanager都是AKKA cluster里的注册的actor, 他们之间很容易经过AKKA(实现的RPCService)通信。 client与JobManager在之前(Version 1.4及之前)也是经过AKKA(实现的RPCService)通信的,但Version1.5及之后版本的JobManager里引入DispatcherRestEndPoint (目的是使Client请求能够在穿过Firewall ?),今后client端与JobManager提供的REST EndPoint通信。Task与Task之间的数据(data stream records)(好比一个reduce task的input来自与graph上前一个map, output 给graph上的另外一个map), 若是这两个Task运行在不一样的TM上,数据是经过由TM上的channel manager 管理的tcp channels传递的。

 

1.2  JobManager

 (图-2,JobManager的内部结构) 算法

 如上一章所述, JobManager 是一个单独的进程(JVM),  它是一个Flink Cluster的 master 、中心和大脑, 他由一堆services 组成(主要是Dispather, JobMaster 和ResourceManager),链接cluster里其余分布式组件 (TaskManager, client及其余外部组件),指挥、得到协助、或提供服务。sql

  • ClusuterEntryPoint是JobManager的入口,它有一个main method ,用来启动HearBeatService, HA Sercie, BlobServer,  Dispather RESTEndPoint, Dispather, ResourceManager 。不一样的FlinkCluster有不一样的ClusuterEntryPoint 的子类,用于启动这些Service在不一样Cluster里的不一样实现类或子类。Flink目前(version1.6.1)实现的FlinkCluster 包括:
    • MiniCluster : JM和TM都运行在同一个JVM里,主要用于在 IDE (IntelliJ或Eclipse)调试 Flink Program (也叫作 application )。
    • Standalone cluster : 不链接External Service (上图中灰色组件,如HA,Distributed storage, hardware Resoruce manager), JM和TM运行在不一样的JVM里。 Flink release 中start-cluster.sh启动的就是StandaloneCluster.
    • YarnCluster : Yarn管理的FlinkCluster, JM的ResourceManager链接Yarn的ResourceManager建立容器运行TaskManager。BlobServer, HAService 链接外部服务,使JM更可靠。
    • MesosCluster : Mesos管理的FlinkCluster, JM的ResourceManager链接Mesos的ResourceManager建立容器运行TaskManager。BlobServer, HAService 链接外部服务,使JM更可靠。
  • HighAvailabilityService:重复以前的话:JM是一个Flink Cluster的 master 、中心和大脑, 若是JM崩溃了,整个cluster就没法运行了。HAService可以使多个JobManager同时运行,并选举一个JM做为Leader, 当Leader失败后在从新选举,使另个健康的JM取而代之成为leader, 从HA存储中读取MetaData(Graph,snapshot)从而 继续管理Cluster的运行。HighAvailabilityService 只保护JM里的DispatcherRestEndpoint, Dispatcher, ResourceManager 和JobMaster 4个核心服务, 从理论上来说, 这些service的各自的leader有可能来自不一样的JM, 这就要看外部作Coordination的服务的Leader Election策略会不会把他们都从一个JM 选了。目前,Flink支持的和在使用的HighAvailbilityService有ZooKeeperHaService和StandaloneHaService。
    • ZooKeeperHaService:链接外部的ZooKeeper cluster作多个JM的Leader Election,从指定的存储(一般是HDFS)存取JM metadata, 从而当JM takeover 或从新启动时可以获取失败以前的snapshot or savepoint, 从而继续服务。
    • StandaloneHaService : 不支持多个JM Election。但支持从指定的存储存取JM metadata, 作失败后重启恢复。
  • BlobServer 使用来存储Client端提交的Flink program jar,  jobGraph file, JM 的全部services , 和全部的TM都链接同一个BlobServer (能够是LocalDisk, HDFS, S3 , 或其余的 Blob数据库)读取这些数据。
  • HeatBeatService , 用来运行JM 与TaskManager之间的心跳服务。 好比 ResourceManager 与JobMaster和全部TaskManager之间的心跳, JobMaster与全部TaskManager之间的心跳。若是心跳消失, 相应的HA 容错措施就要启动。 好比一个TM与JM的心跳没了,那么相应的容错措施就会执行了。好比JobMaster的心跳消失,HA就会从新选举新的JobMaster Leader;TM的心跳消失,ResourceManager就要将task分配到其余空闲的TM的slot里,若是没有空闲的slot ,RM 就会向外部的ResoureManager申请新硬件和启动新的 TM以提供空闲的 slot。Flink的心跳消息是经过AKKA 传递的。
  • DispatcherRESTEndPoint是JM的4大核心服务之一(其余三个分别为Dispatcher, JobMaster和ResourceManager),受HAService的保护, 是Flink客户端与JM交互的REST接口, 也是Flink custer 的WebMonitor。非核心服务实际上都是一些UtilityService, 他们非JM独有,须要用时可随时实例化:好比Client端也会使用HAService来获取DispatcherRESTEndPoint的leader的地址和端口, TM也会使用BlobServer 。DispatcherRESTEndPoint是用Netty搭建的RESTService, 它建立了大概有290个handler 对应不容的资源地址及方法。这些handler大都须要经过RPC方式调用Dispatcher 的远程方法来知足客户的请求。
  • Diaptcher是DispatcherRESTEndPoint的后端服务层,它实现了RestDispatcher接口, 从客户端(包括FlinkClient和Flink Web Dashboard)提交给又有来自于EndPoint的请求,都由这个接口里的方法服务, 这其中最总要的方法就是submitJob。当Dispather受到submitJob的调用时,他会先在本JVM里建立一个JobMaster服务,并将 JobGraph和Flink applicaiton 的jar file , 转交给这个JobMaster去安排job具体的运行。
  • JobMaster的是用于一个Job的Master, 当集群里由多个Job同时运行则会有多个JobMaster同时运行,每个JobMaster只会负责一个job。当接收到jobGraph时, JobMaster首先会将jobGraph转换成ExecutionGraph:一个能够指导task并行运行的数据流程图,并向ResouceManager(RM)申请运行这个ExecutionGaph须要的资源(TaskSlot):好比一个并行度为8的job,必须有8个TaskSlot才能运行起来, 而后按照ExecutionGraph将task schedle到Taskslot中去, 并定时的对task作checkpoint, 以备重启时恢复到崩溃前的现场。
  • ResourceManager负责管理FlinkCluster里全部TaskManager的TaskSlot资源(至关于TM里的一个运行线程)。当一个TM启动时,它会将本身的TaskSlot注册到RM。当JobMaster向RM申请slot时,RM会要求TM将它空闲的slot(已注册到RM,因此TM知道全部slot的状态)提供给JobMaster使用,以后JobMaster才会将相应的Task 安排到slot里运行。若是集群里的TaskSlot不够, RM会向外部的ResourceManager(好比Yarn/Mesos/Hubernetes)申请新的容器(container) 去启动新的TM从而知足JobMaster的slot资源的需求。

1.2.1  展开JobManager后的Flink架构

从以上所述, JobManager是一组Service的总称, 其中真正管理Job调度的组件叫JobMaster ,负责资源管理的组件叫ResoruceManager, 负责接收client端请求的组件叫Dispatcher(包括Dispatcher和DispatchRestEndpoint)。其实Flink源码里有叫JobManager的包和类,功能上也是负责Job调度管理以及snapshot管理,但它应该在Flink某个版本之后就legacy了(估计是从version1.3开始)。这三个服务统称为还叫 JobManager,上真正管理做业的是JobMaster。这一点在读code时让人迷惑,好比JobManagerRunner启动的倒是叫JobMaster的类。可是他不叫JobMasterRunner,这也体现了JobMaster实际是取代了JobManager类,保留legacy类是为了向后兼容。如下是Client, 展开的JobManger(受HA 保护的Dispather, JobMaster, ResourceManager)和TaskManager处理submitJob的流程图,这个比较图-1更能体现当前的Flink runtime架构 (Flink 1.6):docker

 

(图-3)展开JobManager后的Flink 架构, 来自于《 Stream Processing with Apache Flink》shell

 

以上的架构严格来说在Flink里被称做 SessionMode ( Cluster的EntryPoint类都是SessionClusterEntryPoint的子类),  若是没有外部命令 terminate cluster, 在这种模式下的FlinkCluster 是Long running 的, 多个job能够同时运行在同一个flinkcluster里。 SessionMode 在Flink的各类部署都是支持的, 包括Standalone, Kubernetes, Yarn, Mesos, 上图实际上是StandaloneSessionCluster的流程。 还有一种模式叫作JobMode, 区别就是Job(或application) 的main class 和 jar 和在JobManager 启动时经过的启动参数装载的, 不须要submitJob的过程, job运行完毕, cluster自动终结, 全部资源释放。 在这种模式下, Dispather并不负责处理job的提交, 但其余 Client发给DispathcherRESTEndPoint的请求(好比Query, CancelJob), 仍是由Dispatcher处理。

Flink的每一种部署模式(deployment mode)都是既支持Session Mode又支持JobMode的 (或partialy support), 区别如上所述, 但在架构上是一致的。 当有由外部的ResourceManager协助硬件资源分配时,流程略有全部不一样, 以 FlinkCluster in Yarn 为例, SessionMode下, 区别只限于多了RM经过Yarn自动启动TM 的过程(4,5)。

(图-4)FlinkCluster in Yarn Session mode,  来自于《 Stream Processing with Apache Flink》

关于deployment的细节,请参照后面的将Deployment的章节。

1.2.2  JobMaster

如图一所示,JobMaster的主要工做是:

1.  JobGraph的scheduler : 将Client提交的JobGraph按照逻辑的向后关系(source -> transform -> sink), 以及并行关系(每一个operator的子任务只负责所有数据的中一部分), 将子任务分配到TaskManager的Slot中, 并按期的获取每个子任务的运行状态 (status)。

2. 触发和管理Job的checkpoint snapshot:对于streaming job,按期的将运行中的每一个operator 的状态(State)数据存入规定的存储设备, 这些state数据能够用于在Job恢复运行时,恢复相关子任务的失败前的现场。

 

 (图-5)JobMaster内部结构

  •  ExcutionGraph (EG) 是JobMaster 最核心的组件,它承担了JobMaster 上述的的两大责任: job scheduling 和 checkpoint snapshot  。EG的细节下节展开。
  • SlotPool 存放由全部TM Offer 过来的slot 。Offer 的过程就是图-3中的3,4,5 或图-4中的3,4,5,6,7,8。当EG须要slot去执行给sub Task时, 它就从SlotPool里根据必定的策略poll 一个slot ,而后将SubTask打包 (这个在TM讲解中展开) 发送相应的TM 去执行 。 SlotPool实现了一个RPCEndPoint : SlotPoolGateway, 如图-5中所示,感受这个Gatway是为TM OfferSlot准备的。 实际上TM调用的是JobMasterGateway (到JobMaster), 而后JobMaster 经过SlotPoolGateway这个RPC 接口与SlotPool通信的。 看代码时看到SlotPoolGateway时比较奇怪的, 由于它做为JobMaster的组件,是没有必要实现为PCEndPoint的。集群中运行的每个Job, 都会由一个JobMaster建立出来为之服务, 每个JobMaster 都有一个SlotPool存放这个Job分配的Slot 。有一种多是Slotpool的实现这打算将slotpool共享给全部的的JobMaster ? 若是那样的Slotpool  须要由Zookeepr 管理作Leader Selection 和 FailOver, 其实也没什么必要。
  • JobMasterGateway 是外界(ResourceManager, TaskManager)用来 同JobMaster通信的RPC接口。
  • RMConnection 和TMConnection(多个)是JobMaster 同TM 和TM 通信的PRC 通道。这些通道里包裹了RM和TM的PRCEndPoint的AKKA地址,以及永远RPC call 的 XXXXGateway接口。好比ResourceManagerGateway 和TaskManagerGateay。
  • HearbeatManager 会以Interval为(10,000 ms),timeout 为(50,000ms) 向TM和RM发送heartBeat, 若是timeout 发生则相应的ErrorHandling 会出发, 好比从新链接RM,切断timeout的TM 。interval 和 timeout都是可配置的, 前面的两个数值是缺省值。
  • FatalErrorHandler : 一般指向ClusterEntryPoint (回顾一下图-2)。JobMaster 在没法链接和注册有效的RM时会触发FatalErrorHandler的onFatalError方法。onFatalError一般会简单记下log, 而后推出JVM 。
  • RestartStrategy用于在EG中,但Job失败时,尝试重启Job, RestartStrategy 能够在Flink Java/Scala API种指定 。
  • BackPressureTracker,  当一个operator的处理速度小于的上游的下发速度, 数据就会在input buffer 里积压, 当buffer满了的状况, 数据就会无处可放。 Flink将这种状况称做为BackPressure 。Dispatch 会持续的经过JM的BackPressureTracker对每个TM每一个 sub Task作Stack trace(100 stack traces every 50ms , configurable) ,而后用可能有BP的stack trace (好比访问buffer, 访问网络栈等)同total tack trace 的比例决定系统是否有Back Pressure风险 。好比 <10%是OK的, <50%是低危的, >50是高危的。这个比率是能够在Flink WebMonitor的Metrics里看到的。若是是高危的怎么办, 实际上Flink就是把他经过Metrics发了出来,没有作任何handling , 目的是让用户手工在工做流种作相应调整, 好比加速和降速Datasource 的输出速率, 在某个operator 上加cache等。
  •  

 1.2.3  ExcutionGraph 

EG是面向Job 并行运行的图结构,在JobGraph的基础上它加入了对Operator并行执行的子任务,以及子任务的输入输出的描述 。 

                                                 图-6 Execution Graph

 

  • ExecutionJobVertex : 对于每一个 Operator 或Task(单独的或chained Opertor) ,EG 都会建立一个ExecutionJobVertex(EJV)对应 。
  • ExecutionVertex: 对于它 的每个并行子任务  (sub task), EVJ都会建立一个ExecutionVertex(EV)对应 。每个EV都知道输出到哪里 (IntermediateResult), 到哪里获取input (ExecutionEdges : 底层数据也来自IRP ), 和执行的Operator类。
  • IntermediateResultPartition(IRP)  : 表明IntermediateResult(IR)的一个Partition 。 它描述了它是由哪一个EJV提供数据, 并由哪一个EE消费数据的。
  • ExecutionEdge (EE) : 是每一个EV的input的描述, 好比source 来自与哪一个partition, edge 是sub task的第几个input 。
  • Execution: 但EV被分配执行时,Exception对象会被建立做为EV的一次尝试, 分配slot,  将EV打包 成 TDD(TaskDeploymentDescriptor)并同TaskManagerGateWay 发送给TM (submitTask)  执行。Exception若是失败, 新的Exception会被建立做为另外一次尝试。

  • TaskDeploymentDescriptor (TDD): 包括了该Sub Task 全部信息的描述: sub task的执行类 (operator 的类名), 输入和输出的描述, job的描述。 TaskManager收到TDD以后建立一系列物理对象执行的对象,把这些些建立在分散TM上对象拼在一张图上, 实际就造成了EG的物理执行图。 这个TM的章节在展开。
  • 总起来讲EG经过EJV, EV,  IR, IRP, EE构成了一个包含了并行子任务 以及各个子任务间输入输出关系的总工做流图。当scheduling EG的时候, 每一个EV都打包成TDD发给TM。TM会将TDD里的子任务,输出Partition和输入Channel建立在TM的物理机上。 把TM的这些物理对象拼接起来,就造成了该工做流物理执行图 。 Dispatcher就是经过收集这些物理对象的metrics和状态信息,从而在WebMonitor上更新EG的。

 1.2.3.1  任务分配执行(Scheduler)

EG的Scheduling模式由两种,一个叫Lazy, 一个叫Eager 。

Lazy的方式适用于Batch Job, 它先将全部的处理数据输入的sub task 分配执行, 当TaskManager 返回 (同过JobMasterGateway) 已分配成功的信息, EG在根据EJV的上下游关系, 再给相应的EJV分配slot执行。分配的过程如上一小节所述, EJV全部EV都会经过TDD打包,而后要求SlotPool提供slot, 而后将tdd和slot信息都发送给相应的TM去实例化这个sub task而后运行起来 (再TM细述这个时怎么实现的)。值得一说的是, EJV全部EV都应该一块儿scheduling , 但当集群里没有足够的slot时, 同一个EJV可能只有部分EV被schedule了,若是那些没有分配的相同EJV的EV再一个timeout(default 5分钟)以后还没法获得slot, task 这时候会失败, job 也会失败。因此在计划Job使用的资源时,计划的总slot数 (好比当使用Yarn管理resource 时, yn * ys 是job向Yarn申请的总slot数 )必定要大于总的source sub task的总数量(source operator 的数量 * paralelism ), 不然部分soure sub task 得不到资源,再timeout以后就会出发failGlobal 使job 失败。

Eager 方式使用与Streaming job, 在Eager模式下, 全部EV都必须都能获得slot, 不然schduling 失败, job 失败。

 

1.2.3.2  CheckpointCordinator

Flink的Checkpoint这个概念仍是有必要简单说明一下, 固然参考Flink文档会获得更全面的理解。Flink主要是一个Stream computation的架构,(固然它也能够作Batch, 但Batch并非Flink的强项), Flink Streaming processing 的一个特性就是Stated Streaming 。 意思就是在它的流式计算的工做流里, operator的都是能够有状态的。什么是有状态的?至关于一我的睡醒以后还记的本身是谁,而后还能继续下来的生活,作完没有作完的事情的意思 : 由于大脑里存储了过去的信息。Flink 的Operator能够像这样生活的,建立一下StateFull的变量 (好比ValueState<T> ), 而后周期性的将这些状态存储一个地方 ,当job重启, operator 从新实例化的时候, 经过加载这些Sate信息,就可以重新回到上次重启前的状态,而后继续这个operator的人生。周期性的(Periodically)存储operator的State 信息, Flink称做Checkpoint, 每一次存储叫作一次snapshot 。

CheckpointCordinator的主要功能就是协调促使工做流里全部的operator都周期性的触发Checkpoint snapshot 。

换句话说,CheckpointCordinator的主要功能就是向全部的Execution (看前面回顾一下Execution的概念) triggerCheckPoint 。 每个EG都会建立一个CheckpointCordinator (CC), CC用内建的timer (Executor)定时(根据可配置的interval)的经过RPC向全部的TM触发他们运载的全部Source Exection对应的Task的CheckpointBarrier。 上一句比较长,分解来讲, 每个EG的Execution 都会对应一个Task (准确的说时SubTask)运行在某一个TM 里, triggerCheckPoint 就是CC定时的经过RPC调用全部 TM 上全部的Source Task ( 是工做流里开始位置的处理Source 数据的Task, 不包括Sink Task 也不包括 普通的Transformation Task ) 的triggerCheckpointBarrier方法。当一个SourceTask收到triggerCheckpointBarrier时,  它会命令内嵌的Invokable对象 (Operator, 或Operator Chain的封装对象)执行 performCheckPoint , 这个过程大概有以下几步, 不少多步骤的时异步执行的:

  1. CC 对全部的Soruce Execution triggerCheckPoint 
  2. TM 对 全部的 SourceTask triggerCheckpointBarrier
  3. SourceTask对应的Invokable 对象执行performCheckPoint 
  4. 首先 作Barrier 前的工做: 好比对齐和比较多个input channel的barrier 等。
  5. 其次建立Barrier event (只有source须要建立)并向下游传递 : 下游的Operator 的收到这个Barrier , 也会作这5个步骤 ,只是当有多个input channel的时候(Input), 步骤稍微复杂一些而已。Sink 不须要建立Barrier ,由于没有下游。
  6. 而后对Invokable对象的全部Sate述,拍Snapshot (克隆一份)
  7. 而后Invokable 将State数据传递回JobMaster,
  8. 最后JobMaster再persist 到指定的存储中。 

至于CheckPoint怎么配置,State数据, StateBackend 包含那些, 以及CheckpointBarrier再工做流里的联动过程, 我就不赘述了,网上 应该不少介绍, 不过经过代码阅读,想强调以下几点。

  • Checkpoint是对Streaming Job有效的, Batch Operator 不须要有状态的。
  • Streaming Job 缺省状态不开启Checkpoint, 也不能经过Flink configuration 开启, 只能经过Streaming API 再程序中开启。 缺省状态下, CheckPoint 周期被设置为无穷大,所以永远不会被执行。
  • AtLeastOnce只要开启CheckPoint就能达到。
  • 对于ExtractlyOnce, 不少网上不少文章都声称这个 Flink的买点。 实际分析一下以上步骤, Checkpoint Snapshot 存储的是上当barrier 到达operator是它的状态, 但并非Operator 意外退出的状态。因此恢复时,只能恢复到触发barrier 时的现场,这没法保证source的数据无重复下发。
  • 下面的文档提供了ExactlyOnce的解决方案,这须要SinkOperator实现TwoPhaseCommitSinkFunction 。https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html。 大概意思时SinkOperator向external 下游发送数据时须要分两步走(TwoPhase), 不过目前没目前 Flink1.6.1 的 Sink都没有实现这个功能 。
  1. 数据先暂存到临时存储里, 好比存储在临时文件或buffer里 ,这个叫PreCommit .
  2. 当全部的非Sink operator 都作完了CheckPoint , 当barrier 到达时, Sink再将临时存储中的数据一次性发送给下游。
  3. 固然若是下游支持Trasaction的话 (好比, precommit, commit ), 临时存储就不须要了。

 

1.3  TaskManager

在源码里, TaskManager 类同 JobManager被JobMaster取代 同样,TaskExecutor取代了legcy TaskManager 并发挥着它的做用。本文里TM指的是TaskManager的整个进程, JM表明JobManager整个进程。JM的核心类是JobMaster (固然还有ClusterEntryPoint, Dispatcher, WebMonitor和ResourceManager, 可是都起的是辅助做用), TM里的核心类是TaskExecutor 。还有一个比较混乱的Term就是Task。Task对应JobGraph的一个节点,是一个Operator。在ExecutionGraph里Tast被分解为多个并行执行的subtask 。每一个subtask做为一个excution分配到TM里执行。但比较然人抓狂的是在TM里这个subtask的概念由一个叫Task的类来实现。因此TM 里谈论的Task对象实际上对应的是EG里的一个subtask ,若是须要表述Task的概念,用Operator。先澄清一下Terminology ,以避免语言混乱。

 

                                           图-7 TaskExecutor

  • TaskSlotTable 是TaskExeutor最核心的数据结构, 它存放着TM全部的TaskSlot以及再Slot里运行的Task。 TaskSlot只是一个逻辑单位,它并不绑定或链接任何资源, 但它规定了TM里可以并行执行的SubTask的总数量。当TM 启动时,总slot数由命令行参数传入(-ys,default 为1或flink configuration 里设置的), TM建立这个指定数量的TaskSlot后供分配给SubTask使用。如前所示,TM里的Task实际指的是EG里的SubTask ,后面会详述,Task的数据结构和执行过程。
  • NetworkBufferPool(NBP)是用来为InputGate(IG)和ResultPartion(RP) 分配BufferPool的。一个Task要经过InputGate 从远程另外一个Task的ResultPartition 要input数据,这个Task 同时也要将输出的数据放到本身的ResultPartition里。IG和RP都须要Buffer,而这些Buffer都从NetworkBufferPool去申请, NBP的poolsize由flink configuraiton 指定。
  • MemoryManager : 用于大量分配内存。在Bash模式下,输入数据unbouned 的。一些subtask  须要对全体输入数据进行 Sort或者Hash, 好比outJoin 。此时MemoryManager用大快速和大量的分配内存。关于Flink的内存管理,后面有一节详述 。
  • IOManager:用于将内存的数据和硬盘之间交换。一样在Bash模式下,输入数据unbouned 的,若是EG很是复杂,Task的数量巨大。此时NetworkBuffer Pool分配的buffer不是够用的。 IOManager可以用hard disk做为Buffer还缓存数据,当Localbuffer够用时,再将数据从硬盘里换进,供本地或远程消费。
  • ChannelManager是TaskExeutor很是关键的服务,他负责RP与IG之间快速的数据交换,后面专门有一节细述ChannelManager 。
  • BlobCacheService 用于加载将客户的jar文件 ,Task里年的Invokable须要调用jar文件里代码, 好比 source, sink, tranformation operator, 以及他们的依赖。
  • LocalStateStoreanager用于存取TM本地硬盘上的Sate数据, 但Task作CheckPoint是,除了向JM返回snapshot,它也会在本地存储。
  • HeartBeatManager和FatalErrorHandler, 和JM里的相似。
  • TaskExectorGatway, 同其余的Gateway同样,是用于cluster里的其余组件(JM和RM)远程调用TM的stub interface .
  • ResourceManagerConnection 和 JobManagerConnections (注意,能够链接多个JM ) 用于远程RPC call RM 和 JM。

1.3.1  Data Exchange (ChannelManager)

我的以为Task之间的数据交换,是TM里的核心和重点,也是 Flink runtime的核心、重点和难点,它的质量是区别于其余大数据系统的关键,它的质量直接影响Streaming/Batch 任务的延迟及吞吐量两大指标。人们选择一个大数据流式框架最想先了解的就是这两个指标, 至因而不是Statefull, 可靠性可用性集成度怎么样,编程接口是否简单不是不重要,但不是最关键的,是次要考虑的。因此本节做为本文的重点,咱们深刻解读一下TM管理的数据交换。

首先强调一下Task之间的在网络里或在本地的数据交换, 不是Task管理的,是由TM (或TaskExecutor)管理的。以下图所示。

图-8 JM和TM的数据交换

JM将EG的Execution 提交给 TM,TM将Execution转化为Task执行, 并负责Task之间的数据传输.

回想一下EG的数据结构:

  • 它包含多个EV对象, 每个EV表明一个并行子任务(sub task ),
  • EV 产生的结果存储在termediateResultPartition(IRP)里。
  • 多个属于相同EJV的EV链接的IRP组成了IR(IntermediateResult)表明由一个Task节点产生的结果数据,
  • 这个结果数据Re-Partition后,每一个IRP须要根据新的Key从新分区(sub partition )而后经过EE (Execution Edge)发送给下游的SubTask 。
  • 每一条EE的source 是一个IRP, target 是另外一个EV .
  • 以下图所示。

(R

图-9 EG 的数据结构 和 数据流

为了将JM的EG的逻辑工做流在物理机上执行起来, TM建立了相应的物理数据结构。

ResultPartition(RP)概念上对应着EG中的IRP(IntermediateResultPartition), 它负责存放一个Task生成的全部结果数据。

ResultSubpartition (RS) 是对应着RP的数据Repartion以后shuffle数的中的一个parition ,它存放着RP从新分区后发送下游的某一个sub task的分区数据。

InputGate(IG): 在EE中, IRP是source, EV 是target, 它描述了分区数据的流向, 但对不物理实现这样的结构仍是不够的。 IG 由此而引入, RP是EE上的数据发送端(IRP的物理实现),IG是接收端上与RP功能相似的组件, 负责收集来自上游(RP)的数据区 (data buffer)。

InputChannel(IC) 是接受端与RS功能相似的组件,IG使用不一样的IC链接不一样EE上的RP 中特定分区的数据区。 好比在data shuffle的过程当中多个RP都产生键值为a1的RS ,  这些RSa1中的数据最终都会流向同一个IG中的IC, 这其中每个IC承担接受上游的RP中每个RS-a1中的数据。

数据在EE上的通信, 由RP到IG ,数据是以binary 形式传输的。也就是说数据进入RP前,会由总Serializer 将data record 序列化成binary format, 并存入data buffer 中。 Data data buffer 由IG 接受传递给下游的Oparor 前, 由Deseriealizer将数据从DataBuffer反序列化成data record, 供该Task消费使用。 Data buffer至关于高速公路上运输乘客的大巴车, buffer中的数据至关于乘客 。 每一个大巴车的形状, 运载量也是固定的,  不装满不发车 。它能极大的增长了整体数据传递传输量和创数率, 增长系统的吞吐量,  但同时也增长的单个数据的延迟 。缺省状况下2048 个buffer会被建立,   每个32K字节 。对与比较大的record 须要多个buffer 承载 。每个RS和IC有一些大buffer组成, RP和IG就是这些大巴车的装载者和卸载者 。

关于DataBuffer若是建立和管理, 参考后面的内存管理章节。

 

另外值得一提的是, 不一样的对于RS的实现决定了实际的数据传输的方式。

PipelinedSubpartition支持streaming模式下的数据传输 (大部分的RS都是这种实现): 数据压满一个buffer (buffer size是configuration 指定)就向下传递 。 SpillableSubpartition 只有当RP的类型为BLOCKING是才会建立出来(Batch job 中的部分RS 是这种实现)。它支持在事先分配的Buffer不够用的状况下, 将Buffer中的数据Spill到硬盘中,从而该RS占有的Buffer得以释放。由于他会涉及IOManager (应该只有它会使用到IOManager), 因此我们细说一下。

那什么是BLOCKING类型的? 从ResultPartitionType是这样定义:BLOCKING(false, false, false)能够看到, 3个false 分别以下:

  1. BLOCKING类型的RP的DataExchangeMode不是PIPELINED,   对于Batch Job, 只要下游须要shuffle, DataExchangeMode 就会被设置为 BATCH mode,而不是PIPELINED 。
  2. Task没有BackPressure的, 对于Batch Job, 全部的 operator 都没有Backpressure , Backpressure 在streaming job Backpressure才会被enabled 。
  3. 数据不是Bounded 。对于Batch Job, 由于BatchJob的数据流是unbounded (没有window的界限), streaming job才会有Window operaor, 才会有界限。

简单来讲只有Batch job 里的一些RP类型是Blocking的 ,由于BatchJob总一些须要shffule 输出的operator才会有才会启动Blocking模式, SpillableSubpartition才会被建立。

注意 上面说到了的三个模式:概念比较混乱。

1. JobMode (Batch or Streaming):任务模式, 因为决定ExecutionConext

2. ExecutionMode (PIPELINED (default), BATCH, PIPELINED_FORCE, BATCH_FORCE):可配置的数据流总体模式, 目的是经过一个可配置的ExecutionMode(可在ExecutionConfig中配置)来决定全部Operator的DataExchangeMode。它是Optimizer在优化Edge的shipStrategy和DataExchangeMode作策略选择的依据。 详细看DataExchgeMode代码中,DataExchgeMode与ExecutionMode的Mapping关系。

 

3. DataExchangeMode(PIPELINED, BATCH, PIPELINE_WITH_BATCH_FALLBACK ) : 根据下游Operator,和 ExecutionMode, 由Optimizer 决定 数据下发模式 , 

下面是当JobMode 为Batch, ExecutionMode 为PIPELINE时, DataExchangeMode应该优化的结果。能够看出来只要下游须要Shuffle,  DataExchangeMode就会被优化成BATCH模式, 此时Flink会建立SpillableSubpartition 。

DataExchangeMode.PIPELINED,   // to map
DataExchangeMode.PIPELINED, // to combiner connections are pipelined
DataExchangeMode.BATCH, // to reduce
DataExchangeMode.BATCH, // to filter
DataExchangeMode.PIPELINED, // to sink after reduce
DataExchangeMode.PIPELINED, // to join (first input)
DataExchangeMode.BATCH, // to join (second input)
DataExchangeMode.PIPELINED, // combiner connections are pipelined
DataExchangeMode.BATCH, // to other reducer
DataExchangeMode.PIPELINED, // to flatMap
DataExchangeMode.PIPELINED, // to sink after flatMap
DataExchangeMode.PIPELINED, // to coGroup (first input)
DataExchangeMode.PIPELINED, // to coGroup (second input)
DataExchangeMode.PIPELINED // to sink after coGroup

由于batch 工做模式下的 shuffle一般会伴随的对整个group的排序, aggregation等, 下游须要获得(该group的)全集才可作这些操做。全局数据量比较大, 物理内存Buffer极可能不够用, 这时候SpillableSubpartition(在IOManager的帮助下)可将一部分硬盘当Buffer来用, 者极大的帮助了RP端的数据缓存。但SpillToDisk必须实如今RP端吗?不能够在接受端(InputGate)实现吗? 接受端的计算须要作密集的内存访问(sort, hash, etc ),  这些算法都是在整个数据集上的操做, 因此数据须要缓存在MemoryManager管理的MemorySegment中 从而提升存取效率, 也能为使Flink对内存作有效的管理。可现实是Flink的BatchJob须要消耗巨大的内存, 这跟接收端不使用硬盘作Buffer由很大关系。至少Flink 1.6.1的Memory是不使用硬盘作缓存的,虽然有HybridOffHeapMemoryPool, 并且它也使用了DirectMemory 来分配MemorySegment, 但并无实现用磁盘文件来映射内存 , 全部当上游数据量很大, 但内存不足时, Flink task会很快的out of memory。以后看看Flink的最新版本是否有改善。

 

R

 

 图-10 使用EG控制物流数据流(数据交换)

 图-10描述了Task之间数据交换的大概流程。

  • 这是一个最简单的MapReduce工做图: 由一个Map和一个Reduce组成。这个Job并行度为2, 并运行在两个TM上。
  • M1, M2是同一个Map Operator的两个并行的子任务。R1, R2是同一个Map Operator的两个并行的子任务 。 
  • M1产生的数据存入RP1 中 (arrow 1)。RP1 通知JobManager(准确的说是JobMaster)(arrow 2) RP1中有数据产生。
  • RP1中其实产生一些SubParitition(RS) . JM 会通知R1和R2他们分别感兴趣的RS已经准备好。(arrow 3a,3b)
  • R1, R2向 RP1发起数据请求 (4a,4b), 这些请求会触发数据在两个Task之间的传输 (5a,5b) 。之中5a是本地传输, 5b是跨TM经过网络传输。

 

 图-11, 跨TM的两个Task之间的数据交换

 图-11给出了跨TM的两个Task之间的数据交换的更多细节。

  • M1中的MapDriver持续的产生record对象,而后传递给RecordWriter 对象。RecordWriter 由ChannelSelector,  一系列RecordSerializer (每个下游的RS都有一个对应的Serializer)和一个BufferWritter(更准确的说ResultPartitionWriter )组成。
  • 不一样ChannelSelector的实现 (BroadcastPartitioner, ShfflePartitioner, ForwardPartitioner, etc)会由不一样的选择RecordSerializer 的策略。好比BroadcastPartitioner会将record发给全部的RecordSerializer, ShfflePartitioner会根据record的key决定发送给相应的RecordSerializer 。
  • RecordSerializer 将record序列化成二进制数据, 并把他们存入一个固定大小的buffer中, 当buffer 写满后, 由BufferWriter将该Buffer 写入相应的RS中 (本例中是RS2)。
  • RP通知JM 本RP中的RS2已经有填满数据,可供消费。 JM经过EG查找全部的消费这个RS的Task,并经过TaskExecutor通知到这些Task对应的IC, 本例中为IC1。
  • IC1向RP申请传送RS2中的数据 。RP将Databuffer 交给基于netty实现的的ChannelManager,(发送后, RS2中的buffer 得以释放,还给NetworkBufferPool) 并由它发送给对端TM的ChannelManager , 从而数据将存储到IC1中。
  • ReduceDriver 或者其余的Driver/Invokable的run的方法是每一个一个Task的Engine 。ReduceDriver 的 Run()方法是一个while循环, 不停的从RecordReader(或更准确的名字是MutableObjectIterator)
    读取next record, 根据他们的key 来决定他们是否来自同一个group, 而后调用相应的ReduceFunction进行reduce 。
  • MutableObjectIterator的next record的binary实际上来自于IC(本里中为IC2)中buffer, 并经过Deserializer将binary转化为相应的Record。  当buffer 中的数据所有读完, 该Buffer 得以释放,还给NetworkBufferPool 。
  • 今后完成一个buffer从M1到R1的数据传递。

 

1.3.2  Task 提交与执行 (TDD, Task, AbstractInvokable , Driver , ...)

根据前面的介绍,Task是由JM(经过EG) 通过Scheduler (申请和Offer slot resource和根据schedule 策略,等等) 提交给TM执行的。申请resource 和提交Task都是经过JM, RM 和TM之间的RPC通道完成的。那么具体提交了什么呢?Task若是执行的呢?Task如何知道谁是上游从而创建InputChannel的?

实际上这是一个 从EV->TDD->Task->AbstractInvokable->Driver ->具体的Oparator 实现类变形的过程。

 

1. 从EV 到TDD : 

TaskDeploymentDescriptor(TDD) : 是TM在submitTask是提交给TM的数据结构。 他包含了关于Task的全部描述信息:
  • TaskInfo : 包含该Task 执行的java 类 , 该类是某个 AbstractInvokable的实现类 , 固然也是某个operator的实现类 (好比DataSourceTask, DataSinkTask, BatchTask,StreamTask 等), 
  • IG描述 :一般包含一个或两个InputGateDeploymentDescriptor(IGD),
    •   一般一个Operator 有一个或多个逻辑的输入, 好比Map/redue 只会有一个输入, join会有两个输入。因此IGD描述是一个数组。
    •   每个IGD都会包含一组InputChannelDeploymentDescriptor(ICD), 每个ICD是该子任务对应的一个inputChannel 。 
    •   每个ICD 包含上游RP的ID和IP 地址。那么IC怎么知道应该消费RP的哪一个RS呢?
      •  当IG 和 source RP 的并行度相同时, 每个IC都会去消费各个SourceRP 中同子任务序号相同的RS .
      •  当IG 和 source RP 的并行度相同时,    经过对双方平行度进行一番取余, 来决定该IC 要消费的RS: 0个或多个。
  • 目标RP的描述: ParitionId, PartitionType, RS个数等等
  • 其余的一些描述 : JobId, ExecutionId, SlotNumber,  subTaskIndex (子任务序号) , 等等。

 

不难看出,EV包含上述的全部信息,EV的一个方法createDeploymentDescriptor,完成了上述变形。JM 在向TM submitTask时,传递的是TDD不是EV。为何要作此变形的而不是将EV直接传过去既然他们很相似? 我想这个一个设计模式问题, EV是做为ExcutionGraph的中的顶点 ,它最好只存在于JobMaster 的物理图中, 而不是做为参数传递给其余组件,从而维护它的独立性和单一性。参见Descripor设计模式。

当TaskExecutor(TM)接受submitTask 的RPC调用从而获得TDD时, 他会将TDD实例化为一个在TM上能够执行的对象 : Task 。

 

2.  Task : 

Task 是一个Runnable 对象, TM接受到TDD 后会用它实例化成一个Task对象, 并启动一个线程执行Task的Run方法。

Task实例化时, 他会将TDD中的IGD实例化成InputGate (IG) 和 InputChannel(IC), 将RPD实例化成RP . 在 Task 的Run 方法被调用时, 它根据TDD的 TaskInfo, 使用URLClassLoader将用户的operator类从HDFS加载, 并实例化TaskInfo所描述的AbstractInvokable对象, 并将IG, IC, RP , 还有其余全部的AbstractInvokable须要的服务类(MemoryManager, IOManager, CheckoutResponder, TaskConfig etc )都传递进去, 而后调用AbstractInvokable 的 invoke 方法。

3.  AbstractInvokable

如以前给出的一些例子 : DataSourceTask, DataSinkTask, BatchTask, StreamTask 。 它们是Task.Run()时, 经过TaskInfo加载并实例化的。 这些Task Operator的源代码都在

org.apache.flink.runtime.operators 或org.apache.flink.streaming.runtime.tasks下面 。每一个BaskTask的须要的工具都是相似的, 只是计算的流程不一样 , 因此BaskTask的invoke方法调用时, 它根据TaskConfig(信息继承与TDD的TaskInfo) 加载和实例化不一样的Driver 类, 并调用Driver的run方法由Driver指挥流程(input, calcuate, out , etc )。

4. Driver 

Driver类和AbstractInvokable位于同一个包内。 每个Driver的run() 大都是一个循环。 不停向IG 要next record , 写metrics, 调用function 计算, 而后见结果发给RP。只不过有些drver一次计算须要两个record, 有些driver 须要两个record 来自不一样的IG , 有些须要将全部的input所有收完才计算, 有些在window expired后才计算, 等等。下面是FlatmapDriver 的run() 。

while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}

 

1.3.3  内存管理(NetworkBufferPool, MemoryManager, IOManager 

在stream模式下数据处理是有界限(bondary)的, 每一个window的处理所使用的内存是相对比较小 的,  因此Flink stream job 一般使用的内存较小。

但在batch 模式下, 数据处理是无界的 (所谓无界就是没有Window),如前面所示, 不少job 须要将上游全部的input都取干净,才开始计算, 如sort, hash join, cache 等, 此时所须要的内存是巨大的, 好比上游operator 读取300G文件而后map成record, 下游operator 须要将这些record 同另外一组输入作outer join  。因此Flink内存管理主要针对的是这些batch job 。

总起来将, Flink的内存管理仍是比较失败的, 至少在我用的版本里(1.6.1) , 主要缘由仍是 MemoryManager并无联合IOManager Disk去扩展内存。但我觉的MemoryManager的引入, 就是为了管理Flink的内存, 以防止OutOfMemoryException的发生 (如Spark同样 )。 防止溢出最直接的方法就是当系统内存不足或超过throttle时, 使用DiskFile以补充内存 , 从而完成 那些内存消耗巨大的操做。 只是version 1.6.1 还没作好 ,或许之后版本会作好 。

先看下Flink的内存管理机制吧 。

从概念上将, Flink 将JVM的heap分红三个区域。

  • Network buffers pool:  是一些 32 KiByte MemorySegment用于TM以前数据的批量传递。回忆一下前面围绕图-11的介绍。Network buffers Pool 在 TaskManager启动时分配的。 缺省2048 buffers 被分配,可经过 "taskmanager.network.numberOfBuffers"调整。
  • Memory Manager : 是另外大量的  32 KiBytes MemorySegment, runtime algorithms (sort/hash/cache)使用这些buffer 用于将 records存入缓冲区 以后应用算法 ,record 是以serialized 的形式存储在 MemorySegment里的 。 这些MemorySegment Pool 是在TaskManager启动时分配的 。MemorySegment Pool 的大小,能够用两种方式设置。
    •   Relative value (缺省): MemoryManager在全部的service 启动后(包括network buffer pool ) 会计算heap 剩余总量, 而后按必定比例 (by default 0.7) 做为 pool size 。 这个比例可经过 "taskmanager.memory.fraction" 配置。
    •   Absolute value:  科经过 "taskmanager.memory.size" 配置一个绝对值, 好比10GB。
  • Remaining (Free) Heap:  剩下的Heap用于存储 TaskManager's 数据结构, 好比 network buffer  Deserialized 后的 record 。

              

 

 图-12 Flink Memory

Network buffers pool 和 memory segment pool 都是TM启动时就分配的, 它们生存于JVM的老年代, 因此不会被GC回收的。只有Free Heap区域在新生代里生存。 

IG/IC, 和RP/RS 的record存储在network buffer 里,有些RS (SpillableSubpartion)须要Spill to Disk ,所以他们须要IOManager 的帮助。

须要sort/hash/cache的Task瞬时内存消耗很是大,所以从IG接收record后就将他们存储memory segment pool, 以后在对serialized record 应用 算法 。

其余的Task大都是pipeline 方式, 来一个消费一个, 瞬时内存消耗不会大。此时这些Task会使用FeekHeap 区域的内存。

若是没有人为错误, 系统不会有瞬时的大量内存申请, 因此不会有OutOfMemoryException , 因此FreeHeap的区域应该是比较轻松的。

但是问题是, memory segment pool 目前没有使用DiskFile 做为OffHeapMemory, 它可以装载上游下来的巨量 blocking 输入吗? 这就是Flink 1.6.1 的问题。不过这个MemoryManager实现的好很差的问题, 设计上是有防止OutOfMemoryException 的组件的。这个就期待Flink 新版本 补强这部分功能吧。

前面说“serialized record 应用 算法“,这是怎么作到的呢?  Flink 实现了大量的 TypeSerializer TypeComparator,  他们懂得record 是如何序列化到字节数组里的,因此也知道如何将record部分地deserialize 。一般算法只是使用record的某个或某些field。 部分deserialize极大的下降内存使用, 提高了数据存取的速度, 从而极大的提成了内存的使用效率。 serialized 形式的record 数据是Flink 可以将他们在TM之间, 跨进程和跨网络的传输, 它是分布式系统须要的数据形式, 同时, 它也为MemoryManager 将它们在内存和硬盘之间交换提供了条件。

IOManager 使用FileChannel将MemorySegment同DiskFile创建映射, 从而实现将数据在MemorySegment和DiskFile之间写入和读回。

因此,虽然MemoryManager目前还有些问题,  在这种设计下, Flink的内存使用效率确定会进化的极好 。

 

1.4  ResourceManager

RM在Flink内部是Flink cluster 里slot资源的管理者 , TM 提供slot,JM (JobMaster)消费slot 。 RM同JM和TM 都要保持心跳,以保持slot市场的活跃 ,以及在TM或JM失败的时候通知给对方。

总起来讲RM的做用主要包括:

  • 启动和得到TM, 从而获取Slots 。
  • 提供JM和TM发送对方的失败通知。 如一个TM的心跳中止了, JM会通知消费该TM slot的JM。
  • 当注册过来的TM的slot有剩余時, RM会缓存起来。

第2、三项功能比较好实现。第一项功能须要同外部的集群管理器合做才能实现。所用RM是一个随环境不一样而不一样的组件。在不一样的集群环境里, RM有不一样的实现类。

市场上比较流行的集群资源管理器主要有Yarn, Mesos, Kubernetes, 和 AWS ECS 。其中Yarn, Mesos中, 能够利用ApplicationMaster/Scheduler是资源调度器,只要将RM在ApplicationMaster中运行起来,理论上 RM就能够Yarn, Mesos的master通讯,为TM分配容器和启动TM。实际上JM的全部组件(Dispatcher, RestEndPoint, JM, ClusterEntryPoint, etc)都在ApplicationMaster启动的。

Kubernetes和ECS 没有ApplicationMaster的概念, 但TM能够做为一个有多个副本的deployment (K8s) 或 service (ECS) 运行。此时 TM 的多少(规模)是由deployment/service 根据必定策略自动扩容的而不是根据Flink须要的slot数量。Flink并无实现特殊的ResourceManager和K8s/ECS集成,此时的FlinkCluster使StandaloneCluster。若是能一个K8s/ECS 特殊的RM和集群的master通信,使TM可以能按需扩容而不是自动扩容,那就和在Yarn里同样完美融合, 并且可以用上Docker的服务, 毕竟Yarn目前只是使用JVM做为容器,并无真正达到真正资源的隔离 。

RM在不一样cluster以不一样的方式启动新的TM以补充slot资源,固然当一个job 结束时,它也会同集群的master通信,释放container,关闭彻底空闲的TM。

在不一样的集群环境里,RM可以管理TM的生命周期,那么谁来启动和结束JM 呢? 请参考后面的 Flink Deployment 。

1.5  HighAvailability 

 Flink的 HighAvialbility 主要有两个服务组成 : LeaderEleketronService和LeaderRetrievalService 。

1.5.1.  LeaderEleketronService

该Service是用来选举Leader的 。假设系统里启动了两个Dispatcher 。先回顾一下什么是Dispatcher , 看图-2, JobMananager 的进程是ClusterEntryPoint的main 函数启动的,ClusterEntryPoint启动了 WebUI, Dispatcher 和ResourceManager。当用户提交了 Job时, Dispather 实例化一个新的JobMaster 来管理这个job, Dispatcher也是CheckPointBarier 的发起者, 同时也是来自WebUI REST 后台的请求的处理者。Dispatcher 的做用承上启下, 做用很是核心,不可缺失,因此Flink启用HA服务来保护它。 Flink 系统里, 使用HA 保护的核心服务还包括ResourceManager ,JobMaster 还有WebUI 的REST Endpoint。当系统里启动两个Dispatcher 谁来当Leader呢? 这就要LeaderEleketronService (LES) 的决定了。

目前Flink的起做用的LES使用ZooKeeper 实现的。实现方式就是两个Dispather都试图抢占的特定ZooKeeper Path (dispather latch path)  的LeaderLatch(参见 curator framework),谁先抢到了, 就被选举成Leader , leader的 AKKA URI 和UUID被被写道一个特定的ZooKeeper path 中(leader path)  。只有Leader 是工做的,由于其余组件在使用Dispather时会向获取Leader Dispath的URI, 而后才RPC的 。 其余竞争者不会接受到RPC, 他们只有继续监听,若是当前的Leader 退出了, LeaderLatch被释放了, 从而新的leader会被选举出来。

1.5.2.  LeaderRetrievalService

那么对于Dispatcher的使用者, 怎么知道谁是Leader呢? 这就须要LeaderRetrievalService了 (LES ) 。 对于用ZooKeeper 实现的LES, 它只须要监听一下leader path, 它就知道谁是leader 了 。

好比, 当你用fink run 命令提交job 时, 假如系统里有多个Fink REST Enpoint, Flink的ClusterClient 对先使用LES获取Leader REST Endpoint, 而后才会将job  发送过去。 

 

除了ZooKeeper的实现, HAService 还能够以来外部的名字服务(如DNS, LoadBalencer , etc ) 实现。在LES和LRS 永远返回 HA保护的服务的URI(不管是AKKA PRC URI, 或REST URI )。 该URI永远保持不变, 若是提供服务的server失败, 名字服务会将给URI映射到另一个工做的server 。 具体请参考 StandaloneHaServices的代码。

 

1.6 Flink Deployment

目前为止, Flink 支持大概4种部署方式或4种cluster 类型,MiniCluster(或LocalCluster),  StandaloneCluster,YarnCluster,MesosCluster 。 虽然前面提到过Kubernetes , ECS , Docker , 但他们的本质是将JM和TM docker 化 , 从而是他们运行在真正的 dockers里面 , cluster 仍是 StandaloneCluster 。

图-3 是StandaloneCluster , 图-4是YarnCluster , MesosCluster 与YarnCluster 相似, MiniCluster是运行在IDE(e.g. IntelliJ )的虚拟cluster , 它主要用于 FlinkApplicaiton的调试 。在Session 模式下,表面上能够看来StandaloneCluster 与YarnCluster基本流程是一致的, 只不过是启动JM 和TM的启动机制不一样而已。YarnCluster/MesosCluster 里TM是由他们各自的ResourceManager实现根据须要启动的。 在 StandaloneCluster下, TM是手工启动的。其实在YarnCluster下, JM 也是Yarn启动的。其实, 如前所述, StandaloneCluster与YarnCluster / MesosCluster的本质区别是, 前者的用于TM硬件资源是管理员手工分配的, 后置是有RM同集群管理器协调自动分配的。

从Flink内部看, 是为了支持这些异构环境, Flink 对于不一样的Cluster, 实现了不一样的ClusterEntryPoint用于启动不一样ResoureManager ,以及Dispatcher 。不一样集群类型的ClusterEntryPoint对JobMode和SessionMode有 不一样的实现, 好比YearnSessionClusterEntryPoint和YarnJobClusterEntryPoint。Session模式下, Dispatcher使用的时StandaloneDispatcher, Job模式下,使用的是MiniDispather 。 关于什么是JobMode和SessionMode, 请参考图-3下面的解释。

如前所述,JM接受jobGraph的对象或对象文件(序列化后),jobGraph由FlinkClient生成, FlinkClient和JM位于不一样的JVM (MiniCluster除外)。若是JobCluster没法由FlinkClient启动 (经过某种方式),则JobGraph生成之后须要存到指定位置,在手工启动JobCluster读入,这样的过程比较复杂。基于目前的架构,尽管Flink在各类集群环境里对job mode , session mode 在代码上都由支持, 但job mode 一般因为集成度不够好,用户没法方便使用。

 

我用一个表来描述他们跟具体的不一样以及Flink是如何支持这些异构环境。为了使这个表不至于太庞大,先把不一样的cluster 类型表述下先。

1.  MiniCluser 

用于在IDE(IntelliJ, Eclipse) 运行FlinkApplication 的小型FlinkCluster环境,JM和TM运行在同一个进程里,主要用于在IDE里调试Flink Application。

2.   StandaloneCluster

能够运行在单机或多机上的FlinkCluster环境,  JM和TM运行在不一样的JVM里。只要有JRE , 不管在Windows ,Linux均可以搭建StandaloneCluster。

用户可使用FlinkClient 的command line,或API 直接向JM REST URI 提交job ,具体看Flink CLI 的 “-m" 选项。StandaloneCluster很是有利于搭建单元测试,集成测试以及演示环境。

3.   YarnCluster

在Hadoop集群里搭建的FlinkCluster, JM和TM都运行在Yarn管理的容器里。JM作为Yarn里ApplicationMaster ,Flink cluster做为Yarn的一个Application运行。

Yarn是Flink与之集成度最好的集群管理器。 用户能够直接使用FlinkClient 的command line,或API 直接向Yarn提交job , YARN 会自动的启动(或链接现有的)Flink JM, 自动启动须要的TM,然后在该Flinkcluster运行任务。

具体看Flink CLI 的“-m yarn-cluster" 和 ”applicationId" 命令行用法。


4.   MesosCluster

 在Mesos集群里搭建的FlinkCluster, JM作为Mesos里Scheduler , 一个 Flink cluster做为Mesos的一个Framework运行, JM和TM均可以运行在Mesos管理的容器里。 

TM由JM的resource Manager同Mesosmaster 协调按需自动启动和销毁。JM 须要先使用Marathon 建立服务, 由Marathon 启动。Marathon服务的后端都是运行在Mesos的容器里,

并且Marathon服务一个高可用性服务。  

用户能够直接使用FlinkClient的command line,或API 直接向Marathon 建立服务提交job ,具体看Flink CLI 的 “-m" 选项。

 

我的以为Flink 与Mesos的集成度能够在提升一些。目前用户须要手动的为JM建立Marathon Service, 可参考下面写一个简单的配置文件, 调用flink的shell脚本mesos-appmaster.sh启动Flink

MesosSessionCluster的JM (该JM会启动MemsosResourceManager用来启动须要的TM)。

yhttps://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/mesos.html

  

Flink 彻底能够改进一下Client端, 添加集成Mesos Marathon须要的 ClusterDescrptor类,CLI类(好比MesosJobClusterDescripor, MesosSessionClusterDescripor,FlinkMesosSessionCLI ),丰富“-m" 选项,

由ClusterDescrptor类自动建立和销毁JM (Marathon 服务)

 

5.   Kubernetes/ECS/Docker

Flink 对于Kubernetes/ECS/Docker在源码级别并无任何支持。只是说,能够将StandaloneCluster的JM和TM运行在这三个以Docker 做为容器服务的

集群环境里。因此是,Flink 与他们的集成度, 比较低 。Kubernetes的具体作法分别对StandaloneCluster的JM和TM作两个deployment(就是能够平行扩展的docker group ), 它们分别启动StandaloneCluster的JM和TM (经过start-console.sh脚本), 而后对JM的

deployment作一个Service使其具备高可用性,固然须要将该Service的URI传递给TM,前面说过StandaloneHAService是靠统一的URI来提供HA服务的。更详细的请参考下面。

https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/kubernetes.html

ECS和Docer的实现同Kubernetes 相似, 只是在各自使用的技术名称不一样而已。ECS的AutoScalingGroup等价于Kubernetes的Deployment。 common sense是同样的, 只是各家用各家喜欢的名字。

总之,将Flink的部署在上述集群里, 目前手工的工做仍是比较多的,并且TM的数量是预先设定的,或按策略自动扩容的, 并非最优的由Flink RM 指导的按需扩容。获得的好处最大应该是Docker的容器服务。毕竟Yarn或Mesos对Docker的支持并非很好。

我的以为Flink 与他们的集成度能够在提升一些。同提升Mesos的集成度的想法同样, 在目前Fink的框架下,只须要改进FlinkClient,添加集成Kubernetes API-Server须要的 ClusterDescrptor类,CLI类(好比K8sJobClusterDescripor, K8sSessionClusterDescripor,

FlinkK8sSessionCLI ),丰富“-m" 选项,由ClusterDescrptor类自动建立和销毁JM (JM 的Deployment和 Serice ), 添加K8sResourceManager, 用它来建立及扩容TM的deployment 。

 

Cluster Type  JM 启动方式  TM启动方式       ClusterEntryPoint   ResourceManager SuportedMode
 Mini

调试环境启动的Flink Client是

LocalEnvironen或

LocalStreamEvronment,

它们的execute方法会

先启动MiniClusterEntryPoint

由MiniClusterEntryPoint启动  MiniClusterEntryPoint
.java

 StandaloneResource

Manager.java

只支持JobMode
 Standalone 手工,如sart-cluster.sh脚本
手工,
如flink-consol.sh
脚本

StandaloneSession

ClusterEntryPoint.java

 

StandaloneJobCluster

EntryPoint.java

 StandaloneResource

Manager.java

支持SessionMode

job mode 有支持,但用户没法方便使用。 

 Yarn

jobmode下 由FlinkClient里的

YarnClusterDescriptor,

调用Yarn的API经过YARN启动。

JM作为ApplicationMaster自动启动。 

 

SessionMode JM须要手工运行

yarn-session.sh,

JM和TM的内存不是per-job,

只有管理员才能设定。

 JM 里的YarnResourceManager

利用YARN的API启动TM。

 YarnSessionCluster

EntryPoint.java

 

 YarnJobCluster

EntryPoint.java

 

 YarnResource

Manager.java

 支持JobMode和SessionMode

 

Mesos

经过Marathon Service启动,

JM运行在Mesos的容器里。

但Marathon Service须要手

工建立。

 JM 里的MesosResource

Manager

利用Mesos的API启动TM。

 

 MesosSessionCluster

EntryPoint.java

 

 MesosJobCluster

EntryPoint.java

 MesosResource

Manager.java

 支持SessionMode

job mode有支持,但不方便使用。须要

提高集成度。

Kubernetes

/ECS/Docker

 由JM service 启动  由TM deployment启动

StandaloneSession

ClusterEntryPoint.java

 

StandaloneJobCluster

EntryPoint.java

 StandaloneResource

Manager.java

 

支持SessionMode

job mode有支持,但不方便使用。

须要提高集成度。

 

 

 表-1 Flink Deployment

 

1.7 Failure Detection and Reaction

如前所述FinkCluster里 (参考图-3), 最核心的组件并且交互最频繁的组件是Dispatcher, ResourceManager, JobMaster, 和TaskManager 。

其中对于Dispatcher, ResourceManager, JobMaster,一般是ClusterEntryPoint启动Dispatcher和ResourceManager,而后当有job提交时,Dispatcher启动JobMaster。(MiniCluster, JobCluster略有不一样,但相似)。尽管他们之间使用RPC通信,他们生存在同一个JVM里,若是其中一个失败,同时使整个JVM(因为异常崩溃了)失败了,全部组件都失败了。那么他们中若是有承担Leader责任的,也都会经过HA Service 切换到另外一个工做的JM后端的RPCEndPoint。

若是TM失败了,  JM可以感知到heatbeat 機制感知到, JM 和RM都与主动向TM发送和hearbeat , 因此TM heartbeat 一旦timeout , JM会释放该TM 提供 过来的slot,与此同时将slot上运行的Execution 的状态设置为失败, 参考 SingleLogicalSlot::signalPayloadRelease .

在stream mode 下,JM 会尝试将该TM上失败的Task重现分配到别的TM上(若是slot资源时有的,或能够分配的)。

在Batch mode 下, JM 会将整个job 失败,然後嘗試从新啓動整個job 。

从新啓動的streaming Task會根據上一次的checkpoint 回復狀態, 繼續運行。

 

1.8 Flink Security

to be filled .

2.  Flink的源码结构

上一章介紹Flink了的架構, 它包括了那些主要組件? 組件是怎麽工做的? 組件之間是怎麽工做的?組件使用的資源(包括服務器,容器,内存和CPU和Disk)是若是分配的? 以及組件是如何部署的?一個job是怎樣分割成小的Task并行執行在cluster裏面的?

怎麽保證組件HignAvailablity 以及組件是如何作 失敗處理的?以及Flink關於系統安全的設計等等。 這些關於架構的介紹,重點在於瞭解flink cluster中各個組件的互操做行以及容錯處理, 瞭解框架, 以便於再出現問題的時候我們能夠對它有比較針對性的debug 。關於Flink引以爲傲的statefull operator, window watermak, checkpoint barrier , stream/batch API 以及web monitor本文都沒有介紹,請參考相关的Flink的官方文檔。

Flink的源碼還是比較清晰易懂的, 尤为是瞭解了她的架構后, 大部分的實現都很是符合common sense 。 不想在這裏貼一堆代碼段然後加注釋解讀了, 本文的篇幅已經太長了。過一下全部的包,解釋一下他們的主要做用,以及须要框架裏使用的主要類吧。用一個大表來列舉比較合適。

 Artifect    功能介紹
Flink-client

FlinkCommandLine 入口類 (CliFrontEnd)

解析Flink 命令行 (DefaultCLI, YarnSessionCLI )

負責將從用戶jar 文件中main函數生成Plan (PackagedProgramUtils),用通過LcoalExecutor或RemoteExecutor調用flink-Optimizer生成 jobGraph .

使用ClusterDescriptor啓動jobManager (好比在yarn), 使用并將jobGraph 用ClusterClient提交給本地或遠程的JM。

Flink-runtime

Flink 源碼的核心。 框架的核心組件都在裏面, 包括 Diskpatcher, JobMaster, TaskExecutor, ResourceManager, ClusterEntryPoint, WebUI以及他們依賴的子組件以及核心數據模型,

JobGroup, ExecutionGraph, Execution, Task, Invokable, Operator, Driver, Function, NetworkBufferPool, ChannelManager, IOMemory, MemoryManager, PRCService,

HAService, HeartbeatService , CheckPointCoordinator ,  BackPressure, etc .

這些類的名字和做用在上一章都或多或少提到過, 最好結合架構的介紹, 理解他們的代碼。

Flink-runtime-web

Flink Web monitor 的界面和handler .  handler會調用Dispacher 的方法處理客戶請求, 好比sumitJob.

Flink-java

Fink-scala

用Java 和 scala實現的Flink Batch API  , Dataset, ExecutionEnrionment, etc .

Flink-stream-java

Flink-stream-scala

用Java 和 scala實現的Flink stream API ,  DataStream, ExecutionEnrionment, , StreamExecutionEnvironment, windowing, etc .

和對streaming提供支持的runtime, Checkpoint, StreamTask, StreamPartitioner , BarrierTracker,  WindowOperator, etc .

Flink-optimizer

主要功能是優化Plan 和生成JobGraph。Flink-optimizer是一個很client端使用重要的庫,它決定了從客戶代碼(application) 到jobGraph造成過過程。 我個人覺得一般也是調查和解決

application問題的根源, 好比application 的寫法是否是合適的,有問題的, 或最優的等等。

在前面架構極少裏面,并沒有談及Flink-optimizer, 因此在這簡單介紹下。

fink application 開發者使用Flink API 編寫application, flink-client 为了将application 最终能在Flink cluster里運行起來是,

它首先通過讀取用戶jar 文件中的main函數生成Plan:以DataSink為根的一個或多個树结构, 樹的節點都是application使用的 API operator, 每一個節點的輸入來自與樹的下一層節點。

不難想象树的叶子节点都是DataSoruce: 他们没有输入节点,但有用于读取数据源的inputFormat,  根节点是DataSink :他们既有输入节点,也有用于写入目标系统的outputFormat,

中间节点都会有一个多个的输入节点。Plan数据结构描述了同用户的application彻底相同的数据流的节点, 但它只是一个逻辑树状结构, 并无对链接节点的边作描述,

也没有对application编写的数据流作任何修改。o

而后使用Flink-optimizer将Plan根据时根据优化策略设置节点的边上的数据传输方式,并同时用OptimizerNode生成多种优化方案, 最后选择cost 最小的方案产生的方案 做为OptimizedPlan 。

好比将数据装载方式(ShipStrategyType) 设置 FORWARD, 而不是 PARTITION_HASH而 应为 FORWARD 的网络cost 最低(0) 。OptimizedPlan是一个图结构, 图中的顶点(PlanNode)

记录了自身的cost以及从source 开始到它的累计的cost 。OptimizedPlan主要针对与join和interation操做。

而后Flink-optimizer将OptimizedPlan 编译成JobGraph。编译的过程应该基本上是一对一的翻译(从PlanNode 到 JobVertex), 但若是一串PlanNode 知足Chaining 条件

(好比数据在每一个oparator 都不需从新分区, 流过operator 以后, 直接forward到下一个operator), Flink-optimizer就像这些 operator 链接到一块而后在JobGraph里面只建立一个

ChainedOperator jobVertex, ChainedOperator同Spark里面的stage 概念相似, 是优化的一部分。 

最后flink-client将jobGraph提交给FlinkCluster ,jobGraph 变形为 ExceutionGraph在JM和TM上执行。

能够从Optimizer 的compile 和JogGraphGenerator的 compileJobGraph展开看, 他们分别complie的是Plan和OptimizedPlan 。

Flink-optimizer优化的是parition 的选择以及算法的选择, 而不是DAG 的workflow 。

 

Flink-Table

 

用户能够用flink-java/flink-stream-java里的api编写flink application, 也能够用 Flink-Table 的table api和 flink-sql写 application 。 Flink-Table将数据源(Dataset, DataStream)都

generalize 成Table (row based ),用户能够用相似关系型数据库的操做方式操做Flink的数据源, 这种方式虽然屏蔽了一些flink-api的特性 (好比 broadcast), 但极大的下降了application

的开发难度,减小了客户程序的代码量,从而极大的提升系统的重用度。

Flink-table 的底层仍是依赖dataset/datastream api, 因此基于table api 或SQL 的程序 (Program) 最终会翻译成 Flink-optimizer 的Plan , 通过前面所述一样的编译优化过程,

最终一个EG的形式运行在JM和TM上。 固然,在被翻译成Plan以前, Flink-table 的Program 也会有自身的优化过程, 好比SQL Plan optimization .

代码须要看, 如何实现一个Table : StreamTableSource, BatchTableSource, BatchTableSink,AppendStreamTableSink

如何扩充FlinkSQL : UserDefinedFunction, ScalarFunction,TableFunction,AggregateFunction
TableEnvironment

Flink-yarn

Flink-mesos

Flink-container

 Flink 怎么支持异构环境的, 包括不一样环境里的, 主要是异构环境里的不一样ResourceManager (启动TM ),以及 ClusterDescriptor (启动JM)如何实现的。

Flink-library

 flink-cep, flink-gelly, flink-ml

Flink-connector

Flink-format

链接外围数据系统(数据源和输出)系统的InputFormat, OutputFormat .

Flink-filesystem

 Flink支持的分布式文件系统, hadoop, s3, mapr

Flink-metrics

 flink 的metrics 系统

Flink-statebackends

Flink-queryable-satate

 flink 的 state的存储

flink-jepsen
flink-test

Flink的UT和集成测试。

 表-2 Flink packages                     

 

3.  Debug Flink

3.1.  Intellij 准备工做

须要安装JDK1.8和Scala的plugin, 不然Flink没法在IntelliJ里编译。

下载flink1.6.1的代码,到下面的地址 download ,而后用maven 编译 。

 https://github.com/apache/flink/tree/release-1.6.1

 

下载例子程序:  https://github.com/kaixin1976/flink-arch-debug 。 因为github的限制, 上传的instrument文件相对较小,读者能够经过自我复制扩大文件尺寸。

运行例子程序, 有两个参数 :

第一个是 joinType,可取值为"normal" 或"broadcast"。normal 方式彻底依赖FlinkOptimizer产生 优化的方案, broadcast是运用一下方法, 使join的第一路输入广播, 第二路输入自由平均分配。

第二个使api类型, 可取值为 "api" 或 "sql" 。api方式指的是Flink Java API, sql 使 Flink SQL API 。对于Join Flink Java API 能够指定JoinHint或Parateters从而影响优化, sql 没有这些接口, 只能在DataSoource 上作文章。

具体看JoinTest::main()。

3.1.2  Intellij debug configration 

如前所述, Flink分布式环境的主要包含三部分:

1.  FlinkCient : 生成、优化和提交JobGraph.

FlinkClient的main 函数在CliFrontend中, VM 的classpath 和工做路径设置为本地安装的一个flink路径 , 用于找到全部须要的jar文件和 flink confugration .

程序参数跟flink run 同样, 

 图-13  Flink Client debug configuration

 

2.  JM : 生成ExecutionGraph,并为其中的全部的的子任务分配slot资源, 并将子任务发送到slot所在TM上运行。

JM 使用StandaloneSessionClusterEntrypoint , 他会启动Standalone Session Cluster 的entry point .

 图-14  Flink JobManager debug configuration

 

3.  TM: 运行ExecutionGraph的子任务。

  图-14  Flink TaskManager debug configuration

  

3.2  例子程序 和问题描述

例子程序是一个利用Flink SQL  把一大一小两个数据源join再一块儿的Flink  application 。大数据源叫instruments 包含了一些假造金融工具(好比股票期权等)的基础数据(其中包括该工具交易货币的ID),小数据源是货币currencies  包含货币ID,和货币名称。join目的是给与金融工具的货币ID得到对应的货币名称(好比,人民币,欧元,美圆等)。

join的过程是比较简单的,首先 从文件建立两个数据源(instrument 和 currency 的 TableSource), 而后用调用 SQL 作join, 最后建立TableSink将join的结果输出到文件里。程序的主体以下。

 public void joinSmallWithBig(String joinType) throws Exception {

String currencies = "currencies";
String instruments = "instruments";
//0. create the table environment
ExecutionEnvironment env = buildLocalEnvironment() ;

env.setParallelism(4);
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

//1. create and register instrument table source
registerInstrumentTableSource(tableEnv, instruments, System.getProperty("user.dir") +"/data/instruments.csv");

//2. create and register currency table source
if(joinType.equalsIgnoreCase("broadcast")){
this.registerCurrencyBroadcastTableSource(tableEnv,currencies, System.getProperty("user.dir") +
                                              "/data/currency.csv");

}else {
this.registerCurrencyTableSource(tableEnv,currencies, System.getProperty("user.dir") + "/data/currency.csv");
}

//3. join
Table currencyTable = tableEnv.sqlQuery(
"SELECT CurrencyId AS CurrencyId," +
"(CASE " +
"WHEN ISOCode IS NULL OR ISOCode ='' THEN ISOExtended ELSE ISOCode " +
"END ) AS CurrencyCode FROM " + currencies);
tableEnv.registerTable("currencyTable", currencyTable);


Table result = tableEnv.sqlQuery(
"SELECT RIC,Asset,AssetClass,Exchange,Periodicity,ContractType,CallPutOption,ExpiryDate,StrikePrice,
" +        
        "StrikePriceMultiplier,LotSize,CurrencyCode,AssetState " +
  "FROM currencyTable join instruments on currencyTable.CurrencyId=instruments.CurrencyID ");

//4. create sink and output the result
CsvTableSink sink = new CsvTableSink(System.getProperty("user.dir") + "/data/sql_join_result.csv", ",", 1,
           FileSystem.WriteMode.OVERWRITE);

result.writeToSink(sink);

env.execute();

}
 

 表-3  例子程序

 

问题是join的过程发生了严重的数据倾斜 (data skew )。全世界的金融工具分布式极不平均的, 绝大部分使用美圆,欧元计价 (好比例子程序里假造的instrument的数据源,大概有13/14 的instrument是以欧元计价的)。若是使用经常使用的 hash join 或 sort-merage join, 数据倾斜是必然发生的。 调用joinSmallWithBig,并传入任意字符串 作为joinType 时, 程序调用的registerCurrencyTableSource方法, 该方法就是建立了一个普通的CsvTableSourcei并注册, 以后利用sql join 将两个数据join起来,此时会发生数据倾斜。以下图所示:

图-15 flink-webmonitor 上的currency 数据源

 

  

 图-16 flink-webmonitor 上的instrument数据源

能够看出currency 的CsvTableSource一共有492条记录分4个split读入,数据量确实小,instrument数据源记录数大概14million 条记录有,比较大,一样分4个split读入, 分配不可谓不均衡 。但当join后, 均衡完全打破了,以下图所示:

图-17 flink-webmonitor 上的hash-join

两个数据源的输出策略(或JoinOperator的输入策略)也就是链接数据源与JoinOperator的那条边的ShipStrategyType被设置为一个currencyId 为key的HashPartition。  因为本例中约13/14的instrument以欧元计价,大概有13 million个instrument 记录依照hash code 被发送到一个task的IG里, 剩下约1 million的其余三个task 。

借这张图我们须要重温一下的前面提到的task, RP,RS,IG,IC的关系, instrument数据源 有4 个task因此4个RP, 每一个RP会产生4个RS以装载不一样的hash code,共16个RS 。 下游join operator 有两个IG (分别链接instrument数据源 和currency), 每个IG 的ShipStrategy都是HashParition。每个IG 有4个 IC, 每一个IC 都链接上游标号相同的RS (好比JoinOperator task1的 IG1 的IC1,IC2,IC3,IC4)只链接上游 instrumentSource task1, 2,3,4 的 RS1 。 JoinOperator是一个DualInputOperator, 一般的operator只有一个IG 。

上图中13 million的record received (13,836,997)是joinOperator的第三个 task 的IG1 和 IG2中相同标号IC的记录数量(好比IC2),因为currency数据量很小,构成13 million 这个数量级的来源是instrument中以某几个货币计价的instrument, 如前述主要是欧元 , 此例中欧元计价占比大概为13/14 。

数据倾斜的结果形成joinOperator的第三个task须要多于别的 task的100倍的内存,一般会超过容器预先分配的内存配额。罪魁祸首是 join operator 输入边 的 ShipStrategy : hash partition 。 SQL 里没法指定 join 使用哪一种策略, 为何flink 会在翻译(将sql 翻译成jobGraph)的过程当中将 join 输入册率 优化成hash partition 呢?一个小数据集和大数据集join,最好的方式是将小数据集广播给下游的全部task, 大数据平均分配,而后再join operator内部作hash join, 这样的cost是最低的 。Flink为何不能作到这样的优化呢?若是使用Flink API (而不是 SQL) 状况会好吗?

 

3.3  分析问题设置断点

初步的感受是Flink-optimizer 并无作到这样的优化: 当join的两个数据源大小悬殊时, 小数据集广播,大数据集均分的输入给join operator 。Flink这么经常使用的优化都没作到吗?

让咱们经过debug Flink-optimizer 代码, 看看他时怎么将本例的join input的ShipStrategy优化成HashPartition 吧。

3.3.1. 首先,准备调式环境

下载例子代码,而后编译,package 成flink-arch-debug-0.0.1.jar, 而后下载flink-1.6.1 的代码,编译, 按照图-13设置flink-client的运行选项。

program arguments 设置为 run flink-arch-debug-0.0.1.jar normal sql, 使用 "normal ","sql" 方式, 以下:

 C:\projects\flink-arch-debug\target\flink-arch-debug-0.0.1.jar normal sql

3.3.2. 而后,了解ShipStrategy

Flink Optimizer 的优化过程(参考Optimizer::compile方法)大概是:

首先将利用GraphCreatingVisitor 将Flink API (API, Table, 或 SQL) 产生的Plan翻译成dag 包里的OptimizerNode和DagConnection组成图,

而后从图的SinkNode(一个或多个)先前递归的调用getAlternativePlans方法从产生最优的Plan , 最优的Plan 是由plan包里的PlanNode和Channel组成的Graph 。ShipStratigy是channel的一个属性, 它描述数据从一个operator以那种方式发布到RP中的RS中的。

最后由JobGraphGenerator将最优图翻译成JobGraph

那么有多少种ShipStrategy呢? 连续按两下SHIFT, 再键入类名(ShipStategyType.java)。参考一下本文最后面的IntelliJ的经常使用键吧.

 

public enum ShipStrategyType {   

NONE(false, false), //没有策略
   FORWARD(false, false), //数据按原分区号传递到本地运行的Task相同分区,不跨网络,不须要比较器(用于排序或Hash)
 PARTITION_RANDOM(true, false),//数据随机传递到本地运行的Task任意分区,不跨网络,不须要比较器
   PARTITION_HASH(true, true),//数据按照指定键值的Hash决定分区,到目标Task须要跨网络传递,须要比较器
   PARTITION_RANGE(true, true),//数据按照指定键值的排序顺序决定分区,到目标Task须要跨网络传递,须要比较器  
  BROADCAST(true, false) // 将数据复制到任意一个分区中,不须要比较器
   PARTITION_FORCED_REBALANCE(true, false), // 将数据平均到各个分区中,不须要比较器
   PARTITION_CUSTOM(true, true); // 将数据按用户指定的分区器分配分区,不须要比较器
...
表-4 ShipStrategyType

搜索一下PARTITION_HASH, 看谁将它设置给最优图的Channel 对象, 过滤一下,应该由两个地方:

一个是TwoInputNode::setInput 方法中, 它根据join operator 的JoinHint 或 Parameters 来设置 input Channel的ShipStrategy 。 好比, 若是想让join的第一个input(Currencies)用广播方式传递, 就可以使用”BROADCAST_HASH_FIRST“ hint, 或者将这样的参数"INPUT_LEFT_SHIP_STRATEGY"="SHIP_BROADCAST"传递给join operator 。不过这些都只能在Flink Java API中 使用, (具体参考flink-arch-debug中的ApiJointTest类),没法在SQL API 中使用。

第二个在TwoInputNode::getAlternativePlans中。如前文所述, getAlternativePlans首先会 OptimizerNode (本例中主要是JoinNode) 节点用于产生多个plan , 每一个 plan的输入输出的节点都是相同, 不一样的是边上ShipStrategy 。对于每一个Plan , getAlternativePlans都会调用RequestedGlobalProperties::parameterizeChannel从而设置这个Plan的InputChannel的ShipStrategy 。

在SQL application 中, 用户没法像在使用Java API 时那样经过设置JoinHint或Parameter 来选择想要的ShipStategy , 实际上若是指定了这两项中的任意一个,也等同于跳过了getAlternativePlans (优化过程)。 由于既然用户已选择了想要的ShipStategy,就没有必要用Optimizer 根据计算每一个方案的cost 来选择最优方案了。 但是即便是由Optimizer 来选择, 对于本例(小数据集join大数据集), 根据表-4 ShipStrategyType的 定义,PARTITION_HASH的cost 确定不是最低的 。对于大数据集 (instruments), 若是采用PARTITION_HASH将数据传递给下游, 网络cost 必然是很高的, 由于PARTITION_HASH是一个shuffule 的过程, 下游 必然有位于不一样TM远程Task , 所以网络传递的cost 必然很高。 看看Flink 怎么定义Cost 和怎么估计cost 的?

public class Costs implements Comparable<Costs>, Cloneable {

public static final double UNKNOWN = -1;

private double networkCost; // 上游数据传递到本节点在网络上传输的cost, in transferred bytes

private double diskCost; //本节点缓存数据到磁盘上读写的cost , in bytes, 回忆一下前面说的SpillableSubpartition
 private double cpuCost; // 本节点算法在计算中使用CPU 的costs, 好比Hash, sort , merage, etc 
   
private double heuristicNetworkCost; // 如下时假设的cost, 很是高不许确

private double heuristicDiskCost;

private double heuristicCpuCost;
...

表-5 Cost定义

根据表-5 Flink cost的定义, 一个节点(好比JoinNode)的cost ,包括上游数据由上游传递过来的network cost, 数据本地缓存的 disk cost, 还有算法的cpu cost 。 图-17第二条边(Instrments)使用的HASH_PARTITION策略确定比FORWARD cost高不少, 由于若是使用FORWARD 的network cost 为0 (参考表-4的定义),diskCostcpuCost 跟算法有关(好比使用HashTable或sort merage 去作join)。 第一条边(Currencies)使用的HASH_PARTITION相比BROADCAST, cost 确定低一些, 可是Currencies数据源 数量特别小, 它的cost 和 这两个cost 之间的差异 在Instrments引发的cost面前均可以忽略不记。因此感受上 HH(两边都是HASH_PARTITION)和BF(currency 边Broadcuast, instrument边Forward)相比, BF方案的cost必定低不少。 但为何getAlternativePlans没有选择 BF呢?JoinNode没有给出BF的选择权吗?看看JoinNode::getDataProperties()的代码:

private List<OperatorDescriptorDual> getDataProperties(InnerJoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint,   
Partitioner<?> customPartitioner )
{
...
  
switch (joinHint) {
case BROADCAST_HASH_FIRST:
list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
break;
case BROADCAST_HASH_SECOND:
list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
break;
case REPARTITION_HASH_FIRST:
list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
break;
case REPARTITION_HASH_SECOND:
list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
break;
case REPARTITION_SORT_MERGE:
list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2, false, false, true));
break;
case OPTIMIZER_CHOOSES:
list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2));
list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
break;
default:
throw new CompilerException("Unrecognized join hint: " + joinHint);
}

...
}

表-6  JoinNode的输入节点要求的属性

从表-6  JoinNode的输入节点要求的属性可知, 但由Optimizer 来决定(OPTIMIZER_CHOOSES)输入节点的ShipStrategy(输出分区策略)时, JoinNode对上游的要求时很是宽泛的,几乎就是什么都行 。

SortMergeInnerJoinDescriptor(this.keys1, this.keys2),HashJoinBuildFirstProperties(this.keys1, this.keys2),HashJoinBuildSecondProperties(this.keys1, this.key),
实际上就是他们为JoinNode和它的input定义了三类需求 :
第一, Join算法但是时Sort-merge, 或者用一路输入作HashTable的HashJoin ,或者用二路输入作HashTable的HashJoin
第二, 第一路第二路输入的partition 策略: 也就是所谓的RequestGlobalPropertities, 能够是RANDOM_PARTITIONED (对应FORWARD shipStategyType),或者HASH_PARTITIONED(PARTITION_HASH),或者FULL_REPLICATION(对应BROADCAST),
或者ANY_PARTITIONING(它是一个通配策略,意思是RANDOM or HASH均可以)
第三,输入的排序策略 :就是所谓的RequestGlobalPropertities, 排序或者不排序。
想想这三类需求的可选项组合起来,一共会有多少组合? 弄个表, 描述一下可能更清楚些。
     序号                Input1 的 
partition 备选 策略
 Input2 的 
partition 备选 策略
算法  Input1/Input2 的 排序备选策略  兼容性   network Cost   
1
FORWARD
 
HASH
SortMergeInnerJoin
 sort/sort yes  
2
FORWARD
 
HASH
 
HashJoinBuildFirst
 sort/sort  yes  
3
 
FORWARD
 
HASH
 
HashJoinBuildFirst
 not sort/ not sort  yes  
4
 
FORWARD
 
HASH
 
HashJoinBuildFirst
 sort/not sort  yes  
5
 
FORWARD
 
HASH
 
HashJoinBuildFirst
 not sort /sort  yes  
6
 
FORWARD
 
HASH
 
HashJoinBuildSecond
  sort/sort  yes  
7
 
FORWARD
 
HASH
 
HashJoinBuildSecond
 not sort/ not sort  yes  
8
 
FORWARD
 
HASH
 
HashJoinBuildSecond
  sort/not sort yes   
9  
FORWARD
 
HASH
 
HashJoinBuildSecond
 not sort /sort  yes  

表-6  GloabalProperties, LocalProperties 和算法的组合
从表-6 可知, 对于input1和input2的partition策略(表-6实际上用的是shipStategy, 他们是1-1对应的)一个组合FH(Forward, Hash ), 一共有9个能够接受的分区-算法-排序的方案,每一种方案都有本身的cost, getAlternativePlans会最后选择cost最低的方案。 除了FH组合, 合法组合还有
HB:Hash-Broadcast
HH:Hash-Hash
BB:Broadcast-Broadcast
BF:Broadcast-Forward
FB:Forward-Broadcast
固然还有HF,FH ,FF,这些都是不合法的,对于join操做会有数据丢失的组合。 6个合法的input1和input2的partition策略组合,根据表-6能够计算, 一共能有6*9=54可互相兼容的分区-算法-排序的组合方案 。实际上若是在TwoInputNode.java:516行上设计断点 (getAlternativePlans方法内)并运行例子程序,
你会发现变量 outputPlans里有90个备选方案,那么其中36个必定是重复和多徐的。

那么, BF方案是既然在备选方案里,并且它的cost理论上是最低的, 并且getAlternativePlans的算法是选择cost最低方案, 为何cost并不低的HH方案最终被选择了 ? 答案只能有一个 : cost estimation有问题。

  

3.3 发现问题和给出解决方案 

 不贴Cost, CostEstimiator, DefualtCostEstimator (costs包的三个类)的代码了。总起来所, CostEstimator 是根据OptimizerNode 的 estimatedOutputSize 成员变量(包括本节点和数据节点的)来计算 network和disk cost 的 , estimatedOutputSize是由节点成员函数computeOperatorSpecificDefaultEstimates()设定, 能够参考DataSourceNode中该函数的实现:estimatedOutputSize 被设置成来自于InputFormat的数据源实际物理尺寸。 但是FlatMapNode, MapNode 只是简简单单的设置了estimatedNumRecords (使之同 source的 同样),而并无计算estimatedOutputSize 。

重看一下图-4的ExcutionGraph可知, Join的第一个input是FlatMap , 若是Flatmap没有estimatedOutputSize 会致使它的下一个节点JoionNode没法计算cost. 请参考DefualtCostEstimator::addHybridHashCosts()的实现, 当上游的estimatedOutputSize为负数(Unknown)时,本节点的cost为被设置为Unknown 。Unknown cost在选择cheapest cost的plan时是不能作比较的, 只能依靠假设的Cost (如heuristicNetworkCost ), 这个东西很不靠谱, 同时也是HH方案被选上的缘由。

解决方案应该修改Flink代码在FlatmapNode::computeOperatorSpecificDefaultEstimates 加上对estimatedOutputSize的估计(好比保持和上游相同, 或加一个discount), 或者修改DefualtCostEstimator 使之 在计算Cost时能经过estimatedNumRecords估计estimatedOutputSize (下策, 没有 前者好)。 总之,Flink Optimizter 在 Cost Estimation 作的不够好, 改善是应该的, 我刚刚看了Flink 1.9 (最新版)的 Optimizer , 这一块仍是没有修改。忽然还有加入Flink community 的冲动。

 若是不想修改 Flink 源码, 或改了也没法发布, 那就看看有没有替换方案了。

 

...
if (child1.getGlobalProperties().isFullyReplicated()) {
// fully replicated input is always locally forwarded if parallelism is not changed
if (dopChange1) {
// can not continue with this child
childrenSkippedDueToReplicatedInput = true;
continue;
} else {
this.input1.setShipStrategy(ShipStrategyType.FORWARD);
}
}
...

表-7 FullyReplicated DataSource 。

这段代码表示若是上游节点是一个FullyReplicated DataSource , 那么就不须要备选方案的选择过程, 直接将这个输入边的shipStagy 设置为Forward 。 FullyReplicated DataSource意思是, 这个Data Source 的每一个Parition输出的都是数据源的全集 而不是不部分。那么只须要将Currencies作成这种数据源,问题不就解决了吗?!

虽然ShipStategy是Forward, 但实际下游(Join Node)的每个Task都获得了Currencies的这个数据集的全集, 这个同Broadcast的效果是同样的。并且第一路为F的组合有FF, FB , FH , 选择的范围小了。 这不对阿,前面不是说FF, FH, HF是不合法的吗? 要注意此时的F是因为FullyReplicated DataSource而既决定的F,不是普通的Forward , 是合法的。FF, FB , FH产生了27种可选方案(参考表-6), 根据heuristicNetworkCost , FF是cost最低的。 那确定是最低的阿, Forward的数据都是本地传输的, network cost 是 0 。

下面就是如何将Currencies作成FullyReplicated DataSource了, 根据DataSourceNode.java:92, DataSource 的InputFormat只要是ReplicatingInputFormat, 则就是FullyReplicated  。在看看ReplicatingInputFormat的代码, 只须要将原有的InputFormat (好比CsvInputFormat)外面包一层ReplicatingInputFormat就能够了, 具体的看例子的代码吧 : SqlJoinTest.java:95。运行结果以下。

 

  图-18 flink-webmonitor 上FullyReplicated Currencies, 每个partition都输出数据集的所有, 492条记录。

 

 图-19 flink-webmonitor 上instrument , 跟以前没什么区别。

 

 图-20 flink-webmonitor 上的join , 数据在各个并行Task上很是均衡。

 

虽然这个例子只用到了Flink-client上的知识, 并无与JM和TM联合调试, 可是问题的发现,分析和解决的方法是同样的。 都是须要根据对架构的认识,缩小怀疑的范围,然会反复阅读和调式相关代码,找到问题,以及找到解决方案, 试想一下,若是没有架构的知识,你怎么可以怀疑这必定是Flink-optimizer的问题呢? 并且有时解决问题的方法并不必定是直接修改的出问题的代码(固然这样是最好的方案),根据相关代码找到一个合适的替代方案也是解决问题的方法。

没有想到描述一个这样的小问题,这一章运用这么长的篇幅, 对于复杂的问题,简直能写本书了。但愿抛砖引玉,可以运用这样的方法学来解决使用Flink遇到的问题: 架构->缩小范围->阅读和调式代码->发现和分析问题->找到解决方案。

 

 3.4. IntelliJ的经常使用键

CTRL+ALT+Enter to complete line

SHIFT+SHIT search class in source code

CTRL+SHIFT+F serarch string in soruce code

Alt + F1 show current java in project view
CTRL+ALT+B navigate to implementation classes


CTRL+U navigate to super method
ALT + F7 Show reference
ALT + 4 Show run window
ALT + F12 Show terminal window

ALT+Enter Create A testClass
ALT+Insert TestMethod

CTRL+home move to file start
CTRL+end move to file end

 

参考

Flink conference Page: https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals

很不错的Blog: http://www.javashuo.com/article/p-khdfegoi-mh.html

Flink1.6 documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.6/

Flink Github: https://github.com/apache/flink

相关文章
相关标签/搜索