hadoop2.7之做业提交详解(上)

根据wordcount进行分析:html

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author: LUGH1 * @date: 2019-4-8 * @description: */
public class WordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://192.168.88.130:9000"); Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class); job.setMapperClass(WdMapper.class); job.setReducerClass(WdReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("/test/word.txt")); FileOutputFormat.setOutputPath(job, new Path("/test/output")); boolean result = job.waitForCompletion(true); System.exit(result?0:1); System.out.println("good job"); } } class WdMapper extends Mapper<Object, Text, Text, IntWritable> { @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(" "); for(String word : split){ context.write(new Text(word), new IntWritable(1)); } } } class WdReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable i : values){ count += i.get(); } context.write(key,new IntWritable(count)); } }

这上面是个简单wordcount的代码,这里就不一一说明了,咱们首先看main方法:获取一个job对象,而后通过一系列的设置,最后调用waitForCompletion方法java

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
 //....省略具体代码.....
   boolean result = job.waitForCompletion(true);  //调用由Job类提供的方法waitForCompletion()提交做业
   System.exit(result?0:1);
}

  接下来咱们看下一调用waitForCompletion方法的这个类Job(因为类的内容不少,这里只展现咱们须要的部分):node

public class Job extends JobContextImpl implements JobContext {                                                                                                                                                                                                                                                                                                                                                                                                                                                                
  private static final Log LOG = LogFactory.getLog(Job.class);
  public static enum JobState {DEFINE, RUNNING}; //定义两种状态
  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;  //表示最多2000毫秒刷新状态
  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
  public static final String COMPLETION_POLL_INTERVAL_KEY = "mapreduce.client.completion.pollinterval";
  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY ="mapreduce.client.progressmonitor.pollinterval";
  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
  public static final String USED_GENERIC_PARSER = "mapreduce.client.genericoptionsparser.used";
  public static final String SUBMIT_REPLICATION =  "mapreduce.client.submit.file.replication";
  public static final int DEFAULT_SUBMIT_REPLICATION = 10;
  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
  static {
    ConfigUtil.loadResources();  //加载配置
  }
  private JobState state = JobState.DEFINE;  //加载类的时候默认设置状态为DEFINE状态
  private JobStatus status;
  private long statustime;
  private Cluster cluster;
  private ReservationId reservationId;    

 boolean waitForCompletion(booleanverbose) 
submit() setUseNewAPI() connect() getJobSubmitter(FileSystemfs, ClientProtocolsubmitClient) isUber() //是否“拼车”模式(MapTask与ReduceTask在同一节点上) setPartitionerClass()//Mapper的输出可能要由Partitioner按某种规则分发给多个Reducer setMapSpeculativeExecution() //是否须要有Speculative的Mapper起预备队的做用 setReduceSpeculativeExecution() //是否须要有Speculative的Reducer起预备队的做用 setCacheFiles()
}

  在Job类中有不少的静态变量,代码块等,咱们知道在java中初始化会先加载静态的这些变量和代码块,因此咱们在main方法中调用Job job = Job.getInstance(conf);方法的时候,就会对这些静态的变量和代码进行加载,这些静态的变量和代码块就是设置一些参数,好比设置job的默认状态的DEFINE状态,以及加载一些配置文件,加载配置文件的方法以下:web

public static void loadResources() {
    addDeprecatedKeys();
    Configuration.addDefaultResource("mapred-default.xml");
    Configuration.addDefaultResource("mapred-site.xml");
    Configuration.addDefaultResource("yarn-default.xml");
    Configuration.addDefaultResource("yarn-site.xml");
  }

 记载配置文件就是加载hadoop的一些配置文件,因此在咱们调用waitForCompletion方法以前这些都是已经加载好了的,接下来咱们看waitForCompletion方法:算法

//org.apache.hadoop.mapreduce中的Job类
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
if (state == JobState.DEFINE) {   //判断做业是不是DEFINE状态,防止重复提交做业
    submit();  //提交做业 
}  
if (verbose) { //提交以后监控其运行,直到做业结束
  monitorAndPrintJob();   //周期性报告做业进度状况
 } else {   //要否则就周期行询问做业是否文成
    // get the completion poll interval from the client.
    int completionPollIntervalMillis =  Job.getCompletionPollInterval(cluster.getConf());
    while (!isComplete()) {
      try {
       Thread.sleep(completionPollIntervalMillis); 
      } catch (InterruptedException ie) {
      }
    }
 }
  return isSuccessful();
}

  

  从做业提交流程的角度看,这个方法的代码再简单不过了,实际就是对Job.submit()的调用,只是在调用以前要检查一下本做业是否处于 DEFINE 状态,以确保一个做业不会被提交屡次。 如上所述,JobState的值只有 DEFINE 和 RUNNING 两种,具体Job对象建立之初在构造函数Job()中将其设置成 DEFINE,做业提交成功以后就将其改为 RUNNING,这就把门关上了。
  在正常的状况下,Job.submit() 很快就会返回,由于这个方法的做用只是把做业提交上去,而无须等待做业的执行和完成。 可是,在Job.submit()返回以后,Job.waitForCompletion()则要等待做业执行完成了之后才会返回。 在等待期间,若是参数verbose为true,就要周期地报告做业执行的进展,或者就只是周期地检测做业是否已经完成。sql

因此咱们的做业提交流程目前是:apache

[WordCount.main() -> Job.waitForCompletion() -> Job.submit() ]缓存

那么,接下来,看一看这个submit方法:app

public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); //确保做业的状态是DEFINE setUseNewAPI(); //根据配置信息是否使用新的API提交 connect(); //用来链接集群,建立Cluster的cluster对象 final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());//获取JobSubmitter的实例对象submitter 
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { //ugi.doAs用来控制权限 public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); //真正用于提交做业 } }); state = JobState.RUNNING; //设置job的状态为RUNNING LOG.info("The url to track the job: " + getTrackingURL()); }

接下来咱们先看connect方法:框架

private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { //若是cluter为空,咱们就建立一个cluster实例 cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); //建立cluster } }); } }

可见connect()的做用就是保证节点上有个Cluster类对象,若是尚未,就建立一个。 那咱们就看一下Cluster这个类(列出一部分):

public class Cluster { @InterfaceStability.Evolving public static enum JobTrackerStatus {INITIALIZING, RUNNING}; //做业跟踪状态
  private ClientProtocolProvider clientProtocolProvider; //集群版为YarnClientProtocolProvider ,本地模式为LocalClientProtocolProvider
  private ClientProtocol client;  //在集群条件下,这是与外界通讯的渠道和规则
  private UserGroupInformation ugi; //用来控制权限
  private Configuration conf;  //配置信息
  private FileSystem fs = null; //文件系统
  private Path sysDir = null; //系统目录
  private Path stagingAreaDir = null; private Path jobHistoryDir = null; //历史做业目录
  private static final Log LOG = LogFactory.getLog(Cluster.class); //ServiceLoader<ClientProtocolProvider>,就是针对 //ClientProtocolProvider类的ServiceLoader,并且这就是经过ServiceLoaderl.oad()装载的ServiceLoader实现了Iterable界面,
//提供一个iterator()函数,于是能够用在for循环中。
//它还提供了一个load()方法,能够经过ClassLoader加载Class private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class); static { ConfigUtil.loadResources(); //加载配置文件 } //构造器 public Cluster(Configuration conf) throws IOException { this(null, conf); } //构造器 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf); //调用initialize方法 } //目的是要建立ClientProtocolProvider和ClientProtocol private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { //不容许多个线程同时进入此段代码,须要加锁 for (ClientProtocolProvider provider : frameworkLoader) { //遍历frameworkLoader获取provider LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { //经过ClientProtocolProvider的create方法建立clientProtocol clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; //已经建立了ClientProtocol对象,YARNRunner或LocalJobRunner LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; //成功后结束循环 } else { //失败,记录日志 LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: ", e); } } } if (null == clientProtocolProvider || null == client) { //判断是否建立了ClientProtocolProvider和ClientProtocol对象 throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } }

  那么知道job类的connect方法就是确保有实例cluster,若是没有就经过Cluster的构造函数进行建立,在建立以前须要加载一些配置信息ConfigUtil.loadResources()和对静态的变量frameworkLoader等赋值,而后在调用Cluster的构造方法,在Cluster的构造方法中一定调用Cluster.initialize()方法,其中ClientProtocolProvider和ClientProtocol:用户向RM节点提交做业,是要RM为其安排运行,因此RM起着服务提供者的做用,而用户则处于客户的位置。既然如此,双方就得有个协议,对于双方怎么交互,乃至服务怎么提供,都得有个规定。在Hadoop的代码中,这所谓Protocol甚至被“上纲上线”到了计算框架的高度,连是否采用YARN框架也被归入了这个范畴。实际上ClientProtocol就起着这样的做用,而ClientProtocolProvider顾名思义是ClientProtocol的提供者,起着有点像是Factory的做用。

至于ServiceLoader<ClientProtocolProvider>,那是用来装载ClientProtocolProvider的。

咱们首先看一下这个类ClientProtocolProvider,很明显是一个抽象类,这意味着只有继承和扩充了这个抽象类的具体类才能被实体化成对象

public abstract class ClientProtocolProvider { public abstract ClientProtocol create(Configuration conf) throws IOException; public abstract ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException; public abstract void close(ClientProtocol clientProtocol) throws IOException; }

接下来咱们看看这个抽象类的两个子类YarnClientProtocolProvider和LocalClientProtocolProvider 

package org.apache.hadoop.mapred; public class YarnClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { return new YARNRunner(conf); //YARNRunner实现了ClientProtocol接口
 } return null; } @Override
public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { return create(conf); } @Override public void close(ClientProtocol clientProtocol) throws IOException { if (clientProtocol instanceof YARNRunner) { ((YARNRunner)clientProtocol).close(); } }
package org.apache.hadoop.mapred; public class LocalClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { String framework = conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) { return null; } conf.setInt(JobContext.NUM_MAPS, 1); //map数为1
    return new LocalJobRunner(conf); //LocalJobRunner实现了ClientProtocol接口
 } @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) { return null; // LocalJobRunner doesn't use a socket
 } @Override public void close(ClientProtocol clientProtocol) { // no clean up required
  }

如今返回来在聊聊Cluster.initialize()方法:

  其中ServiceLoader实现了Iterable界面,提供一个iterator()函数,于是能够用在for循环中。它还提供了一个load()方法,能够经过ClassLoader加载Class。此外,它还提供解析文件内容的功能装载了做为ServiceLoader对象的frameworkLoader,其LinkedHashMap中就有了上述的两个路径,这样就能够经过其iterator()函数依次引用这两个路径了

  而后,在Cluster类的构造函数中就会调用其initialize(),目的是要建立ClientProtocolProvider和ClientProtocol。

  可是ClientProtocolProvider是个抽象类,这意味着只有继承和扩充了这个抽象类的具体类才能被实体化成对象。Hadoop的源码中一共只有两个类扩充和落实了这个抽象类,那就是LocalClientProtocolProvider和YarnClientProtocolProvide

 

  可想而知,由这两种ClientProtocolProvider提供的ClientProtocol也是不同的。事实上ClientProtocol是个界面,实现了这个界面的类也有两个,分别为LocalJobRunner和YARNRunner。可是实际使用的只能是其中之一。

  initialize的for循环,是基于前述ServiceLoader中iterator()的循环。实际上也就是对两个ClientProtocolProvider的循环,目的是要经过ClientProtocolProvider.create()建立用户所要求的ClientProtocol,也无非就是LocalJobRunner或YARNRunner。只要有一次建立成功,循环就没有必要继续了,由于只能有一种选择;可是,若是两次都失败,程序就没法继续了,由于不知道该怎样让RM提供计算服务。而可否成功建立,则取决于前述配置项的设置。不过ClientProtocolProvider是抽象类,实际上依次进行尝试的是LocalClientProtocolProvider和YarnClientProtocolProvider。假定第一轮循环时进行尝试的是前者,那么做业的流程就是:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> LocalClientProtocolProvider.create()]

若是是后者,则做业的流程就是:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create()]

这里咱们假定以yarn方式提交,因此流程为第二种。

经过YarnClientProtocolProvider.create()方法,最终返回的是一个new YARNRunner(conf)对象。

  好了,继续回到咱们的Job.submit()方法,到这里connect方法就算执行完毕了,接下就是对getJobSubmitter()的调用。 这个函数建立一个JobSubmitter类对象,而后Jobs. ubmit()就调用它的submitJobInternal()方法,完成做业的提交。建立JobSubmitter对象时的两个参数就是调用getJobSubmitter()时的两个参数,就是cluster.getFileSystem()和cluster.getClient()。 其中cluster.getClient()返回的就是 YARNRunner或LocalJobRunner;而cluster.getFileSystem()的返回结果对于 YARNRunner是 RM 节点上文件系统的 URL,对于 LocalJobRunner则是本节点上的一个相对路径为“mapred/system”的目录。

  接下来了解下JobSubmitter这个类(部分展现)

 

package org.apache.hadoop.mapreduce;
class JobSubmitter {
  protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
  private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; //shuffle算法
  private static final int SHUFFLE_KEY_LENGTH = 64;
  private FileSystem jtFs;
  private ClientProtocol submitClient;
  private String submitHostName;
  private String submitHostAddress;
  JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) 
  throws IOException {
    this.submitClient = submitClient; //在集群条件下是YARNRunner 
    this.jtFs = submitFs;
  }

compareFs(FileSystemsrcFs, FileSystemdestFs) //比较两个文件系统是否相同
getPathURI()
checkSpecs()
copyRemoteFiles()
copyAndConfigureFiles()
copyJar(PathoriginalJarPath, PathsubmitJarFile,shortreplication)
addMRFrameworkToDistributedCache()
submitJobInternal(Jobjob, Clustercluster) //将做业提交给集群
writeNewSplits(JobContextjob, PathjobSubmitDir)
getJobSubmitter(FileSystem fs, ClientProtocol submitClient)//底层调用的就是JobSubmitter的构造方法
}

 

接下来看看submitJobInternal方法

JobStatus submitJobInternal(Job job, Cluster cluster) 
throws ClassNotFoundException, InterruptedException, IOException {

  //validate the jobs output specs 验证输出格式等配置 
  checkSpecs(job);

  Configuration conf = job.getConfiguration(); //获取配置信息
  addMRFrameworkToDistributedCache(conf); //添加到缓存

  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// 获取目录路径
  //configure the command line options correctly on the submitting dfs
  InetAddress ip = InetAddress.getLocalHost(); //获取本节点(该主机)的ip地址
  if (ip != null) {
    submitHostAddress = ip.getHostAddress();//本节点IP地址的字符串形式 
    submitHostName = ip.getHostName();//本节点名称 
    conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); //写入配置conf中
    conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
  }
  JobID jobId = submitClient.getNewJobID(); //设置JOBId(做业ID惟一)
  job.setJobID(jobId); //设置job的id
  Path submitJobDir = new Path(jobStagingArea, jobId.toString());//本做业的临时子目录名中包含着做业ID号码 
  JobStatus status = null;
  try {
    conf.set(MRJobConfig.USER_NAME,
        UserGroupInformation.getCurrentUser().getShortUserName()); //这是用户名
    conf.set("hadoop.http.filter.initializers", 
        "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");//准备用于Http接口的过滤器初始化 
    conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());//设置提交job的路径
    LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
        + " as the submit dir");

    // get delegation token for the dir  /* 准备好与访问权限有关的证件(token) */ 
    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
        new Path[] { submitJobDir }, conf); //获取与NameNode打交道所需证件 
    
    populateTokenCache(conf, job.getCredentials());

    // generate a secret to authenticate shuffle transfers//须要生成Mapper与Reducer之间的数据流动所用的密码 
    if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
      KeyGenerator keyGen;
      try {
        keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
        keyGen.init(SHUFFLE_KEY_LENGTH);
      } catch (NoSuchAlgorithmException e) {
        throw new IOException("Error generating shuffle secret key", e);
      }
      SecretKey shuffleKey = keyGen.generateKey();
      TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
          job.getCredentials());
    }
    if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
      conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
      LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
              "data spill is enabled");
    }

    copyAndConfigureFiles(job, submitJobDir);//将可执行文件之类拷贝到HDFS中,默认的是保留10份,会存在不一样的节点上

    Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);//配置文件路径 
    
    // Create the splits for the job
    LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
    int maps = writeSplits(job, submitJobDir);    //设置map数,这里如何设置map的数量我会单独写一篇介绍,
    conf.setInt(MRJobConfig.NUM_MAPS, maps);
    LOG.info("number of splits:" + maps);

    // write "queue admins of the queue to which job is being submitted"  to job file.
    String queue = conf.get(MRJobConfig.QUEUE_NAME,
        JobConf.DEFAULT_QUEUE_NAME); //默认做业调度队列名为“default”

    AccessControlList acl = submitClient.getQueueAdmins(queue);
    conf.set(toFullPropertyName(queue,
        QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());  //设置acl权限 

    // removing jobtoken referrals before copying the jobconf to HDFS
    // as the tasks don't need this setting, actually they may break
    // because of it if present as the referral will point to a
    // different job.
    TokenCache.cleanUpTokenReferral(conf); //清楚Token引用的缓存

    if (conf.getBoolean(
        MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
        MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
      // Add HDFS tracking ids 若是启用了跟踪机制的话
      ArrayList<String> trackingIds = new ArrayList<String>();
      for (Token<? extends TokenIdentifier> t :
          job.getCredentials().getAllTokens()) {
        trackingIds.add(t.decodeIdentifier().getTrackingId()); //获取全部相关跟踪机制
      }
      conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
          trackingIds.toArray(new String[trackingIds.size()])); //设置跟踪机制
    }

    // Set reservation info if it exists设置预设参数(若是有)
    ReservationId reservationId = job.getReservationId();
    if (reservationId != null) {
      conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
    }

    // Write job file to submit dir
    writeConf(conf, submitJobFile);//将conf的内容写入一个.xml文件 
    
    //
    // Now, actually submit the job (using the submit name)
    //
    printTokens(jobId, job.getCredentials());

//提交做业,经过YarnRunner.submitJob()或LocalJobRunner.submitJob() 
    status = submitClient.submitJob(
        jobId, submitJobDir.toString(), job.getCredentials());
    if (status != null) {
      return status;  //返回状态
    } else {
      throw new IOException("Could not launch job");
    }
  } finally {
    if (status == null) {
      LOG.info("Cleaning up the staging area " + submitJobDir);
      if (jtFs != null && submitJobDir != null)
        jtFs.delete(submitJobDir, true); // 删除临时目录 

    }
  }
}

submitJobInternal方法能够得知,须要随同做业单一块儿提交的资源和信息有两类:

  一类是须要交到资源管理器RM手里,供RM在立项和调度时使用的;

  一类则并不是供RM直接使用,而是供具体进行计算的节点使用的。前者包括本节点即做业提交者的IP地址、节点名、用户名、做业ID号,以及有关MapReduce计算输入数据文件的信息,还有为提交做业而提供的“证章(Token)”等。这些信息将被打包提交给RM,这就是狭义的做业提交,是流程的主体。后者则有做业执行所需的jar可执行文件、外来对象库等。若是计算的输入文件在本地,则后者还应包括输入文件。这些资源并不须要提交给RM,由于RM自己并不须要用到这些资源,可是必需要把这些资源复制或转移到全局性的HDFS文件系统中,让具体承担计算任务的节点可以取用。

  为了上传相关的资源和信息,须要在HDFS文件系统中为本做业建立一个目录。HDFS文件系统中有一个目录是专门用于做业提交的,称为“舞台目录(stagingdirectory)”。因此这里要经过JobSubmissionFiles.getStagingDir()从集群获取这个目录的路径。而后就以本做业的ID,即JobId为目录名在这个舞台目录中建立一个临时的子目录,这就是代码中的submitJobDir。之后凡是与本做业有关的资源和信息,就都上传到这个子目录中。

  这个方法还包括设置map数,执行队列呀等最后执行connect()方法中建立的对象YARNRunner(或者是LocalJobRunner)的submitJob方法。这样咱们的做业就提交给RM了,做业流程以下:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()]

可继续看(hadoop2.7之做业提交详解(下)

相关文章
相关标签/搜索