本文主要介绍 Flink Runtime 的做业执行的核心机制。首先介绍 Flink Runtime 的总体架构以及 Job 的基本执行流程,而后介绍在这个过程,Flink 是怎么进行资源管理、做业调度以及错误恢复的。最后,本文还将简要介绍 Flink Runtime 层当前正在进行的一些工做。算法
Flink 的总体架构如图 1 所示。Flink 是能够运行在多种不一样的环境中的,例如,它能够经过单进程多线程的方式直接运行,从而提供调试的能力。它也能够运行在 Yarn 或者 K8S 这种资源管理系统上面,也能够在各类云环境中执行。缓存
针对不一样的执行环境,Flink 提供了一套统一的分布式做业执行引擎,也就是 Flink Runtime 这层。Flink 在 Runtime 层之上提供了 DataStream 和 DataSet 两套 API,分别用来编写流做业与批做业,以及一组更高级的 API 来简化特定做业的编写。本文主要介绍 Flink Runtime 层的总体架构。网络
Flink Runtime 层的主要架构如图 2 所示,它展现了一个 Flink 集群的基本结构。Flink Runtime 层的整个架构主要是在 FLIP-6 中实现的,总体来讲,它采用了标准 master-slave 的结构,其中左侧白色圈中的部分便是 master,它负责管理整个集群中的资源和做业;而右侧的两个 TaskExecutor 则是 Slave,负责提供具体的资源并实际执行做业。数据结构
其中,Master 部分又包含了三个组件,即 Dispatcher、ResourceManager 和 JobManager。其中,Dispatcher 负责接收用户提供的做业,而且负责为这个新提交的做业拉起一个新的 JobManager 组件。ResourceManager 负责资源的管理,在整个 Flink 集群中只有一个 ResourceManager。JobManager 负责管理做业的执行,在一个 Flink 集群中可能有多个做业同时执行,每一个做业都有本身的 JobManager 组件。这三个组件都包含在 AppMaster 进程中。多线程
基于上述结构,当用户提交做业的时候,提交脚本会首先启动一个 Client进程负责做业的编译与提交。它首先将用户编写的代码编译为一个 JobGraph,在这个过程,它还会进行一些检查或优化等工做,例如判断哪些 Operator 能够 Chain 到同一个 Task 中。而后,Client 将产生的 JobGraph 提交到集群中执行。此时有两种状况,一种是相似于 Standalone 这种 Session 模式,AM 会预先启动,此时 Client 直接与 Dispatcher 创建链接并提交做业便可。另外一种是 Per-Job 模式,AM 不会预先启动,此时 Client 将首先向资源管理系统 (如Yarn、K8S)申请资源来启动 AM,而后再向 AM 中的 Dispatcher 提交做业。架构
看成业到 Dispatcher 后,Dispatcher 会首先启动一个 JobManager 组件,而后 JobManager 会向 ResourceManager 申请资源来启动做业中具体的任务。这时根据 Session 和 Per-Job 模式的区别, TaskExecutor 可能已经启动或者还没有启动。若是是前者,此时 ResourceManager 中已有记录了 TaskExecutor 注册的资源,能够直接选取空闲资源进行分配。不然,ResourceManager 也须要首先向外部资源管理系统申请资源来启动 TaskExecutor,而后等待 TaskExecutor 注册相应资源后再继续选择空闲资源进程分配。目前 Flink 中 TaskExecutor 的资源是经过 Slot 来描述的,一个 Slot 通常能够执行一个具体的 Task,但在一些状况下也能够执行多个相关联的 Task,这部份内容将在下文进行详述。ResourceManager 选择到空闲的 Slot 以后,就会通知相应的 TM “将该 Slot 分配分 JobManager XX ”,而后 TaskExecutor 进行相应的记录后,会向 JobManager 进行注册。JobManager 收到 TaskExecutor 注册上来的 Slot 后,就能够实际提交 Task 了。并发
TaskExecutor 收到 JobManager 提交的 Task 以后,会启动一个新的线程来执行该 Task。Task 启动后就会开始进行预先指定的计算,并经过数据 Shuffle 模块互相交换数据。负载均衡
以上就是 Flink Runtime 层执行做业的基本流程。能够看出,Flink 支持两种不一样的模式,即 Per-job 模式与 Session 模式。如图 3 所示,Per-job 模式下整个 Flink 集群只执行单个做业,即每一个做业会独享 Dispatcher 和 ResourceManager 组件。此外,Per-job 模式下 AppMaster 和 TaskExecutor 都是按需申请的。所以,Per-job 模式更适合运行执行时间较长的大做业,这些做业对稳定性要求较高,而且对申请资源的时间不敏感。与之对应,在 Session 模式下,Flink 预先启动 AppMaster 以及一组 TaskExecutor,而后在整个集群的生命周期中会执行多个做业。能够看出,Session 模式更适合规模小,执行时间短的做业。dom
本节对 Flink 中资源管理与做业调度的功能进行更深刻的说明。实际上,做业调度能够看作是对资源和任务进行匹配的过程。如上节所述,在 Flink 中,资源是经过 Slot 来表示的,每一个 Slot 能够用来执行不一样的 Task。而在另外一端,任务即 Job 中实际的 Task,它包含了待执行的用户逻辑。调度的主要目的就是为了给 Task 找到匹配的 Slot。逻辑上来讲,每一个 Slot 都应该有一个向量来描述它所能提供的各类资源的量,每一个 Task 也须要相应的说明它所须要的各类资源的量。可是实际上在 1.9 以前,Flink 是不支持细粒度的资源描述的,而是统一的认为每一个 Slot 提供的资源和 Task 须要的资源都是相同的。从 1.9 开始,Flink 开始增长对细粒度的资源匹配的支持的实现,但这部分功能目前仍在完善中。分布式
做业调度的基础是首先提供对资源的管理,所以咱们首先来看下 Flink 中资源管理的实现。如上文所述,Flink 中的资源是由 TaskExecutor 上的 Slot 来表示的。如图 4 所示,在 ResourceManager 中,有一个子组件叫作 SlotManager,它维护了当前集群中全部 TaskExecutor 上的 Slot 的信息与状态,如该 Slot 在哪一个 TaskExecutor 中,该 Slot 当前是否空闲等。当 JobManger 来为特定 Task 申请资源的时候,根据当前是 Per-job 仍是 Session 模式,ResourceManager 可能会去申请资源来启动新的 TaskExecutor。当 TaskExecutor 启动以后,它会经过服务发现找到当前活跃的 ResourceManager 并进行注册。在注册信息中,会包含该 TaskExecutor中全部 Slot 的信息。 ResourceManager 收到注册信息后,其中的 SlotManager 就会记录下相应的 Slot 信息。当 JobManager 为某个 Task 来申请资源时, SlotManager 就会从当前空闲的 Slot 中按必定规则选择一个空闲的 Slot 进行分配。当分配完成后,如第 2 节所述,RM 会首先向 TaskManager 发送 RPC 要求将选定的 Slot 分配给特定的 JobManager。TaskManager 若是尚未执行过该 JobManager 的 Task 的话,它须要首先向相应的 JobManager 创建链接,而后发送提供 Slot 的 RPC 请求。在 JobManager 中,全部 Task 的请求会缓存到 SlotPool 中。当有 Slot 被提供以后,SlotPool 会从缓存的请求中选择相应的请求并结束相应的请求过程。
当 Task 结束以后,不管是正常结束仍是异常结束,都会通知 JobManager 相应的结束状态,而后在 TaskManager 端将 Slot 标记为已占用但未执行任务的状态。JobManager 会首先将相应的 Slot 缓存到 SlotPool 中,但不会当即释放。这种方式避免了若是将 Slot 直接还给 ResourceManager,在任务异常结束以后须要重启时,须要马上从新申请 Slot 的问题。经过延时释放,Failover 的 Task 能够尽快调度回原来的 TaskManager,从而加快 Failover 的速度。当 SlotPool 中缓存的 Slot 超过指定的时间仍未使用时,SlotPool 就会发起释放该 Slot 的过程。与申请 Slot 的过程对应,SlotPool 会首先通知 TaskManager 来释放该 Slot,而后 TaskExecutor 通知 ResourceManager 该 Slot 已经被释放,从而最终完成释放的逻辑。
除了正常的通讯逻辑外,在 ResourceManager 和 TaskExecutor 之间还存在定时的心跳消息来同步 Slot 的状态。在分布式系统中,消息的丢失、错乱不可避免,这些问题会在分布式系统的组件中引入不一致状态,若是没有定时消息,那么组件没法从这些不一致状态中恢复。此外,当组件之间长时间未收到对方的心跳时,就会认为对应的组件已经失效,并进入到 Failover 的流程。
在 Slot 管理基础上,Flink 能够将 Task 调度到相应的 Slot 当中。如上文所述,Flink 还没有彻底引入细粒度的资源匹配,默认状况下,每一个 Slot 能够分配给一个 Task。可是,这种方式在某些状况下会致使资源利用率不高。如图 5 所示,假如 A、B、C 依次执行计算逻辑,那么给 A、B、C 分配分配单独的 Slot 就会致使资源利用率不高。为了解决这一问题,Flink 提供了 Share Slot 的机制。如图 5 所示,基于 Share Slot,每一个 Slot 中能够部署来自不一样 JobVertex 的多个任务,可是不能部署来自同一个 JobVertex 的 Task。如图5所示,每一个 Slot 中最多能够部署同一个 A、B 或 C 的 Task,可是能够同时部署 A、B 和 C 的各一个 Task。当单个 Task 占用资源较少时,Share Slot 能够提升资源利用率。 此外,Share Slot 也提供了一种简单的保持负载均衡的方式。
基于上述 Slot 管理和分配的逻辑,JobManager 负责维护做业中 Task执行的状态。如上文所述,Client 端会向 JobManager 提交一个 JobGraph,它表明了做业的逻辑结构。JobManager 会根据 JobGraph 按并发展开,从而获得 JobManager 中关键的 ExecutionGraph。ExecutionGraph 的结构如图 5 所示,与 JobGraph 相比,ExecutionGraph 中对于每一个 Task 与中间结果等均建立了对应的对象,从而能够维护这些实体的信息与状态。
在一个 Flink Job 中是包含多个 Task 的,所以另外一个关键的问题是在 Flink 中按什么顺序来调度 Task。如图 7 所示,目前 Flink 提供了两种基本的调度逻辑,即 Eager 调度与 Lazy From Source。Eager 调度如其名子所示,它会在做业启动时申请资源将全部的 Task 调度起来。这种调度算法主要用来调度可能没有终止的流做业。与之对应,Lazy From Source 则是从 Source 开始,按拓扑顺序来进行调度。简单来讲,Lazy From Source 会先调度没有上游任务的 Source 任务,当这些任务执行完成时,它会将输出数据缓存到内存或者写入到磁盘中。而后,对于后续的任务,当它的前驱任务所有执行完成后,Flink 就会将这些任务调度起来。这些任务会从读取上游缓存的输出数据进行本身的计算。这一过程继续进行直到全部的任务完成计算。
在 Flink 做业的执行过程当中,除正常执行的流程外,还有可能因为环境等缘由致使各类类型的错误。总体上来讲,错误可能分为两大类:Task 执行出现错误或 Flink 集群的 Master 出现错误。因为错误不可避免,为了提升可用性,Flink 须要提供自动错误恢复机制来进行重试。
对于第一类 Task 执行错误,Flink 提供了多种不一样的错误恢复策略。如图 8 所示,第一种策略是 Restart-all,即直接重启全部的 Task。对于 Flink 的流任务,因为 Flink 提供了 Checkpoint 机制,所以当任务重启后能够直接从上次的 Checkpoint 开始继续执行。所以这种方式更适合于流做业。第二类错误恢复策略是 Restart-individual,它只适用于 Task 之间没有数据传输的状况。这种状况下,咱们能够直接重启出错的任务。
因为 Flink 的批做业没有 Checkpoint 机制,所以对于须要数据传输的做业,直接重启全部 Task 会致使做业从头计算,从而致使必定的性能问题。为了加强对 Batch 做业,Flink 在1.9中引入了一种新的Region-Based的Failover策略。在一个 Flink 的 Batch 做业中 Task 之间存在两种数据传输方式,一种是 Pipeline 类型的方式,这种方式上下游 Task 之间直接经过网络传输数据,所以须要上下游同时运行;另一种是 Blocking 类型的试,如上节所述,这种方式下,上游的 Task 会首先将数据进行缓存,所以上下游的 Task 能够单独执行。基于这两种类型的传输,Flink 将 ExecutionGraph 中使用 Pipeline 方式传输数据的 Task 的子图叫作 Region,从而将整个 ExecutionGraph 划分为多个子图。能够看出,Region 内的 Task 必须同时重启,而不一样 Region 的 Task 因为在 Region 边界存在 Blocking 的边,所以,能够单独重启下游 Region 中的 Task。
基于这一思路,若是某个 Region 中的某个 Task 执行出现错误,能够分两种状况进行考虑。如图 8 所示,若是是因为 Task 自己的问题发生错误,那么能够只重启该 Task 所属的 Region 中的 Task,这些 Task 重启以后,能够直接拉取上游 Region 缓存的输出结果继续进行计算。
另外一方面,如图若是错误是因为读取上游结果出现问题,如网络链接中断、缓存上游输出数据的 TaskExecutor 异常退出等,那么还须要重启上游 Region 来从新产生相应的数据。在这种状况下,若是上游 Region 输出的数据分发方式不是肯定性的(如 KeyBy、Broadcast 是肯定性的分发方式,而 Rebalance、Random 则不是,由于每次执行会产生不一样的分发结果),为保证结果正确性,还须要同时重启上游 Region 全部的下游 Region。
除了 Task 自己执行的异常外,另外一类异常是 Flink 集群的 Master 进行发生异常。目前 Flink 支持启动多个 Master 做为备份,这些 Master 能够经过 ZK 来进行选主,从而保证某一时刻只有一个 Master 在运行。当前活路的 Master 发生异常时,某个备份的 Master 能够接管协调的工做。为了保证 Master 能够准确维护做业的状态,Flink 目前采用了一种最简单的实现方式,即直接重启整个做业。实际上,因为做业自己可能仍在正常运行,所以这种方式存在必定的改进空间。
Flink目前仍然在Runtime部分进行不断的迭代和更新。目前来看,Flink将来可能会在如下几个方式继续进行优化和扩展:
本文为云栖社区原创内容,未经容许不得转载。