Oozie是由Cloudera公司贡献给Apache的基于工做流引擎的开源框架,是Hadoop平台的开源的工做流调度引擎,用来管理Hadoop做业。本文是系列的第一篇,介绍Oozie的任务提交阶段。html
咱们从需求逆推实现,即考虑若是咱们从无到有实现工做流引擎,咱们须要实现哪些部分?从而咱们能够提出一系列问题从而去Oozie中探寻。java
做为工做流引擎须要实现哪些部分?大体想了想,以为须要有:node
由于篇幅和精力所限,咱们没法研究全部源码,回答全部问题,因此咱们先整理出部分问题,在后面Oozie源码分析中一一解答:web
Oozie由Oozie client和Oozie Server两个组件构成,Oozie Server是运行于Java Servlet容器(Tomcat)中的web应用程序。Oozie client用于给Oozie Server说起任务,Oozie client 提交任务的途径是HTTP请求。sql
实际上Oozie Server就至关于Hadoop的一个客户端,当用户须要执行多个关联的MR任务时,只须要将MR执行顺序写入workflow.xml,而后使用Oozie Server提交本次任务,Oozie Server会托管此任务流。shell
Oozie Server 具体操做的是workflow,即Oozie主要维护workflow的执行 / workflow内部Action的串联和跳转。数据库
具体Action的执行是由Yarn去执行,Yarn会把Action分配给有充足资源的节点执行。Action是异步执行,因此Action结束时候会经过回调方式通知Oozie执行结果,Oozie也会采用轮询方式去获取Action结果(为了提升可靠性)。apache
大体提交流程以下:json
Oozie client ------> Oozie Server -------> Yarn ------> Hadoop
Oozie特色以下:服务器
Oozie主要由如下功能模块构成:
咱们就从无到有,看看一个Workflow从提交到最后是如何运行的,假设这个workflow开始后,进入一个hive action,这个hive自己配置的是由tez引擎执行 。下面是代码简化版。
<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive-wf"> <start to="hive-node"/> <action name="hive-node"> <hive xmlns="uri:oozie:hive-action:0.5"> <script>hive.sql</script> </hive> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Hive failed, error message</message> </kill> <end name="end"/> </workflow-app>
Oozie Client是用户用来提交任务给Oozie Server的途径,其能够启动任务,中止任务,提交任务,开始任务,查看任务执行状况。好比启动任务以下:
oozie job -oozie oozie_url -config job.properties_address -run
既然有启动脚本,咱们就直接去里面探寻程序入口。
${JAVA_BIN} ${OOZIE_CLIENT_OPTS} -cp ${OOZIECPPATH} org.apache.oozie.cli.OozieCLI "${@}"
这就看到了Client 的入口类,咱们去看看。
public class OozieCLI { public static void main(String[] args) { if (!System.getProperties().containsKey(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP)) { System.setProperty(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP, "true"); } System.exit(new OozieCLI().run(args)); } }
咱们能够看到,通过验证以后,程序直接从main函数进入到了run函数。
public class OozieCLI { public synchronized int run(String[] args) { final CLIParser parser = getCLIParser(); try { final CLIParser.Command command = parser.parse(args); String doAsUser = command.getCommandLine().getOptionValue(DO_AS_OPTION); if (doAsUser != null) { OozieClient.doAs(doAsUser, new Callable<Void>() { @Override public Void call() throws Exception { processCommand(parser, command); return null; } }); } else { processCommand(parser, command); } return 0; } } }
看来主要的内容是在这个processCommand里面,其会根据命令调用相应的命令方法。经过command.getName()
咱们能够清楚的知道Oozie目前支持什么种类的任务,好比 JOB_CMD,JOBS_CMD,PIG_CMD,SQOOP_CMD,MR_CMD。
public void processCommand(CLIParser parser, CLIParser.Command command) throws Exception { switch (command.getName()) { case JOB_CMD: jobCommand(command.getCommandLine()); break; case JOBS_CMD: jobsCommand(command.getCommandLine()); break; case HIVE_CMD: scriptLanguageCommand(command.getCommandLine(), HIVE_CMD); break; ...... default: parser.showHelp(command.getCommandLine()); } }
咱们以Hive为例看看如何处理。Hive就是调用 scriptLanguageCommand。
private void scriptLanguageCommand(CommandLine commandLine, String jobType){ List<String> args = commandLine.getArgList(); try { XOozieClient wc = createXOozieClient(commandLine); Properties conf = getConfiguration(wc, commandLine); String script = commandLine.getOptionValue(SCRIPTFILE_OPTION); List<String> paramsList = new ArrayList<>(); ...... System.out.println(JOB_ID_PREFIX + wc.submitScriptLanguage(conf, script, args.toArray(new String[args.size()]), paramsList.toArray(new String[paramsList.size()]), jobType)); } }
这里关键代码是:wc.submitScriptLanguage,因此咱们须要看看XOozieClient.submitScriptLanguage。其注释代表做用是经过HTTP来提交 Pig 或者 Hive。
public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType) throws IOException, OozieClientException { switch (jobType) { case OozieCLI.HIVE_CMD: script = XOozieClient.HIVE_SCRIPT; options = XOozieClient.HIVE_OPTIONS; scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS; break; case OozieCLI.PIG_CMD: ...... } conf.setProperty(script, readScript(scriptFile)); setStrings(conf, options, args); setStrings(conf, scriptParams, params); return (new HttpJobSubmit(conf, jobType)).call(); }
而HttpJobSubmit就是向Oozie Server提交job,因此咱们最终是须要去Oozie Server探究。
private class HttpJobSubmit extends ClientCallable<String> { @Override protected String call(HttpURLConnection conn) throws IOException, OozieClientException { conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); writeToXml(conf, conn.getOutputStream()); if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) { JSONObject json = (JSONObject) JSONValue.parse( new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8)); return (String) json.get(JsonTags.JOB_ID); } return null; } }
前面咱们提到,Oozie Server是运行于Java Servlet容器(Tomcat)中的web应用程序。因此具体启动等配置信息是在web.xml中。好久没有看到web.xml了,忽然以为好陌生,嘿嘿。
<!-- Servlets --> <servlet> <servlet-name>callback</servlet-name> <display-name>Callback Notification</display-name> <servlet-class>org.apache.oozie.servlet.CallbackServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> <servlet> <servlet-name>v1jobs</servlet-name> <display-name>WS API for Workflow Jobs</display-name> <servlet-class>org.apache.oozie.servlet.V1JobsServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> ......
Ooize的不少基础工做是由Services来完成的,每个service都是一个单例。这些服务的配置信息在ooze-default.xml中
<property> <name>oozie.services</name> <value> org.apache.oozie.service.HadoopAccessorService, org.apache.oozie.service.LiteWorkflowAppService, org.apache.oozie.service.JPAService, org.apache.oozie.service.DBLiteWorkflowStoreService, org.apache.oozie.service.CallbackService, org.apache.oozie.service.ActionService, org.apache.oozie.service.CallableQueueService, org.apache.oozie.service.CoordinatorEngineService, org.apache.oozie.service.BundleEngineService, org.apache.oozie.service.DagEngineService, ...... </value> </property>
ServicesLoader这个类用来启动,加载配置的全部service。
public class ServicesLoader implements ServletContextListener { private static Services services; /** * Initialize Oozie services. */ public void contextInitialized(ServletContextEvent event) { services = new Services(); services.init(); } }
init函数是用来初始化全部配置好的Services,若是有同类型服务,则后来者会被存储。
public class Services { public void init() throws ServiceException { loadServices(); } private void loadServices() throws ServiceException { try { Map<Class<?>, Service> map = new LinkedHashMap<Class<?>, Service>(); Class<?>[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES); Class<?>[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES); List<Service> list = new ArrayList<Service>(); loadServices(classes, list); loadServices(classesExt, list); //removing duplicate services, strategy: last one wins for (Service service : list) { if (map.containsKey(service.getInterface())) { service.getClass()); } map.put(service.getInterface(), service); } for (Map.Entry<Class<?>, Service> entry : map.entrySet()) { setService(entry.getValue().getClass()); } } } }
客户经过oozie脚本提交job以后,进入org.apache.oozie.cli.OozieCLI。会生成一个OozieClient,而后使用JobCommand,提交运行的信息到V1JosServlet的doPost接口,Oozier在doPos接口中会调用submitJob()方法。此时会生成一个DAG对象,而后DAG.submitJon(JobConf,startJob)。
咱们从V1JosServlet.doPost入手。这里是基类。
public abstract class BaseJobsServlet extends JsonRestServlet { protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { JSONObject json = submitJob(request, conf); } }
而后回到 V1JosServlet.submitJob
@Override protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,IOException { String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); if (jobType == null) { String wfPath = conf.get(OozieClient.APP_PATH); if (wfPath != null) { json = submitWorkflowJob(request, conf); // 咱们的目标在这里 } else if (coordPath != null) { json = submitCoordinatorJob(request, conf); } else { json = submitBundleJob(request, conf); } } else { // This is a http submission job ...... } return json; }
而后调用到了 DagEngine.submitJob。从其注释能够看出 The DagEngine provides all the DAG engine functionality for WS calls. 这样咱们就正式来到了DAG的世界。
private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException { try { String action = request.getParameter(RestConstants.ACTION_PARAM); DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); if (action != null) { dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); } if (dryrun) { id = dagEngine.dryRunSubmit(conf); } else { id = dagEngine.submitJob(conf, startJob); // 咱们在这里 } json.put(JsonTags.JOB_ID, id); } return json; }
Oozie有三种核心引擎,其都是继承抽象类BaseEngine。
这三种引擎是:
分别对应
咱们以前提到,这些属于系统Services,都是Singletgon,在Oozie启动时候会加入到Services中。当须要时候经过get来获取。
public class Services { private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>(); public <T extends Service> T get(Class<T> serviceKlass) { return (T) services.get(serviceKlass); } }
具体在V1JosServlet中调用举例:
String user = conf.get(OozieClient.USER_NAME); DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
Oozie把全部命令抽象成Command,这样其内部把程序执行总结成用Command来推进,相似于消息驱动。
Command分为同步和异步。其基类都是XCommand。XCommand提供以下模式:
public abstract class XCommand<T> implements XCallable<T> { ...... private String key; private String name; private String type; private AtomicBoolean used = new AtomicBoolean(false); private Map<Long, List<XCommand<?>>> commandQueue; protected Instrumentation instrumentation; ...... }
XCommand的父接口XCallable继承了java.util.concurrent.Callable
。最终目的是当异步执行时候,基于优先级来排列命令的执行计划。
因此XCommand的几个关键函数就是:queue,call,execute:
从咱们常见的SubmitXCommand来看,继承关系以下:
public class SubmitXCommand extends WorkflowXCommand<String> public abstract class WorkflowXCommand<T> extends XCommand<T> public abstract class XCommand<T> implements XCallable<T> public interface XCallable<T> extends Callable<T>
再好比TransitionXCommand的继承关系:
abstract class TransitionXCommand<T> extends XCommand<T> public abstract class SubmitTransitionXCommand extends TransitionXCommand<String>
从以前的组件能够看到,任务是有状态机的概念的,准备,开始,运行中,失败结束 等等,因此对任务进行操做的命令同时须要处理状态机的变化,oozie处理任务的命令都须要继承TransitionXCommand这个抽象类,而TransitionXCommand的父类是XCommand。
前面提到,doPost 会调用到 id = dagEngine.submitJob(conf, startJob);
咱们看看DAGEngine是如何处理提交的任务。
首先经过SubmitXCommand直接运行其call()来提交job。
public String submitJob(Configuration conf, boolean startJob) throws DagEngineException { validateSubmitConfiguration(conf); try { String jobId; SubmitXCommand submit = new SubmitXCommand(conf); jobId = submit.call(); if (startJob) { start(jobId); } return jobId; } }
而后经过StartXCommand来启动Job。从注释中咱们能够看到,此时依然是同步执行 (经过主动执行call()函数)。
public void start(String jobId) throws DagEngineException { // Changing to synchronous call from asynchronous queuing to prevent the // loss of command if the queue is full or the queue is lost in case of // failure. new StartXCommand(jobId).call(); }
SubmitXCommand处理的是提交工做,将用户提交的任务解析后更新到数据库。
主要业务是在execute中实现。
代码摘要以下:
protected String execute() throws CommandException { WorkflowAppService wps = Services.get().get(WorkflowAppService.class); try { HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); FileSystem fs = has.createFileSystem(user, uri, fsConf); // 解析配置,获取WorkflowApp WorkflowApp app = wps.parseDef(conf, defaultConf); // 建立WorkflowInstance WorkflowInstance wfInstance; wfInstance = workflowLib.createInstance(app, conf); // 生成 WorkflowJobBean WorkflowJobBean workflow = new WorkflowJobBean(); workflow.setId(wfInstance.getId()); workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf)); workflow.setAppPath(conf.get(OozieClient.APP_PATH)); workflow.setConf(XmlUtils.prettyPrint(conf).toString()); ...... workflow.setWorkflowInstance(wfInstance); workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); if (!dryrun) { workflow.setSlaXml(jobSlaXml); // 添加到临时list insertList.add(workflow); JPAService jpaService = Services.get().get(JPAService.class); if (jpaService != null) { // 保存WorkflowJobBean 到wf_jobs BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); } } return workflow.getId(); } }
其中insertList是用来临时存储 WorkflowJobBean
private List<JsonBean> insertList = new ArrayList<JsonBean>();
WorkflowJobBean 对应数据库中表 WF_JOBS。
public class WorkflowJobBean implements Writable, WorkflowJob, JsonBean { ......//省略其余变量 @Transient private List<WorkflowActionBean> actions; }
在Oozie为了方便将用户定义的Action以及Workflow进行管理,底层使用Jpa将这些数据存储于数据库中。具体是调用executeBatchInsertUpdateDelete来经过JPA插入到数据库。
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
具体BatchQueryExecutor代码以下。
public class BatchQueryExecutor { public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertList, Collection<UpdateEntry> updateList,Collection<JsonBean> deleteList) { List<QueryEntry> queryList = new ArrayList<QueryEntry>(); JPAService jpaService = Services.get().get(JPAService.class); EntityManager em = jpaService.getEntityManager(); if (updateList != null) { for (UpdateEntry entry : updateList) { Query query = null; JsonBean bean = entry.getBean(); if (bean instanceof WorkflowJobBean) { // 咱们程序在这里 query = WorkflowJobQueryExecutor.getInstance().getUpdateQuery( (WorkflowJobQuery) entry.getQueryName(), (WorkflowJobBean) entry.getBean(), em); } else if (bean instanceof WorkflowActionBean) { query = WorkflowActionQueryExecutor.getInstance().getUpdateQuery( (WorkflowActionQuery) entry.getQueryName(), (WorkflowActionBean) entry.getBean(), em); } else if { //此处省略众多其余类型 } queryList.add(new QueryEntry(entry.getQueryName(), query)); } } // 这里插入数据库 jpaService.executeBatchInsertUpdateDelete(insertList, queryList, deleteList, em); } }
JPA摘要代码以下:
public class JPAService implements Service, Instrumentable { private OperationRetryHandler retryHandler; public void executeBatchInsertUpdateDelete(final Collection<JsonBean> insertBeans, final List<QueryEntry> updateQueryList, final Collection<JsonBean> deleteBeans, final EntityManager em) { try { retryHandler.executeWithRetry(new Callable<Void>() { public Void call() throws Exception { ...... if (CollectionUtils.isNotEmpty(insertBeans)) { for (final JsonBean bean : insertBeans) { em.persist(bean); } } ...... } }); } } }
这样,一个Workflow Job就存储到了数据库中。
首先介绍下workflow生命周期,咱们代码立刻会用到PREP状态。
prep:一个工做流第一次建立就处于prep状态,表示工做流以及建立可是尚未运行。
running:当一个已经被建立的工做流job开始执行的时候,就处于running状态。它不会达到结束状态,只能由于出错而结束,或者被挂起。
suspended:一个running状态的工做流job会变成suspended状态,并且它会一直处于该状态,除非这个工做流job被从新开始执行或者被杀死。
killed:当一个工做流job处于被建立后的状态,或者处于running,suspended状态时,被杀死,则工做流job的状态变为killed状态。
failed:当一个工做流job不可预期的错误失败而终止,就会变为failed状态。
处理完SubmitXCommand以后,Oozie Server 立刻处理StartXCommand。
StartXCommand 的做用是启动Command,其继承了SignalXCommand ,因此 StartXCommand(jobId).call();
调用到了SignalXCommand的call。
public class StartXCommand extends SignalXCommand
相关代码以下:
首先,StartXCommand调用基类构造函数
public StartXCommand(String id) { super("start", 1, id); InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); }
而后,SignalXCommand获得了jobId,这个就是以前SubmitXCommand生成而且传回来的。
public SignalXCommand(String name, int priority, String jobId) { super(name, name, priority); this.jobId = ParamChecker.notEmpty(jobId, "jobId"); }
call()首先调用到 SignalXCommand.loadState。其会根据jobId从数据库中读取Workflow job信息。
protected void loadState() throws CommandException { try { jpaService = Services.get().get(JPAService.class); if (jpaService != null) { this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId); if (actionId != null) { this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_SIGNAL, actionId); } } }
SQL语句以下:
@NamedQuery(name = "GET_WORKFLOW", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),
call()接着调用SignalXCommand.execute(),这里具体操做以下:
代码以下:
protected Void execute() throws CommandException { WorkflowInstance workflowInstance = wfJob.getWorkflowInstance(); workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob); WorkflowJob.Status prevStatus = wfJob.getStatus(); WorkflowActionBean syncAction = null; List<WorkflowActionBean> workflowActionBeanListForForked = new ArrayList<WorkflowActionBean>(); if (wfAction == null) { if (wfJob.getStatus() == WorkflowJob.Status.PREP) { // 对于上面的 1) completed = workflowInstance.start(); wfJob.setStatus(WorkflowJob.Status.RUNNING); wfJob.setStartTime(new Date()); wfJob.setWorkflowInstance(workflowInstance); generateEvent = true; queue(new WorkflowNotificationXCommand(wfJob)); } } else { ...... } if (completed) { ...... } else { // 对于上面最外层的 2) for (WorkflowActionBean newAction : WorkflowStoreService.getActionsToStart(workflowInstance)) { insertList.add(newAction); if (wfAction != null) { // null during wf job submit // 注释指出,wf job 提交时候不走这里 ..... } else { syncAction = newAction; // first action after wf submit should always be sync } } } // 写入 WorkflowActionBean,对于上面的 3) BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { ...... } else if (syncAction != null) { // 直接调用 call(),对应上面的 4) new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(); } }
Workflow 数据库信息从WorkflowActionBean中能够看出,咱们这里要重点说明的就是transition字段,Oozie用transition来记录本Action下一步要跳转到哪里。WorkflowActionBean摘要以下:
public class WorkflowActionBean implements Writable, WorkflowAction, JsonBean { @Id private String id; @Basic @Index @Column(name = "wf_id") private String wfId = null; @Basic @Index @Column(name = "status") private String statusStr = WorkflowAction.Status.PREP.toString(); @Basic @Column(name = "execution_path", length = 1024) private String executionPath = null; @Basic @Column(name = "transition") private String transition = null; @Basic @Column(name = "data") @Lob @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") private StringBlob data; }
同步执行的跳转主要是在LiteWorkflowInstance.signal这里体现,若是命令结束后发现后续还有同步跳转,则就继续执行。
public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException { // signal all new synch transitions for (String pathToStart : pathsToStart) { signal(pathToStart, "::synch::"); } }
至此,程序提交已经完成,后续就是程序在Oozie内部的执行阶段,这就是从 ActionStartXCommand 开始了。
【原创】大叔经验分享(6)Oozie如何查看提交到Yarn上的任务日志