当map task开始运算,并产生中间数据时,其产生的中间结果并不是直接就简单的写入磁盘。这中间的过程比较复杂,而且利用到了内存buffer来进行已经产生的部分结果的缓存,并在内存buffer中进行一些预排序来优化整个map的性能。如上图所示,每个map都会对应存在一个内存buffer(MapOutputBuffer,即上图的buffer in memory),map会将已经产生的部分结果先写入到该buffer中,这个buffer默认是100MB大小,可是这个大小是能够根据job提交时的参数设定来调整的,该参数即为:io.sort.mb。当map的产生数据很是大时,而且把io.sort.mb调大,那么map在整个计算过程当中spill的次数就势必会下降,map task对磁盘的操做就会变少,若是map tasks的瓶颈在磁盘上,这样调整就会大大提升map的计算性能。map作sort和spill的内存结构以下如所示:java
map在运行过程当中,不停的向该buffer中写入已有的计算结果,可是该buffer并不必定能将所有的map输出缓存下来,当map输出超出必定阈值(好比100M),那么map就必须将该buffer中的数据写入到磁盘中去,这个过程在mapreduce中叫作spill。map并非要等到将该buffer所有写满时才进行spill,由于若是所有写满了再去写spill,势必会形成map的计算部分等待buffer释放空间的状况。因此,map实际上是当buffer被写满到必定程度(好比80%)时,就开始进行spill。这个阈值也是由一个job的配置参数来控制,即io.sort.spill.percent,默认为0.80或80%。这个参数一样也是影响spill频繁程度,进而影响map task运行周期对磁盘的读写频率的。但非特殊状况下,一般不须要人为的调整。调整io.sort.mb对用户来讲更加方便。算法
当map task的计算部分所有完成后,若是map有输出,就会生成一个或者多个spill文件,这些文件就是map的输出结果。map在正常退出以前,须要将这些spill合并(merge)成一个,因此map在结束以前还有一个merge的过程。merge的过程当中,有一个参数能够调整这个过程的行为,该参数为:io.sort.factor。该参数默认为10。它表示当merge spill文件时,最多能有多少并行的stream向merge文件中写入。好比若是map产生的数据很是的大,产生的spill文件大于10,而io.sort.factor使用的是默认的10,那么当map计算完成作merge时,就没有办法一次将全部的spill文件merge成一个,而是会分屡次,每次最多10个stream。这也就是说,当map的中间结果很是大,调大io.sort.factor,有利于减小merge次数,进而减小map对磁盘的读写频率,有可能达到优化做业的目的。apache
当job指定了combiner的时候,咱们都知道map介绍后会在map端根据combiner定义的函数将map结果进行合并。运行combiner函数的时机有可能会是merge完成以前,或者以后,这个时机能够由一个参数控制,即min.num.spill.for.combine(default 3),当job中设定了combiner,而且spill数最少有3个的时候,那么combiner函数就会在merge产生结果文件以前运行。经过这样的方式,就能够在spill很是多须要merge,而且不少数据须要作conbine的时候,减小写入到磁盘文件的数据数量,一样是为了减小对磁盘的读写频率,有可能达到优化做业的目的。缓存
减小中间结果读写进出磁盘的方法不止这些,还有就是压缩。也就是说map的中间,不管是spill的时候,仍是最后merge产生的结果文件,都是能够压缩的。压缩的好处在于,经过压缩减小写入读出磁盘的数据量。对中间结果很是大,磁盘速度成为map执行瓶颈的job,尤为有用。控制map中间结果是否使用压缩的参数为:mapred.compress.map.output(true/false)。将这个参数设置为true时,那么map在写中间结果时,就会将数据压缩后再写入磁盘,读结果时也会采用先解压后读取数据。这样作的后果就是:写入磁盘的中间结果数据量会变少,可是cpu会消耗一些用来压缩和解压。因此这种方式一般适合job中间结果很是大,瓶颈不在cpu,而是在磁盘的读写的状况。说的直白一些就是用cpu换IO。根据观察,一般大部分的做业cpu都不是瓶颈,除非运算逻辑异常复杂。因此对中间结果采用压缩一般来讲是有收益的。如下是一个wordcount中间结果采用压缩和不采用压缩产生的map中间结果本地磁盘读写的数据量对比:网络
map中间结果不压缩:并发
map中间结果压缩:ide
能够看出,一样的job,一样的数据,在采用压缩的状况下,map中间结果能缩小将近10倍,若是map的瓶颈在磁盘,那么job的性能提高将会很是可观。函数
当采用map中间结果压缩的状况下,用户还能够选择压缩时采用哪一种压缩格式进行压缩,如今hadoop支持的压缩格式有:GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等压缩格式。一般来讲,想要达到比较平衡的cpu和磁盘压缩比,LzoCodec比较适合。但也要取决于job的具体状况。用户若想要自行选择中间结果的压缩算法,能够设置配置参数:mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec或者其余用户自行选择的压缩方式。oop
选项 | 类型 | 默认值 | 描述 |
io.sort.mb | int | 100 | 缓存map中间结果的buffer大小(in MB) |
io.sort.record.percent | float | 0.05 | io.sort.mb中用来保存map output记录边界的百分比,其余缓存用来保存数据 |
io.sort.spill.percent | float | 0.80 | map开始作spill操做的阈值 |
io.sort.factor | int | 10 | 作merge操做时同时操做的stream数上限。 |
min.num.spill.for.combine | int | 3 | combiner函数运行的最小spill数 |
mapred.compress.map.output | boolean | false | map中间结果是否采用压缩 |
mapred.map.output.compression.codec | class name | org.apache.hadoop.io. compress.DefaultCodec性能 |
map中间结果的压缩格式 |
reduce的运行是分红三个阶段的。分别为copy->sort->reduce。因为job的每个map都会根据reduce(n)数将数据分红map 输出结果分红n个partition,因此map的中间结果中是有可能包含每个reduce须要处理的部分数据的。因此,为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,全部的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据。这个过程就是一般所说的shuffle,也就是copy过程。
Reduce task在作shuffle时,实际上就是从不一样的已经完成的map上去下载属于本身这个reduce的部分数据,因为map一般有许多个,因此对一个reduce来讲,下载也能够是并行的从多个map下载,这个并行度是能够调整的,调整参数为:mapred.reduce.parallel.copies(default 5)。默认状况下,每一个只会有5个并行的下载线程在从map下数据,若是一个时间段内job完成的map有100个或者更多,那么reduce也最多只能同时下载5个map的数据,因此这个参数比较适合map不少而且完成的比较快的job的状况下调大,有利于reduce更快的获取属于本身部分的数据。
reduce的每个下载线程在下载某个map数据的时候,有可能由于那个map中间结果所在机器发生错误,或者中间结果的文件丢失,或者网络瞬断等等状况,这样reduce的下载就有可能失败,因此reduce的下载线程并不会无休止的等待下去,当必定时间后下载仍然失败,那么下载线程就会放弃此次下载,并在随后尝试从另外的地方下载(由于这段时间map可能重跑)。因此reduce下载线程的这个最大的下载时间段是能够调整的,调整参数为:mapred.reduce.copy.backoff(default 300秒)。若是集群环境的网络自己是瓶颈,那么用户能够经过调大这个参数来避免reduce下载线程被误判为失败的状况。不过在网络环境比较好的状况下,没有必要调整。一般来讲专业的集群网络不该该有太大问题,因此这个参数须要调整的状况很少。
Reduce将map结果下载到本地时,一样也是须要进行merge的,因此io.sort.factor的配置选项一样会影响reduce进行merge时的行为,该参数的详细介绍上文已经提到,当发现reduce在shuffle阶段iowait很是的高的时候,就有可能经过调大这个参数来加大一次merge时的并发吞吐,优化reduce效率。
Reduce在shuffle阶段对下载来的map数据,并非马上就写入磁盘的,而是会先缓存在内存中,而后当使用内存达到必定量的时候才刷入磁盘。这个内存大小的控制就不像map同样能够经过io.sort.mb来设定了,而是经过另一个参数来设置:mapred.job.shuffle.input.buffer.percent(default 0.7),这个参数实际上是一个百分比,意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。也就是说,若是该reduce task的最大heap使用量(一般经过mapred.child.java.opts来设置,好比设置为-Xmx1024m)的必定比例用来缓存数据。默认状况下,reduce会使用其heapsize的70%来在内存中缓存数据。若是reduce的heap因为业务缘由调整的比较大,相应的缓存大小也会变大,这也是为何reduce用来作缓存的参数是一个百分比,而不是一个固定的值了。
假设mapred.job.shuffle.input.buffer.percent为0.7,reduce task的max heapsize为1G,那么用来作下载数据缓存的内存就为大概700MB左右,这700M的内存,跟map端同样,也不是要等到所有写满才会往磁盘刷的,而是当这700M中被使用到了必定的限度(一般是一个百分比),就会开始往磁盘刷。这个限度阈值也是能够经过job参数来设定的,设定参数为:mapred.job.shuffle.merge.percent(default 0.66)。若是下载速度很快,很容易就把内存缓存撑大,那么调整一下这个参数有可能会对reduce的性能有所帮助。
当reduce将全部的map上对应本身partition的数据下载完成后,就会开始真正的reduce计算阶段(中间有个sort阶段一般时间很是短,几秒钟就完成了,由于整个下载阶段就已是边下载边sort,而后边merge的)。当reduce task真正进入reduce函数的计算阶段的时候,有一个参数也是能够调整reduce的计算行为。也就是:mapred.job.reduce.input.buffer.percent(default 0.0)。因为reduce计算时确定也是须要消耗内存的,而在读取reduce须要的数据时,一样是须要内存做为buffer,这个参数是控制,须要多少的内存百分比来做为reduce读已经sort好的数据的buffer百分比。默认状况下为0,也就是说,默认状况下,reduce是所有从磁盘开始读处理数据。若是这个参数大于0,那么就会有必定量的数据被缓存在内存并输送给reduce,当reduce计算逻辑消耗内存很小时,能够分一部份内存用来缓存数据,反正reduce的内存闲着也是闲着。
选项 | 类型 | 默认值 | 描述 |
mapred.reduce.parallel.copies | int | 5 | 每一个reduce并行下载map结果的最大线程数 |
mapred.reduce.copy.backoff | int | 300 | reduce下载线程最大等待时间(in sec) |
io.sort.factor | int | 10 | 同上 |
mapred.job.shuffle.input.buffer.percent | float | 0.7 | 用来缓存shuffle数据的reduce task heap百分比 |
mapred.job.shuffle.merge.percent | float | 0.66 | 缓存的内存中多少百分比后开始作merge操做 |
mapred.job.reduce.input.buffer.percent | float | 0.0 | sort完成后reduce计算阶段用来缓存数据的百分比 |