Hadoop中的MapReduce的shuffle过程及调优

概述

Shuffle,即混洗、洗牌,顾名思义就是对数据打乱重新分配。Shuffle发生在Map输出至Reduce的输入过程之间。主要分为两部分

  1. Map任务输出的数据进行分组、合并、排序,并写入本地磁盘
  2. Reduce任务拉取数据进行合并、排序

 

Shuffle过程

 

Map端

  • Map端输出时,会先将数据写入内存的环形缓冲区,默认大小100M,可通过参数设置
  • 当缓冲区的内容大小达到阈值(默认0.8,可通过参数设置),便有一个后台线程将写入缓冲区的数据溢写到磁盘。溢写的过程中Map任务仍然可以写数据到缓冲区,一旦缓冲区写满,Map任务将会被阻塞,知道后台线程写磁盘结束
  • 数据溢写到磁盘时会计算输出key的分区,同一个分区的key分在一组并按照key排序,最后写入磁盘文件。如果设置了combiner,会在写磁盘前调用combiner函数进行聚合,目的是减少网络IO,且不会影响Reduce计算结果
  • 每一次溢写都会产生一个文件,Map结束后会产生多个文件,但最终会被合并成一个分区且有序的文件。合并文件时可通过参数设置合并的文件个数
  • 输出到磁盘的过程可以设置为压缩,默认不压缩,可通过参数设置

参数调优

参数名 默认值 说明
mapreduce.task.io.sort.mb

100MB

Map输出时所使用的内存缓冲区大小

mapreduce.map.sort.spill.percent

0.8

Map输出溢写到磁盘的内存阈值

mapreduce.task.io.sort.factor

10

排序文件时一次可以合并的流数
mapreduce.map.output.compress

false

Map输出是否压缩

mapredcue.map.output.compress.codec

Org.apache.hadoop.io.compress.DefaultCodec

Map输出压缩的编解码器

 

Reduce端

  • Reduce端主要涉及复制和排序。一个reduce任务需要从多个map端拉取对应分区的数据,只要map任务完成,reduce即可开始复制数据,可通过参数设置复制的并行度,默认为5
  • map任务完成后通过心跳统计欧诺个只Application Master,reduce端会有一个线程定期查询Application Master,以获取完成的map任务的位置,从而到对应位置的机器复制数据
  • reduce复制数据将数据写入内存,可通过参数设置可用的内存比
  • 如果数据大小达到一定阈值(可通过参数设置),或者复制的文件数达到阈值(可通过参数设置)则将内存的数据合并并溢写到磁盘。如果设置了combiner,则会在写入磁盘之前调用combiner函数进行聚合以减少写入磁盘的数据量
  • 复制结束后将会对数据进行排序,如果溢写产生的多个文件,将会进行文件合并,每次合并的文件数可通过参数设置。经过多次合并后最后一次合并排序将会直接作为reduce任务的输入

参数调优

参数 默认值 说明
mapreduce.reduce.shuffle.parallelcopies

5

并发复制的线程数
mapreduce.task.io.sort.factor

10

排序文件时一次可以合并的流数

mapreduce.reduce.shuffle.input.buffer.percent

0.7

shuffle的复制阶段,用来存放map输出缓冲区占reduce堆内存的百分比
mapreduce.reduce.shuffle.merge.percent

0.66

Reduce输出缓冲区的阈值,超过该比例将进行合并和溢写磁盘
mapreduce.reduce.merge.inmem.threshold

1000

阈值,当累计的map输出文件超过该值,进行合并和溢写磁盘,0或者复制意味着该参数无效,合并和溢写只由mapreduce.reduce.shuffle.merge.percent控制
mapreduce.reduce.input.buffer.percent

0.0

在reduce过程(开始reduce函数时),内存中保存map输出的空间占整个堆空间的比例。默认情况下,reduce任务开始前所有的map输出合并到磁盘。以便为reducer提供更多的内存,如果reducer需要的内存较少,可以增加此值以最小化磁盘访问次数

 

 

 

参考:

[1] http://www.javashuo.com/article/p-cbpyxgln-kb.html