[源码解析]Oozie前因后果以内部执行

[源码解析]Oozie前因后果以内部执行

0x00 摘要

Oozie由Cloudera公司贡献给Apache的基于工做流引擎的开源框架,是用于Hadoop平台的开源的工做流调度引擎,用来管理Hadoop做业,进行。本文是系列的第二篇,介绍Oozie的内部执行阶段。html

前文[源码解析]Oozie的前因后果 --- (1)提交任务阶段 已经为你们展现了用户提交一个Oozie Job以后作了什么,本文将沿着一个Workflow的执行流程为你们继续剖析Oozie接下来作什么。java

大体以下:shell

  • 在Oozie中准备Yarn Application Master
  • 介绍新旧两版本的Yarn Application Master区别
  • 介绍Hive on Yarn
  • Tez是如何乱入到这个流程中的
  • Java on Yarn会是如何执行
  • Yarn Job结束以后如何返回Oozie

0x01 Oozie阶段

1.1 ActionStartXCommand

咱们假设Workflow在start以后,就进入到了一个Hive命令。数据库

ActionStartXCommand的主要做用就是和Yarn交互,最后提交一个Yarn Application Masterapache

ActionStartXCommand是 WorkflowXCommand的子类。重点函数仍是loadState和execute。api

public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> {
    private String jobId = null;
    protected String actionId = null;
    protected WorkflowJobBean wfJob = null;
    protected WorkflowActionBean wfAction = null;
    private JPAService jpaService = null;
    private ActionExecutor executor = null;
    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
    private List<JsonBean> insertList = new ArrayList<JsonBean>();
    protected ActionExecutorContext context = null;  
}

loadState 的做用就是从数据库中获取 WorkflowJobBean 和 WorkflowActionBean 信息session

protected void loadState() throws CommandException {
    try {
        jpaService = Services.get().get(JPAService.class);
        if (jpaService != null) {
            if (wfJob == null) {
                this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId);
            }
            this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId);
        }
    }
}

execute函数以下。其主要业务就是executor.start(context, wfAction); 这里的executor是HiveActionExecutor。并发

@Override
protected ActionExecutorContext execute() throws CommandException {
    Configuration conf = wfJob.getWorkflowInstance().getConf();
    try {
        if(!caught) {
            // 这里是业务重点,就是启动任务
            executor.start(context, wfAction);
          
            if (wfAction.isExecutionComplete()) {
                if (!context.isExecuted()) {
                    failJob(context);
                } else {
                    wfAction.setPending();
                    if (!(executor instanceof ControlNodeActionExecutor)) {
                        queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
                    }
                    else {
                        execSynchronous = true;
                    }
                }
            }
            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
        }
    }
    finally {
            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
            ......
            if (execSynchronous) {
                // Changing to synchronous call from asynchronous queuing to prevent
                // undue delay from ::start:: to action due to queuing
                callActionEnd();
            }
        }
    }
    return null;
}

ActionExecutor.start是异步的,还须要检查Action执行状态来推动流程,oozie经过两种方式来检查任务是否完成。app

  • 回调:当一个任务和一个计算被启动后,会为任务提供一个回调url,该任务执行完成后,会执行回调来通知oozie框架

  • 轮询:在任务执行回调失败的状况下,不管任何缘由,都支持以轮询的方式进行查询。

oozie提供这两种方式来控制任务。后续咱们会再提到。

1.2 HiveActionExecutor

上面代码中 executor.start(context, wfAction); 就是启动任务。

HiveActionExecutor继承 ScriptLanguageActionExecutor,ScriptLanguageActionExecutor继承 JavaActionExecutor,因此后续不少函数执行的是JavaActionExecutor中的函数。

public class HiveActionExecutor extends ScriptLanguageActionExecutor {}

ActionExecutor.start就是执行的JavaActionExecutor.start()。

其会检查文件系统,好比hdfs是否是支持,Action Dir是否ready,而后会submitLauncher。

public void start(Context context, WorkflowAction action) throws ActionExecutorException {
        FileSystem actionFs = context.getAppFileSystem();
        prepareActionDir(actionFs, context);
        submitLauncher(actionFs, context, action); // 这里是业务
        check(context, action);
}

submitLauncher主要功能是:

  • 1)对于某些类型job,调用injectActionCallback配置回调Action
  • 2)配置 action job
  • 3)调用createLauncherConf配置LauncherAM, 即Application Master
    • 3.1)配置回调conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback);
    • 3.2)设置"launcher Main Class"。LauncherHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
  • 4)调用HadoopAccessorService.createYarnClient来建立一个YarnClient
  • 5)调用UserGroupInformation继续配置
  • 6)调用yarnClient.createApplication建立一个YarnClientApplication
  • 7)记录ApplicationId
  • 8)调用createAppSubmissionContext创建Yarn App的执行环境
    • 8.1)appContext.setApplicationType("Oozie Launcher");
    • 8.2)设置容器信息 ContainerLaunchContext
    • 8.3)vargs.add(LauncherAM.class.getCanonicalName()); 好比设置AM启动类
    • 8.4)return appContext;
  • 9)提交App,yarnClient.submitApplication(appContext); appContext就是前面return的。

具体代码以下:

public void submitLauncher(final FileSystem actionFs, final Context context, final WorkflowAction action)throws ActionExecutorException {
    YarnClient yarnClient = null;
    try {
        // action job configuration
        Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
        setupActionConf(actionConf, context, actionXml, appPathRoot);
        addAppNameContext(context, action);
        setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
				// 配置回调Action
        injectActionCallback(context, actionConf);

        Configuration launcherConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
        yarnClient = createYarnClient(context, launcherConf);
      
        //继续配置各类Credentials
        if (UserGroupInformation.isSecurityEnabled()) {
           ......
        }

        if (alreadyRunning && !isUserRetry) {
          ......
        }
        else {
            YarnClientApplication newApp = yarnClient.createApplication();
            ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
            ApplicationSubmissionContext appContext =
                    createAppSubmissionContext(appId, launcherConf, context, actionConf, action, credentials, actionXml);
            // 这里正式与 Yarn 交互。
            yarnClient.submitApplication(appContext);

            launcherId = appId.toString();
            ApplicationReport appReport = yarnClient.getApplicationReport(appId);
            consoleUrl = appReport.getTrackingUrl();
        }

        String jobTracker = launcherConf.get(HADOOP_YARN_RM);
        context.setStartData(launcherId, jobTracker, consoleUrl);
    }
}

protected YarnClient createYarnClient(Context context, Configuration jobConf) throws HadoopAccessorException {
        String user = context.getWorkflow().getUser();
        return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf);
}

0x2 旧版本LauncherMapper

这里咱们有必要提一下旧版本的实现:LauncherMapper。

网上关于Oozie的文章不少都是基于旧版本,因此基本都提到了 LauncherMapper,好比:

Oozie本质就是一个做业协调工具(底层原理是经过将xml语言转换成mapreduce程序来作,但只是在集中map端作处理,避免shuffle的过程)。

Oozie执行Action时,即ActionExecutor(最主要的子类是JavaActionExecutor,hive、spark等action都是这个类的子类),JavaActionExecutor首先会提交一个LauncherMapper(map任务)到yarn,其中会执行LauncherMain(具体的action是其子类,好比JavaMain、SparkMain等),spark任务会执行SparkMain,在SparkMain中会调用org.apache.spark.deploy.SparkSubmit来提交任务。其实诉个人map任务就是识别你是什么样的任务(hive,shell,spark等),并经过该任务来启动任务所须要的环境来提交任务。提供了提交任务的接口(如hive任务,启动hive客户端或beeline等)

从文档看,OOZIE-2918 Delete LauncherMapper and its test (asasvari via pbacsko) 这时候被移除了。

咱们从旧版本代码中大体看看LauncherMapper的实现。

LauncherMapper继承了 import org.apache.hadoop.mapred.Mapper;,实现了 map 函数。其内部就是调用用户代码的主函数。

import org.apache.hadoop.mapred.Mapper;

public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
   @Override
    public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException {
        SecurityManager initialSecurityManager = System.getSecurityManager();
        try {
            else {
                String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS);

                    new LauncherSecurityManager();
                    setupHeartBeater(reporter);
                    setupMainConfiguration();
                    // Propagating the conf to use by child job.
                    propagateToHadoopConf();

                    executePrepare();
                    Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
                    Method mainMethod = klass.getMethod("main", String[].class);
                    mainMethod.invoke(null, (Object) args);
             }
        }
    }
}

在LauncherMapperHelper中,会设置LauncherMapper为启动函数。

public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
        launcherConf.setMapperClass(LauncherMapper.class);
}

在 JavaActionExecutor 中有 org.apache.hadoop.mapred.JobClient

import org.apache.hadoop.mapred.JobClient;

public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
            jobClient = createJobClient(context, launcherJobConf);
            LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, prepareXML);

            // Set the launcher Main Class
            LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); 
            LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
            ......
  
            runningJob = jobClient.submitJob(launcherJobConf);  // 这里进行了提交
}

综上所述,旧版本 LauncherMapper 实现了一个 import org.apache.hadoop.mapred.Mapper;,具体是org.apache.hadoop.mapred.JobClient 负责与hadoop交互

0x3 新版本Yarn Application Master

新版本的Oozie是和Yarn深度绑定的,因此咱们须要先介绍Yarn。

3. 1 YARN简介

YARN 是 Hadoop 2.0 中的资源管理系统,它的基本设计思想是将 MRv1 中的 JobTracker拆分红了两个独立的服务:一个全局的资源管理器 ResourceManager 和每一个应用程序特有的ApplicationMaster。 其中 ResourceManager 负责整个系统的资源管理和分配, 而 ApplicationMaster负责单个应用程序的管理。

YARN 整体上仍然是 Master/Slave 结构,在整个资源管理框架中,ResourceManager 为Master,NodeManager 为 Slave,ResourceManager 负责对各个 NodeManager 上的资源进行统一管理和调度。

当用户提交一个应用程序时,须要提供一个用以跟踪和管理这个程序的ApplicationMaster, 它负责向 ResourceManager 申请资源,并要求 NodeManager 启动能够占用必定资源的任务。 因为不一样的ApplicationMaster 被分布到不一样的节点上,所以它们之间不会相互影响。

3.2 ApplicationMaster

用户提交的每一个应用程序均包含一个 AM,主要功能包括:

  • 与 RM 调度器协商以获取资源(用 Container 表示);
  • 将获得的任务进一步分配给内部的任务;
  • 与 NM 通讯以启动 / 中止任务;
  • 监控全部任务运行状态,并在任务运行失败时从新为任务申请资源以重启任务。

当用户向 YARN 中提交一个应用程序后, YARN 将分两个阶段运行该应用程序 :

  • 第一个阶段是启动 ApplicationMaster ;
  • 第二个阶段是由 ApplicationMaster 建立应用程序, 为它申请资源, 并监控它的整个运行过程, 直到运行完成。

工做流程分为如下几个步骤:

  1. 用 户 向 YARN 中 提 交 应 用 程 序, 其 中 包 括 ApplicationMaster 程 序、 启 动ApplicationMaster 的命令、 用户程序等。
  2. ResourceManager 为 该 应 用程 序 分 配 第 一 个 Container, 并 与 对应 的 NodeManager 通讯,要求它在这个 Container 中启动应用程序的 ApplicationMaster。
  3. ApplicationMaster 首 先 向 ResourceManager 注 册, 这 样 用 户 可 以 直 接 通 过ResourceManage 查看应用程序的运行状态, 而后它将为各个任务申请资源, 并监控它的运行状态, 直到运行结束, 即重复步骤 4~7。
  4. ApplicationMaster 采用轮询的方式经过 RPC 协议向 ResourceManager 申请和领取资源。
  5. 一旦 ApplicationMaster 申请到资源后, 便与对应的 NodeManager 通讯, 要求它启动任务。
  6. NodeManager 为任务设置好运行环境(包括环境变量、 JAR 包、 二进制程序等) 后, 将任务启动命令写到一个脚本中, 并经过运行该脚本启动任务。
  7. 各个任务经过某个 RPC 协议向 ApplicationMaster 汇报本身的状态和进度, 以让 ApplicationMaster 随时掌握各个任务的运行状态,从而能够在任务失败时从新启动任务。在应用程序运行过程当中,用户可随时经过RPC向ApplicationMaster查询应用程序的当前运行状态。
  8. 应用程序运行完成后,ApplicationMaster 向 ResourceManager 注销并关闭本身。

3.3 LauncherAM

LauncherAM就是Oozie的ApplicationMaster实现。LauncherAM.main就是Yarn调用之处。

public class LauncherAM {
  
    public static void main(String[] args) throws Exception {
        final LocalFsOperations localFsOperations = new LocalFsOperations();
        final Configuration launcherConf = readLauncherConfiguration(localFsOperations);
        UserGroupInformation.setConfiguration(launcherConf);
        // MRAppMaster adds this call as well, but it's included only in Hadoop 2.9+
        // SecurityUtil.setConfiguration(launcherConf);
        UserGroupInformation ugi = getUserGroupInformation(launcherConf);
        // Executing code inside a doAs with an ugi equipped with correct tokens.
        ugi.doAs(new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
                  LauncherAM launcher = new LauncherAM(new AMRMClientAsyncFactory(),
                        new AMRMCallBackHandler(),
                        new HdfsOperations(new SequenceFileWriterFactory()),
                        new LocalFsOperations(),
                        new PrepareActionsHandler(new LauncherURIHandlerFactory(null)),
                        new LauncherAMCallbackNotifierFactory(),
                        new LauncherSecurityManager(),
                        sysenv.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()),
                        launcherConf);
                    launcher.run();
                    return null;
            }
        });
    }  
}

launcher.run主要完成

经过registerWithRM调用AMRMClientAsync来注册到Resource Manager

  • executePrepare / setupMainConfiguration 完成初始化,准备和配置
  • runActionMain会根据配置调用具体的main函数,好比HiveMain
    • Class<?> klass = launcherConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null);
    • Method mainMethod = klass.getMethod("main", String[].class);
    • mainMethod.invoke(null, (Object) mainArgs);
  • 调用uploadActionDataToHDFS同步HDFS
  • 调用unregisterWithRM从RM解绑
  • 调用LauncherAMCallbackNotifier.notifyURL通知Oozie

具体代码以下:

public void run() throws Exception {
    try {
        actionDir = new Path(launcherConf.get(OOZIE_ACTION_DIR_PATH));
        registerWithRM(amrmCallBackHandler);
        // Run user code without the AM_RM_TOKEN so users can't request containers
        UserGroupInformation ugi = getUserGroupInformation(launcherConf, AMRMTokenIdentifier.KIND_NAME);

        ugi.doAs(new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
                executePrepare(errorHolder);
                setupMainConfiguration();
                runActionMain(errorHolder); // 会根据配置调用具体的main函数,好比HiveMain
                return null;
            }
        });
    } 
    finally {
        try {
            actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString());
            hdfsOperations.uploadActionDataToHDFS(launcherConf, actionDir, actionData);
        } finally {
            try {
                unregisterWithRM(actionResult, errorHolder.getErrorMessage());
            } finally {
                LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf);
                cn.notifyURL(actionResult);
            }
        }
    }
}

可是你会发现,对比以前所说的ApplicationMaster应该实现的功能,LauncherAM 作得恁少了点,这是个疑问! 咱们在后续研究中会为你们揭开这个秘密。

0x4 Hive on Yarn

上文提到,runActionMain会根据配置调用具体的main函数。咱们假设是hive action,则对应的是HiveMain。

Hive job的入口函数是在HIVE_MAIN_CLASS_NAME配置的。

public class HiveActionExecutor extends ScriptLanguageActionExecutor {
    private static final String HIVE_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.HiveMain";

	  @Override
    public List<Class<?>> getLauncherClasses() {
        List<Class<?>> classes = new ArrayList<Class<?>>();
        classes.add(Class.forName(HIVE_MAIN_CLASS_NAME)); // 这里配置了 HiveMain
        return classes;
    }  
}

HiveMain后续调用以下

HiveMain.main ----> run ----> runHive ----> CliDriver.main(args);

最后调用 org.apache.hadoop.hive.cli.CliDriver 完成了hive操做,大体有:

  • 设定参数;
  • 若是有脚本,则设定脚本路径;
  • 若是有以前的yarn child jobs,杀掉;
  • 执行hive;
  • 写log;

具体以下:

public class HiveMain extends LauncherMain {
    public static void main(String[] args) throws Exception {
        run(HiveMain.class, args);
    }
  
   @Override
    protected void run(String[] args) throws Exception {
        Configuration hiveConf = setUpHiveSite();
        List<String> arguments = new ArrayList<String>();

        String logFile = setUpHiveLog4J(hiveConf);
        arguments.add("--hiveconf");
        arguments.add("hive.log4j.file=" + new File(HIVE_L4J_PROPS).getAbsolutePath());
        arguments.add("--hiveconf");
        arguments.add("hive.exec.log4j.file=" + new File(HIVE_EXEC_L4J_PROPS).getAbsolutePath());

        //setting oozie workflow id as caller context id for hive
        String callerId = "oozie:" + System.getProperty(LauncherAM.OOZIE_JOB_ID);
        arguments.add("--hiveconf");
        arguments.add("hive.log.trace.id=" + callerId);

        String scriptPath = hiveConf.get(HiveActionExecutor.HIVE_SCRIPT);
        String query = hiveConf.get(HiveActionExecutor.HIVE_QUERY);
        if (scriptPath != null) {
            ......
            // print out current directory & its contents
            File localDir = new File("dummy").getAbsoluteFile().getParentFile();
            String[] files = localDir.list();

            // Prepare the Hive Script
            String script = readStringFromFile(scriptPath);
            arguments.add("-f");
            arguments.add(scriptPath);
        } else if (query != null) {
            String filename = createScriptFile(query);
            arguments.add("-f");
            arguments.add(filename);
        } 

        // Pass any parameters to Hive via arguments
        ......
        String[] hiveArgs = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS);
        for (String hiveArg : hiveArgs) {
            arguments.add(hiveArg);
        }
        LauncherMain.killChildYarnJobs(hiveConf);

        try {
            runHive(arguments.toArray(new String[arguments.size()]));
        }
        finally {
            writeExternalChildIDs(logFile, HIVE_JOB_IDS_PATTERNS, "Hive");
        }
    }  
}

所以咱们能看到,Oozie ApplicationMaster 在被Yarn调用以后,就是经过org.apache.hadoop.hive.cli.CliDriver 给Hive发送命令让其执行,没有什么再和ResourceManager / NodeManager 交互的过程,这真的很奇怪。这个秘密要由下面的Tez来解答。

0x5 Tez计算框架

Tez是Apache开源的支持DAG做业的计算框架,它直接源于MapReduce框架,核心思想是将Map和Reduce两个操做进一步拆分,即Map被拆分红Input、Processor、Sort、Merge和Output, Reduce被拆分红Input、Shuffle、Sort、Merge、Processor和Output等,这样,这些分解后的元操做能够任意灵活组合,产生新的操做,这些操做通过一些控制程序组装后,可造成一个大的DAG做业。

Tez有如下特色:

  • Apache二级开源项目
  • 运行在YARN之上
  • 适用于DAG(有向图)应用(同Impala、Dremel和Drill同样,可用于替换Hive/Pig等)

能够看到,Tez也是和Yarn深度绑定的。

5.1 DAGAppMaster

首先咱们就找到了Tez对应的Application Master,即Tez DAG Application Master

public class DAGAppMaster extends AbstractService {
  public String submitDAGToAppMaster(DAGPlan dagPlan,
      Map<String, LocalResource> additionalResources) throws TezException {
      startDAG(dagPlan, additionalResources);
    }
  }  
}

咱们能看到提交Application Master代码。

public class TezYarnClient extends FrameworkClient {
  @Override
  public ApplicationId submitApplication(ApplicationSubmissionContext appSubmissionContext)
      throws YarnException, IOException, TezException {
   	ApplicationId appId= yarnClient.submitApplication(appSubmissionContext);
    ApplicationReport appReport = getApplicationReport(appId);
    return appId;
  }
}

这里是创建Application Master context 代码,设置了Application Maste类和Container。

public static ApplicationSubmissionContext createApplicationSubmissionContext(
      ApplicationId appId, DAG dag, String amName,
      AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
      Credentials sessionCreds, boolean tezLrsAsArchive,
      TezApiVersionInfo apiVersionInfo,
      ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker)
      throws IOException, YarnException {

    // Setup the command to run the AM
    List<String> vargs = new ArrayList<String>(8);
    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");

    String amOpts = constructAMLaunchOpts(amConfig.getTezConfiguration(), capability);
    vargs.add(amOpts);

    // 这里设置了 Application Master
    vargs.add(TezConstants.TEZ_APPLICATION_MASTER_CLASS);

    // 这里设置了命令行参数 
    Vector<String> vargsFinal = new Vector<String>(8);
    // Final command
    StringBuilder mergedCommand = new StringBuilder();
    for (CharSequence str : vargs) {
      mergedCommand.append(str).append(" ");
    }
    vargsFinal.add(mergedCommand.toString());

    // 设置了container
    // Setup ContainerLaunchContext for AM container
    ContainerLaunchContext amContainer =
        ContainerLaunchContext.newInstance(amLocalResources, environment,
            vargsFinal, serviceData, securityTokens, acls);

    // Set up the ApplicationSubmissionContext
    ApplicationSubmissionContext appContext = Records
        .newRecord(ApplicationSubmissionContext.class);

    appContext.setAMContainerSpec(amContainer);

    return appContext;
}

5.2 与Resource Manager交互

这里只摘要部分代码,能看到Tez实现了与Yarn Resource Manager交互

YarnTaskSchedulerService实现了AMRMClientAsync.CallbackHandler,其功能是处理由Resource Manager收到的消息,其实现了方法

import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;

public class YarnTaskSchedulerService extends TaskScheduler
                             implements AMRMClientAsync.CallbackHandler {
  @Override
  public void onContainersAllocated(List<Container> containers) {
      if (!shouldReuseContainers) {
        List<Container> modifiableContainerList = Lists.newLinkedList(containers);
        assignedContainers = assignNewlyAllocatedContainers(
            modifiableContainerList);
      } 
    }
    // upcall to app must be outside locks
    informAppAboutAssignments(assignedContainers);
  }

  @Override
  public void onContainersCompleted(List<ContainerStatus> statuses) {
    synchronized (this) {
      for(ContainerStatus containerStatus : statuses) {
        ContainerId completedId = containerStatus.getContainerId();
        HeldContainer delayedContainer = heldContainers.get(completedId);

        Object task = releasedContainers.remove(completedId);
        appContainerStatus.put(task, containerStatus);
        continue;
       }

        // not found in released containers. check currently allocated containers
        // no need to release this container as the RM has already completed it
        task = unAssignContainer(completedId, false);
        if (delayedContainer != null) {
          heldContainers.remove(completedId);
          Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource());
        } 
        if(task != null) {
          // completion of a container we have allocated currently
          // an allocated container completed. notify app. This will cause attempt to get killed
          appContainerStatus.put(task, containerStatus);
          continue;
        }
      }
    }

    // upcall to app must be outside locks
    for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
      getContext().containerCompleted(entry.getKey(), entry.getValue());
    }
  }
}
  • onContainersAllocated : 当有新的Container 可使用。这里时启动container 的代码。
  • onContainersCompleted 是Container 运行结束。 在onContainersCompleted 中,若是是失败的Container,咱们须要从新申请并启动Container,成功的将作记录既能够。

由此咱们能够看到,Oozie是一个甩手掌柜,他只管启动Hive,具体后续如何与RM交互,则彻底由Tez搞定。这就解答了以前咱们全部疑惑

最后总结下新流程:

  1. Oozie提交LauncherAM到Yarn;
  2. LauncherAM运行HiveMain,其调用CliDriver.main给Hive提交任务;
  3. Hive on Tez,因此Tez准备DAGAppMaster;
  4. Yarn与Tez交互:Tez提交DAGAppMaster到Yarn,Tez解析运行Hive命令;
  5. Hive运行结束后,调用回调 url 通知Oozie;

原谅我用这种办法画图,由于我最讨厌看到一篇好文,结果发现图没了......

+---------+                       +----------+                       +-----------+
|         | 1-submit LauncherAM   |          | 2.CliDriver.main      |           |  
|         |---------------------->| HiveMain |---------------------> |           |
|         |                       |          |                       |           |--+
| [Oozie] |                       |  [Yarn]  |                       |   [Hive]  |  | 3.Run 
|         |                       |          |                       |           |  | Hive     
|         | 5-notifyURL of Oozie  |          | 4-submit DAGAppMaster |           |<-+
|         |<----------------------|          | <-------------------->|    Tez    |
|         |                       |          |                       |           |
+---------+                       +----------+                       +-----------+

0x6 Java on Yarn

下面咱们看看若是Oozie执行一个Java程序,是如何进行的。

Java程序的主执行函数是 JavaMain,这个就简单多了,就是直接调用用户的Java主函数。

public class JavaMain extends LauncherMain {
    public static final String JAVA_MAIN_CLASS = "oozie.action.java.main";

   /**
    * @param args Invoked from LauncherAM:run()
    * @throws Exception in case of error when running the application
    */
    public static void main(String[] args) throws Exception {
        run(JavaMain.class, args);
    }

    @Override
    protected void run(String[] args) throws Exception {

        Configuration actionConf = loadActionConf();
        setYarnTag(actionConf);
        setApplicationTags(actionConf, TEZ_APPLICATION_TAGS);
        setApplicationTags(actionConf, SPARK_YARN_TAGS);

        LauncherMain.killChildYarnJobs(actionConf);

        Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);
        Method mainMethod = klass.getMethod("main", String[].class);
        mainMethod.invoke(null, (Object) args);
    }
}

0x7 Yarn job 执行结束

7.1 检查任务机制

前面提到,ActionExecutor.start是异步的,还须要检查Action执行状态来推动流程,oozie经过两种方式来检查任务是否完成。

  • 回调:当一个任务和一个计算被启动后,会为任务提供一个回调url,该任务执行完成后,会执行回调来通知oozie
  • 轮询:在任务执行回调失败的状况下,不管任何缘由,都支持以轮询的方式进行查询。

oozie提供这两种方式来控制任务。

7.2 回调机制

LauncherAM 在用户程序执行完成以后,会作以下调用,以通知Oozie。这就用到了“回调”机制。

LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf);
                cn.notifyURL(actionResult);

Oozie的CallbackServlet会响应这个调用。能够看到,DagEngine.processCallback是Oozie处理程序结束之处。

public class CallbackServlet extends JsonRestServlet {
    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        String queryString = request.getQueryString();
        CallbackService callbackService = Services.get().get(CallbackService.class);

        String actionId = callbackService.getActionId(queryString);

        DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();

        dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null);
        }
    }
}

DagEngine.processCallback主要是使用CompletedActionXCommand来进行。能够看到这个命令是放到 CallableQueueService 的 queue中,因此下面咱们须要介绍 CallableQueueService

public void processCallback(String actionId, String externalStatus, Properties actionData)
          throws DagEngineException {
      XCallable<Void> command = new CompletedActionXCommand(actionId, externalStatus,
      actionData, HIGH_PRIORITY);
      if (!Services.get().get(CallableQueueService.class).queue(command)) {
          LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback");
      }
}

7.3 异步执行

7.3.1 CallableQueueService

Oozie 使用 CallableQueueService 来异步执行操做;

public class CallableQueueService implements Service, Instrumentable {
    private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
    private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
    private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>();
    private Set<String> interruptTypes;
    private int interruptMapMaxSize;
    private int maxCallableConcurrency;
    private int queueAwaitTerminationTimeoutSeconds;
    private int queueSize;
    private PriorityDelayQueue<CallableWrapper<?>> queue;
    private ThreadPoolExecutor executor;
    private Instrumentation instrumentation;
    private boolean newImpl = false;
    private AsyncXCommandExecutor asyncXCommandExecutor; 
  
    public void init(Services services) {
          queue = new PollablePriorityDelayQueue<CallableWrapper<?>>(PRIORITIES,
                    MAX_CALLABLE_WAITTIME_MS,
                    TimeUnit.MILLISECONDS,
                    queueSize) {
                @Override
                protected boolean eligibleToPoll(QueueElement<?> element) {
                    if (element != null) {
                        CallableWrapper wrapper = (CallableWrapper) element;
                        if (element.getElement() != null) {
                            return callableReachMaxConcurrency(wrapper.getElement());
                        }
                    }
                    return false;
                }
            };  
    }
}

特色:

  • 加入执行队列的任务多是能够当即被吊起的,也多是将来某个时间才触发的。
  • 执行线程池根据 任务的执行时间和任务的优先级别来选取任务吊起。
  • 执行线程池的任务队列大小可配置,当到达队列最大值,线程池将再也不接收任务。

7.3.3 PriorityDelayQueue

线程池选取的队列是oozie自定义的队列 PriorityDelayQueue:

特色:

根据队列中元素的延时时间以及其执行优先级出队列:

实现策略:

PriorityDelayQueue 中为每一个优先级别的任务设置一个 延时队列 DelayQueue
由于使用的是jdk自带的延时队列 DelayQueue,能够保证的是若是任务在该队列中的延时时间知足条件,咱们
经过poll()方法便可获得知足延时条件的任务,若是 poll()获得的是null,说明该队列的中任务没有知足时间条件的任务。

如何编排多个优先级的队列:
每次从PriorityDelayQueue去选取任务,都优先从最高优先级的队列来poll出任务,若是最高的优先级队列中没有知足条件的任务,则次优先级队列poll出任务,若是仍未获取
将按照队列优先等级以此类推。
饿死现象:假如高优先级中的任务在每次获取的时候都知足条件,这样容易将低优先级的队列中知足条件的任务活活饿死,为了防止这种状况的产生,在每次选取任务以前,遍历
低优先级队列任务,若是任务早已经知足出队列条件,若是超时时间超过了咱们设定的最大值,咱们会为这个任务提升优先级,将这个任务优先级加一,添加到上个优先级队列中进行
排队。

7.3.3 PollablePriorityDelayQueue

特色:

在从队列中选取任务的时候,先判断知足时间的任务是否知足并发等限制,若是知足再从队列中取出,而不是像PriorityDelayQueue那样,先取出若是不知足并发等限制,再将该任务从新放置回去。

任务类型:

使用线程池异步执行任务,任务和任务之间是无序的,针对具体的业务场景,可能执行的单元是须要串序执行的。oozie中封装了 CompositeCallable 和 通常的 XCallable的任务类型,前者是XCallable的一个集合,它能保证的是这个集合里面的XCallable是顺序执行的。

7.4 跳转下一个操做

CompletedActionXCommand 当Workflow command结束时候会执行,且只执行一次。对于程序结束,会在异步队列中加入一个 ActionCheckXCommand。

public class CompletedActionXCommand extends WorkflowXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {
        if (this.wfactionBean.getStatus() == WorkflowActionBean.Status.PREP) {
           .....
        } else {    // RUNNING
            ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(this.wfactionBean.getType());
            // this is done because oozie notifications (of sub-wfs) is send
            // every status change, not only on completion.
            if (executor.isCompleted(externalStatus)) {
                queue(new ActionCheckXCommand(this.wfactionBean.getId(), getPriority(), -1));
            }
        }
        return null;
    }  
}

异步调用到ActionCheckXCommand,其主要做用是:

  • 若是有重试机制,则作相应配置
  • 调用 executor.check(context, wfAction); 来检查环境信息
  • 更新数据库中的任务信息
  • 由于已经结束了,因此用ActionEndXCommand来执行结束
public class ActionCheckXCommand extends ActionXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {

        ActionExecutorContext context = null;
        boolean execSynchronous = false;
        try {
            boolean isRetry = false; // 若是有重试机制,则作相应配置
            if (wfAction.getRetries() > 0) {
                isRetry = true;
            }
            boolean isUserRetry = false;
            context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
          
            executor.check(context, wfAction); // 检查环境信息

            if (wfAction.isExecutionComplete()) {
                if (!context.isExecuted()) {
                    failJob(context);
                    generateEvent = true;
                } else {
                    wfAction.setPending();
                    execSynchronous = true;
                }
            }
            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
            updateList.add(new UpdateEntry<WorkflowJobQuery> (WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
                    wfJob));
        }
        finally {
                // 更新数据库中的任务信息
                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
                if (generateEvent && EventHandlerService.isEnabled()) {
                    generateEvent(wfAction, wfJob.getUser());
                }
                if (execSynchronous) {
                    // 用ActionEndXCommand来执行结束
                    new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
                }
        }
        return null;
    }
}

调用到 JavaActionExecutor.check

  • 根据配置信息创建 yarnClient = createYarnClient(context, jobConf);
  • 获取程序报告信息 ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
  • 获取程序数据 Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);
  • 设置各类信息
@Override
public void check(Context context, WorkflowAction action) throws ActionExecutorException {
    boolean fallback = false;
    YarnClient yarnClient = null;
    try {
        Element actionXml = XmlUtils.parseXml(action.getConf());
        Configuration jobConf = createBaseHadoopConf(context, actionXml);
        FileSystem actionFs = context.getAppFileSystem();
        yarnClient = createYarnClient(context, jobConf); // 根据配置信息创建
        FinalApplicationStatus appStatus = null;
        try {
            final String effectiveApplicationId = findYarnApplicationId(context, action);
            final ApplicationId applicationId = ConverterUtils.toApplicationId(effectiveApplicationId);
            final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId); // 获取程序报告信息
            final YarnApplicationState appState = appReport.getYarnApplicationState();
            if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
                    || appState == YarnApplicationState.KILLED) {
                appStatus = appReport.getFinalApplicationStatus();
            }
        } 
        if (appStatus != null || fallback) {
            Path actionDir = context.getActionDir();
            // load sequence file into object
            Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);   // 获取程序数据
            if (fallback) {
                String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS);
                if (finalStatus != null) {
                    appStatus = FinalApplicationStatus.valueOf(finalStatus);
                } else {
                    context.setExecutionData(FAILED, null);
                }
            }

            String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID);  // MapReduce was launched
            if (externalID != null) {
                context.setExternalChildIDs(externalID);
             }

           // Multiple child IDs - Pig or Hive action
            String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
            if (externalIDs != null) {
                context.setExternalChildIDs(externalIDs);
             }

            // 设置各类信息
            context.setExecutionData(appStatus.toString(), null);
            if (appStatus == FinalApplicationStatus.SUCCEEDED) {
                if (getCaptureOutput(action) && LauncherHelper.hasOutputData(actionData)) {
                    context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
                            .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS)));
                }
                else {
                    context.setExecutionData(SUCCEEDED, null);
                }
                if (LauncherHelper.hasStatsData(actionData)) {
                    context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS));
                }
                getActionData(actionFs, action, context);
            }
            else {
                ......
                context.setExecutionData(FAILED_KILLED, null);
            }
        }
    }
    finally {
        if (yarnClient != null) {
            IOUtils.closeQuietly(yarnClient);
        }
    }
}

ActionEndXCommand会进行结束和跳转:

  • 调用Executor来完成结束操做 executor.end(context, wfAction);
  • 更新数据库的job信息 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete
  • 用 SignalXCommand 来进行跳转,进行下一个Action的执行
public class ActionEndXCommand extends ActionXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {

        Configuration conf = wfJob.getWorkflowInstance().getConf();

        if (!(executor instanceof ControlNodeActionExecutor)) {
            maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
            retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
        }

        executor.setMaxRetries(maxRetries);
        executor.setRetryInterval(retryInterval);

        boolean isRetry = false;
        if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
                || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
            isRetry = true;
        }
        boolean isUserRetry = false;
        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
        try {
          
            executor.end(context, wfAction); // 调用Executor来完成结束操做

            if (!context.isEnded()) {
                failJob(context);
            } else {
                wfAction.setRetries(0);
                wfAction.setEndTime(new Date());

                boolean shouldHandleUserRetry = false;
                Status slaStatus = null;
                switch (wfAction.getStatus()) {
                    case OK:
                        slaStatus = Status.SUCCEEDED;
                        break;
                    ......
                }
                if (!shouldHandleUserRetry || !handleUserRetry(context, wfAction)) {
                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus,
                            SlaAppType.WORKFLOW_ACTION);
                    if(slaEvent != null) {
                        insertList.add(slaEvent);
                    }
                }
            }
            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
            DagELFunctions.setActionInfo(wfInstance, wfAction);
            wfJob.setWorkflowInstance(wfInstance);

            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
            wfJob.setLastModifiedTime(new Date());
            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
        }
        finally {
            try { 
                // 更新数据库的job信息
                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
            }
            if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
                generateEvent(wfAction, wfJob.getUser());
            }
            new SignalXCommand(jobId, actionId).call(); // 进行跳转,进行下一个Action的执行
        }
        return null;
    }  
}

0xFF 参考

大数据之Oozie——源码分析(一)程序入口

什么是Oozie——大数据任务调度框架

Oozie基础小结

【原创】大数据基础之Oozie(1)简介、源代码解析

【原创】大叔经验分享(6)Oozie如何查看提交到Yarn上的任务日志

Oozie和Azkaban的技术选型和对比

Oozie-TransitionXCommand

Oozie-Service-CallableQueueService

YARN基本框架分析

Oozie任务调度阻塞及内存优化方法

相关文章
相关标签/搜索