Azkaban3.45git
https://azkaban.github.io/github
Azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies. We had jobs that needed to run in order, from ETL jobs to data analytics products.ajax
Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.sql
Azkaban是由LinkedIn为了解决Hadoop环境下任务依赖问题而开发的,LinkedIn团队有不少任务须要按照顺序运行,包括ETL任务以及数据分析任务;数据库
Azkaban一开始是单server方案,如今已经演化为一个更健壮的方案;(惋惜当前版本的WebServer仍是单点)json
Azkaban consists of 3 key components:oop
Azkaban有3个核心组件:Mysql、WebServer、ExecutorServer;fetch
projects:项目编码
project_flows:工做流定义spa
execution_flows:工做流实例
execution_jobs:任务实例
triggers:调度定义
ps:表中不少数据都是编码的,enc_type是编码类型(对应的枚举为EncodingType),2是gzip编码,其余为无编码,2须要调用GZIPUtils.transformBytesToObject解析获得原始字符串;
l Job:最小的执行单元,做为DAG的一个结点,即任务
l Flow:由多个Job组成,并经过dependent配置Job的依赖属性,即工做流
l Tirgger:根据指定Cron信息触发Flow,即调度
AzkabanWebServer.main
launch
prepareAndStartServer
configureRoutes
TriggerManager.start
FlowTriggerService.start
recoverIncompleteTriggerInstances
SELECT %s FROM execution_dependencies WHERE trigger_instance_id in (SELECT trigger_instance_id FROM execution_dependencies WHERE dep_status = %s or dep_status = %s or (dep_status = %s and flow_exec_id = %s))
FlowTriggerScheduler.start
ExecutorManager
setupExecutors
loadRunningFlows
QueueProcessorThread.run
ExecutingManagerUpdaterThread.run
AzkabanExecutorServer.main
launch
AzkabanExecutorServer.start
insertExecutorEntryIntoDB
Web Server两个入口:
ExecuteFlowAction.doAction
ExecutorServlet.ajaxExecuteFlow
Web Server分配任务:
ExecutorManager.submitExecutableFlow
JdbcExecutorLoader.uploadExecutableFlow
INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)
ExecutorLoader.addActiveExecutableReference
INSERT INTO active_executing_flows (exec_id, update_time) values (?,?)
queuedFlows.enqueue
QueueProcessorThread.run
processQueuedFlows
ExecutorManager.selectExecutorAndDispatchFlow (get from queuedFlows)
selectExecutor
dispatch
JdbcExecutorLoader.assignExecutor
UPDATE execution_flows SET executor_id=? where exec_id=?
ExecutorApiGateway.callWithExecutable (调用Executor Server)
Executor Server执行任务:
ExecutorServlet.doGet
handleAjaxExecute
FlowRunnerManager.submitFlow
JdbcExecutorLoader.fetchExecutableFlow
SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?
FlowPreparer.setup
FlowRunner.run
setupFlowExecution
updateFlow
UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?
runFlow
progressGraph
runReadyJob
runExecutableNode
JobRunner.run
uploadExecutableNode
INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)
prepareJob
runJob
Job.run (ProcessJob, JavaJob)
Web Server轮询流程状态:
ExecutingManagerUpdaterThread.run
getFlowToExecutorMap
ExecutorApiGateway.callWithExecutionId
updateExecution
TriggerManager.start
loadTriggers
SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM triggers
TriggerScannerThread.start
checkAllTriggers
onTriggerTrigger
TriggerAction.doAction
ExecuteFlowAction.doAction
PS:还有另外一套彻底独立的定时任务逻辑,经过azkaban.server.schedule.enable_quartz控制(默认false),如下为register job到quartz:
ProjectManagerServlet.ajaxHandleUpload
SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true
ProjectManager.loadAllProjectFlows
SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?
FlowTriggerScheduler.scheduleAll
SELECT MAX(flow_version) FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=?
SELECT flow_file FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=? AND flow_version=?
registerJob
如下为quartz job执行:
FlowTriggerQuartzJob.execute
FlowTriggerService.startTrigger
TriggerInstanceProcessor.processSucceed
TriggerInstanceProcessor.executeFlowAndUpdateExecID
ExecutorManager.submitExecutableFlow
Job是任务的核心接口,全部具体任务都是该接口的子类:
Job
AbstractJob
AbstractProcessJob
ProcessJob (Shell任务)
JavaProcessJob (Java任务)
JavaJob