Flink源码阅读(二)——checkpoint源码分析

前言

  在Flink原理——容错机制一文中,已对checkpoint的机制有了较为基础的介绍,本文着重从源码方面去分析checkpoint的过程。固然本文只是分析作checkpoint的调度过程,只是尽可能弄清楚总体的逻辑,没有弄清楚其实现细节,仍是有遗憾的,后期仍是努力去分析实现细节。文中如果有误,欢迎大伙留言指出html

  本文基于Flink1.9。java

一、参数设置

  1.1 有关checkpoint常见的参数以下:

1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 2 env.enableCheckpointing(10000); //默认是不开启的   3 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //默认为EXACTLY_ONCE 4 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);  //默认为0,最大值为1年 5 env.getCheckpointConfig().setCheckpointTimeout(150000);  //默认为10min 6 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);  //默认为1

   上述参数的默认值可见flink-streaming-java*.jar中的CheckpointConfig.java,配置值是经过该类中私有configureCheckpointing()的jobGraph.setSnapshotSettings(settings)传递给runtime层的,更多设置也能够参见该类。并发

  1.2 参数分析

  这里着重分析enableCheckpointing()设置的baseInterval和minPauseBetweenCheckpoint之间的关系。为分析二者的关系,这里先给出源码中定义app

1     /** The base checkpoint interval. Actual trigger time may be affected by the 2  * max concurrent checkpoints and minimum-pause values */ 3 //checkpoint触发周期,时间触发时间还受maxConcurrentCheckpointAttempts和minPauseBetweenCheckpointsNanos影响 4 private final long baseInterval; 5 6 /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to 7  * enforce minimum processing time between checkpoint attempts */ 8 //在能够触发checkpoint的时,两次checkpoint之间的时间间隔 9 private final long minPauseBetweenCheckpointsNanos;

   当baseInterval<minPauseBetweenCheckpoint时,在CheckpointCoordinator.java源码中定义以下:异步

1     // it does not make sense to schedule checkpoints more often then the desired 2 // time between checkpoints 3 long baseInterval = chkConfig.getCheckpointInterval(); 4 if (baseInterval < minPauseBetweenCheckpoints) { 5 baseInterval = minPauseBetweenCheckpoints; 6 }

   今后能够看出,checkpoint的触发虽然设置为周期性的,可是实际触发状况,还得考虑minPauseBetweenCheckpoint和maxConcurrentCheckpointAttempts,若maxConcurrentCheckpointAttempts为1,就算知足触发时间也需等待正在执行的checkpoint结束。async

二、checkpoint调用过程

  将JobGraph提交到Dispatcher后,会createJobManagerRunner和startJobManagerRunner,能够关注Dispatcher类中的createJobManagerRunner(...)方法。ide

  2.1 createJobManagerRunner阶段

  该阶段会建立一个JobManagerRunner实例,在该过程和checkpoint有关的是会启动listener去监听job的状态。ui

 1   #JobManagerRunner.java  2 public JobManagerRunner(...) throws Exception {  3  4 //..........  5  6 // make sure we cleanly shut down out JobManager services if initialization fails  7 try {  8 //..........  9 //加载JobGraph、library、leader选举等 10 11 // now start the JobManager 12 //启动JobManager 13 this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader); 14  } 15 catch (Throwable t) { 16 //...... 17  } 18  } 19 20 //在DefaultJobMasterServiceFactory类的createJobMasterService()中新建一个JobMaster对象 21 //#JobMaster.java 22 public JobMaster(...) throws Exception { 23 24 //........ 25 //该方法中主要作了参数检查,slotPool的建立、slotPool的schedul的建立等一系列的事情 26 27 //建立一个调度器 28 this.schedulerNG = createScheduler(jobManagerJobMetricGroup); 29 //...... 30 }

   在建立调度器中核心的语句以下:this

 1   //#LegacyScheduler.java中的LegacyScheduler()  2 //建立ExecutionGraph  3 this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));  4     5  6 private ExecutionGraph createAndRestoreExecutionGraph(  7  JobManagerJobMetricGroup currentJobManagerJobMetricGroup,  8 ShuffleMaster<?> shuffleMaster,  9 PartitionTracker partitionTracker) throws Exception { 10 11 12 ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker); 13 14 final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); 15 16 if (checkpointCoordinator != null) { 17 // check whether we find a valid checkpoint 18 //若state没有被恢复是否能够经过savepoint恢复 19 //...... 20  } 21  } 22 23 return newExecutionGraph; 24 }

   经过调用到达生成ExecutionGraph的核心类ExecutionGraphBuilder的在buildGraph()方法,其中该方法主要是生成ExecutionGraph和设置checkpoint,下面给出其中的核心代码:atom

 1     //..............  2 //生成ExecutionGraph的核心方法,这里后期会详细分析  3  executionGraph.attachJobGraph(sortedTopology);  4  5 //.......................  6  7 //在enableCheckpointing中设置CheckpointCoordinator  8  executionGraph.enableCheckpointing(  9  chkConfig, 10  triggerVertices, 11  ackVertices, 12  confirmVertices, 13  hooks, 14  checkpointIdCounter, 15  completedCheckpoints, 16  rootBackend, 17 checkpointStatsTracker);

   在enableCheckpointing()方法中主要是建立了checkpoint失败是的manager、设置了checkpoint的核心类CheckpointCoordinator。

 1     //#ExecutionGraph.java  2 public void enableCheckpointing(  3  CheckpointCoordinatorConfiguration chkConfig,  4 List<ExecutionJobVertex> verticesToTrigger,  5 List<ExecutionJobVertex> verticesToWaitFor,  6 List<ExecutionJobVertex> verticesToCommitTo,  7 List<MasterTriggerRestoreHook<?>> masterHooks,  8  CheckpointIDCounter checkpointIDCounter,  9  CompletedCheckpointStore checkpointStore, 10  StateBackend checkpointStateBackend, 11  CheckpointStatsTracker statsTracker) { 12 //Job的状态必须为Created, 13 checkState(state == JobStatus.CREATED, "Job must be in CREATED state"); 14 checkState(checkpointCoordinator == null, "checkpointing already enabled"); 15 //checkpointing的不一样状态 16 ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger); 17 ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor); 18 ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); 19 20 checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker"); 21 //checkpoint失败manager,如果checkpoint失败会根据设置来决定下一步 22 CheckpointFailureManager failureManager = new CheckpointFailureManager( 23  chkConfig.getTolerableCheckpointFailureNumber(), 24 new CheckpointFailureManager.FailJobCallback() { 25  @Override 26 public void failJob(Throwable cause) { 27 getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause)); 28  } 29 30  @Override 31 public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) { 32 getJobMasterMainThreadExecutor().execute(() -> failGlobalIfExecutionIsStillRunning(cause, failingTask)); 33  } 34  } 35  ); 36 37 // create the coordinator that triggers and commits checkpoints and holds the state 38 //checkpoint的核心类CheckpointCoordinator 39 checkpointCoordinator = new CheckpointCoordinator( 40  jobInformation.getJobId(), 41  chkConfig, 42  tasksToTrigger, 43  tasksToWaitFor, 44  tasksToCommitTo, 45  checkpointIDCounter, 46  checkpointStore, 47  checkpointStateBackend, 48  ioExecutor, 49  SharedStateRegistry.DEFAULT_FACTORY, 50  failureManager); 51 52 // register the master hooks on the checkpoint coordinator 53 for (MasterTriggerRestoreHook<?> hook : masterHooks) { 54 if (!checkpointCoordinator.addMasterHook(hook)) { 55 LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier()); 56  } 57  } 58 //checkpoint统计 59  checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker); 60 61 // interval of max long value indicates disable periodic checkpoint, 62 // the CheckpointActivatorDeactivator should be created only if the interval is not max value 63 //设置为Long.MAX_VALUE标识关闭周期性的checkpoint 64 if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) { 65 // the periodic checkpoint scheduler is activated and deactivated as a result of 66 // job status changes (running -> on, all other states -> off) 67 //只有在job的状态为running时,才会开启checkpoint的scheduler 68 //createActivatorDeactivator()建立一个listener监听器 69 //registerJobStatusListener()将listener加入监听器集合jobStatusListeners中 70  registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); 71  } 72  } 73 74 75 //#CheckpointCoordinator.java 76 / ------------------------------------------------------------------------ 77 // job status listener that schedules / cancels periodic checkpoints 78 // ------------------------------------------------------------------------ 79 //建立一个listener监听器checkpointCoordinator.createActivatorDeactivator() 80 public JobStatusListener createActivatorDeactivator() { 81 synchronized (lock) { 82 if (shutdown) { 83 throw new IllegalArgumentException("Checkpoint coordinator is shut down"); 84  } 85 86 if (jobStatusListener == null) { 87 jobStatusListener = new CheckpointCoordinatorDeActivator(this); 88  } 89 90 return jobStatusListener; 91  } 92 }

   至此,createJobManagerRunner阶段结束了,ExecutionGraph中checkpoint的配置就设置好了。

  2.2 startJobManagerRunner阶段

  在该阶段中,在得到leaderShip以后,就会启动startJobExecution,这里只给出调用涉及的类和方法:

1     //#JobManagerRunner.java类中 2 //grantLeadership(...)==>verifyJobSchedulingStatusAndStartJobManager(...) 3 //==>startJobMaster(...),该方法中核心代码为 4 startFuture = jobMasterService.start(new JobMasterId(leaderSessionId)); 5 6 //进一步调用#JobMaster.java类中的start()==>startJobExecution(...) 

   startJobExecution()方法是JobMaster类中的私有方法,具体代码分析以下:

 1   //----------------------------------------------------------------------------------------------  2 // Internal methods  3 //----------------------------------------------------------------------------------------------  4  5 //-- job starting and stopping -----------------------------------------------------------------  6  7 private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {  8  9  validateRunsInMainThread(); 10 11 checkNotNull(newJobMasterId, "The new JobMasterId must not be null."); 12 13 if (Objects.equals(getFencingToken(), newJobMasterId)) { 14 log.info("Already started the job execution with JobMasterId {}.", newJobMasterId); 15 16 return Acknowledge.get(); 17  } 18 19  setNewFencingToken(newJobMasterId); 20 //启动slotPool并申请资源,该方法能够具体看看申请资源的过程 21  startJobMasterServices(); 22 23 log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId); 24 //执行ExecuteGraph的切入口,先判断job的状态是否为created的,后调执行executionGraph.scheduleForExecution(); 25  resetAndStartScheduler(); 26 27 return Acknowledge.get(); 28 }

   在LegacyScheduler类中的方法scheduleForExecution()调度过程以下:

 1     public void scheduleForExecution() throws JobException {  2 
 3  assertRunningInJobMasterMainThread();  4 
 5         final long currentGlobalModVersion = globalModVersion;  6         //任务执行以前进行状态切换从CREATED到RUNNING,  7         //transitionState(...)方法中会经过notifyJobStatusChange(newState, error)通知jobStatusListeners集合中listeners状态改变
 8         if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {  9             //根据启动算子调度模式不一样,采用不一样的调度方案
10             final CompletableFuture<Void> newSchedulingFuture = SchedulingUtils.schedule( 11  scheduleMode, 12  getAllExecutionVertices(), 13                 this); 14             
15             //..............
16  } 17         else { 18             throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED); 19  } 20  } 21     
22     private void notifyJobStatusChange(JobStatus newState, Throwable error) { 23         if (jobStatusListeners.size() > 0) { 24             final long timestamp = System.currentTimeMillis(); 25             final Throwable serializedError = error == null ? null : new SerializedThrowable(error); 26 
27             for (JobStatusListener listener : jobStatusListeners) { 28                 try { 29  listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError); 30                 } catch (Throwable t) { 31                     LOG.warn("Error while notifying JobStatusListener", t); 32  } 33  } 34  } 35  } 36     
37     
38     //#CheckpointCoordinatorDeActivator.java
39     public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { 40         if (newJobStatus == JobStatus.RUNNING) { 41             // start the checkpoint scheduler 42             //触发checkpoint的核心方法
43  coordinator.startCheckpointScheduler(); 44         } else { 45             // anything else should stop the trigger for now
46  coordinator.stopCheckpointScheduler(); 47  } 48     }

   下面具体分析触发checkpoint的核心方法startCheckpointScheduler()。

  startCheckpointScheduler()方法结合注释仍是比较好理解的,但因为方法太长这里就不所有贴出来了,先分析一下大体作什么了,而后给出其核心代码:

  1)检查触发checkpoint的条件。如coordinator被关闭、周期性checkpoint被禁止、在没有开启强制checkpoint的状况下没有达到最小的checkpoint间隔以及超过并发的checkpoint个数等;

  2)检查是否全部须要checkpoint和须要响应checkpoint的ACK(的task都处于running状态,不然抛出异常;

  3)若均符合,执行checkpointID = checkpointIdCounter.getAndIncrement();以生成一个新的checkpointID,而后生成一个PendingCheckpoint。其中,PendingCheckpoint仅是一个启动了的checkpoint,可是尚未被确认,直到全部的task都确认了本次checkpoint,该checkpoint对象才转化为一个CompletedCheckpoint;

  4)调度timer清理失败的checkpoint;

  5)定义一个超时callback,若是checkpoint执行了好久还没完成,就把它取消;

  6)触发MasterHooks,用户能够定义一些额外的操做,用以加强checkpoint的功能(如准备和清理外部资源);

  核心代码以下:

1 // send the messages to the tasks that trigger their checkpoint 2 //遍历ExecutionVertex,是否异步触发checkpoint 3 for (Execution execution: executions) { 4 if (props.isSynchronous()) { 5  execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime); 6 } else { 7  execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); 8  } 9 }

   不论是否以异步的方式触发checkpoint,最终调用的方法是Execution类中的私有方法triggerCheckpointHelper(...),具体代码以下:

 1   //Execution.java  2 private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {  3  4 final CheckpointType checkpointType = checkpointOptions.getCheckpointType();  5 if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {  6 throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");  7  }  8  9 final LogicalSlot slot = assignedResource; 10 11 if (slot != null) { 12 //TaskManagerGateway是用于与taskManager通讯的组件 13 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); 14 15  taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime); 16 } else { 17 LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running."); 18  } 19 }

   至此,checkpointCoordinator就将作checkpoint的命令发送到TaskManager去了,下面着重分析TM中checkpoint的执行过程。

  2.3 TaskManager中checkpoint

  TaskManager 接收到触发checkpoint的RPC后,会触发生成checkpoint barrier。RpcTaskManagerGateway做为消息入口,其triggerCheckpoint(...)会调用TaskExecutor的triggerCheckpoint(...),具体过程以下:

 1   //RpcTaskManagerGateway.java  2 public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {  3  taskExecutorGateway.triggerCheckpoint(  4  executionAttemptID,  5  checkpointId,  6  timestamp,  7  checkpointOptions,  8  advanceToEndOfEventTime);  9  } 10 11 //TaskExecutor.java 12  @Override 13 public CompletableFuture<Acknowledge> triggerCheckpoint( 14  ExecutionAttemptID executionAttemptID, 15 long checkpointId, 16 long checkpointTimestamp, 17  CheckpointOptions checkpointOptions, 18 boolean advanceToEndOfEventTime) { 19 log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); 20 21 //........... 22 23 if (task != null) { 24 //核心方法,触发生成barrier 25  task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime); 26 27 return CompletableFuture.completedFuture(Acknowledge.get()); 28 } else { 29 final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.'; 30 31 //......... 32  } 33 }

   在Task类的triggerCheckpointBarrier(...)方法中生成了一个Runable匿名类用于执行checkpoint,而后以异步的方式触发了该Runable,具体代码以下:

 1     public void triggerCheckpointBarrier(  2 final long checkpointID,  3 final long checkpointTimestamp,  4 final CheckpointOptions checkpointOptions,  5 final boolean advanceToEndOfEventTime) {  6  7 final AbstractInvokable invokable = this.invokable;  8 //建立一个CheckpointMetaData,该对象仅有checkpointID、checkpointTimestamp两个属性  9 final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); 10 11 if (executionState == ExecutionState.RUNNING && invokable != null) { 12 13 //.............. 14 15 Runnable runnable = new Runnable() { 16  @Override 17 public void run() { 18 // set safety net from the task's context for checkpointing thread 19 LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); 20  FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); 21 22 try { 23 //根据SourceStreamTask和StreamTask调用不一样的方法 24 boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); 25 if (!success) { 26  checkpointResponder.declineCheckpoint( 27  getJobID(), getExecutionId(), checkpointID, 28 new CheckpointException("Task Name" + taskName, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY)); 29  } 30  } 31 catch (Throwable t) { 32 if (getExecutionState() == ExecutionState.RUNNING) { 33 failExternally(new Exception( 34 "Error while triggering checkpoint " + checkpointID + " for " + 35  taskNameWithSubtask, t)); 36 } else { 37 LOG.debug("Encountered error while triggering checkpoint {} for " + 38 "{} ({}) while being not in state running.", checkpointID, 39  taskNameWithSubtask, executionId, t); 40  } 41 } finally { 42 FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null); 43  } 44  } 45  }; 46 //以异步的方式触发Runnable 47  executeAsyncCallRunnable( 48  runnable, 49 String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); 50  } 51 else { 52 LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId); 53 54 // send back a message that we did not do the checkpoint 55  checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, 56 new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY)); 57  } 58 }

   SourceStreamTask和StreamTask调用triggerCheckpoint最终都是调用StreamTask类中的triggerCheckpoint(...)方法,其核心代码为:

1   //#StreamTask.java 2 return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);

   在performCheckpoint(...)方法中,主要有如下两件事:

  一、若task是running,则能够进行checkpoint,主要有如下三件事:

    1)为checkpoint作准备,通常是什么不作的,直接接受checkpoint;

    2)生成barrier,并以广播的形式发射到下游去;

    3)触发本task保存state;

  二、若不是running,通知下游取消本次checkpoint,方法是发送一个CancelCheckpointMarker,这是相似于Barrier的另外一种消息。

   具体代码以下:

 1   //#StreamTask.java  2 private boolean performCheckpoint(  3  CheckpointMetaData checkpointMetaData,  4  CheckpointOptions checkpointOptions,  5  CheckpointMetrics checkpointMetrics,  6 boolean advanceToEndOfTime) throws Exception {  7 //......  8  9 synchronized (lock) { 10 if (isRunning) { 11 12 if (checkpointOptions.getCheckpointType().isSynchronous()) { 13  syncSavepointLatch.setCheckpointId(checkpointId); 14 15 if (advanceToEndOfTime) { 16  advanceToEndOfEventTime(); 17  } 18  } 19 20 // All of the following steps happen as an atomic step from the perspective of barriers and 21 // records/watermarks/timers/callbacks. 22 // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream 23 // checkpoint alignments 24 25 // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. 26 // The pre-barrier work should be nothing or minimal in the common case. 27  operatorChain.prepareSnapshotPreBarrier(checkpointId); 28 29 // Step (2): Send the checkpoint barrier downstream 30  operatorChain.broadcastCheckpointBarrier( 31  checkpointId, 32  checkpointMetaData.getTimestamp(), 33  checkpointOptions); 34 35 // Step (3): Take the state snapshot. This should be largely asynchronous, to not 36 // impact progress of the streaming topology 37  checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); 38 39 return true; 40  } 41 else { 42 //....... 43  } 44  } 45 }

    接下来分析checkpointState(...)过程。

  checkpointState(...)方法最终会调用StreamTask类中executeCheckpointing(),其中会建立一个异步对象AsyncCheckpointRunnable,用以报告该检查点已完成,关键代码以下:

 1   //#StreamTask.java类中executeCheckpointing()  2 public void executeCheckpointing() throws Exception {  3 startSyncPartNano = System.nanoTime();  4  5 try {  6 //调用StreamOperator进行snapshotState的入口方法,依算子不一样而变  7 for (StreamOperator<?> op : allOperators) {  8  checkpointStreamOperator(op);  9  } 10 //......... 11 12 // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit 13 AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( 14  owner, 15  operatorSnapshotsInProgress, 16  checkpointMetaData, 17  checkpointMetrics, 18  startAsyncPartNano); 19 20  owner.cancelables.registerCloseable(asyncCheckpointRunnable); 21  owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable); 22 23 //......... 24 } catch (Exception ex) { 25 //....... 26  } 27 }

   进入AsyncCheckpointRunnable(...)中的run()方法,其中会调用StreamTask类中reportCompletedSnapshotStates(...)(对于一个无状态的job返回的null),进而调用TaskStateManagerImpl类中的reportTaskStateSnapshots(...)将TM的checkpoint汇报给JM,关键代码以下:

1     //TaskStateManagerImpl.java
2  checkpointResponder.acknowledgeCheckpoint( 3  jobId, 4  executionAttemptID, 5  checkpointId, 6  checkpointMetrics, 7             acknowledgedState);

  其逻辑是逻辑是经过rpc的方式远程调JobManager的相关方法完成报告事件。

  2.4 JobManager处理checkpoint

  经过RpcCheckpointResponder类中acknowledgeCheckpoint(...)来响应checkpoint返回的消息,该方法以后的调度过程和涉及的核心方法以下:

 1    //#JobMaster类中acknowledgeCheckpoint==>  2 //#LegacyScheduler类中acknowledgeCheckpoint==>  3 //#CheckpointCoordinator类中receiveAcknowledgeMessage(...)==>  4 //completePendingCheckpoint(checkpoint);  5  6 //<p>Important: This method should only be called in the checkpoint lock scope  7 private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {  8 final long checkpointId = pendingCheckpoint.getCheckpointId();  9 final CompletedCheckpoint completedCheckpoint; 10 11 // As a first step to complete the checkpoint, we register its state with the registry 12 Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates(); 13  sharedStateRegistry.registerAll(operatorStates.values()); 14 15 try { 16 try { 17 //完成checkpoint 18 completedCheckpoint = pendingCheckpoint.finalizeCheckpoint(); 19  failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId()); 20  } 21 catch (Exception e1) { 22 // abort the current pending checkpoint if we fails to finalize the pending checkpoint. 23 if (!pendingCheckpoint.isDiscarded()) { 24  failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1); 25  } 26 27 throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', 28  CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1); 29  } 30 31 // the pending checkpoint must be discarded after the finalization 32 Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null); 33 34 try { 35 //添加新的checkpoints,如有必要(completedCheckpoints.size() > maxNumberOfCheckpointsToRetain)删除旧的 36  completedCheckpointStore.addCheckpoint(completedCheckpoint); 37 } catch (Exception exception) { 38 // we failed to store the completed checkpoint. Let's clean up 39 executor.execute(new Runnable() { 40  @Override 41 public void run() { 42 try { 43  completedCheckpoint.discardOnFailedStoring(); 44 } catch (Throwable t) { 45 LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t); 46  } 47  } 48  }); 49 50 throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', 51  CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception); 52  } 53 } finally { 54  pendingCheckpoints.remove(checkpointId); 55 56  triggerQueuedRequests(); 57  } 58 59  rememberRecentCheckpointId(checkpointId); 60 61 // drop those pending checkpoints that are at prior to the completed one 62 //删除在其以前未完成的checkpoint(优先级高的) 63  dropSubsumedCheckpoints(checkpointId); 64 65 // record the time when this was completed, to calculate 66 // the 'min delay between checkpoints' 67 lastCheckpointCompletionNanos = System.nanoTime(); 68 69 LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job, 70  completedCheckpoint.getStateSize(), completedCheckpoint.getDuration()); 71 72 if (LOG.isDebugEnabled()) { 73 StringBuilder builder = new StringBuilder(); 74 builder.append("Checkpoint state: "); 75 for (OperatorState state : completedCheckpoint.getOperatorStates().values()) { 76  builder.append(state); 77 builder.append(", "); 78  } 79 // Remove last two chars ", " 80 builder.setLength(builder.length() - 2); 81 82  LOG.debug(builder.toString()); 83  } 84 85 // send the "notify complete" call to all vertices 86 final long timestamp = completedCheckpoint.getTimestamp(); 87 88 //通知全部(TM中)operator该checkpoint已完成 89 for (ExecutionVertex ev : tasksToCommitTo) { 90 Execution ee = ev.getCurrentExecutionAttempt(); 91 if (ee != null) { 92  ee.notifyCheckpointComplete(checkpointId, timestamp); 93  } 94  } 95 }

   至此,checkpoint的总体流程分析完毕建议结合原理去理解,参考的三篇文献都是写的很好的,有时间建议看看。

 

Ref:

[1]https://www.jianshu.com/p/a40a1b92f6a2

[2]http://www.javashuo.com/article/p-khdfegoi-mh.html

[3] https://blog.csdn.net/qq475781638/article/details/92698301

相关文章
相关标签/搜索