默认一个block对应一个分片,对应一个map taskjava
map业务处理算法
map 输出的数据并非直接写入磁盘的,而是会预先存储在一个预约义的buffer中apache
对map的输出结果进行分区,按照key进行排序和分组缓存
至关于本地端的reduce网络
将最终的数据进行merge以后输出到磁盘中,等待shuffleapp
对map结果进行了一次reduce操做,减小了map的输出结果和reduce的远程拷贝数据量,使得Map task和 Reduce task的执行时间缩短负载均衡
好比,要处理整型类型数据时,输出类型IntWritable就比Text高效(须要转换)
若是输出整数的大部分能够用一个或者两个字节保存,可直接采用VIntWritable,VLongWritable,采用变长整型的编码方式,减小数据输出量。socket
map的第一步,从磁盘读取数据并切片。当读取海量的小文件时,会启动大量的map task,效率很是慢,能够经过CombineInputFormat 自定义分片策略对小文件进行合并,从而减小map task的数量,减小map 时间,此外:ide
当mapred.min.split.size小于dfs.block.size的时候,一个block会被分为多个分片,也就是对应多个map task。函数
当mapred.min.split.size大于dfs.block.size的时候,一个分片可能对应多个block,也就是一个map task读取多个block数据。
集群的网络、IO等性能很好的时候,建议调高dfs.block.size。
根据数据源的特性,主要调整mapred.min.split.size来控制map task的数量
设置Buffer话能够减小map任务的IO开销,从而提升性能。
首先将map结果输出到buffer,当缓存的使用量超过80%的时候,开始溢出到磁盘(splll),buffer默认100M,能够经过io.sort.mb设置。
可是若是将io.sort.mb调的很是大的时候,对机器的配置要求就很是高,由于占用内存过大,因此须要根据状况进行配置。
map并非等buffer写满才spill,经过io.sort.spill.percent能够调整。这个会影响spill的频繁程度,进而影响map task
该阶段是map产生spill以后,对spill进行处理的过程,经过对其进行配置也能够达到优化IO开销的目的。
merge过程是并行处理spill的,每次并行多少个spill是由参数io.sort.factor指定的,默认为10个
若是产生的spill很是多,merge的时候每次只能处理10个spill,那么仍是会形成频繁的IO处理
适当的调大每次并行处理的spill数有利于减小merge数所以能够影响map的性能
可是若是调整的数值过大,并行处理spill的进程过多会对机器形成很大压力
咱们知道若是map side设置了Combiner,那么会根据设定的函数对map输出的数据进行一次类reduce的预处理
可是和分组、排序分组不同的是,combine发生的阶段多是在merge以前,也多是在merge以后
这个时机能够由一个参数控制:min.num.spill.for.combine,默认值为3 当job中设定了combiner,而且spill数最少有3个的时候,那么combiner函数就会在merge产生结果文件以前运行
例如,产生的spill很是多,虽然咱们能够经过merge阶段的io.sort.factor进行优化配置,可是在此以前咱们还能够经过先执行combine对结果进行处理以后再对数据进行merge 这样一来,到merge阶段的数据量将会进一步减小,IO开销也会被降到最低
其实不管是spill的时候,仍是最后merge产生的结果文件,都是能够压缩的,控制输出是否使用压缩的参数是mapred.compress.map.output,值为true或者false.启用压缩以后,会牺牲CPU的一些计算资源,可是能够节省IO开销,很是适合IO密集型的做业(若是是CPU密集型的做业不建议设置)
设置压缩的时候,咱们能够选择不一样的压缩算法 Hadoop默认提供了GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等压缩格式
一般来讲,想要达到比较平衡的cpu和磁盘压缩比,LzoCodec比较合适,但也要取决于job的具体状况
若是想要自行选择中间结果的压缩算法,能够设置配置参数:
mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec
//或者其余用户自行选择的压缩方式
==map端的性能瓶颈都是频繁的IO操做形成的==,全部的优化也都是针对IO进行的,而优化的瓶颈又很大程度上被机器的配置等外部因素所限制
选项 | 类型 | 默认值 | 描述 |
---|---|---|---|
mapred.min.split.size | int | 1 | Input Split的最小值 |
mapred.max.split.size | int | . | Input Split的最大值 |
io.sort.mb | int | 100 | map缓冲区大小 |
io.sort.spill.percent | float | 0.8 | 缓冲区阈值 |
io.sort.factor | int | 10 | 并行处理spill的个数 |
min.num.spill.for.combine | int | 3 | 最少有多少个spill的时候combine在merge以前进行 |
mapred.compress.map.output | boolean | false | map中间数据是否采用压缩 |
mapred.map.output.compression.codec | String | . | 压缩算法 |
每个job都会将map输出结果根据reduce(n)分红n个partition。因此,为了节省时间,==在第一个map结束后,全部reduce就开始尝试从完成的map中下载该reduce对应的partition==
在这个shuffle过程当中,因为map的数量一般是不少个的,而每一个map中又都有可能包含每一个reduce所须要的数据
因此对于每一个reduce来讲,去各个map中拿数据也是并行的,能够经过mapred.reduce.parallel.copies这个参数来调整,默认为5
当map数量不少的时候,就能够适当调大这个值,减小shuffle过程使用的时间 还有一种状况是:reduce从map中拿数据的时候,有可能由于中间结果丢失、网络等其余缘由致使map任务失败
而reduce不会由于map失败就永无止境的等待下去,它会尝试去别的地方得到本身的数据(这段时间失败的map可能会被重跑) 因此设置reduce获取数据的超时时间能够避免一些由于网络很差致使没法得到数据的状况
mapred.reduce.copy.backoff,默认300s,通常状况下不用调整这个值,由于生产环境的网络都是很流畅的
因为reduce是并行将map结果下载到本地,因此也是须要进行merge的,因此io.sort.factor的配置选项一样会影响reduce进行merge时的行为
和map同样,reduce下载过来的数据也是存入一个buffer中而不是立刻写入磁盘的,因此咱们一样能够控制这个值来减小IO开销 控制该值的参数为: mapred.job.shuffle.input.buffer.percent,默认0.7,这是一个百分比,意思是reduce的可用内存中拿出70%做为buffer存放数据
reduce的可用内存经过mapred.child.Java.opts来设置,好比置为-Xmx1024m,该参数是同时设定map和reduce task的可用内存,通常为map buffer大小的两倍左右
设置了reduce端的buffer大小,咱们一样能够经过一个参数来控制buffer中的数据达到一个阈值的时候开始往磁盘写数据:mapred.job.shuffle.merge.percent,默认为0.66
sort的过程通常很是短,由于是边copy边merge边sort的,后面就直接进入真正的reduce计算阶段了
以前咱们说过reduc端的buffer,默认状况下,数据达到一个阈值的时候,buffer中的数据就会写入磁盘,而后reduce会从磁盘中得到全部的数据
也就是说,buffer和reduce是没有直接关联的,中间多个一个写磁盘->读磁盘的过程,既然有这个弊端,那么就能够经过参数来配置使得buffer中的一部分数据能够直接输送到reduce,从而减小IO开销:==mapred.job.reduce.input.buffer.percent==,默认为0.0
当值大于0的时候,会保留指定比例的内存读buffer中的数据直接拿给reduce使用 这样一来,设置buffer须要内存,读取数据须要内存,reduce计算也要内存,因此要根据做业的运行状况进行调整
和map阶段差很少,reduce节点的调优也是主要集中在加大内存使用量,减小IO,增大并行数
reduce调优主要参数:
选项 | 类型 | 默认值 | 描述 |
---|---|---|---|
mapred.reduce.parallel.copies | int | 5 | 每一个reduce去map中拿数据的并行数 |
mapred.reduce.copy.backoff | int | 300 | 获取map数据最大超时时间 |
mapred.job.shuffle.input.buffer.percent | float | 0.7 | buffer大小占reduce可用内存的比例 |
mapred.child.java.opts | String | . | -Xmx1024m设置reduce可用内存为1g |
mapred.job.shuffle.merge.percent | float | 0.66 | buffer中的数据达到多少比例开始写入磁盘 |
mapred.job.reduce.input.buffer.percent | float | 0.0 | 指定多少比例的内存用来存放buffer中的数据 |
Map Task和Reduce Task调优的一个原则就是
减小数据的传输量
尽可能使用内存
减小磁盘IO的次数
增大任务并行数
除此以外还有根据本身集群及网络的实际状况来调优
每一个做业启动的mapper由输入的分片数决定,每一个节点启动的mapper数应该是在10-100之间,且最好每一个map的执行时间至少一分钟 若是输入的文件巨大,会产生无数个mapper的状况,应该使用mapred.tasktracker.map.tasks.maximum参数肯定每一个tasktracker可以启动的最大mapper数,默认只有2 以避免同时启动过多的mapper
reducer的启动数量官方建议是0.95或者1.75节点数每一个节点的container数 使用0.95的时候reduce只须要一轮就能够完成 使用1.75的时候完成较快的reducer会进行第二轮计算,并进行负载均衡
增长reducer的数量会增长集群的负担,可是会获得较好的负载均衡结果和减低失败成本
一些详细的参数:
选项 | 类型 | 默认值 | 描述 |
---|---|---|---|
mapred.reduce.tasks | int | 1 | reduce task数量 |
mapred.tasktracker.map.tasks.maximum | int | 2 | 每一个节点上可以启动map task的最大数量 |
mapred.tasktracker.reduce.tasks.maximum | int | 2 | 每一个节点上可以启动reduce task的最大数量 |
mapred.reduce.slowstart.completed.maps | float | 0.05 | map阶段完成5%的时候开始进行reduce计算 |
map和reduce task是同时启动的,很长一段时间是并存的,共存的时间取决于mapred.reduce.slowstart.completed.maps的设置,若是设置为0.6.那么reduce将在map完成60%后进入运行态
若是设置的map和reduce参数都很大,势必形成map和reduce争抢资源,形成有些进程饥饿,超时出错,最大的可能就是socket.timeout的出错
reduce是在33%的时候完成shuffle过程,因此==确保reduce进行到33%的时候map任务所有完成==,能够经过观察任务界面的完成度进行调整。当reduce到达33%的时候,map刚好达到100%设置最佳的比例,可让map先完成,可是不要让reduce等待计算资源