工做中用Flink作批量和流式处理有段时间了,感受只看Flink文档是对Flink ProgramRuntime的细节描述不是不少, 程序员仍是看代码最简单和有效。因此想写点东西,记录一下,若是能对别人有所帮助,善莫大焉。html
说一下个人工做,在一个项目里咱们在Flink-SQL基础上构建了一个SQL Engine, 使懂SQL非技术人员可以使用SQL代替程序员直接实现Application, 而后在此基础上在加上一些拖拽的界面,使不懂SQL非技术人员 利用拖拽实现批量或流式数据处理的Application 。 公司的数据源多样且庞大,发布渠道也很丰富, 咱们在SQL Engine 里实现了各类各样的Table Source (数据源) , Table Sink (数据发布)和 UDF (计算器), 公司里有不少十分懂业务专业分析员,若是他们真的能够简简单单,托托拽拽的操做大数据,创建计算模型,而后快速上线和发布,这样的产品应该前景广阔。java
但是后台并不是提及来这么简单,SQL使用不善,难以达到业务想要的效果,数据量一上来各类问题会出现,后端须要大量的优化工做, 好比 数据倾斜, 是最常发生的事情。SQL基本上是一个Join Language。用户常常会将一个大数据源和一个小数据源作Inner Join, 若是大数据源的数据项很大部分都使用极少数的几个join key, 就很容易出现数据倾斜。现实倾斜的或不均衡的,好比国际资本>80%以用美圆计价,世界人口50%属于某两个国家, 财富主要有20%的人拥有, 等等 。 Flink 若是把SQL join 执行成Hash Join, 最后的结果是不管你实现分配了多少个TaskSlots, 若是80%的数据都跑到某一个TaskSlot里,缓慢运行直至将个这Slot的资源耗尽,整个job失败。这种状况最好是将小数据集广播给全部的下游通道, 大数据集按原始的分片并行,这样的join因分配均衡而快速。然而标准SQL里没有办法指定joinhint , Flink sql也不支持这个,只能经过debug flink 来看看哪里能作一些改变解决这个问题。咱们在最后一章,从Flink client , flink optimizer, flink run-time (job manager, task manager) 一步一步的 在源码里设置断点, debug, 将数据流过一遍,看看有哪些方案能够将这个小数据集合广播起来。git
为了使本文读起来流畅一些, 我先经过几个章节大概介绍一下Flink 。本文关心架构, 因此不会涉及不少关于API的东西(好比Flink streaming 的windowing, watermark, Dataset, DataStream, 及SQL的API等, 网上应该有不少关于这些的文章)。只是想大概梳理一个Flink的架构,使架构对应到源码结构里, 了解一下Flink的 Graph metadata, 高可靠性的设计,不一样cluster环境里 depoloyment的实现等, 最后利用IntelliJ IDEA 通過一個小例子带你们debug一下Flink 。若是对flink的架构有较好的理解(好比主要类及metadata),就比较容易在准确的地方设置断点,debug Flink代码将更有效率,从而解决问题就会更有效率, 这就是本文的目的。大概了解一下框架,但并不会面面俱到。当若是你须要深刻了解一下Flink某方面的细节, 本文可以告诉你入口在哪里,或者经过对架构了解过程当中获得的common sense , 再加上一点想象力, 你或许直接可以获得解决问题的方案, 而后再经过阅读源码及调试来加以验证。 程序员
(图-1 Flink Runtime 来自:https://ci.apache.org/projects/flink/flink-docs-release-1.6/concepts/runtime.html)github
关于架构,先上一个官方的图,尽管对于Flink架构,上图不是很准确(好比client与JobManager的通信已经改成REST 方式, 而非AKKA的actor system),咱们仍是能够知道一些要点:web
(图-2,JobManager的内部结构) 算法
如上一章所述, JobManager 是一个单独的进程(JVM), 它是一个Flink Cluster的 master 、中心和大脑, 他由一堆services 组成(主要是Dispather, JobMaster 和ResourceManager),链接cluster里其余分布式组件 (TaskManager, client及其余外部组件),指挥、得到协助、或提供服务。sql
从以上所述, JobManager是一组Service的总称, 其中真正管理Job调度的组件叫JobMaster ,负责资源管理的组件叫ResoruceManager, 负责接收client端请求的组件叫Dispatcher(包括Dispatcher和DispatchRestEndpoint)。其实Flink源码里有叫JobManager的包和类,功能上也是负责Job调度管理以及snapshot管理,但它应该在Flink某个版本之后就legacy了(估计是从version1.3开始)。这三个服务统称为还叫 JobManager,上真正管理做业的是JobMaster。这一点在读code时让人迷惑,好比JobManagerRunner启动的倒是叫JobMaster的类。可是他不叫JobMasterRunner,这也体现了JobMaster实际是取代了JobManager类,保留legacy类是为了向后兼容。如下是Client, 展开的JobManger(受HA 保护的Dispather, JobMaster, ResourceManager)和TaskManager处理submitJob的流程图,这个比较图-1更能体现当前的Flink runtime架构 (Flink 1.6):docker
(图-3)展开JobManager后的Flink 架构, 来自于《 Stream Processing with Apache Flink》shell
以上的架构严格来说在Flink里被称做 SessionMode ( Cluster的EntryPoint类都是SessionClusterEntryPoint的子类), 若是没有外部命令 terminate cluster, 在这种模式下的FlinkCluster 是Long running 的, 多个job能够同时运行在同一个flinkcluster里。 SessionMode 在Flink的各类部署都是支持的, 包括Standalone, Kubernetes, Yarn, Mesos, 上图实际上是StandaloneSessionCluster的流程。 还有一种模式叫作JobMode, 区别就是Job(或application) 的main class 和 jar 和在JobManager 启动时经过的启动参数装载的, 不须要submitJob的过程, job运行完毕, cluster自动终结, 全部资源释放。 在这种模式下, Dispather并不负责处理job的提交, 但其余 Client发给DispathcherRESTEndPoint的请求(好比Query, CancelJob), 仍是由Dispatcher处理。
Flink的每一种部署模式(deployment mode)都是既支持Session Mode又支持JobMode的 (或partialy support), 区别如上所述, 但在架构上是一致的。 当有由外部的ResourceManager协助硬件资源分配时,流程略有全部不一样, 以 FlinkCluster in Yarn 为例, SessionMode下, 区别只限于多了RM经过Yarn自动启动TM 的过程(4,5)。
(图-4)FlinkCluster in Yarn Session mode, 来自于《 Stream Processing with Apache Flink》
关于deployment的细节,请参照后面的将Deployment的章节。
如图一所示,JobMaster的主要工做是:
1. JobGraph的scheduler : 将Client提交的JobGraph按照逻辑的向后关系(source -> transform -> sink), 以及并行关系(每一个operator的子任务只负责所有数据的中一部分), 将子任务分配到TaskManager的Slot中, 并按期的获取每个子任务的运行状态 (status)。
2. 触发和管理Job的checkpoint snapshot:对于streaming job,按期的将运行中的每一个operator 的状态(State)数据存入规定的存储设备, 这些state数据能够用于在Job恢复运行时,恢复相关子任务的失败前的现场。
(图-5)JobMaster内部结构
EG是面向Job 并行运行的图结构,在JobGraph的基础上它加入了对Operator并行执行的子任务,以及子任务的输入输出的描述 。
图-6 Execution Graph
Execution: 但EV被分配执行时,Exception对象会被建立做为EV的一次尝试, 分配slot, 将EV打包 成 TDD(TaskDeploymentDescriptor)并同TaskManagerGateWay 发送给TM (submitTask) 执行。Exception若是失败, 新的Exception会被建立做为另外一次尝试。
EG的Scheduling模式由两种,一个叫Lazy, 一个叫Eager 。
Lazy的方式适用于Batch Job, 它先将全部的处理数据输入的sub task 分配执行, 当TaskManager 返回 (同过JobMasterGateway) 已分配成功的信息, EG在根据EJV的上下游关系, 再给相应的EJV分配slot执行。分配的过程如上一小节所述, EJV全部EV都会经过TDD打包,而后要求SlotPool提供slot, 而后将tdd和slot信息都发送给相应的TM去实例化这个sub task而后运行起来 (再TM细述这个时怎么实现的)。值得一说的是, EJV全部EV都应该一块儿scheduling , 但当集群里没有足够的slot时, 同一个EJV可能只有部分EV被schedule了,若是那些没有分配的相同EJV的EV再一个timeout(default 5分钟)以后还没法获得slot, task 这时候会失败, job 也会失败。因此在计划Job使用的资源时,计划的总slot数 (好比当使用Yarn管理resource 时, yn * ys 是job向Yarn申请的总slot数 )必定要大于总的source sub task的总数量(source operator 的数量 * paralelism ), 不然部分soure sub task 得不到资源,再timeout以后就会出发failGlobal 使job 失败。
Eager 方式使用与Streaming job, 在Eager模式下, 全部EV都必须都能获得slot, 不然schduling 失败, job 失败。
Flink的Checkpoint这个概念仍是有必要简单说明一下, 固然参考Flink文档会获得更全面的理解。Flink主要是一个Stream computation的架构,(固然它也能够作Batch, 但Batch并非Flink的强项), Flink Streaming processing 的一个特性就是Stated Streaming 。 意思就是在它的流式计算的工做流里, operator的都是能够有状态的。什么是有状态的?至关于一我的睡醒以后还记的本身是谁,而后还能继续下来的生活,作完没有作完的事情的意思 : 由于大脑里存储了过去的信息。Flink 的Operator能够像这样生活的,建立一下StateFull的变量 (好比ValueState<T> ), 而后周期性的将这些状态存储一个地方 ,当job重启, operator 从新实例化的时候, 经过加载这些Sate信息,就可以重新回到上次重启前的状态,而后继续这个operator的人生。周期性的(Periodically)存储operator的State 信息, Flink称做Checkpoint, 每一次存储叫作一次snapshot 。
CheckpointCordinator的主要功能就是协调促使工做流里全部的operator都周期性的触发Checkpoint snapshot 。
换句话说,CheckpointCordinator的主要功能就是向全部的Execution (看前面回顾一下Execution的概念) triggerCheckPoint 。 每个EG都会建立一个CheckpointCordinator (CC), CC用内建的timer (Executor)定时(根据可配置的interval)的经过RPC向全部的TM触发他们运载的全部Source Exection对应的Task的CheckpointBarrier。 上一句比较长,分解来讲, 每个EG的Execution 都会对应一个Task (准确的说时SubTask)运行在某一个TM 里, triggerCheckPoint 就是CC定时的经过RPC调用全部 TM 上全部的Source Task ( 是工做流里开始位置的处理Source 数据的Task, 不包括Sink Task 也不包括 普通的Transformation Task ) 的triggerCheckpointBarrier方法。当一个SourceTask收到triggerCheckpointBarrier时, 它会命令内嵌的Invokable对象 (Operator, 或Operator Chain的封装对象)执行 performCheckPoint , 这个过程大概有以下几步, 不少多步骤的时异步执行的:
至于CheckPoint怎么配置,State数据, StateBackend 包含那些, 以及CheckpointBarrier再工做流里的联动过程, 我就不赘述了,网上 应该不少介绍, 不过经过代码阅读,想强调以下几点。
TwoPhaseCommitSinkFunction
。https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html。 大概意思时SinkOperator向external 下游发送数据时须要分两步走(TwoPhase), 不过目前没目前 Flink1.6.1 的 Sink都没有实现这个功能 。
在源码里, TaskManager 类同 JobManager被JobMaster取代 同样,TaskExecutor取代了legcy TaskManager 并发挥着它的做用。本文里TM指的是TaskManager的整个进程, JM表明JobManager整个进程。JM的核心类是JobMaster (固然还有ClusterEntryPoint, Dispatcher, WebMonitor和ResourceManager, 可是都起的是辅助做用), TM里的核心类是TaskExecutor 。还有一个比较混乱的Term就是Task。Task对应JobGraph的一个节点,是一个Operator。在ExecutionGraph里Tast被分解为多个并行执行的subtask 。每一个subtask做为一个excution分配到TM里执行。但比较然人抓狂的是在TM里这个subtask的概念由一个叫Task的类来实现。因此TM 里谈论的Task对象实际上对应的是EG里的一个subtask ,若是须要表述Task的概念,用Operator。先澄清一下Terminology ,以避免语言混乱。
图-7 TaskExecutor
我的以为Task之间的数据交换,是TM里的核心和重点,也是 Flink runtime的核心、重点和难点,它的质量是区别于其余大数据系统的关键,它的质量直接影响Streaming/Batch 任务的延迟及吞吐量两大指标。人们选择一个大数据流式框架最想先了解的就是这两个指标, 至因而不是Statefull, 可靠性可用性集成度怎么样,编程接口是否简单不是不重要,但不是最关键的,是次要考虑的。因此本节做为本文的重点,咱们深刻解读一下TM管理的数据交换。
首先强调一下Task之间的在网络里或在本地的数据交换, 不是Task管理的,是由TM (或TaskExecutor)管理的。以下图所示。
图-8 JM和TM的数据交换
JM将EG的Execution 提交给 TM,TM将Execution转化为Task执行, 并负责Task之间的数据传输.
回想一下EG的数据结构:
(R
图-9 EG 的数据结构 和 数据流
为了将JM的EG的逻辑工做流在物理机上执行起来, TM建立了相应的物理数据结构。
ResultPartition(RP)概念上对应着EG中的IRP(IntermediateResultPartition), 它负责存放一个Task生成的全部结果数据。
ResultSubpartition (RS) 是对应着RP的数据Repartion以后shuffle数的中的一个parition ,它存放着RP从新分区后发送下游的某一个sub task的分区数据。
InputGate(IG): 在EE中, IRP是source, EV 是target, 它描述了分区数据的流向, 但对不物理实现这样的结构仍是不够的。 IG 由此而引入, RP是EE上的数据发送端(IRP的物理实现),IG是接收端上与RP功能相似的组件, 负责收集来自上游(RP)的数据区 (data buffer)。
InputChannel(IC) 是接受端与RS功能相似的组件,IG使用不一样的IC链接不一样EE上的RP 中特定分区的数据区。 好比在data shuffle的过程当中多个RP都产生键值为a1的RS , 这些RSa1中的数据最终都会流向同一个IG中的IC, 这其中每个IC承担接受上游的RP中每个RS-a1中的数据。
数据在EE上的通信, 由RP到IG ,数据是以binary 形式传输的。也就是说数据进入RP前,会由总Serializer 将data record 序列化成binary format, 并存入data buffer 中。 Data data buffer 由IG 接受传递给下游的Oparor 前, 由Deseriealizer将数据从DataBuffer反序列化成data record, 供该Task消费使用。 Data buffer至关于高速公路上运输乘客的大巴车, buffer中的数据至关于乘客 。 每一个大巴车的形状, 运载量也是固定的, 不装满不发车 。它能极大的增长了整体数据传递传输量和创数率, 增长系统的吞吐量, 但同时也增长的单个数据的延迟 。缺省状况下2048 个buffer会被建立, 每个32K字节 。对与比较大的record 须要多个buffer 承载 。每个RS和IC有一些大buffer组成, RP和IG就是这些大巴车的装载者和卸载者 。
关于DataBuffer若是建立和管理, 参考后面的内存管理章节。
另外值得一提的是, 不一样的对于RS的实现决定了实际的数据传输的方式。
PipelinedSubpartition支持streaming模式下的数据传输 (大部分的RS都是这种实现): 数据压满一个buffer (buffer size是configuration 指定)就向下传递 。 SpillableSubpartition 只有当RP的类型为BLOCKING是才会建立出来(Batch job 中的部分RS 是这种实现)。它支持在事先分配的Buffer不够用的状况下, 将Buffer中的数据Spill到硬盘中,从而该RS占有的Buffer得以释放。由于他会涉及IOManager (应该只有它会使用到IOManager), 因此我们细说一下。
那什么是BLOCKING类型的? 从ResultPartitionType是这样定义:BLOCKING(false, false, false)能够看到, 3个false 分别以下:
简单来讲只有Batch job 里的一些RP类型是Blocking的 ,由于BatchJob总一些须要shffule 输出的operator才会有才会启动Blocking模式, SpillableSubpartition才会被建立。
注意 上面说到了的三个模式:概念比较混乱。
1. JobMode (Batch or Streaming):任务模式, 因为决定ExecutionConext
2. ExecutionMode (PIPELINED (default), BATCH, PIPELINED_FORCE, BATCH_FORCE):可配置的数据流总体模式, 目的是经过一个可配置的ExecutionMode(可在ExecutionConfig中配置)来决定全部Operator的DataExchangeMode。它是Optimizer在优化Edge的shipStrategy和DataExchangeMode作策略选择的依据。 详细看DataExchgeMode代码中,DataExchgeMode与ExecutionMode的Mapping关系。
3. DataExchangeMode(PIPELINED, BATCH, PIPELINE_WITH_BATCH_FALLBACK ) : 根据下游Operator,和 ExecutionMode, 由Optimizer 决定 数据下发模式 ,
下面是当JobMode 为Batch, ExecutionMode 为PIPELINE时, DataExchangeMode应该优化的结果。能够看出来只要下游须要Shuffle, DataExchangeMode就会被优化成BATCH模式, 此时Flink会建立SpillableSubpartition 。
DataExchangeMode.PIPELINED, // to map
DataExchangeMode.PIPELINED, // to combiner connections are pipelined
DataExchangeMode.BATCH, // to reduce
DataExchangeMode.BATCH, // to filter
DataExchangeMode.PIPELINED, // to sink after reduce
DataExchangeMode.PIPELINED, // to join (first input)
DataExchangeMode.BATCH, // to join (second input)
DataExchangeMode.PIPELINED, // combiner connections are pipelined
DataExchangeMode.BATCH, // to other reducer
DataExchangeMode.PIPELINED, // to flatMap
DataExchangeMode.PIPELINED, // to sink after flatMap
DataExchangeMode.PIPELINED, // to coGroup (first input)
DataExchangeMode.PIPELINED, // to coGroup (second input)
DataExchangeMode.PIPELINED // to sink after coGroup
由于batch 工做模式下的 shuffle一般会伴随的对整个group的排序, aggregation等, 下游须要获得(该group的)全集才可作这些操做。全局数据量比较大, 物理内存Buffer极可能不够用, 这时候SpillableSubpartition(在IOManager的帮助下)可将一部分硬盘当Buffer来用, 者极大的帮助了RP端的数据缓存。但SpillToDisk必须实如今RP端吗?不能够在接受端(InputGate)实现吗? 接受端的计算须要作密集的内存访问(sort, hash, etc ), 这些算法都是在整个数据集上的操做, 因此数据须要缓存在MemoryManager管理的MemorySegment中 从而提升存取效率, 也能为使Flink对内存作有效的管理。可现实是Flink的BatchJob须要消耗巨大的内存, 这跟接收端不使用硬盘作Buffer由很大关系。至少Flink 1.6.1的Memory是不使用硬盘作缓存的,虽然有HybridOffHeapMemoryPool, 并且它也使用了DirectMemory 来分配MemorySegment, 但并无实现用磁盘文件来映射内存 , 全部当上游数据量很大, 但内存不足时, Flink task会很快的out of memory。以后看看Flink的最新版本是否有改善。
R
图-10 使用EG控制物流数据流(数据交换)
图-10描述了Task之间数据交换的大概流程。
图-11给出了跨TM的两个Task之间的数据交换的更多细节。
1.3.2 Task 提交与执行 (TDD, Task, AbstractInvokable , Driver , ...)
根据前面的介绍,Task是由JM(经过EG) 通过Scheduler (申请和Offer slot resource和根据schedule 策略,等等) 提交给TM执行的。申请resource 和提交Task都是经过JM, RM 和TM之间的RPC通道完成的。那么具体提交了什么呢?Task若是执行的呢?Task如何知道谁是上游从而创建InputChannel的?
实际上这是一个 从EV->TDD->Task->AbstractInvokable->Driver ->具体的Oparator 实现类变形的过程。
1. 从EV 到TDD :
TaskDeploymentDescriptor(TDD) : 是TM在submitTask是提交给TM的数据结构。 他包含了关于Task的全部描述信息:
不难看出,EV包含上述的全部信息,EV的一个方法createDeploymentDescriptor,完成了上述变形。JM 在向TM submitTask时,传递的是TDD不是EV。为何要作此变形的而不是将EV直接传过去既然他们很相似? 我想这个一个设计模式问题, EV是做为ExcutionGraph的中的顶点 ,它最好只存在于JobMaster 的物理图中, 而不是做为参数传递给其余组件,从而维护它的独立性和单一性。参见Descripor设计模式。
当TaskExecutor(TM)接受submitTask 的RPC调用从而获得TDD时, 他会将TDD实例化为一个在TM上能够执行的对象 : Task 。
2. Task :
Task 是一个Runnable 对象, TM接受到TDD 后会用它实例化成一个Task对象, 并启动一个线程执行Task的Run方法。
Task实例化时, 他会将TDD中的IGD实例化成InputGate (IG) 和 InputChannel(IC), 将RPD实例化成RP . 在 Task 的Run 方法被调用时, 它根据TDD的 TaskInfo, 使用URLClassLoader将用户的operator类从HDFS
加载, 并实例化TaskInfo所描述的AbstractInvokable对象, 并将IG, IC, RP , 还有其余全部的AbstractInvokable须要的服务类(MemoryManager, IOManager, CheckoutResponder, TaskConfig etc )都传递进去, 而后调用AbstractInvokable 的 invoke 方法。
3. AbstractInvokable
如以前给出的一些例子 : DataSourceTask, DataSinkTask, BatchTask, StreamTask 。 它们是Task.Run()时, 经过TaskInfo加载并实例化的。 这些Task Operator的源代码都在
org.apache.flink.runtime.operators 或org.apache.flink.streaming.runtime.tasks下面 。每一个BaskTask的须要的工具都是相似的, 只是计算的流程不一样 , 因此BaskTask的invoke方法调用时, 它根据TaskConfig(信息继承与TDD的TaskInfo) 加载和实例化不一样的Driver 类, 并调用Driver的run方法由Driver指挥流程(input, calcuate, out , etc )。
4. Driver
Driver类和AbstractInvokable位于同一个包内。 每个Driver的run() 大都是一个循环。 不停向IG 要next record , 写metrics, 调用function 计算, 而后见结果发给RP。只不过有些drver一次计算须要两个record, 有些driver 须要两个record 来自不一样的IG , 有些须要将全部的input所有收完才计算, 有些在window expired后才计算, 等等。下面是FlatmapDriver 的run() 。
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
在stream模式下数据处理是有界限(bondary)的, 每一个window的处理所使用的内存是相对比较小 的, 因此Flink stream job 一般使用的内存较小。
但在batch 模式下, 数据处理是无界的 (所谓无界就是没有Window),如前面所示, 不少job 须要将上游全部的input都取干净,才开始计算, 如sort, hash join, cache 等, 此时所须要的内存是巨大的, 好比上游operator 读取300G文件而后map成record, 下游operator 须要将这些record 同另外一组输入作outer join 。因此Flink内存管理主要针对的是这些batch job 。
总起来将, Flink的内存管理仍是比较失败的, 至少在我用的版本里(1.6.1) , 主要缘由仍是 MemoryManager并无联合IOManager Disk去扩展内存。但我觉的MemoryManager的引入, 就是为了管理Flink的内存, 以防止OutOfMemoryException的发生 (如Spark同样 )。 防止溢出最直接的方法就是当系统内存不足或超过throttle时, 使用DiskFile以补充内存 , 从而完成 那些内存消耗巨大的操做。 只是version 1.6.1 还没作好 ,或许之后版本会作好 。
先看下Flink的内存管理机制吧 。
从概念上将, Flink 将JVM的heap分红三个区域。
"taskmanager.network.numberOfBuffers"调整。
"taskmanager.memory.fraction" 配置。
"taskmanager.memory.size"
配置一个绝对值, 好比10GB。
Network buffers pool 和 memory segment pool 都是TM启动时就分配的, 它们生存于JVM的老年代, 因此不会被GC回收的。只有Free Heap区域在新生代里生存。
IG/IC, 和RP/RS 的record存储在network buffer 里,有些RS (SpillableSubpartion)须要Spill to Disk ,所以他们须要IOManager 的帮助。
须要sort/hash/cache的Task瞬时内存消耗很是大,所以从IG接收record后就将他们存储memory segment pool, 以后在对serialized record 应用 算法 。
其余的Task大都是pipeline 方式, 来一个消费一个, 瞬时内存消耗不会大。此时这些Task会使用FeekHeap 区域的内存。
若是没有人为错误, 系统不会有瞬时的大量内存申请, 因此不会有OutOfMemoryException , 因此FreeHeap的区域应该是比较轻松的。
但是问题是, memory segment pool 目前没有使用DiskFile 做为OffHeapMemory, 它可以装载上游下来的巨量 blocking 输入吗? 这就是Flink 1.6.1 的问题。不过这个MemoryManager实现的好很差的问题, 设计上是有防止OutOfMemoryException 的组件的。这个就期待Flink 新版本 补强这部分功能吧。
前面说“serialized record 应用 算法“,这是怎么作到的呢? Flink 实现了大量的 TypeSerializer 和 TypeComparator, 他们懂得record 是如何序列化到字节数组里的,因此也知道如何将record部分地deserialize 。一般算法只是使用record的某个或某些field。 部分deserialize极大的下降内存使用, 提高了数据存取的速度, 从而极大的提成了内存的使用效率。 serialized 形式的record 数据是Flink 可以将他们在TM之间, 跨进程和跨网络的传输, 它是分布式系统须要的数据形式, 同时, 它也为MemoryManager 将它们在内存和硬盘之间交换提供了条件。
IOManager 使用FileChannel将MemorySegment同DiskFile创建映射, 从而实现将数据在MemorySegment和DiskFile之间写入和读回。
因此,虽然MemoryManager目前还有些问题, 在这种设计下, Flink的内存使用效率确定会进化的极好 。
RM在Flink内部是Flink cluster 里slot资源的管理者 , TM 提供slot,JM (JobMaster)消费slot 。 RM同JM和TM 都要保持心跳,以保持slot市场的活跃 ,以及在TM或JM失败的时候通知给对方。
总起来讲RM的做用主要包括:
第2、三项功能比较好实现。第一项功能须要同外部的集群管理器合做才能实现。所用RM是一个随环境不一样而不一样的组件。在不一样的集群环境里, RM有不一样的实现类。
市场上比较流行的集群资源管理器主要有Yarn, Mesos, Kubernetes, 和 AWS ECS 。其中Yarn, Mesos中, 能够利用ApplicationMaster/Scheduler是资源调度器,只要将RM在ApplicationMaster中运行起来,理论上 RM就能够Yarn, Mesos的master通讯,为TM分配容器和启动TM。实际上JM的全部组件(Dispatcher, RestEndPoint, JM, ClusterEntryPoint, etc)都在ApplicationMaster启动的。
Kubernetes和ECS 没有ApplicationMaster的概念, 但TM能够做为一个有多个副本的deployment (K8s) 或 service (ECS) 运行。此时 TM 的多少(规模)是由deployment/service 根据必定策略自动扩容的而不是根据Flink须要的slot数量。Flink并无实现特殊的ResourceManager和K8s/ECS集成,此时的FlinkCluster使StandaloneCluster。若是能一个K8s/ECS 特殊的RM和集群的master通信,使TM可以能按需扩容而不是自动扩容,那就和在Yarn里同样完美融合, 并且可以用上Docker的服务, 毕竟Yarn目前只是使用JVM做为容器,并无真正达到真正资源的隔离 。
RM在不一样cluster以不一样的方式启动新的TM以补充slot资源,固然当一个job 结束时,它也会同集群的master通信,释放container,关闭彻底空闲的TM。
在不一样的集群环境里,RM可以管理TM的生命周期,那么谁来启动和结束JM 呢? 请参考后面的 Flink Deployment 。
Flink的 HighAvialbility 主要有两个服务组成 : LeaderEleketronService和LeaderRetrievalService 。
该Service是用来选举Leader的 。假设系统里启动了两个Dispatcher 。先回顾一下什么是Dispatcher , 看图-2, JobMananager 的进程是ClusterEntryPoint的main 函数启动的,ClusterEntryPoint启动了 WebUI, Dispatcher 和ResourceManager。当用户提交了 Job时, Dispather 实例化一个新的JobMaster 来管理这个job, Dispatcher也是CheckPointBarier 的发起者, 同时也是来自WebUI REST 后台的请求的处理者。Dispatcher 的做用承上启下, 做用很是核心,不可缺失,因此Flink启用HA服务来保护它。 Flink 系统里, 使用HA 保护的核心服务还包括ResourceManager ,JobMaster 还有WebUI 的REST Endpoint。当系统里启动两个Dispatcher 谁来当Leader呢? 这就要LeaderEleketronService (LES) 的决定了。
目前Flink的起做用的LES使用ZooKeeper 实现的。实现方式就是两个Dispather都试图抢占的特定ZooKeeper Path (dispather latch path) 的LeaderLatch(参见 curator framework),谁先抢到了, 就被选举成Leader , leader的 AKKA URI 和UUID被被写道一个特定的ZooKeeper path 中(leader path) 。只有Leader 是工做的,由于其余组件在使用Dispather时会向获取Leader Dispath的URI, 而后才RPC的 。 其余竞争者不会接受到RPC, 他们只有继续监听,若是当前的Leader 退出了, LeaderLatch被释放了, 从而新的leader会被选举出来。
那么对于Dispatcher的使用者, 怎么知道谁是Leader呢? 这就须要LeaderRetrievalService了 (LES ) 。 对于用ZooKeeper 实现的LES, 它只须要监听一下leader path, 它就知道谁是leader 了 。
好比, 当你用fink run 命令提交job 时, 假如系统里有多个Fink REST Enpoint, Flink的ClusterClient 对先使用LES获取Leader REST Endpoint, 而后才会将job 发送过去。
除了ZooKeeper的实现, HAService 还能够以来外部的名字服务(如DNS, LoadBalencer , etc ) 实现。在LES和LRS 永远返回 HA保护的服务的URI(不管是AKKA PRC URI, 或REST URI )。 该URI永远保持不变, 若是提供服务的server失败, 名字服务会将给URI映射到另一个工做的server 。 具体请参考 StandaloneHaServices的代码。
目前为止, Flink 支持大概4种部署方式或4种cluster 类型,MiniCluster(或LocalCluster), StandaloneCluster,YarnCluster,MesosCluster 。 虽然前面提到过Kubernetes , ECS , Docker , 但他们的本质是将JM和TM docker 化 , 从而是他们运行在真正的 dockers里面 , cluster 仍是 StandaloneCluster 。
图-3 是StandaloneCluster , 图-4是YarnCluster , MesosCluster 与YarnCluster 相似, MiniCluster是运行在IDE(e.g. IntelliJ )的虚拟cluster , 它主要用于 FlinkApplicaiton的调试 。在Session 模式下,表面上能够看来StandaloneCluster 与YarnCluster基本流程是一致的, 只不过是启动JM 和TM的启动机制不一样而已。YarnCluster/MesosCluster 里TM是由他们各自的ResourceManager实现根据须要启动的。 在 StandaloneCluster下, TM是手工启动的。其实在YarnCluster下, JM 也是Yarn启动的。其实, 如前所述, StandaloneCluster与YarnCluster / MesosCluster的本质区别是, 前者的用于TM硬件资源是管理员手工分配的, 后置是有RM同集群管理器协调自动分配的。
从Flink内部看, 是为了支持这些异构环境, Flink 对于不一样的Cluster, 实现了不一样的ClusterEntryPoint用于启动不一样ResoureManager ,以及Dispatcher 。不一样集群类型的ClusterEntryPoint对JobMode和SessionMode有 不一样的实现, 好比YearnSessionClusterEntryPoint和YarnJobClusterEntryPoint。Session模式下, Dispatcher使用的时StandaloneDispatcher, Job模式下,使用的是MiniDispather 。 关于什么是JobMode和SessionMode, 请参考图-3下面的解释。
如前所述,JM接受jobGraph的对象或对象文件(序列化后),jobGraph由FlinkClient生成, FlinkClient和JM位于不一样的JVM (MiniCluster除外)。若是JobCluster没法由FlinkClient启动 (经过某种方式),则JobGraph生成之后须要存到指定位置,在手工启动JobCluster读入,这样的过程比较复杂。基于目前的架构,尽管Flink在各类集群环境里对job mode , session mode 在代码上都由支持, 但job mode 一般因为集成度不够好,用户没法方便使用。
我用一个表来描述他们跟具体的不一样以及Flink是如何支持这些异构环境。为了使这个表不至于太庞大,先把不一样的cluster 类型表述下先。
1. MiniCluser
用于在IDE(IntelliJ, Eclipse) 运行FlinkApplication 的小型FlinkCluster环境,JM和TM运行在同一个进程里,主要用于在IDE里调试Flink Application。
2. StandaloneCluster
能够运行在单机或多机上的FlinkCluster环境, JM和TM运行在不一样的JVM里。只要有JRE , 不管在Windows ,Linux均可以搭建StandaloneCluster。
用户可使用FlinkClient 的command line,或API 直接向JM REST URI 提交job ,具体看Flink CLI 的 “-m" 选项。StandaloneCluster很是有利于搭建单元测试,集成测试以及演示环境。
3. YarnCluster
在Hadoop集群里搭建的FlinkCluster, JM和TM都运行在Yarn管理的容器里。JM作为Yarn里ApplicationMaster ,Flink cluster做为Yarn的一个Application运行。
Yarn是Flink与之集成度最好的集群管理器。 用户能够直接使用FlinkClient 的command line,或API 直接向Yarn提交job , YARN 会自动的启动(或链接现有的)Flink JM, 自动启动须要的TM,然后在该Flinkcluster运行任务。
具体看Flink CLI 的“-m yarn-cluster" 和 ”applicationId" 命令行用法。
4. MesosCluster
在Mesos集群里搭建的FlinkCluster, JM作为Mesos里Scheduler , 一个 Flink cluster做为Mesos的一个Framework运行, JM和TM均可以运行在Mesos管理的容器里。
TM由JM的resource Manager同Mesosmaster 协调按需自动启动和销毁。JM 须要先使用Marathon 建立服务, 由Marathon 启动。Marathon服务的后端都是运行在Mesos的容器里,
并且Marathon服务一个高可用性服务。
用户能够直接使用FlinkClient的command line,或API 直接向Marathon 建立服务提交job ,具体看Flink CLI 的 “-m" 选项。
我的以为Flink 与Mesos的集成度能够在提升一些。目前用户须要手动的为JM建立Marathon Service, 可参考下面写一个简单的配置文件, 调用flink的shell脚本mesos-appmaster.sh启动Flink
MesosSessionCluster的JM (该JM会启动MemsosResourceManager用来启动须要的TM)。
yhttps://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/mesos.html
Flink 彻底能够改进一下Client端, 添加集成Mesos Marathon须要的 ClusterDescrptor类,CLI类(好比MesosJobClusterDescripor, MesosSessionClusterDescripor,FlinkMesosSessionCLI ),丰富“-m" 选项,
由ClusterDescrptor类自动建立和销毁JM (Marathon 服务)
5. Kubernetes/ECS/Docker
Flink 对于Kubernetes/ECS/Docker在源码级别并无任何支持。只是说,能够将StandaloneCluster的JM和TM运行在这三个以Docker 做为容器服务的
集群环境里。因此是,Flink 与他们的集成度, 比较低 。Kubernetes的具体作法分别对StandaloneCluster的JM和TM作两个deployment(就是能够平行扩展的docker group ), 它们分别启动StandaloneCluster的JM和TM (经过start-console.sh脚本), 而后对JM的
deployment作一个Service使其具备高可用性,固然须要将该Service的URI传递给TM,前面说过StandaloneHAService是靠统一的URI来提供HA服务的。更详细的请参考下面。
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/kubernetes.html
ECS和Docer的实现同Kubernetes 相似, 只是在各自使用的技术名称不一样而已。ECS的AutoScalingGroup等价于Kubernetes的Deployment。 common sense是同样的, 只是各家用各家喜欢的名字。
总之,将Flink的部署在上述集群里, 目前手工的工做仍是比较多的,并且TM的数量是预先设定的,或按策略自动扩容的, 并非最优的由Flink RM 指导的按需扩容。获得的好处最大应该是Docker的容器服务。毕竟Yarn或Mesos对Docker的支持并非很好。
我的以为Flink 与他们的集成度能够在提升一些。同提升Mesos的集成度的想法同样, 在目前Fink的框架下,只须要改进FlinkClient,添加集成Kubernetes API-Server须要的 ClusterDescrptor类,CLI类(好比K8sJobClusterDescripor, K8sSessionClusterDescripor,
FlinkK8sSessionCLI ),丰富“-m" 选项,由ClusterDescrptor类自动建立和销毁JM (JM 的Deployment和 Serice ), 添加K8sResourceManager, 用它来建立及扩容TM的deployment 。
Cluster Type | JM 启动方式 | TM启动方式 | ClusterEntryPoint | ResourceManager | SuportedMode |
Mini | 调试环境启动的Flink Client是 LocalEnvironen或 LocalStreamEvronment, 它们的execute方法会 先启动MiniClusterEntryPoint |
由MiniClusterEntryPoint启动 | MiniClusterEntryPoint .java |
StandaloneResource Manager.java |
只支持JobMode |
Standalone | 手工,如sart-cluster.sh脚本 | 手工, |
StandaloneSession ClusterEntryPoint.java
StandaloneJobCluster EntryPoint.java |
StandaloneResource Manager.java |
支持SessionMode job mode 有支持,但用户没法方便使用。 |
Yarn | jobmode下 由FlinkClient里的 YarnClusterDescriptor, 调用Yarn的API经过YARN启动。 JM作为ApplicationMaster自动启动。
SessionMode JM须要手工运行 yarn-session.sh, JM和TM的内存不是per-job, 只有管理员才能设定。 |
JM 里的YarnResourceManager 利用YARN的API启动TM。 |
YarnSessionCluster EntryPoint.java
YarnJobCluster EntryPoint.java
|
YarnResource Manager.java |
支持JobMode和SessionMode
|
Mesos |
经过Marathon Service启动, JM运行在Mesos的容器里。 但Marathon Service须要手 工建立。 |
JM 里的MesosResource Manager 利用Mesos的API启动TM。 |
MesosSessionCluster EntryPoint.java
MesosJobCluster EntryPoint.java |
MesosResource Manager.java |
支持SessionMode job mode有支持,但不方便使用。须要 提高集成度。 |
Kubernetes /ECS/Docker |
由JM service 启动 | 由TM deployment启动 | StandaloneSession ClusterEntryPoint.java
StandaloneJobCluster EntryPoint.java |
StandaloneResource Manager.java |
支持SessionMode job mode有支持,但不方便使用。 须要提高集成度。 |
表-1 Flink Deployment
如前所述FinkCluster里 (参考图-3), 最核心的组件并且交互最频繁的组件是Dispatcher, ResourceManager, JobMaster, 和TaskManager 。
其中对于Dispatcher, ResourceManager, JobMaster,一般是ClusterEntryPoint启动Dispatcher和ResourceManager,而后当有job提交时,Dispatcher启动JobMaster。(MiniCluster, JobCluster略有不一样,但相似)。尽管他们之间使用RPC通信,他们生存在同一个JVM里,若是其中一个失败,同时使整个JVM(因为异常崩溃了)失败了,全部组件都失败了。那么他们中若是有承担Leader责任的,也都会经过HA Service 切换到另外一个工做的JM后端的RPCEndPoint。
若是TM失败了, JM可以感知到heatbeat 機制感知到, JM 和RM都与主动向TM发送和hearbeat , 因此TM heartbeat 一旦timeout , JM会释放该TM 提供 过来的slot,与此同时将slot上运行的Execution 的状态设置为失败, 参考 SingleLogicalSlot::signalPayloadRelease .
在stream mode 下,JM 会尝试将该TM上失败的Task重现分配到别的TM上(若是slot资源时有的,或能够分配的)。
在Batch mode 下, JM 会将整个job 失败,然後嘗試从新啓動整個job 。
从新啓動的streaming Task會根據上一次的checkpoint 回復狀態, 繼續運行。
to be filled .
上一章介紹Flink了的架構, 它包括了那些主要組件? 組件是怎麽工做的? 組件之間是怎麽工做的?組件使用的資源(包括服務器,容器,内存和CPU和Disk)是若是分配的? 以及組件是如何部署的?一個job是怎樣分割成小的Task并行執行在cluster裏面的?
怎麽保證組件HignAvailablity 以及組件是如何作 失敗處理的?以及Flink關於系統安全的設計等等。 這些關於架構的介紹,重點在於瞭解flink cluster中各個組件的互操做行以及容錯處理, 瞭解框架, 以便於再出現問題的時候我們能夠對它有比較針對性的debug 。關於Flink引以爲傲的statefull operator, window watermak, checkpoint barrier , stream/batch API 以及web monitor本文都沒有介紹,請參考相关的Flink的官方文檔。
Flink的源碼還是比較清晰易懂的, 尤为是瞭解了她的架構后, 大部分的實現都很是符合common sense 。 不想在這裏貼一堆代碼段然後加注釋解讀了, 本文的篇幅已經太長了。過一下全部的包,解釋一下他們的主要做用,以及须要框架裏使用的主要類吧。用一個大表來列舉比較合適。
Artifect | 功能介紹 |
Flink-client | FlinkCommandLine 入口類 (CliFrontEnd) 解析Flink 命令行 (DefaultCLI, YarnSessionCLI ) 負責將從用戶jar 文件中main函數生成Plan (PackagedProgramUtils),用通過LcoalExecutor或RemoteExecutor調用flink-Optimizer生成 jobGraph . 使用ClusterDescriptor啓動jobManager (好比在yarn), 使用并將jobGraph 用ClusterClient提交給本地或遠程的JM。 |
Flink-runtime | Flink 源碼的核心。 框架的核心組件都在裏面, 包括 Diskpatcher, JobMaster, TaskExecutor, ResourceManager, ClusterEntryPoint, WebUI以及他們依賴的子組件以及核心數據模型, JobGroup, ExecutionGraph, Execution, Task, Invokable, Operator, Driver, Function, NetworkBufferPool, ChannelManager, IOMemory, MemoryManager, PRCService, HAService, HeartbeatService , CheckPointCoordinator , BackPressure, etc . 這些類的名字和做用在上一章都或多或少提到過, 最好結合架構的介紹, 理解他們的代碼。 |
Flink-runtime-web | Flink Web monitor 的界面和handler . handler會調用Dispacher 的方法處理客戶請求, 好比sumitJob. |
Flink-java Fink-scala |
用Java 和 scala實現的Flink Batch API , Dataset, ExecutionEnrionment, etc . |
Flink-stream-java Flink-stream-scala |
用Java 和 scala實現的Flink stream API , DataStream, ExecutionEnrionment, , StreamExecutionEnvironment, windowing, etc . 和對streaming提供支持的runtime, Checkpoint, StreamTask, StreamPartitioner , BarrierTracker, WindowOperator, etc . |
Flink-optimizer |
主要功能是優化Plan 和生成JobGraph。Flink-optimizer是一個很client端使用重要的庫,它決定了從客戶代碼(application) 到jobGraph造成過過程。 我個人覺得一般也是調查和解決 application問題的根源, 好比application 的寫法是否是合適的,有問題的, 或最優的等等。 在前面架構極少裏面,并沒有談及Flink-optimizer, 因此在這簡單介紹下。 fink application 開發者使用Flink API 編寫application, flink-client 为了将application 最终能在Flink cluster里運行起來是, 它首先通過讀取用戶jar 文件中的main函數生成Plan:以DataSink為根的一個或多個树结构, 樹的節點都是application使用的 API operator, 每一個節點的輸入來自與樹的下一層節點。 不難想象树的叶子节点都是DataSoruce: 他们没有输入节点,但有用于读取数据源的inputFormat, 根节点是DataSink :他们既有输入节点,也有用于写入目标系统的outputFormat, 中间节点都会有一个多个的输入节点。Plan数据结构描述了同用户的application彻底相同的数据流的节点, 但它只是一个逻辑树状结构, 并无对链接节点的边作描述, 也没有对application编写的数据流作任何修改。o 而后使用Flink-optimizer将Plan根据时根据优化策略设置节点的边上的数据传输方式,并同时用OptimizerNode生成多种优化方案, 最后选择cost 最小的方案产生的方案 做为OptimizedPlan 。 好比将数据装载方式(ShipStrategyType) 设置 FORWARD, 而不是 PARTITION_HASH而 应为 FORWARD 的网络cost 最低(0) 。OptimizedPlan是一个图结构, 图中的顶点(PlanNode) 记录了自身的cost以及从source 开始到它的累计的cost 。OptimizedPlan主要针对与join和interation操做。 而后Flink-optimizer将OptimizedPlan 编译成JobGraph。编译的过程应该基本上是一对一的翻译(从PlanNode 到 JobVertex), 但若是一串PlanNode 知足Chaining 条件 (好比数据在每一个oparator 都不需从新分区, 流过operator 以后, 直接forward到下一个operator), Flink-optimizer就像这些 operator 链接到一块而后在JobGraph里面只建立一个 ChainedOperator jobVertex, ChainedOperator同Spark里面的stage 概念相似, 是优化的一部分。 最后flink-client将jobGraph提交给FlinkCluster ,jobGraph 变形为 ExceutionGraph在JM和TM上执行。 能够从Optimizer 的compile 和JogGraphGenerator的 compileJobGraph展开看, 他们分别complie的是Plan和OptimizedPlan 。 Flink-optimizer优化的是parition 的选择以及算法的选择, 而不是DAG 的workflow 。
|
Flink-Table
|
用户能够用flink-java/flink-stream-java里的api编写flink application, 也能够用 Flink-Table 的table api和 flink-sql写 application 。 Flink-Table将数据源(Dataset, DataStream)都 generalize 成Table (row based ),用户能够用相似关系型数据库的操做方式操做Flink的数据源, 这种方式虽然屏蔽了一些flink-api的特性 (好比 broadcast), 但极大的下降了application 的开发难度,减小了客户程序的代码量,从而极大的提升系统的重用度。 Flink-table 的底层仍是依赖dataset/datastream api, 因此基于table api 或SQL 的程序 (Program) 最终会翻译成 Flink-optimizer 的Plan , 通过前面所述一样的编译优化过程, 最终一个EG的形式运行在JM和TM上。 固然,在被翻译成Plan以前, Flink-table 的Program 也会有自身的优化过程, 好比SQL Plan optimization . 代码须要看, 如何实现一个Table : StreamTableSource, BatchTableSource, BatchTableSink,AppendStreamTableSink 如何扩充FlinkSQL : UserDefinedFunction, ScalarFunction,TableFunction,AggregateFunction |
Flink-yarn Flink-mesos Flink-container |
Flink 怎么支持异构环境的, 包括不一样环境里的, 主要是异构环境里的不一样ResourceManager (启动TM ),以及 ClusterDescriptor (启动JM)如何实现的。 |
Flink-library |
flink-cep, flink-gelly, flink-ml |
Flink-connector Flink-format |
链接外围数据系统(数据源和输出)系统的InputFormat, OutputFormat . |
Flink-filesystem |
Flink支持的分布式文件系统, hadoop, s3, mapr |
Flink-metrics |
flink 的metrics 系统 |
Flink-statebackends Flink-queryable-satate |
flink 的 state的存储 |
flink-jepsen |
Flink的UT和集成测试。 |
表-2 Flink packages
须要安装JDK1.8和Scala的plugin, 不然Flink没法在IntelliJ里编译。
下载flink1.6.1的代码,到下面的地址 download ,而后用maven 编译 。
https://github.com/apache/flink/tree/release-1.6.1
下载例子程序: https://github.com/kaixin1976/flink-arch-debug 。 因为github的限制, 上传的instrument文件相对较小,读者能够经过自我复制扩大文件尺寸。
运行例子程序, 有两个参数 :
第一个是 joinType,可取值为"normal" 或"broadcast"。normal 方式彻底依赖FlinkOptimizer产生 优化的方案, broadcast是运用一下方法, 使join的第一路输入广播, 第二路输入自由平均分配。
第二个使api类型, 可取值为 "api" 或 "sql" 。api方式指的是Flink Java API, sql 使 Flink SQL API 。对于Join Flink Java API 能够指定JoinHint或Parateters从而影响优化, sql 没有这些接口, 只能在DataSoource 上作文章。
具体看JoinTest::main()。
如前所述, Flink分布式环境的主要包含三部分:
1. FlinkCient : 生成、优化和提交JobGraph.
FlinkClient的main 函数在CliFrontend中, VM 的classpath 和工做路径设置为本地安装的一个flink路径 , 用于找到全部须要的jar文件和 flink confugration .
程序参数跟flink run 同样,
图-13 Flink Client debug configuration
2. JM : 生成ExecutionGraph,并为其中的全部的的子任务分配slot资源, 并将子任务发送到slot所在TM上运行。
JM 使用StandaloneSessionClusterEntrypoint , 他会启动Standalone Session Cluster 的entry point .
图-14 Flink JobManager debug configuration
3. TM: 运行ExecutionGraph的子任务。
图-14 Flink TaskManager debug configuration
例子程序是一个利用Flink SQL 把一大一小两个数据源join再一块儿的Flink application 。大数据源叫instruments 包含了一些假造金融工具(好比股票期权等)的基础数据(其中包括该工具交易货币的ID),小数据源是货币currencies 包含货币ID,和货币名称。join目的是给与金融工具的货币ID得到对应的货币名称(好比,人民币,欧元,美圆等)。
join的过程是比较简单的,首先 从文件建立两个数据源(instrument 和 currency 的 TableSource), 而后用调用 SQL 作join, 最后建立TableSink将join的结果输出到文件里。程序的主体以下。
public void joinSmallWithBig(String joinType) throws Exception { "StrikePriceMultiplier,LotSize,CurrencyCode,AssetState " + "FROM currencyTable join instruments on currencyTable.CurrencyId=instruments.CurrencyID "); |
表-3 例子程序
问题是join的过程发生了严重的数据倾斜 (data skew )。全世界的金融工具分布式极不平均的, 绝大部分使用美圆,欧元计价 (好比例子程序里假造的instrument的数据源,大概有13/14 的instrument是以欧元计价的)。若是使用经常使用的 hash join 或 sort-merage join, 数据倾斜是必然发生的。 调用joinSmallWithBig,并传入任意字符串 作为joinType 时, 程序调用的registerCurrencyTableSource方法, 该方法就是建立了一个普通的CsvTableSourcei并注册, 以后利用sql join 将两个数据join起来,此时会发生数据倾斜。以下图所示:
图-15 flink-webmonitor 上的currency 数据源
图-16 flink-webmonitor 上的instrument数据源
能够看出currency 的CsvTableSource一共有492条记录分4个split读入,数据量确实小,instrument数据源记录数大概14million 条记录有,比较大,一样分4个split读入, 分配不可谓不均衡 。但当join后, 均衡完全打破了,以下图所示:
图-17 flink-webmonitor 上的hash-join
两个数据源的输出策略(或JoinOperator的输入策略)也就是链接数据源与JoinOperator的那条边的ShipStrategyType被设置为一个currencyId 为key的HashPartition。 因为本例中约13/14的instrument以欧元计价,大概有13 million个instrument 记录依照hash code 被发送到一个task的IG里, 剩下约1 million的其余三个task 。
借这张图我们须要重温一下的前面提到的task, RP,RS,IG,IC的关系, instrument数据源 有4 个task因此4个RP, 每一个RP会产生4个RS以装载不一样的hash code,共16个RS 。 下游join operator 有两个IG (分别链接instrument数据源 和currency), 每个IG 的ShipStrategy都是HashParition。每个IG 有4个 IC, 每一个IC 都链接上游标号相同的RS (好比JoinOperator task1的 IG1 的IC1,IC2,IC3,IC4)只链接上游 instrumentSource task1, 2,3,4 的 RS1) 。 JoinOperator是一个DualInputOperator, 一般的operator只有一个IG 。
上图中13 million的record received (13,836,997)是joinOperator的第三个 task 的IG1 和 IG2中相同标号IC的记录数量(好比IC2),因为currency数据量很小,构成13 million 这个数量级的来源是instrument中以某几个货币计价的instrument, 如前述主要是欧元 , 此例中欧元计价占比大概为13/14 。
数据倾斜的结果形成joinOperator的第三个task须要多于别的 task的100倍的内存,一般会超过容器预先分配的内存配额。罪魁祸首是 join operator 输入边 的 ShipStrategy : hash partition 。 SQL 里没法指定 join 使用哪一种策略, 为何flink 会在翻译(将sql 翻译成jobGraph)的过程当中将 join 输入册率 优化成hash partition 呢?一个小数据集和大数据集join,最好的方式是将小数据集广播给下游的全部task, 大数据平均分配,而后再join operator内部作hash join, 这样的cost是最低的 。Flink为何不能作到这样的优化呢?若是使用Flink API (而不是 SQL) 状况会好吗?
初步的感受是Flink-optimizer 并无作到这样的优化: 当join的两个数据源大小悬殊时, 小数据集广播,大数据集均分的输入给join operator 。Flink这么经常使用的优化都没作到吗?
让咱们经过debug Flink-optimizer 代码, 看看他时怎么将本例的join input的ShipStrategy优化成HashPartition 吧。
下载例子代码,而后编译,package 成flink-arch-debug-0.0.1.jar, 而后下载flink-1.6.1 的代码,编译, 按照图-13设置flink-client的运行选项。
program arguments 设置为 run flink-arch-debug-0.0.1.jar normal sql, 使用 "normal ","sql" 方式, 以下:
C:\projects\flink-arch-debug\target\flink-arch-debug-0.0.1.jar normal sql
Flink Optimizer 的优化过程(参考Optimizer::compile方法)大概是:
首先将利用GraphCreatingVisitor 将Flink API (API, Table, 或 SQL) 产生的Plan翻译成dag 包里的OptimizerNode和DagConnection组成图,
而后从图的SinkNode(一个或多个)先前递归的调用getAlternativePlans方法从产生最优的Plan , 最优的Plan 是由plan包里的PlanNode和Channel组成的Graph 。ShipStratigy是channel的一个属性, 它描述数据从一个operator以那种方式发布到RP中的RS中的。
最后由JobGraphGenerator将最优图翻译成JobGraph
那么有多少种ShipStrategy呢? 连续按两下SHIFT, 再键入类名(ShipStategyType.java)。参考一下本文最后面的IntelliJ的经常使用键吧.
public enum ShipStrategyType { FORWARD(false, false), //数据按原分区号传递到本地运行的Task相同分区,不跨网络,不须要比较器(用于排序或Hash) PARTITION_RANDOM(true, false),//数据随机传递到本地运行的Task任意分区,不跨网络,不须要比较器 PARTITION_HASH(true, true),//数据按照指定键值的Hash决定分区,到目标Task须要跨网络传递,须要比较器 PARTITION_RANGE(true, true),//数据按照指定键值的排序顺序决定分区,到目标Task须要跨网络传递,须要比较器 BROADCAST(true, false) // 将数据复制到任意一个分区中,不须要比较器 PARTITION_FORCED_REBALANCE(true, false), // 将数据平均到各个分区中,不须要比较器 PARTITION_CUSTOM(true, true); // 将数据按用户指定的分区器分配分区,不须要比较器 ... |
表-4 ShipStrategyType
搜索一下PARTITION_HASH, 看谁将它设置给最优图的Channel 对象, 过滤一下,应该由两个地方:
一个是TwoInputNode::setInput 方法中, 它根据join operator 的JoinHint 或 Parameters 来设置 input Channel的ShipStrategy 。 好比, 若是想让join的第一个input(Currencies)用广播方式传递, 就可以使用”BROADCAST_HASH_FIRST“ hint, 或者将这样的参数"INPUT_LEFT_SHIP_STRATEGY"="SHIP_BROADCAST"传递给join operator 。不过这些都只能在Flink Java API中 使用, (具体参考flink-arch-debug中的ApiJointTest类),没法在SQL API 中使用。
第二个在TwoInputNode::getAlternativePlans中。如前文所述, getAlternativePlans首先会 OptimizerNode (本例中主要是JoinNode) 节点用于产生多个plan , 每一个 plan的输入输出的节点都是相同, 不一样的是边上ShipStrategy 。对于每一个Plan , getAlternativePlans都会调用RequestedGlobalProperties::parameterizeChannel从而设置这个Plan的InputChannel的ShipStrategy 。
在SQL application 中, 用户没法像在使用Java API 时那样经过设置JoinHint或Parameter 来选择想要的ShipStategy , 实际上若是指定了这两项中的任意一个,也等同于跳过了getAlternativePlans (优化过程)。 由于既然用户已选择了想要的ShipStategy,就没有必要用Optimizer 根据计算每一个方案的cost 来选择最优方案了。 但是即便是由Optimizer 来选择, 对于本例(小数据集join大数据集), 根据表-4 ShipStrategyType的 定义,PARTITION_HASH的cost 确定不是最低的 。对于大数据集 (instruments), 若是采用PARTITION_HASH将数据传递给下游, 网络cost 必然是很高的, 由于PARTITION_HASH是一个shuffule 的过程, 下游 必然有位于不一样TM远程Task , 所以网络传递的cost 必然很高。 看看Flink 怎么定义Cost 和怎么估计cost 的?
public class Costs implements Comparable<Costs>, Cloneable { private double cpuCost; // 本节点算法在计算中使用CPU 的costs, 好比Hash, sort , merage, etc
|
表-5 Cost定义
根据表-5 Flink cost的定义, 一个节点(好比JoinNode)的cost ,包括上游数据由上游传递过来的network cost, 数据本地缓存的 disk cost, 还有算法的cpu cost 。 图-17第二条边(Instrments)使用的HASH_PARTITION策略确定比FORWARD cost高不少, 由于若是使用FORWARD 的network cost 为0 (参考表-4的定义),diskCost和cpuCost 跟算法有关(好比使用HashTable或sort merage 去作join)。 第一条边(Currencies)使用的HASH_PARTITION相比BROADCAST, cost 确定低一些, 可是Currencies数据源 数量特别小, 它的cost 和 这两个cost 之间的差异 在Instrments引发的cost面前均可以忽略不记。因此感受上 HH(两边都是HASH_PARTITION)和BF(currency 边Broadcuast, instrument边Forward)相比, BF方案的cost必定低不少。 但为何getAlternativePlans没有选择 BF呢?JoinNode没有给出BF的选择权吗?看看JoinNode::getDataProperties()的代码:
private List<OperatorDescriptorDual> getDataProperties(InnerJoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint, switch (joinHint) { } |
表-6 JoinNode的输入节点要求的属性
从表-6 JoinNode的输入节点要求的属性可知, 但由Optimizer 来决定(OPTIMIZER_CHOOSES)输入节点的ShipStrategy(输出分区策略)时, JoinNode对上游的要求时很是宽泛的,几乎就是什么都行 。
SortMergeInnerJoinDescriptor(this.keys1, this.keys2),HashJoinBuildFirstProperties(this.keys1, this.keys2),HashJoinBuildSecondProperties(this.keys1, this.key),
实际上就是他们为JoinNode和它的input定义了三类需求 :
第一, Join算法但是时Sort-merge, 或者用一路输入作HashTable的HashJoin ,或者用二路输入作HashTable的HashJoin
第二, 第一路第二路输入的partition 策略: 也就是所谓的RequestGlobalPropertities, 能够是RANDOM_PARTITIONED (对应FORWARD shipStategyType),或者HASH_PARTITIONED(PARTITION_HASH),或者FULL_REPLICATION(对应BROADCAST),
或者ANY_PARTITIONING(它是一个通配策略,意思是RANDOM or HASH均可以)
第三,输入的排序策略 :就是所谓的RequestGlobalPropertities, 排序或者不排序。
想想这三类需求的可选项组合起来,一共会有多少组合? 弄个表, 描述一下可能更清楚些。
序号 | Input1 的 partition 备选 策略 |
Input2 的 partition 备选 策略 |
算法 | Input1/Input2 的 排序备选策略 | 兼容性 | network Cost |
1 |
FORWARD |
HASH |
SortMergeInnerJoin |
sort/sort | yes | |
2 |
FORWARD |
HASH |
HashJoinBuildFirst |
sort/sort | yes | |
3 |
FORWARD |
HASH |
HashJoinBuildFirst |
not sort/ not sort | yes | |
4 |
FORWARD |
HASH |
HashJoinBuildFirst |
sort/not sort | yes | |
5 |
FORWARD |
HASH |
HashJoinBuildFirst |
not sort /sort | yes | |
6 |
FORWARD |
HASH |
HashJoinBuildSecond |
sort/sort | yes | |
7 |
FORWARD |
HASH |
HashJoinBuildSecond |
not sort/ not sort | yes | |
8 |
FORWARD |
HASH |
HashJoinBuildSecond |
sort/not sort | yes | |
9 | FORWARD |
HASH |
HashJoinBuildSecond |
not sort /sort | yes |
表-6 GloabalProperties, LocalProperties 和算法的组合
从表-6 可知, 对于input1和input2的partition策略(表-6实际上用的是shipStategy, 他们是1-1对应的)一个组合FH(Forward, Hash ), 一共有9个能够接受的分区-算法-排序的方案,每一种方案都有本身的cost, getAlternativePlans会最后选择cost最低的方案。 除了FH组合, 合法组合还有
HB:Hash-Broadcast
HH:Hash-Hash
BB:Broadcast-Broadcast
BF:Broadcast-Forward
FB:Forward-Broadcast
固然还有HF,FH ,FF,这些都是不合法的,对于join操做会有数据丢失的组合。 6个合法的input1和input2的partition策略组合,根据表-6能够计算, 一共能有6*9=54可互相兼容的分区-算法-排序的组合方案 。实际上若是在TwoInputNode.java:516行上设计断点 (getAlternativePlans方法内)并运行例子程序,
你会发现变量 outputPlans里有90个备选方案,那么其中36个必定是重复和多徐的。
那么, BF方案是既然在备选方案里,并且它的cost理论上是最低的, 并且getAlternativePlans的算法是选择cost最低方案, 为何cost并不低的HH方案最终被选择了 ? 答案只能有一个 : cost estimation有问题。
不贴Cost, CostEstimiator, DefualtCostEstimator (costs包的三个类)的代码了。总起来所, CostEstimator 是根据OptimizerNode 的 estimatedOutputSize 成员变量(包括本节点和数据节点的)来计算 network和disk cost 的 , estimatedOutputSize是由节点成员函数computeOperatorSpecificDefaultEstimates()设定, 能够参考DataSourceNode中该函数的实现:estimatedOutputSize 被设置成来自于InputFormat的数据源实际物理尺寸。 但是FlatMapNode, MapNode 只是简简单单的设置了estimatedNumRecords (使之同 source的 同样),而并无计算estimatedOutputSize 。
重看一下图-4的ExcutionGraph可知, Join的第一个input是FlatMap , 若是Flatmap没有estimatedOutputSize 会致使它的下一个节点JoionNode没法计算cost. 请参考DefualtCostEstimator::addHybridHashCosts()的实现, 当上游的estimatedOutputSize为负数(Unknown)时,本节点的cost为被设置为Unknown 。Unknown cost在选择cheapest cost的plan时是不能作比较的, 只能依靠假设的Cost (如heuristicNetworkCost ), 这个东西很不靠谱, 同时也是HH方案被选上的缘由。
解决方案应该修改Flink代码在FlatmapNode::computeOperatorSpecificDefaultEstimates 加上对estimatedOutputSize的估计(好比保持和上游相同, 或加一个discount), 或者修改DefualtCostEstimator 使之 在计算Cost时能经过estimatedNumRecords估计estimatedOutputSize (下策, 没有 前者好)。 总之,Flink Optimizter 在 Cost Estimation 作的不够好, 改善是应该的, 我刚刚看了Flink 1.9 (最新版)的 Optimizer , 这一块仍是没有修改。忽然还有加入Flink community 的冲动。
若是不想修改 Flink 源码, 或改了也没法发布, 那就看看有没有替换方案了。
... |
表-7 FullyReplicated DataSource 。
这段代码表示若是上游节点是一个FullyReplicated DataSource , 那么就不须要备选方案的选择过程, 直接将这个输入边的shipStagy 设置为Forward 。 FullyReplicated DataSource意思是, 这个Data Source 的每一个Parition输出的都是数据源的全集 而不是不部分。那么只须要将Currencies作成这种数据源,问题不就解决了吗?!
虽然ShipStategy是Forward, 但实际下游(Join Node)的每个Task都获得了Currencies的这个数据集的全集, 这个同Broadcast的效果是同样的。并且第一路为F的组合有FF, FB , FH , 选择的范围小了。 这不对阿,前面不是说FF, FH, HF是不合法的吗? 要注意此时的F是因为FullyReplicated DataSource而既决定的F,不是普通的Forward , 是合法的。FF, FB , FH产生了27种可选方案(参考表-6), 根据heuristicNetworkCost , FF是cost最低的。 那确定是最低的阿, Forward的数据都是本地传输的, network cost 是 0 。
下面就是如何将Currencies作成FullyReplicated DataSource了, 根据DataSourceNode.java:92, DataSource 的InputFormat只要是ReplicatingInputFormat, 则就是FullyReplicated 。在看看ReplicatingInputFormat的代码, 只须要将原有的InputFormat (好比CsvInputFormat)外面包一层ReplicatingInputFormat就能够了, 具体的看例子的代码吧 : SqlJoinTest.java:95。运行结果以下。
图-18 flink-webmonitor 上FullyReplicated Currencies, 每个partition都输出数据集的所有, 492条记录。
图-19 flink-webmonitor 上instrument , 跟以前没什么区别。
图-20 flink-webmonitor 上的join , 数据在各个并行Task上很是均衡。
虽然这个例子只用到了Flink-client上的知识, 并无与JM和TM联合调试, 可是问题的发现,分析和解决的方法是同样的。 都是须要根据对架构的认识,缩小怀疑的范围,然会反复阅读和调式相关代码,找到问题,以及找到解决方案, 试想一下,若是没有架构的知识,你怎么可以怀疑这必定是Flink-optimizer的问题呢? 并且有时解决问题的方法并不必定是直接修改的出问题的代码(固然这样是最好的方案),根据相关代码找到一个合适的替代方案也是解决问题的方法。
没有想到描述一个这样的小问题,这一章运用这么长的篇幅, 对于复杂的问题,简直能写本书了。但愿抛砖引玉,可以运用这样的方法学来解决使用Flink遇到的问题: 架构->缩小范围->阅读和调式代码->发现和分析问题->找到解决方案。
CTRL+ALT+Enter to complete line
SHIFT+SHIT search class in source code
CTRL+SHIFT+F serarch string in soruce code
Alt + F1 show current java in project view
CTRL+ALT+B navigate to implementation classes
CTRL+U navigate to super method
ALT + F7 Show reference
ALT + 4 Show run window
ALT + F12 Show terminal window
ALT+Enter Create A testClass
ALT+Insert TestMethod
CTRL+home move to file start
CTRL+end move to file end
参考
Flink conference Page: https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals
很不错的Blog: http://www.javashuo.com/article/p-khdfegoi-mh.html
Flink1.6 documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.6/
Flink Github: https://github.com/apache/flink