【原创】大数据基础之Azkaban(1)简介、源代码解析

Azkaban3.45git

一 简介

1 官网

https://azkaban.github.io/github

Azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies. We had jobs that needed to run in order, from ETL jobs to data analytics products.ajax

Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.sql

Azkaban是由LinkedIn为了解决Hadoop环境下任务依赖问题而开发的,LinkedIn团队有不少任务须要按照顺序运行,包括ETL任务以及数据分析任务;数据库

Azkaban一开始是单server方案,如今已经演化为一个更健壮的方案;(惋惜当前版本的WebServer仍是单点)json

Azkaban consists of 3 key components:oop

  • Relational Database (MySQL)
  • AzkabanWebServer
  • AzkabanExecutorServer

Azkaban有3个核心组件:Mysql、WebServer、ExecutorServer;fetch

2 部署

 

3 数据库表结构

 

projects:项目编码

project_flows:工做流定义spa

execution_flows:工做流实例

execution_jobs:任务实例

triggers:调度定义

ps:表中不少数据都是编码的,enc_type是编码类型(对应的枚举为EncodingType),2是gzip编码,其余为无编码,2须要调用GZIPUtils.transformBytesToObject解析获得原始字符串;

4 概念

l  Job:最小的执行单元,做为DAG的一个结点,即任务

l  Flow:由多个Job组成,并经过dependent配置Job的依赖属性,即工做流

l  Tirgger:根据指定Cron信息触发Flow,即调度

二 代码解析

1 启动过程

Web Server

AzkabanWebServer.main

         launch

                  prepareAndStartServer

                          configureRoutes

                                   TriggerManager.start

                          FlowTriggerService.start

                                   recoverIncompleteTriggerInstances

                                            SELECT %s FROM execution_dependencies WHERE trigger_instance_id in (SELECT trigger_instance_id FROM execution_dependencies WHERE dep_status = %s or dep_status = %s or (dep_status = %s and flow_exec_id = %s))

                          FlowTriggerScheduler.start

 

ExecutorManager

         setupExecutors

         loadRunningFlows

QueueProcessorThread.run

ExecutingManagerUpdaterThread.run

Executor Server

AzkabanExecutorServer.main

         launch

                  AzkabanExecutorServer.start

                          insertExecutorEntryIntoDB

 

2 工做流执行过程

Web Server两个入口:

ExecuteFlowAction.doAction

ExecutorServlet.ajaxExecuteFlow

 

Web Server分配任务:

ExecutorManager.submitExecutableFlow

         JdbcExecutorLoader.uploadExecutableFlow

                  INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)

         ExecutorLoader.addActiveExecutableReference

                  INSERT INTO active_executing_flows (exec_id, update_time) values (?,?)

         queuedFlows.enqueue

 

QueueProcessorThread.run

         processQueuedFlows

                  ExecutorManager.selectExecutorAndDispatchFlow (get from queuedFlows)

                          selectExecutor

                          dispatch

                                   JdbcExecutorLoader.assignExecutor

                                            UPDATE execution_flows SET executor_id=? where exec_id=?

                                   ExecutorApiGateway.callWithExecutable (调用Executor Server)

 

Executor Server执行任务:

ExecutorServlet.doGet

         handleAjaxExecute

                  FlowRunnerManager.submitFlow

                          JdbcExecutorLoader.fetchExecutableFlow

                                 SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?

                          FlowPreparer.setup

                          FlowRunner.run

                                   setupFlowExecution

                                   updateFlow

                                            UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?

                                   runFlow

                                            progressGraph

                                                     runReadyJob

                                                             runExecutableNode

                                                                      JobRunner.run

                                                                               uploadExecutableNode

                                                                                        INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)

                                                                               prepareJob

                                                                               runJob

                                                                                        Job.run (ProcessJob, JavaJob)

 

Web Server轮询流程状态:

ExecutingManagerUpdaterThread.run

         getFlowToExecutorMap

         ExecutorApiGateway.callWithExecutionId

         updateExecution

 

3 调度执行过程

TriggerManager.start

         loadTriggers

                  SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM triggers

         TriggerScannerThread.start

                  checkAllTriggers

                          onTriggerTrigger

                                   TriggerAction.doAction

                                            ExecuteFlowAction.doAction

 

PS:还有另外一套彻底独立的定时任务逻辑,经过azkaban.server.schedule.enable_quartz控制(默认false),如下为register job到quartz:

ProjectManagerServlet.ajaxHandleUpload

         SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true

         ProjectManager.loadAllProjectFlows

                  SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?

         FlowTriggerScheduler.scheduleAll

                  SELECT MAX(flow_version) FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=?

                  SELECT flow_file FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=? AND flow_version=?

                  registerJob

 

如下为quartz job执行:

FlowTriggerQuartzJob.execute

         FlowTriggerService.startTrigger

                  TriggerInstanceProcessor.processSucceed

                          TriggerInstanceProcessor.executeFlowAndUpdateExecID

                                   ExecutorManager.submitExecutableFlow

 

4 任务执行过程

Job是任务的核心接口,全部具体任务都是该接口的子类:

Job

         AbstractJob

                  AbstractProcessJob

                          ProcessJob (Shell任务)

                                   JavaProcessJob (Java任务)

                                            JavaJob

相关文章
相关标签/搜索