学习Hadoop搞明白Shuffle的原理是很是重要的,然而相信不少人看了《Hadoop权威指南4》好几遍,也没有真正搞明白它真正的原理。看完这篇文章,相信会对你理解Shuffle有很大的帮助。java
官方给的定义:系统执行排序、将map输出做为输入传给reducer的过程称为Shuffle。(看完是否是一脸懵逼)
通俗来说,就是从map产生输出开始到reduce消化输入的整个过程称为Shuffle。以下图用黑线框出的部分:apache
圆形缓冲区介绍:app
每个map任务都会有一个圆形缓冲区。默认大小100MB(io.sort.mb属性)阈值0.8也就是80MB(io.sort.spill.percent属性指定) ,ide
一旦达到阈值一个后台线程开始把内容写到(spill)磁盘的指定目录mapred.local.dir下的新建的一个溢出写文件。写入磁盘前先partition、sort、[combiner]。一个map task任务可能产生N个磁盘文件。
map task运算完以后,产生了N个文件,而后将这些文件merge合成一个文件。
若是N < 3,合成的新文件写入磁盘前只通过patition(分区)和sort(排序)过程,不会执行combiner合并(不管是否指定combiner类),以下图所示:oop
若是N>=3,合成的新文件写入磁盘前通过patition(分区)、sort(排序)过和combiner合并(前提是指定了combiner类),以下图所示:学习
思考:为何只有当N>=3时,合成文件才会执行combiner呢?
这是由于若是N< 3时,执行combiner虽然减小了文件的大小,可是同时产生了必定的系统开销。因为减小的文件大小不大,权衡利弊后,肯定N< 2时不在执行combiner操做。
当该map task所有执行完以后,对应的reduce task将会拷贝对应分区的数据(该过程称为fetch),以下图所示:fetch
其它的map task任务完成后,对应的reduce task也一样执行fetch操做,以下图所示:优化
每一个map任务的完成时间可能不一样,所以只要有一个任务完成,reduce任务就开始复制其输出。该阶段被称为reduce的复制阶段。reduce任务有少许复制线程,所以可以并行取得map输出。默认值是5个线程,但这个默认值能够经过设置mapred.reduce.parallel.copies属性改变。线程
复制完全部map输出后,reduce任务进入合并阶段,该阶段将合并map输出,并维持其顺序排序(至关于执行了sort),若是指定了combiner,在写入磁盘前还会执行combiner操做。3d
那么具体是如何合并的呢?
合并因子默认是10,能够经过io.sort.factor属性设置。合并过程是循环进行了,可能叫通过多趟合并。目标是合并最小数量的文件以便知足最后一趟的合并系数。假设有40个文件,咱们不会在四趟中每趟合并10个文件从而获得4个文件。相反,第一趟只合并4个文件,随后的三趟分别合并10个文件。再最后一趟中4个已合并的文件和余下的6个(未合并的)文件合计10个文件。具体流程以下图所示:
注意:这并无改变合并次数,它只是一个优化措施,目的是尽可能减小写到磁盘的数据量,由于最后一趟老是直接合并到reduce。
看到这里您是否理解了Shuffle的具体原理呢,若是没有,也没有关系,接下来咱们经过一个wordcount案例再将整个流程梳理一遍。
首先map任务的代码以下:
public class WCMapper extends Mapper< LongWritable, Text, Text, LongWritable> { public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { String line = ivalue.toString(); String words[] = line.split(" "); for (String word : words) { context.write(new Text(word), new LongWritable(1)); } } }
在分区(分区规则:按首字母分四个区,分别为a-i,j-q,r-z,其它)的过程当中,会将相同的单词合并到一块儿,将出现次数用逗号隔开,如上图所示。注意此时尚未排序。分区代码以下:
package cn.geekmooc; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class WCPatitioner extends Partitioner<Text, LongWritable> { @Override public int getPartition(Text key, LongWritable value, int numPartitions) { int first_char = key.charAt(0); if(first_char>=97&&first_char<=105){ return 0; }else if(first_char>=106&&first_char<=113){ return 1; }else if(first_char>=114&&first_char<=122){ return 2; }else{ return 3; } } }
接着执行排序操做,默认排序规则是按照key的字典升序排序,固然你也能够指定排序规则,排序后以下图所示:
接下来执行combiner操做,将每一个单词后续的1求和,WCCombiner类代码以下:
package cn.geekmooc; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { System.out.println(key.toString()+":"+values.toString()); long count = 0; Iterator<LongWritable> iter = values.iterator(); while(iter.hasNext()){ count += iter.next().get(); } context.write(key, new LongWritable(count)); } }
combiner的结果以下图所示
map任务执行完,产生N个spill文件,接着对N个文件进行合并,分如下两种状况:
1.N < 3,不管是否指定combiner类,合并文件时都不会执行combiner
2.N>=3,若是指定了combiner类将执行combiner操做,以下图:
接下来进入fetch(或copy)阶段
而后在reduce端进行合并
而后执行最后一趟合并,并将结果直接传给reduce
reduce类代码以下:
package cn.geekmooc; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { System.out.println(key.toString()+":"+values.toString()); long count = 0; for (LongWritable val : values) { count += val.get(); } context.write(key, new LongWritable(count)); } }
reduce task执行后,输出结果: