【Flink】Flink做业调度流程分析

1. 概述

当向Flink集群提交用户做业时,从用户角度看,只须要做业处理逻辑正确,输出正确的结果便可;而不用关心做业什么时候被调度的,做业申请的资源又是如何被分配的以及做业什么时候会结束;可是了解做业在运行时的具体行为对于咱们深刻了解Flink原理有很是大的帮助,而且对咱们如何编写更合理的做业逻辑有指导意义,所以本文详细分析做业的调度及资源分配以及做业的生命周期。html

2. 流程分析

基于社区master主线(1.11-SNAPSHOT),commit: 12f7873db54cfbc5bf853d66ccd4093f9b749c9a ,HA基于ZK实现分析web

Flink做业申请流程图
上图归纳了Flink做业从Client端提交到到Flink集群的提交的基本流程[1]。缓存

当运行./flink run脚本提交用户做业至Dispathcer后,Dispatcher会拉起JobManagerRunner,然后JobManagerRunner会向Zookeeper注册竞争Leader。对于以前流程有兴趣能够参考深刻理解Flink-On-Yarn模式异步

JobManagerRunner竞争成为Leader时,会调用JobManagerRunnerImpl#grantLeadership,此时便开始处理做业,会经过以下的代码调用路径启动JobMaster。ide

  • JobManagerRunnerImpl#grantLeadership
  • JobManagerRunnerImpl#verifyJobSchedulingStatusAndStartJobManager
  • JobManagerRunnerImpl#startJobMaster。
    startJobMaster方法会首先将该做业的ID写入对应的ZK目录并置为RUNNING状态,写入该目录可用于在Dispathcer接收做业时,判断该做业是否重复提交或恢复做业时使用;在JobManagerRunner调度做业时也在从ZK上拉取做业信息来判断做业状态,若为DONE状态,则无需调度。启动JobMaster时会先启动其RPC Endpoint,以便与其余组件进行RPC调用,以后JobMaster便经过JobMaster#startJobExecution开始执行做业,执行做业前会有些前置校验,如必须确保运行在主线程中;启动JobMaster上的一些服务(组件),如TaskManager和ResourceManager的心跳管理;启动SlotPool、Scheduler;重连至ResourceManager,而且在ZK中注册监听ResourceManager Leader的变化的Retriever等。
    当初始化完JobMaster上相应服务(组件)后,便开始调度,会有以下代码调用路径
  • JobMaster#start
  • JobMaster#startJobExecution
  • JobMaster#resetAndStartScheduler
  • JobMaster#startScheduling
  • SchedulerBase#startScheduling。

咱们知道用户编写的做业是以JobGraph提交到Dispatcher,可是在实际调度时会将JobGraph转化为ExecutionGraph,JobGraph生成ExecutionGraph是在SchedulerBase对象初始化的时候完成转化,以下图所示表示了典型的转化过程(JobVertex与ExecutionJobVertex一一对应),而具体的转化逻辑实现可参考如何生成ExecutionGraph及物理执行图oop

JobGraph->ExecutionGraph

在SchedulerBase初始化时生成ExecutionGraph后,以后便基于ExecutionGraph调度,而调度基类SchedulerBase默认实现为DefaultScheduler,会继续经过DefaultScheduler#startSchedulingInternal调度做业,此时会将做业(ExecutionGraph)的状态从CREATED状态变动为RUNNING状态,此时在Flink web界面查看任务的状态便已经为RUNNING,但注意此时做业(各顶点)实际并未开始调度,顶点仍是处于CREATED状态,任做业状态与顶点状态不彻底相关联,有其各自的演化生命周期,具体可参考Flink做业调度[2];而后根据不一样的策略EagerSchedulingStrategy(主要用于流式做业,全部顶点(ExecutionVertex)同时开始调度)和LazyFromSourcesSchedulingStrategy(主要用于批做业,从Source开始开始调度,其余顶点延迟调度)调度。ui

当提交流式做业时,会有以下代码调用路径:spa

  • EagerSchedulingStrategy#startScheduling
  • EagerSchedulingStrategy#allocateSlotsAndDeploy,在部署以前会根据待部署的ExecutionVertex生成对应的ExecutionVertexDeploymentOption,而后调用DefaultScheduler#allocateSlotsAndDeploy开始部署。一样,在部署以前也须要进行一些前置校验(ExecutionVertex对应的Execution的状态必须为CREATED),接着将待部署的ExecutionVertex对应的Execution状态变动为SCHEDULED,而后开始为ExecutionVertex分配Slot。会有以下的调用代码路径:
  • DefaultScheduler#allocateSlots(该过程会ExecutionVertex转化为ExecutionVertexSchedulingRequirements,会封装包含一些location信息、sharing信息、资源信息等)
  • DefaultExecutionSlotAllocator#allocateSlotsFor,该方法会开始逐一异步部署各ExecutionVertex,部署也是根据不一样的Slot提供策略来分配,接着会通过以下代码调用路径层层转发,SlotProviderStrategy#allocateSlot -> SlotProvider#allocateSlot(SlotProvider默认实现为SchedulerImpl) -> SchedulerImpl#allocateSlotInternal -> SchedulerImpl#internalAllocateSlot(该方法会根据vertex是否共享slot来分配singleSlot/SharedSlot),以singleSlot为例说明。
    在分配slot时,首先会在JobMaster中SlotPool中进行分配,具体是先SlotPool中获取全部slot,而后尝试选择一个最合适的slot进行分配,这里的选择有两种策略,即按照位置优先和按照以前已分配的slot优先;若从SlotPool没法分配,则经过RPC请求向ResourceManager请求slot,若此时并未链接上ResourceManager,则会将请求缓存起来,待链接上ResourceManager后再申请。

当ResourceManager收到申请slot请求时,若发现该JobManager未注册,则直接抛出异常;不然将请求转发给SlotManager处理,SlotManager中维护了集群全部空闲的slot(TaskManager会向ResourceManager上报本身的信息,在ResourceManager中由SlotManager保存Slot和TaskManager对应关系),并从其中找出符合条件的slot,而后向TaskManager发送RPC请求申请对应的slot。线程

等待全部的slot申请完成后,而后会将ExecutionVertex对应的Execution分配给对应的Slot,即从Slot中分配对应的资源给Execution,完成分配后可开始部署做业。
部署做业代码调用路径以下:3d

  • DefaultScheduler#waitForAllSlotsAndDeploy
  • DefaultScheduler#deployAll
  • DefaultScheduler#deployOrHandleError
  • DefaultScheduler#deployTaskSafe
  • DefaultExecutionVertexOperations#deploy
  • ExecutionVertex#deploy
  • Execution#deploy(每次调度ExecutionVertex,都会有一个Execute,在此阶段会将Execution的状态变动为DEPLOYING状态,而且为该ExecutionVertex生成对应的部署描述信息,而后从对应的slot中获取对应的TaskManagerGateway,以便向对应的TaskManager提交Task)
  • RpcTaskManagerGateway#submitTask(此时便将Task经过RPC提交给了TaskManager)。

TaskManager(TaskExecutor)在接收到提交Task的请求后,会通过一些初始化(如从BlobServer拉取文件,反序列化做业和Task信息、LibaryCacheManager等),而后这些初始化的信息会用于生成Task(Runnable对象),而后启动该Task,其代码调用路径以下 Task#startTaskThread(启动Task线程)-> Task#run(将ExecutionVertex状态变动为RUNNING状态,此时在FLINK web前台查看顶点状态会变动为RUNNING状态,另外还会生成了一个AbstractInvokable对象,该对象是FLINK衔接执行用户代码的关键,然后会通过以下调用

  • AbstractInvokable#invoke(AbstractInvokable有几个关键的子类实现, BatchTask/BoundedStreamTask/DataSinkTask/DataSourceTask/StreamTask/SourceStreamTask。对于streaming类型的Source,会调用StreamTask#invoke)
  • StreamTask#invoke
  • StreamTask#beforeInvoke
  • StreamTask#initializeStateAndOpen(初始化状态和进行初始化,这里会调用用户的open方法(如自定义实现的source))-> StreamTask#runMailboxLoop,便开始处理Source端消费的数据,并流入下游算子处理。

至此做业从提交到资源分配及调度运行总体流程就已经分析完毕,对于流式做业而言,正常状况下其会一直运行,不会结束。

3. 总结

对于做业的运行,会先提交至Dispatcher,由Dispatcher拉起JobManagerRunner,在JobManagerRunner成为Leader后,便开始处理做业,首先会根据JobGraph生成对应的ExecutionGraph,而后开始调度,做业的状态首先会变动为RUNNING,而后对各ExecutionVertex申请slot,申请slot会涉及JM与RM、TM之间的通讯,当在TM上分配完slot后,即可将Task提交至TaskManager,而后TaskManager会为每一个提交的Task生成一个单独的线程处理。

参考

  1. https://www.infoq.cn/article/RWTM9o0SHHV3Xr8o8giT
  2. https://flink.sojb.cn/internals/job_scheduling.html
相关文章
相关标签/搜索