深度分析如何在Hadoop中控制Map的数量

深度分析如何在Hadoop中控制Map的数量

guibin.beijing@gmail.comhtml


不少文档中描述,Mapper的数量在默认状况下不可直接控制干预,由于Mapper的数量由输入的大小和个数决定。在默认状况下,最终input 占据了多少block,就应该启动多少个Mapper。若是输入的文件数量巨大,可是每一个文件的size都小于HDFS的blockSize,那么会形成 启动的Mapper等于文件的数量(即每一个文件都占据了一个block),那么极可能形成启动的Mapper数量超出限制而致使崩溃。这些逻辑确实是正确 的,但都是在默认状况下的逻辑。其实若是进行一些客户化的设置,就能够控制了。
java

在Hadoop中,设置Map task的数量不像设置Reduce task数量那样直接,即:不可以经过API直接精确的告诉Hadoop应该启动多少个Map task。node

你也许奇怪了,在API中不是提供了接口 org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)吗?这个值难道不能够设置Map task的数量吗?这个API的确没错,在文档上解释”Note: This is only a hint to the framework.“,即这个值对Hadoop的框架来讲仅仅是个提示,不起决定性的做用。也就是说,即使你设置了,也不必定获得你想要的效果。数据库


1. InputFormat介绍

在具体设置Map task数量以前,很是有必要了解一下与Map-Reduce输入相关的基础知识。apache

这个接口(org.apache.hadoop.mapred.InputFormat)描述了Map-Reduce job的输入规格说明(input-specification),它将全部的输入文件分割成逻辑上的InputSplit,每个InputSplit将会分给一个单独的mapper;它还提供RecordReader的具体实现,这个Reader从逻辑的InputSplit上获取input records并传给Mapper处理。app

InputFormat有多种具体实现,诸如FileInputFormat(处理基于文件的输入的基础抽象类), DBInputFormat(处理基于数据库的输入,数据来自于一个能用SQL查询的表),KeyValueTextInputFormat(特 殊的FineInputFormat,处理Plain Text File,文件由回车或者回车换行符分割成行,每一行由key.value.separator.in.input.line分割成Key和 Value),CompositeInputFormat,DelegatingInputFormat等。在绝大多数应用场景中都会使用 FileInputFormat及其子类型。框架

经过以上的简单介绍,咱们知道InputFormat决定着InputSplit,每一个InputSplit会分配给一个单独的Mapper,所以InputFormat决定了具体的Map task数量svn


2. FileInputFormat中影响Map数量的因素

在平常使用中,FileInputFormat是最经常使用的InputFormat,它有不少具体的实现。如下分析的影响Map数量的因素仅对 FileInputFormat及其子类有效,其余非FileInputFormat能够去查看相应的 getSplits(JobConf job, int numSplits) 具体实现便可。函数

请看以下代码段(摘抄自org.apache.hadoop.mapred.FileInputFormat.getSplits,hadoop-0.20.205.0源代码):oop

[java] view plain copy
  1. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);  
  2. long minSize = Math.max(job.getLong("mapred.min.split.size", 1), minSplitSize);  
  3.   
  4. for (FileStatus file: files) {  
  5.   Path path = file.getPath();  
  6.   FileSystem fs = path.getFileSystem(job);  
  7.   if ((length != 0) && isSplitable(fs, path)) {   
  8.     long blockSize = file.getBlockSize();  
  9.     long splitSize = computeSplitSize(goalSize, minSize, blockSize);  
  10.       
  11.     long bytesRemaining = length;  
  12.     while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
  13.       String[] splitHosts = getSplitHosts(blkLocations,length-bytesRemaining, splitSize, clusterMap);  
  14.       splits.add(new FileSplit(path, length-bytesRemaining, splitSize, splitHosts));  
  15.       bytesRemaining -= splitSize;  
  16.     }  
  17.   
  18.     if (bytesRemaining != 0) {  
  19.       splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));  
  20.     }  
  21.   } else if (length != 0) {  
  22.     String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);  
  23.     splits.add(new FileSplit(path, 0, length, splitHosts));  
  24.   } else {   
  25.     //Create empty hosts array for zero length files  
  26.     splits.add(new FileSplit(path, 0, length, new String[0]));  
  27.   }  
  28. }  
  29.   
  30. return splits.toArray(new FileSplit[splits.size()]);  
  31.   
  32. protected long computeSplitSize(long goalSize, long minSize, long blockSize) {  
  33.     return Math.max(minSize, Math.min(goalSize, blockSize));  
  34. }  

totalSize:是整个Map-Reduce job全部输入的总大小。

numSplits:来自job.getNumMapTasks(),即在job启动时用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)设置的值,给M-R框架的Map数量的提示。

goalSize:是输入总大小与提示Map task数量的比值,即指望每一个Mapper处理多少的数据,仅仅是指望,具体处理的数据数由下面的computeSplitSize决定。

minSplitSize:默认为1,可由子类复写函数protected void setMinSplitSize(long minSplitSize) 从新设置。通常状况下,都为1,特殊状况除外

minSize:取的1和mapred.min.split.size中较大的一个。

blockSize:HDFS的块大小,默认为64M,通常大的HDFS都设置成128M。

splitSize:就是最终每一个Split的大小,那么Map的数量基本上就是totalSize/splitSize。

接下来看看computeSplitSize的逻辑:首先在goalSize(指望每一个Mapper处理的数据量)和HDFS的block size中取较小的,而后与mapred.min.split.size相比取较大的


3. 如何调整Map的数量

有了2的分析,下面调整Map的数量就很容易了。


3.1 减少Map-Reduce job 启动时建立的Mapper数量

当处理大批量的大数据时,一种常见的状况是job启动的mapper数量太多而超出了系统限制,致使Hadoop抛出异常终止执行。解决这种异常的思路是减小mapper的数量。具体以下:

3.1.1 输入文件size巨大,但不是小文件

这种状况能够经过增大每一个mapper的input size,即增大minSize或者增大blockSize来减小所需的mapper的数量。增大blockSize一般不可行,由于当HDFS被 hadoop namenode -format以后,blockSize就已经肯定了(由格式化时dfs.block.size决定),若是要更改blockSize,须要从新格式化 HDFS,这样固然会丢失已有的数据。因此一般状况下只能经过增大minSize,即增大mapred.min.split.size的值。


3.1.2 输入文件数量巨大,且都是小文件

所谓小文件,就是单个文件的size小于blockSize。这种状况经过增大mapred.min.split.size不可行,须要使用 FileInputFormat衍生的CombineFileInputFormat将多个input path合并成一个InputSplit送给mapper处理,从而减小mapper的数量。具体细节稍后会更新并展开。


3.2 增长Map-Reduce job 启动时建立的Mapper数量

增长mapper的数量,能够经过减少每一个mapper的输入作到,即减少blockSize或者减少mapred.min.split.size的值。


参考资料

http://yaseminavcular.blogspot.com/2011/06/how-to-set-number-of-maps-with-hadoop.html

http://svn.apache.org/repos/asf/hadoop/common/tags/release-0.20.205.0

相关文章
相关标签/搜索