上节,咱们看到了任务放在executor的一个线程池里,下面开始分析真正的执行过程!node
================================================================================app
jdb azkaban.execapp.AzkabanExecutorServer -conf /root/azkb/azkaban_3.0.0_debug/confspa
stop in azkaban.execapp.FlowRunner.run线程
rundebug
================================================================================ci
你们知道,关于一个拓扑图来讲,有一个起点的说法get
可能在拓扑图里有多个起点,看下azkaban的起点计算方法it
public List<String> getStartNodes() {io
if (startNodes == null) {table
startNodes = new ArrayList<String>();
for (ExecutableNode node : executableNodes.values()) {
if (node.getInNodes().isEmpty()) {
startNodes.add(node.getId());
}
}
}
return startNodes;
}
================================================================================
以及执行过程
for (String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) {
ExecutableNode startNode = flow.getExecutableNode(startNodeId);
runReadyJob(startNode);
}
================================================================================
通过一番操做后,最终执行的代码以下所示:
stop in azkaban.execapp.FlowRunner.runExecutableNode
private void runExecutableNode(ExecutableNode node) throws IOException {
// Collect output props from the job's dependencies.
prepareJobProperties(node);
node.setStatus(Status.QUEUED);
JobRunner runner = createJobRunner(node);
logger.info("Submitting job '" + node.getNestedId() + "' to run.");
try {
executorService.submit(runner);
activeJobRunners.add(runner);
} catch (RejectedExecutionException e) {
logger.error(e);
}
;
}
本质就是把各个job抛到线程池里运行,而后执行下面的代码!
while (!flowFinished) {
synchronized (mainSyncObj) {
if (flowPaused) {
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
}
continue;
} else {
if (retryFailedJobs) {
retryAllFailures();
} else if (!progressGraph()) {
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
}
}
}
}
}
这里面就是一些策略性的问题了,懒的看了。
后面重点去看具体的一个JobRunner的运行!