经过本章节,您能够学习到:前端
1)job提交流程源码详解 waitForCompletion() submit(); // 1创建链接 connect(); // 1)建立提交job的代理 new Cluster(getConfiguration()); // (1)判断是本地yarn仍是远程 initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster) // 1)建立给集群提交数据的Stag路径 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)获取jobid ,并建立job路径 JobID jobId = submitClient.getNewJobID(); // 3)拷贝jar包到集群 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)计算切片,生成切片规划文件 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向Stag路径写xml配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交job,返回提交状态 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
注意以上代码只是大过程的提取,并非连续的某处的代码。要了解详细的过程,能够经过编译器打断点了解。java
红色划分是均分方式,这种方式比较低下。apache
而当前采用的是蓝色方式,以一个块为一个切片。大体流程以下:oop
block是HDFS上物理上存储的存储的数据,切片是对数据逻辑上的划分。源码分析
经过如下的学习,咱们能够总结出如下三个结论:学习
举个例子加入咱们有如下两个文件this
file1.txt 320M file2.txt 10M
通过FileInputFormat的切片机制运算后,默认配置下造成的切片信息以下:debug
file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M
经过分析源码org.apache.hadoop.mapreduce.lib.input.FileInputFormat
,咱们先来看看他的父类InputFormat代理
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.apache.hadoop.mapreduce; import java.io.IOException; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @Public @Stable public abstract class InputFormat<K, V> { public InputFormat() { } public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException; public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException; }
父类规定了两个抽象方法getSplits以及RecordReader。code
再来看看FileInputFormat计算分片大小的相关代码:
public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = (new StopWatch()).start(); long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); List<InputSplit> splits = new ArrayList(); List<FileStatus> files = this.listStatus(job); Iterator var9 = files.iterator(); while(true) { while(true) { while(var9.hasNext()) { FileStatus file = (FileStatus)var9.next(); Path path = file.getPath(); long length = file.getLen(); if (length != 0L) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus)file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0L, length); } if (this.isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining; int blkIndex; for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } if (bytesRemaining != 0L) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, new String[0])); } } job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; } } }
从中咱们能够了解到,计算分片大小的逻辑为
// 初始化值 long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); ... // 计算分片大小 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); ... protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } ... // minSize默认值为1L protected long getFormatMinSplitSize() { return 1L; }
也就说,切片主要由这几个值来运算决定
mapreduce.input.fileinputformat.split.minsize=1 默认值为1 mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
所以,默认状况下,切片大小=blocksize。咱们不可贵到,要想修改分片的大小,彻底能够经过配置文件的mapreduce.input.fileinputformat.split.minsize
以及mapreduce.input.fileinputformat.split.maxsize
进行配置:
FileInputFormat有多个底层实现,2.7版本的jdk具备以下的继承树
默认状况下Job任务使用的是
// 根据文件类型获取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 获取切片的文件名称 String name = inputSplit.getPath().getName();
默认状况下TextInputformat对任务的切片机制是按文件规划切片,无论文件多小,都会是一个单独的切片,都会交给一个maptask,这样若是有大量小文件,就会产生大量的maptask,处理效率极其低下。最好的办法,在数据处理系统的最前端(预处理/采集),将小文件先合并成大文件,再上传到HDFS作后续分析。
若是已是大量小文件在HDFS中了,可使用另外一种InputFormat来作切片(CombineTextInputFormat),它的切片逻辑跟TextFileInputFormat不一样:它能够将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就能够交给一个maptask。
优先知足最小切片大小,不超过最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
若是不设置InputFormat,它默认用的是TextInputFormat.class,所以咱们须要手动指定InputFormat类型,在执行job以前指定:
job.setInputFormatClass(CombineTextInputFormat.class) CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
经过此设置以后,分片会变得更少一些,不会像以前同样,一个文件造成一个分片(文件太小的状况尤为浪费)。