类 JobSubmitter.submitJobInternal()apache
The job submission process involves:api
部分方法摘要缓存
//validate the jobs output specs checkSpecs(job); Configuration conf = job.getConfiguration(); addMRFrameworkToDistributedCache(conf); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
根据源码看,添加资源至 分布式缓存应该是第二步 才对app
1 检查job输出路径分布式
/** * Check for validity of the output-specification for the job. * * <p>This is to validate the output specification for the job when it is * a job is submitted. Typically checks that it does not already exist, * throwing an exception when it already exists, so that output is not * overwritten.</p> * * @param ignored * @param job job configuration. * @throws IOException when output should not be attempted */ void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
从源码注释上能够看出,为了不数据的丢失,若是输出路径存在,这里会抛出异常。ide
2 检查资源权限oop
/** * Initializes the staging directory and returns the path. It also * keeps track of all necessary ownership & permissions * @param cluster * @param conf */ public static Path getStagingDir(Cluster cluster, Configuration conf) throws IOException,InterruptedException {
在后面的方法中会根据这个方法输出的path,再次进行校验ui
/** * Make sure that a path specifies a FileSystem. * @param path to use */ public Path makeQualified(Path path) { checkPath(path); return path.makeQualified(this.getUri(), this.getWorkingDirectory()); }
3 计算分片this
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
在新的api中,分片的切割是根据InputFormat切割的,再看源码getSplits方法code
/** * Logically split the set of input files for the job. * * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper} * for processing.</p> * * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the * input files are not physically split into chunks. For e.g. a split could * be <i><input-file-path, start, offset></i> tuple. The InputFormat * also creates the {@link RecordReader} to read the {@link InputSplit}. * * @param context job configuration. * @return an array of {@link InputSplit}s for the job. */ public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException;
注释中有最重要的一条,这个不是物理上的切割。mapreduce程序默认使用TextInputFormat方法,TextInputFormat类继承了FileInputFormat,FileInputFormat是一个基类,里面实现了getSplits()方法。
/** * Generate the list of files and make them into FileSplits. * @param job the job context * @throws IOException */ public List<InputSplit> getSplits(JobContext job) throws IOException {
再看,切分大小是如何计算的:
protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
maxSize 是用户设置属性 mapreduce.input.fileinputformat.split.maxsize 的大小,默认是Long.MAX_VALUE
minSize 能够经过属性 mapreduce.input.fileinputformat.split.minsize 控制,默认是1L
因此 默认状况下,分片的大小是blockSize。
回归主线,关于InputFormat,之后详细的分析。
在获取了List
private static class SplitComparator implements Comparator<InputSplit> { @Override public int compare(InputSplit o1, InputSplit o2) { try { long len1 = o1.getLength(); long len2 = o2.getLength(); if (len1 < len2) { return 1; } else if (len1 == len2) { return 0; } else { return -1; } } catch (IOException ie) { throw new RuntimeException("exception in compare", ie); } catch (InterruptedException ie) { throw new RuntimeException("exception in compare", ie); } } }
旧api
//method to write splits for old api mapper. private int writeOldSplits(JobConf job, Path jobSubmitDir) throws IOException { org.apache.hadoop.mapred.InputSplit[] splits = job.getInputFormat().getSplits(job, job.getNumMapTasks()); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() { public int compare(org.apache.hadoop.mapred.InputSplit a, org.apache.hadoop.mapred.InputSplit b) { try { long left = a.getLength(); long right = b.getLength(); if (left == right) { return 0; } else if (left < right) { return 1; } else { return -1; } } catch (IOException ie) { throw new RuntimeException("Problem getting input split size", ie); } } }); JobSplitWriter.createSplitFiles(jobSubmitDir, job, jobSubmitDir.getFileSystem(job), splits); return splits.length; }
旧api 实现相似。
4 提交做业
// Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
等等,是否是缺乏了什么?哪里有提交job jar了?在计算maps 前就已upload了文件 注释顺序并不必定是代码顺序!!!
/** * configure the jobconf of the user with the command line options of * -libjars, -files, -archives. * @param job * @throws IOException */ private void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException { JobResourceUploader rUploader = new JobResourceUploader(jtFs); rUploader.uploadFiles(job, jobSubmitDir); // Set the working directory if (job.getWorkingDirectory() == null) { job.setWorkingDirectory(jtFs.getWorkingDirectory()); } }
顺便提一下hadoop job 四种状态
RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5);