hadoop 之源码 job

hadoop source

job

类 JobSubmitter.submitJobInternal()apache

The job submission process involves:api

  1. Checking the input and output specifications of the job.
  2. Computing the InputSplit values for the job.
  3. Setting up the requisite accounting information for the DistributedCache of the job, if necessary.
  4. Copying the job's jar and configuration to the MapReduce system directory on the FileSystem.
  5. Submitting the job to the ResourceManager and optionally monitoring it's status.

部分方法摘要缓存

//validate the jobs output specs
   checkSpecs(job);

   Configuration conf = job.getConfiguration();
   addMRFrameworkToDistributedCache(conf);

   Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

根据源码看,添加资源至 分布式缓存应该是第二步 才对app

1 检查job输出路径分布式

/**
 * Check for validity of the output-specification for the job.
 *  
 * <p>This is to validate the output specification for the job when it is
 * a job is submitted.  Typically checks that it does not already exist,
 * throwing an exception when it already exists, so that output is not
 * overwritten.</p>
 *
 * @param ignored
 * @param job job configuration.
 * @throws IOException when output should not be attempted
 */
void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;

从源码注释上能够看出,为了不数据的丢失,若是输出路径存在,这里会抛出异常。ide

2 检查资源权限oop

/**
   * Initializes the staging directory and returns the path. It also
   * keeps track of all necessary ownership & permissions
   * @param cluster
   * @param conf
   */
  public static Path getStagingDir(Cluster cluster, Configuration conf)
  throws IOException,InterruptedException {

在后面的方法中会根据这个方法输出的path,再次进行校验ui

/**
 * Make sure that a path specifies a FileSystem.
 * @param path to use
 */
public Path makeQualified(Path path) {
  checkPath(path);
  return path.makeQualified(this.getUri(), this.getWorkingDirectory());
}

3 计算分片this

private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
  Configuration conf = job.getConfiguration();
  InputFormat<?, ?> input =
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

  List<InputSplit> splits = input.getSplits(job);
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

  // sort the splits into order based on size, so that the biggest
  // go first
  Arrays.sort(array, new SplitComparator());
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
      jobSubmitDir.getFileSystem(conf), array);
  return array.length;
}

在新的api中,分片的切割是根据InputFormat切割的,再看源码getSplits方法code

/**
 * Logically split the set of input files for the job.  
 *
 * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
 * for processing.</p>
 *
 * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
 * input files are not physically split into chunks. For e.g. a split could
 * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
 * also creates the {@link RecordReader} to read the {@link InputSplit}.
 *
 * @param context job configuration.
 * @return an array of {@link InputSplit}s for the job.
 */
public abstract
  List<InputSplit> getSplits(JobContext context
                             ) throws IOException, InterruptedException;

注释中有最重要的一条,这个不是物理上的切割。mapreduce程序默认使用TextInputFormat方法,TextInputFormat类继承了FileInputFormat,FileInputFormat是一个基类,里面实现了getSplits()方法。

/**
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {

再看,切分大小是如何计算的:

protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

maxSize 是用户设置属性 mapreduce.input.fileinputformat.split.maxsize 的大小,默认是Long.MAX_VALUE
minSize 能够经过属性 mapreduce.input.fileinputformat.split.minsize 控制,默认是1L
因此 默认状况下,分片的大小是blockSize。

回归主线,关于InputFormat,之后详细的分析。
在获取了List ,会有一个排序,比较大小,从大到小。比较器以下:

private static class SplitComparator implements Comparator<InputSplit> {
   @Override
   public int compare(InputSplit o1, InputSplit o2) {
     try {
       long len1 = o1.getLength();
       long len2 = o2.getLength();
       if (len1 < len2) {
         return 1;
       } else if (len1 == len2) {
         return 0;
       } else {
         return -1;
       }
     } catch (IOException ie) {
       throw new RuntimeException("exception in compare", ie);
     } catch (InterruptedException ie) {
       throw new RuntimeException("exception in compare", ie);
     }
   }
 }

旧api

//method to write splits for old api mapper.
 private int writeOldSplits(JobConf job, Path jobSubmitDir)
 throws IOException {
   org.apache.hadoop.mapred.InputSplit[] splits =
   job.getInputFormat().getSplits(job, job.getNumMapTasks());
   // sort the splits into order based on size, so that the biggest
   // go first
   Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() {
     public int compare(org.apache.hadoop.mapred.InputSplit a,
                        org.apache.hadoop.mapred.InputSplit b) {
       try {
         long left = a.getLength();
         long right = b.getLength();
         if (left == right) {
           return 0;
         } else if (left < right) {
           return 1;
         } else {
           return -1;
         }
       } catch (IOException ie) {
         throw new RuntimeException("Problem getting input split size", ie);
       }
     }
   });
   JobSplitWriter.createSplitFiles(jobSubmitDir, job,
       jobSubmitDir.getFileSystem(job), splits);
   return splits.length;
 }

旧api 实现相似。

4 提交做业

// Write job file to submit dir
   writeConf(conf, submitJobFile);

   //
   // Now, actually submit the job (using the submit name)
   //
   printTokens(jobId, job.getCredentials());
   status = submitClient.submitJob(
       jobId, submitJobDir.toString(), job.getCredentials());

等等,是否是缺乏了什么?哪里有提交job jar了?在计算maps 前就已upload了文件 注释顺序并不必定是代码顺序!!!

/**
 * configure the jobconf of the user with the command line options of
 * -libjars, -files, -archives.
 * @param job
 * @throws IOException
 */
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
throws IOException {
  JobResourceUploader rUploader = new JobResourceUploader(jtFs);
  rUploader.uploadFiles(job, jobSubmitDir);

  // Set the working directory
  if (job.getWorkingDirectory() == null) {
    job.setWorkingDirectory(jtFs.getWorkingDirectory());
  }
}

顺便提一下hadoop job 四种状态

RUNNING(1),
 SUCCEEDED(2),
 FAILED(3),
 PREP(4),
 KILLED(5);
相关文章
相关标签/搜索