Oozie由Cloudera公司贡献给Apache的基于工做流引擎的开源框架,是用于Hadoop平台的开源的工做流调度引擎,用来管理Hadoop做业,进行。本文是系列的第二篇,介绍Oozie的内部执行阶段。html
前文[源码解析]Oozie的前因后果 --- (1)提交任务阶段 已经为你们展现了用户提交一个Oozie Job以后作了什么,本文将沿着一个Workflow的执行流程为你们继续剖析Oozie接下来作什么。java
大体以下:shell
咱们假设Workflow在start以后,就进入到了一个Hive命令。数据库
ActionStartXCommand的主要做用就是和Yarn交互,最后提交一个Yarn Application Master。apache
ActionStartXCommand是 WorkflowXCommand的子类。重点函数仍是loadState和execute。api
public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> { private String jobId = null; protected String actionId = null; protected WorkflowJobBean wfJob = null; protected WorkflowActionBean wfAction = null; private JPAService jpaService = null; private ActionExecutor executor = null; private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); private List<JsonBean> insertList = new ArrayList<JsonBean>(); protected ActionExecutorContext context = null; }
loadState 的做用就是从数据库中获取 WorkflowJobBean 和 WorkflowActionBean 信息session
protected void loadState() throws CommandException { try { jpaService = Services.get().get(JPAService.class); if (jpaService != null) { if (wfJob == null) { this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId); } this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId); } } }
execute函数以下。其主要业务就是executor.start(context, wfAction);
这里的executor是HiveActionExecutor。并发
@Override protected ActionExecutorContext execute() throws CommandException { Configuration conf = wfJob.getWorkflowInstance().getConf(); try { if(!caught) { // 这里是业务重点,就是启动任务 executor.start(context, wfAction); if (wfAction.isExecutionComplete()) { if (!context.isExecuted()) { failJob(context); } else { wfAction.setPending(); if (!(executor instanceof ControlNodeActionExecutor)) { queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); } else { execSynchronous = true; } } } updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction)); } } finally { BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); ...... if (execSynchronous) { // Changing to synchronous call from asynchronous queuing to prevent // undue delay from ::start:: to action due to queuing callActionEnd(); } } } return null; }
ActionExecutor.start是异步的,还须要检查Action执行状态来推动流程,oozie经过两种方式来检查任务是否完成。app
回调:当一个任务和一个计算被启动后,会为任务提供一个回调url,该任务执行完成后,会执行回调来通知oozie框架
轮询:在任务执行回调失败的状况下,不管任何缘由,都支持以轮询的方式进行查询。
oozie提供这两种方式来控制任务。后续咱们会再提到。
上面代码中 executor.start(context, wfAction);
就是启动任务。
HiveActionExecutor继承 ScriptLanguageActionExecutor,ScriptLanguageActionExecutor继承 JavaActionExecutor,因此后续不少函数执行的是JavaActionExecutor中的函数。
public class HiveActionExecutor extends ScriptLanguageActionExecutor {}
ActionExecutor.start就是执行的JavaActionExecutor.start()。
其会检查文件系统,好比hdfs是否是支持,Action Dir是否ready,而后会submitLauncher。
public void start(Context context, WorkflowAction action) throws ActionExecutorException { FileSystem actionFs = context.getAppFileSystem(); prepareActionDir(actionFs, context); submitLauncher(actionFs, context, action); // 这里是业务 check(context, action); }
submitLauncher主要功能是:
具体代码以下:
public void submitLauncher(final FileSystem actionFs, final Context context, final WorkflowAction action)throws ActionExecutorException { YarnClient yarnClient = null; try { // action job configuration Configuration actionConf = loadHadoopDefaultResources(context, actionXml); setupActionConf(actionConf, context, actionXml, appPathRoot); addAppNameContext(context, action); setLibFilesArchives(context, actionXml, appPathRoot, actionConf); // 配置回调Action injectActionCallback(context, actionConf); Configuration launcherConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); yarnClient = createYarnClient(context, launcherConf); //继续配置各类Credentials if (UserGroupInformation.isSecurityEnabled()) { ...... } if (alreadyRunning && !isUserRetry) { ...... } else { YarnClientApplication newApp = yarnClient.createApplication(); ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId(); ApplicationSubmissionContext appContext = createAppSubmissionContext(appId, launcherConf, context, actionConf, action, credentials, actionXml); // 这里正式与 Yarn 交互。 yarnClient.submitApplication(appContext); launcherId = appId.toString(); ApplicationReport appReport = yarnClient.getApplicationReport(appId); consoleUrl = appReport.getTrackingUrl(); } String jobTracker = launcherConf.get(HADOOP_YARN_RM); context.setStartData(launcherId, jobTracker, consoleUrl); } } protected YarnClient createYarnClient(Context context, Configuration jobConf) throws HadoopAccessorException { String user = context.getWorkflow().getUser(); return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf); }
这里咱们有必要提一下旧版本的实现:LauncherMapper。
网上关于Oozie的文章不少都是基于旧版本,因此基本都提到了 LauncherMapper,好比:
Oozie本质就是一个做业协调工具(底层原理是经过将xml语言转换成mapreduce程序来作,但只是在集中map端作处理,避免shuffle的过程)。
Oozie执行Action时,即ActionExecutor(最主要的子类是JavaActionExecutor,hive、spark等action都是这个类的子类),JavaActionExecutor首先会提交一个LauncherMapper(map任务)到yarn,其中会执行LauncherMain(具体的action是其子类,好比JavaMain、SparkMain等),spark任务会执行SparkMain,在SparkMain中会调用org.apache.spark.deploy.SparkSubmit来提交任务。其实诉个人map任务就是识别你是什么样的任务(hive,shell,spark等),并经过该任务来启动任务所须要的环境来提交任务。提供了提交任务的接口(如hive任务,启动hive客户端或beeline等)
从文档看,OOZIE-2918 Delete LauncherMapper and its test (asasvari via pbacsko)
这时候被移除了。
咱们从旧版本代码中大体看看LauncherMapper的实现。
LauncherMapper继承了 import org.apache.hadoop.mapred.Mapper;
,实现了 map 函数。其内部就是调用用户代码的主函数。
import org.apache.hadoop.mapred.Mapper; public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable { @Override public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException { SecurityManager initialSecurityManager = System.getSecurityManager(); try { else { String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS); new LauncherSecurityManager(); setupHeartBeater(reporter); setupMainConfiguration(); // Propagating the conf to use by child job. propagateToHadoopConf(); executePrepare(); Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class); Method mainMethod = klass.getMethod("main", String[].class); mainMethod.invoke(null, (Object) args); } } } }
在LauncherMapperHelper中,会设置LauncherMapper为启动函数。
public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { launcherConf.setMapperClass(LauncherMapper.class); }
在 JavaActionExecutor 中有 org.apache.hadoop.mapred.JobClient
import org.apache.hadoop.mapred.JobClient; public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { jobClient = createJobClient(context, launcherJobConf); LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, prepareXML); // Set the launcher Main Class LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); LauncherMapperHelper.setupMainArguments(launcherJobConf, args); ...... runningJob = jobClient.submitJob(launcherJobConf); // 这里进行了提交 }
综上所述,旧版本 LauncherMapper 实现了一个 import org.apache.hadoop.mapred.Mapper;
,具体是org.apache.hadoop.mapred.JobClient
负责与hadoop交互。
新版本的Oozie是和Yarn深度绑定的,因此咱们须要先介绍Yarn。
YARN 是 Hadoop 2.0 中的资源管理系统,它的基本设计思想是将 MRv1 中的 JobTracker拆分红了两个独立的服务:一个全局的资源管理器 ResourceManager 和每一个应用程序特有的ApplicationMaster。 其中 ResourceManager 负责整个系统的资源管理和分配, 而 ApplicationMaster负责单个应用程序的管理。
YARN 整体上仍然是 Master/Slave 结构,在整个资源管理框架中,ResourceManager 为Master,NodeManager 为 Slave,ResourceManager 负责对各个 NodeManager 上的资源进行统一管理和调度。
当用户提交一个应用程序时,须要提供一个用以跟踪和管理这个程序的ApplicationMaster, 它负责向 ResourceManager 申请资源,并要求 NodeManager 启动能够占用必定资源的任务。 因为不一样的ApplicationMaster 被分布到不一样的节点上,所以它们之间不会相互影响。
用户提交的每一个应用程序均包含一个 AM,主要功能包括:
当用户向 YARN 中提交一个应用程序后, YARN 将分两个阶段运行该应用程序 :
工做流程分为如下几个步骤:
LauncherAM就是Oozie的ApplicationMaster实现。LauncherAM.main就是Yarn调用之处。
public class LauncherAM { public static void main(String[] args) throws Exception { final LocalFsOperations localFsOperations = new LocalFsOperations(); final Configuration launcherConf = readLauncherConfiguration(localFsOperations); UserGroupInformation.setConfiguration(launcherConf); // MRAppMaster adds this call as well, but it's included only in Hadoop 2.9+ // SecurityUtil.setConfiguration(launcherConf); UserGroupInformation ugi = getUserGroupInformation(launcherConf); // Executing code inside a doAs with an ugi equipped with correct tokens. ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { LauncherAM launcher = new LauncherAM(new AMRMClientAsyncFactory(), new AMRMCallBackHandler(), new HdfsOperations(new SequenceFileWriterFactory()), new LocalFsOperations(), new PrepareActionsHandler(new LauncherURIHandlerFactory(null)), new LauncherAMCallbackNotifierFactory(), new LauncherSecurityManager(), sysenv.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()), launcherConf); launcher.run(); return null; } }); } }
launcher.run主要完成
经过registerWithRM调用AMRMClientAsync来注册到Resource Manager
具体代码以下:
public void run() throws Exception { try { actionDir = new Path(launcherConf.get(OOZIE_ACTION_DIR_PATH)); registerWithRM(amrmCallBackHandler); // Run user code without the AM_RM_TOKEN so users can't request containers UserGroupInformation ugi = getUserGroupInformation(launcherConf, AMRMTokenIdentifier.KIND_NAME); ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { executePrepare(errorHolder); setupMainConfiguration(); runActionMain(errorHolder); // 会根据配置调用具体的main函数,好比HiveMain return null; } }); } finally { try { actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString()); hdfsOperations.uploadActionDataToHDFS(launcherConf, actionDir, actionData); } finally { try { unregisterWithRM(actionResult, errorHolder.getErrorMessage()); } finally { LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf); cn.notifyURL(actionResult); } } } }
可是你会发现,对比以前所说的ApplicationMaster应该实现的功能,LauncherAM 作得恁少了点,这是个疑问! 咱们在后续研究中会为你们揭开这个秘密。
上文提到,runActionMain会根据配置调用具体的main函数。咱们假设是hive action,则对应的是HiveMain。
Hive job的入口函数是在HIVE_MAIN_CLASS_NAME配置的。
public class HiveActionExecutor extends ScriptLanguageActionExecutor { private static final String HIVE_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.HiveMain"; @Override public List<Class<?>> getLauncherClasses() { List<Class<?>> classes = new ArrayList<Class<?>>(); classes.add(Class.forName(HIVE_MAIN_CLASS_NAME)); // 这里配置了 HiveMain return classes; } }
HiveMain后续调用以下
HiveMain.main ----> run ----> runHive ----> CliDriver.main(args);
最后调用 org.apache.hadoop.hive.cli.CliDriver 完成了hive操做,大体有:
具体以下:
public class HiveMain extends LauncherMain { public static void main(String[] args) throws Exception { run(HiveMain.class, args); } @Override protected void run(String[] args) throws Exception { Configuration hiveConf = setUpHiveSite(); List<String> arguments = new ArrayList<String>(); String logFile = setUpHiveLog4J(hiveConf); arguments.add("--hiveconf"); arguments.add("hive.log4j.file=" + new File(HIVE_L4J_PROPS).getAbsolutePath()); arguments.add("--hiveconf"); arguments.add("hive.exec.log4j.file=" + new File(HIVE_EXEC_L4J_PROPS).getAbsolutePath()); //setting oozie workflow id as caller context id for hive String callerId = "oozie:" + System.getProperty(LauncherAM.OOZIE_JOB_ID); arguments.add("--hiveconf"); arguments.add("hive.log.trace.id=" + callerId); String scriptPath = hiveConf.get(HiveActionExecutor.HIVE_SCRIPT); String query = hiveConf.get(HiveActionExecutor.HIVE_QUERY); if (scriptPath != null) { ...... // print out current directory & its contents File localDir = new File("dummy").getAbsoluteFile().getParentFile(); String[] files = localDir.list(); // Prepare the Hive Script String script = readStringFromFile(scriptPath); arguments.add("-f"); arguments.add(scriptPath); } else if (query != null) { String filename = createScriptFile(query); arguments.add("-f"); arguments.add(filename); } // Pass any parameters to Hive via arguments ...... String[] hiveArgs = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS); for (String hiveArg : hiveArgs) { arguments.add(hiveArg); } LauncherMain.killChildYarnJobs(hiveConf); try { runHive(arguments.toArray(new String[arguments.size()])); } finally { writeExternalChildIDs(logFile, HIVE_JOB_IDS_PATTERNS, "Hive"); } } }
所以咱们能看到,Oozie ApplicationMaster 在被Yarn调用以后,就是经过org.apache.hadoop.hive.cli.CliDriver
给Hive发送命令让其执行,没有什么再和ResourceManager / NodeManager 交互的过程,这真的很奇怪。这个秘密要由下面的Tez来解答。
Tez是Apache开源的支持DAG做业的计算框架,它直接源于MapReduce框架,核心思想是将Map和Reduce两个操做进一步拆分,即Map被拆分红Input、Processor、Sort、Merge和Output, Reduce被拆分红Input、Shuffle、Sort、Merge、Processor和Output等,这样,这些分解后的元操做能够任意灵活组合,产生新的操做,这些操做通过一些控制程序组装后,可造成一个大的DAG做业。
Tez有如下特色:
能够看到,Tez也是和Yarn深度绑定的。
首先咱们就找到了Tez对应的Application Master,即Tez DAG Application Master。
public class DAGAppMaster extends AbstractService { public String submitDAGToAppMaster(DAGPlan dagPlan, Map<String, LocalResource> additionalResources) throws TezException { startDAG(dagPlan, additionalResources); } } }
咱们能看到提交Application Master代码。
public class TezYarnClient extends FrameworkClient { @Override public ApplicationId submitApplication(ApplicationSubmissionContext appSubmissionContext) throws YarnException, IOException, TezException { ApplicationId appId= yarnClient.submitApplication(appSubmissionContext); ApplicationReport appReport = getApplicationReport(appId); return appId; } }
这里是创建Application Master context 代码,设置了Application Maste类和Container。
public static ApplicationSubmissionContext createApplicationSubmissionContext( ApplicationId appId, DAG dag, String amName, AMConfiguration amConfig, Map<String, LocalResource> tezJarResources, Credentials sessionCreds, boolean tezLrsAsArchive, TezApiVersionInfo apiVersionInfo, ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) throws IOException, YarnException { // Setup the command to run the AM List<String> vargs = new ArrayList<String>(8); vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); String amOpts = constructAMLaunchOpts(amConfig.getTezConfiguration(), capability); vargs.add(amOpts); // 这里设置了 Application Master vargs.add(TezConstants.TEZ_APPLICATION_MASTER_CLASS); // 这里设置了命令行参数 Vector<String> vargsFinal = new Vector<String>(8); // Final command StringBuilder mergedCommand = new StringBuilder(); for (CharSequence str : vargs) { mergedCommand.append(str).append(" "); } vargsFinal.add(mergedCommand.toString()); // 设置了container // Setup ContainerLaunchContext for AM container ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(amLocalResources, environment, vargsFinal, serviceData, securityTokens, acls); // Set up the ApplicationSubmissionContext ApplicationSubmissionContext appContext = Records .newRecord(ApplicationSubmissionContext.class); appContext.setAMContainerSpec(amContainer); return appContext; }
这里只摘要部分代码,能看到Tez实现了与Yarn Resource Manager交互。
YarnTaskSchedulerService实现了AMRMClientAsync.CallbackHandler,其功能是处理由Resource Manager收到的消息,其实现了方法
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; public class YarnTaskSchedulerService extends TaskScheduler implements AMRMClientAsync.CallbackHandler { @Override public void onContainersAllocated(List<Container> containers) { if (!shouldReuseContainers) { List<Container> modifiableContainerList = Lists.newLinkedList(containers); assignedContainers = assignNewlyAllocatedContainers( modifiableContainerList); } } // upcall to app must be outside locks informAppAboutAssignments(assignedContainers); } @Override public void onContainersCompleted(List<ContainerStatus> statuses) { synchronized (this) { for(ContainerStatus containerStatus : statuses) { ContainerId completedId = containerStatus.getContainerId(); HeldContainer delayedContainer = heldContainers.get(completedId); Object task = releasedContainers.remove(completedId); appContainerStatus.put(task, containerStatus); continue; } // not found in released containers. check currently allocated containers // no need to release this container as the RM has already completed it task = unAssignContainer(completedId, false); if (delayedContainer != null) { heldContainers.remove(completedId); Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource()); } if(task != null) { // completion of a container we have allocated currently // an allocated container completed. notify app. This will cause attempt to get killed appContainerStatus.put(task, containerStatus); continue; } } } // upcall to app must be outside locks for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) { getContext().containerCompleted(entry.getKey(), entry.getValue()); } } }
由此咱们能够看到,Oozie是一个甩手掌柜,他只管启动Hive,具体后续如何与RM交互,则彻底由Tez搞定。这就解答了以前咱们全部疑惑。
最后总结下新流程:
原谅我用这种办法画图,由于我最讨厌看到一篇好文,结果发现图没了......
+---------+ +----------+ +-----------+ | | 1-submit LauncherAM | | 2.CliDriver.main | | | |---------------------->| HiveMain |---------------------> | | | | | | | |--+ | [Oozie] | | [Yarn] | | [Hive] | | 3.Run | | | | | | | Hive | | 5-notifyURL of Oozie | | 4-submit DAGAppMaster | |<-+ | |<----------------------| | <-------------------->| Tez | | | | | | | +---------+ +----------+ +-----------+
下面咱们看看若是Oozie执行一个Java程序,是如何进行的。
Java程序的主执行函数是 JavaMain,这个就简单多了,就是直接调用用户的Java主函数。
public class JavaMain extends LauncherMain { public static final String JAVA_MAIN_CLASS = "oozie.action.java.main"; /** * @param args Invoked from LauncherAM:run() * @throws Exception in case of error when running the application */ public static void main(String[] args) throws Exception { run(JavaMain.class, args); } @Override protected void run(String[] args) throws Exception { Configuration actionConf = loadActionConf(); setYarnTag(actionConf); setApplicationTags(actionConf, TEZ_APPLICATION_TAGS); setApplicationTags(actionConf, SPARK_YARN_TAGS); LauncherMain.killChildYarnJobs(actionConf); Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class); Method mainMethod = klass.getMethod("main", String[].class); mainMethod.invoke(null, (Object) args); } }
前面提到,ActionExecutor.start是异步的,还须要检查Action执行状态来推动流程,oozie经过两种方式来检查任务是否完成。
oozie提供这两种方式来控制任务。
LauncherAM 在用户程序执行完成以后,会作以下调用,以通知Oozie。这就用到了“回调”机制。
LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf); cn.notifyURL(actionResult);
Oozie的CallbackServlet会响应这个调用。能够看到,DagEngine.processCallback是Oozie处理程序结束之处。
public class CallbackServlet extends JsonRestServlet { @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { String queryString = request.getQueryString(); CallbackService callbackService = Services.get().get(CallbackService.class); String actionId = callbackService.getActionId(queryString); DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine(); dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null); } } }
DagEngine.processCallback主要是使用CompletedActionXCommand来进行。能够看到这个命令是放到 CallableQueueService 的 queue中,因此下面咱们须要介绍 CallableQueueService。
public void processCallback(String actionId, String externalStatus, Properties actionData) throws DagEngineException { XCallable<Void> command = new CompletedActionXCommand(actionId, externalStatus, actionData, HIGH_PRIORITY); if (!Services.get().get(CallableQueueService.class).queue(command)) { LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback"); } }
Oozie 使用 CallableQueueService 来异步执行操做;
public class CallableQueueService implements Service, Instrumentable { private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>(); private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>(); private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>(); private Set<String> interruptTypes; private int interruptMapMaxSize; private int maxCallableConcurrency; private int queueAwaitTerminationTimeoutSeconds; private int queueSize; private PriorityDelayQueue<CallableWrapper<?>> queue; private ThreadPoolExecutor executor; private Instrumentation instrumentation; private boolean newImpl = false; private AsyncXCommandExecutor asyncXCommandExecutor; public void init(Services services) { queue = new PollablePriorityDelayQueue<CallableWrapper<?>>(PRIORITIES, MAX_CALLABLE_WAITTIME_MS, TimeUnit.MILLISECONDS, queueSize) { @Override protected boolean eligibleToPoll(QueueElement<?> element) { if (element != null) { CallableWrapper wrapper = (CallableWrapper) element; if (element.getElement() != null) { return callableReachMaxConcurrency(wrapper.getElement()); } } return false; } }; } }
特色:
线程池选取的队列是oozie自定义的队列 PriorityDelayQueue:
特色:
根据队列中元素的延时时间以及其执行优先级出队列:
实现策略:
PriorityDelayQueue 中为每一个优先级别的任务设置一个 延时队列 DelayQueue 由于使用的是jdk自带的延时队列 DelayQueue,能够保证的是若是任务在该队列中的延时时间知足条件,咱们 经过poll()方法便可获得知足延时条件的任务,若是 poll()获得的是null,说明该队列的中任务没有知足时间条件的任务。 如何编排多个优先级的队列: 每次从PriorityDelayQueue去选取任务,都优先从最高优先级的队列来poll出任务,若是最高的优先级队列中没有知足条件的任务,则次优先级队列poll出任务,若是仍未获取 将按照队列优先等级以此类推。 饿死现象:假如高优先级中的任务在每次获取的时候都知足条件,这样容易将低优先级的队列中知足条件的任务活活饿死,为了防止这种状况的产生,在每次选取任务以前,遍历 低优先级队列任务,若是任务早已经知足出队列条件,若是超时时间超过了咱们设定的最大值,咱们会为这个任务提升优先级,将这个任务优先级加一,添加到上个优先级队列中进行 排队。
特色:
在从队列中选取任务的时候,先判断知足时间的任务是否知足并发等限制,若是知足再从队列中取出,而不是像PriorityDelayQueue那样,先取出若是不知足并发等限制,再将该任务从新放置回去。
任务类型:
使用线程池异步执行任务,任务和任务之间是无序的,针对具体的业务场景,可能执行的单元是须要串序执行的。oozie中封装了 CompositeCallable 和 通常的 XCallable的任务类型,前者是XCallable的一个集合,它能保证的是这个集合里面的XCallable是顺序执行的。
CompletedActionXCommand 当Workflow command结束时候会执行,且只执行一次。对于程序结束,会在异步队列中加入一个 ActionCheckXCommand。
public class CompletedActionXCommand extends WorkflowXCommand<Void> { @Override protected Void execute() throws CommandException { if (this.wfactionBean.getStatus() == WorkflowActionBean.Status.PREP) { ..... } else { // RUNNING ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(this.wfactionBean.getType()); // this is done because oozie notifications (of sub-wfs) is send // every status change, not only on completion. if (executor.isCompleted(externalStatus)) { queue(new ActionCheckXCommand(this.wfactionBean.getId(), getPriority(), -1)); } } return null; } }
异步调用到ActionCheckXCommand,其主要做用是:
public class ActionCheckXCommand extends ActionXCommand<Void> { @Override protected Void execute() throws CommandException { ActionExecutorContext context = null; boolean execSynchronous = false; try { boolean isRetry = false; // 若是有重试机制,则作相应配置 if (wfAction.getRetries() > 0) { isRetry = true; } boolean isUserRetry = false; context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); executor.check(context, wfAction); // 检查环境信息 if (wfAction.isExecutionComplete()) { if (!context.isExecuted()) { failJob(context); generateEvent = true; } else { wfAction.setPending(); execSynchronous = true; } } updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction)); updateList.add(new UpdateEntry<WorkflowJobQuery> (WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); } finally { // 更新数据库中的任务信息 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); if (generateEvent && EventHandlerService.isEnabled()) { generateEvent(wfAction, wfJob.getUser()); } if (execSynchronous) { // 用ActionEndXCommand来执行结束 new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(); } } return null; } }
调用到 JavaActionExecutor.check
@Override public void check(Context context, WorkflowAction action) throws ActionExecutorException { boolean fallback = false; YarnClient yarnClient = null; try { Element actionXml = XmlUtils.parseXml(action.getConf()); Configuration jobConf = createBaseHadoopConf(context, actionXml); FileSystem actionFs = context.getAppFileSystem(); yarnClient = createYarnClient(context, jobConf); // 根据配置信息创建 FinalApplicationStatus appStatus = null; try { final String effectiveApplicationId = findYarnApplicationId(context, action); final ApplicationId applicationId = ConverterUtils.toApplicationId(effectiveApplicationId); final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId); // 获取程序报告信息 final YarnApplicationState appState = appReport.getYarnApplicationState(); if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED || appState == YarnApplicationState.KILLED) { appStatus = appReport.getFinalApplicationStatus(); } } if (appStatus != null || fallback) { Path actionDir = context.getActionDir(); // load sequence file into object Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf); // 获取程序数据 if (fallback) { String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS); if (finalStatus != null) { appStatus = FinalApplicationStatus.valueOf(finalStatus); } else { context.setExecutionData(FAILED, null); } } String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID); // MapReduce was launched if (externalID != null) { context.setExternalChildIDs(externalID); } // Multiple child IDs - Pig or Hive action String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS); if (externalIDs != null) { context.setExternalChildIDs(externalIDs); } // 设置各类信息 context.setExecutionData(appStatus.toString(), null); if (appStatus == FinalApplicationStatus.SUCCEEDED) { if (getCaptureOutput(action) && LauncherHelper.hasOutputData(actionData)) { context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS))); } else { context.setExecutionData(SUCCEEDED, null); } if (LauncherHelper.hasStatsData(actionData)) { context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS)); } getActionData(actionFs, action, context); } else { ...... context.setExecutionData(FAILED_KILLED, null); } } } finally { if (yarnClient != null) { IOUtils.closeQuietly(yarnClient); } } }
ActionEndXCommand会进行结束和跳转:
public class ActionEndXCommand extends ActionXCommand<Void> { @Override protected Void execute() throws CommandException { Configuration conf = wfJob.getWorkflowInstance().getConf(); if (!(executor instanceof ControlNodeActionExecutor)) { maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); } executor.setMaxRetries(maxRetries); executor.setRetryInterval(retryInterval); boolean isRetry = false; if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) { isRetry = true; } boolean isUserRetry = false; ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); try { executor.end(context, wfAction); // 调用Executor来完成结束操做 if (!context.isEnded()) { failJob(context); } else { wfAction.setRetries(0); wfAction.setEndTime(new Date()); boolean shouldHandleUserRetry = false; Status slaStatus = null; switch (wfAction.getStatus()) { case OK: slaStatus = Status.SUCCEEDED; break; ...... } if (!shouldHandleUserRetry || !handleUserRetry(context, wfAction)) { SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION); if(slaEvent != null) { insertList.add(slaEvent); } } } WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); DagELFunctions.setActionInfo(wfInstance, wfAction); wfJob.setWorkflowInstance(wfInstance); updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction)); wfJob.setLastModifiedTime(new Date()); updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); } finally { try { // 更新数据库的job信息 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); } if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) { generateEvent(wfAction, wfJob.getUser()); } new SignalXCommand(jobId, actionId).call(); // 进行跳转,进行下一个Action的执行 } return null; } }
【原创】大叔经验分享(6)Oozie如何查看提交到Yarn上的任务日志