hadoop之mapreduce详解(优化篇)

1、概述

     优化前咱们须要知道hadoop适合干什么活,适合什么场景,在工做中,咱们要知道业务是怎样的,能才结合平台资源达到最有优化。除了这些咱们固然还要知道mapreduce的执行过程,好比从文件的读取,map处理,shuffle过程,reduce处理,文件的输出或者存储。在工做中,每每平台的参数都是固定的,不可能为了某一个做业去修改整个平台的参数,因此在做业的执行过程当中,须要对做业进行单独的设定,这样既不会对其余做业产生影响,也能很好的提升做业的性能,提升优化的灵活性。html

如今回顾下hadoop的优点(适用场景):
一、可构建在廉价机器上,设备成本相对低
二、高容错性,HDFS将数据自动保存多个副本,副本丢失后,自动恢复,防止数据丢失或损坏
三、适合批处理,HDFS适合一次写入、屡次查询(读取)的状况,适合在已有的数据进行屡次分析,稳定性好
四、适合存储大文件,其中的大表示能够存储单个大文件,由于是分块存储,以及表示存储大量的数据算法

2、小文件优化

从概述中咱们知道,很明显hadoop适合大文件的处理和存储,那为何不适合小文件呢?sql

一、从存储方面来讲:hadoop的存储每一个文件都会在NameNode上记录元数据,若是一样大小的文件,文件很小的话,就会产生不少文件,形成NameNode的压力。
二、从读取方面来讲:一样大小的文件分为不少小文件的话,会增长磁盘寻址次数,下降性能
三、从计算方面来讲:咱们知道一个map默认处理一个分片或者一个小文件,若是map的启动时间都比数据处理的时间还要长,那么就会形成性能低,并且在map端溢写磁盘的时候每个map最终会产生reduce数量个数的中间结果,若是map数量特别多,就会形成临时文件不少,并且在reduce拉取数据的时候增长磁盘的IO。apache

好,咱们明白小文件形成的弊端以后,那咱们应该怎么处理这些小文件呢?网络

一、从源头干掉,也就是在hdfs上咱们不存储小文件,也就是数据上传hdfs的时候咱们就合并小文件
二、在FileInputFormat读取入数据的时候咱们使用实现类CombineFileInputFormat读取数据,在读取数据的时候进行合并。数据结构

3、数据倾斜问题优化

咱们都知道mapreduce是一个并行处理,那么处理的时间确定是做业中全部任务最慢的那个了,可谓木桶效应?为何会这样呢?app

一、数据倾斜,每一个reduce处理的数据量不是同一个级别的,全部致使有些已经跑完了,而有些跑的很慢。
二、还有可能就是某些做业所在的NodeManager有问题或者container有问题,致使做业执行缓慢。函数

那么为何会产生数据倾斜呢?oop

数据自己就不平衡,因此在默认的hashpartition时形成分区数据不一致问题,还有就是代码设计不合理等。post

那如何解决数据倾斜的问题呢?

一、既然默认的是hash算法进行分区,那咱们自定义分区,修改分区实现逻辑,结合业务特色,使得每一个分区数据基本平衡
二、既然有默认的分区算法,那么咱们能够修改分区的键,让其符合hash分区,而且使得最后的分区平衡,好比在key前加随机数n-key。
三、既然reduce处理慢,咱们能够增长reduce的内存和vcore呀,这样挺高性能就快了,虽然没从根本上解决问题,可是还有效果
四、既然一个reduce处理慢,那咱们能够增长reduce的个数来分摊一些压力呀,也不能根本解决问题,仍是有必定的效果。

那么若是不是数据倾斜带来的问题,而是节点服务有问题形成某些map和reduce执行缓慢呢?

那么咱们可使用推测执行呀,你跑的慢,咱们能够找个其余的节点重启同样的任务竞争,谁快谁为准。推测执行时以空间换时间的优化。会带来集群资源的浪费,会给集群增长压力,因此我司集群的推测执行都是关闭的。其实在做业执行的时候能够偷偷开启的呀

推测执行参数控制:

mapreduce.map.speculative
mapreduce.reduce.speculative

4、mapreduce过程优化

4.一、map端

上面咱们从hadoop的特性场景等聊了下mapreduce的优化,接下来咱们从mapreduce的执行过程进行优化。

好吧,咱们就从源头开始说,从数据的读取以及map数的肯定:

    在前面咱们聊太小文件的问题,因此在数据的读取这里也能够作优化,因此选择一个合适数据的文件的读取类(FIleInputFormat的实现类)也很重要咱们在做业提交的过程当中,会把jar,分片信息,资源信息提交到hdfs的临时目录,默认会有10个复本,经过参数mapreduce.client.submit.file.replication控制后期做业执行都会去下载这些东西到本地,中间会产生磁盘IO,因此若是集群很大的时候,能够增长该值,提升下载的效率。

分片的计算公式:

计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize))
minSize的默认值是1,而maxSize的默认值是long类型的最大值,便可得切片的默认大小是blockSize(128M)
maxSize参数若是调得比blocksize小,则会让切片变小,并且就等于配置的这个参数的值
minSize参数调的比blockSize大,则可让切片变得比blocksize还大

     由于map数没有具体的参数指定,因此咱们能够经过如上的公式调整切片的大小,这样咱们就能够设置map数了,那么问题来了,map数该如何设置呢?

这些东西必定要结合业务,map数太多,会产生不少中间结果,致使reduce拉取数据变慢,太少,每一个map处理的时间又很长,结合数据的需求,能够把map的执行时间调至到一分钟左右比较合适,那若是数据量就是很大呢,咱们有时候仍是须要控制map的数量,这个时候每一个map的执行时间就比较长了,那么咱们能够调整每一个map的资源来提高map的处理能力呀,我司就调整了mapreduce.map.memory.mb=3G(默认1G)mapreduce.map.cpu.vcores=1(默认也是1)

从源头上咱们肯定好map以后。那么接下来看map的具体执行过程咯。

首先写环形换冲区,那为啥要写环形换冲区呢,而不是直接写磁盘呢?这样的目的主要是为了减小磁盘i/o。

每一个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽量多的数据。执行流程是,该缓冲默认100M(mapreduce.task.io.sort.mb参数控制),当到达80%(mapreduce.map.sort.spill.percent参数控制)时就会溢写磁盘。每达到80%都会重写溢写到一个新的文件。那么,咱们彻底能够根据机器的配置和数据来两种这两个参数,当内存足够,咱们增大mapreduce.task.io.sort.mb彻底会提升溢写的过程,并且会减小中间结果的文件数量。我司调整mapreduce.task.io.sort.mb=512。当文件溢写完后,会对这些文件进行合并,默认每次合并10(mapreduce.task.io.sort.factor参数控制)个溢写的文件,我司调整mapreduce.task.io.sort.factor=64。这样能够提升合并的并行度,减小合并的次数,下降对磁盘操做的次数。

mapreduce.shuffle.max.threads(默认为0,表示可用处理器的两倍),该参数表示每一个节点管理器的工做线程,用于map输出到reduce。

那么map算是完整了,在reduce拉取数据以前,咱们彻底还能够combiner呀(不影响最终结果的状况下),此时会根据Combiner定义的函数对map的结果进行合并这样就能够减小数据的传输,下降磁盘io,提升性能了。

终于走到了map到reduce的数据传输过程了:
这中间主要的影响无非就是磁盘IO,网络IO,数据量的大小了(是否压缩),其实减小数据量的大小,就能够作到优化了,因此咱们能够选择性压缩数据,这样在传输的过程当中
就能够下降磁盘IO,网络IO等。能够经过mapreduce.map.output.compress(default:false)设置为true进行压缩,数据会被压缩写入磁盘,读数据读的是压缩数据须要解压,在实际经验中Hive在Hadoop的运行的瓶颈通常都是IO而不是CPU,压缩通常能够10倍的减小IO操做,压缩的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一种比较平衡选择,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)参数设置。我司使用org.apache.hadoop.io.compress.SnappyCodec算法,但这个过程会消耗CPU,适合IO瓶颈比较大。

mapreduce.task.io.sort.mb        #排序map输出所须要使用内存缓冲的大小,以兆为单位, 默认为100
mapreduce.map.sort.spill.percent #map输出缓冲和用来磁盘溢写过程的记录边界索引,这二者使用的阈值,默认0.8
mapreduce.task.io.sort.factor    #排序文件时,一次最多合并的文件数,默认10
mapreduce.map.output.compress    #在map溢写磁盘的过程是否使用压缩,默认false
org.apache.hadoop.io.compress.SnappyCodec  #map溢写磁盘的压缩算法,默认org.apache.hadoop.io.compress.DefaultCodec
mapreduce.shuffle.max.threads    #该参数表示每一个节点管理器的工做线程,用于map输出到reduce,默认为0,表示可用处理器的两倍

4.一、reduce端

接下来就是reduce了,首先咱们能够经过参数设置合理的reduce个数(mapreduce.job.reduces参数控制),以及经过参数设置每一个reduce的资源,mapreduce.reduce.memory.mb=5G(默认1G)
mapreduce.reduce.cpu.vcores=1(默认为1)。

reduce在copy的过程当中默认使用5(mapreduce.reduce.shuffle.parallelcopies参数控制)个并行度进行复制数据,我司调了mapreduce.reduce.shuffle.parallelcopies=100.reduce的每个下载线程在下载某个map数据的时候,有可能由于那个map中间结果所在机器发生错误,或者中间结果的文件丢失,或者网络瞬断等等状况,这样reduce的下载就有可能失败,因此reduce的下载线程并不会无休止的等待下去,当必定时间后下载仍然失败,那么下载线程就会放弃此次下载,并在随后尝试从另外的地方下载(由于这段时间map可能重跑)。reduce下载线程的这个最大的下载时间段是能够经过mapreduce.reduce.shuffle.read.timeout(default180000秒)调整的。

Copy过来的数据会先放入内存缓冲区中,而后当使用内存达到必定量的时候才spill磁盘。这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置。这个内存大小的控制就不像map同样能够经过io.sort.mb来设定了,而是经过另一个参数 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7)控制的。意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task,内存到磁盘merge的启动门限能够经过mapreduce.reduce.shuffle.merge.percent(default0.66)配置。

copy完成后,reduce进入归并排序阶段,合并因子默认为10(mapreduce.task.io.sort.factor参数控制),若是map输出不少,则须要合并不少趟,因此能够提升此参数来减小合并次数。

mapreduce.reduce.shuffle.parallelcopies #把map输出复制到reduce的线程数,默认5
mapreduce.task.io.sort.factor  #排序文件时一次最多合并文件的个数
mapreduce.reduce.shuffle.input.buffer.percent #在shuffle的复制阶段,分配给map输出缓冲区占堆内存的百分比,默认0.7
mapreduce.reduce.shuffle.merge.percent #map输出缓冲区的阈值,用于启动合并输出和磁盘溢写的过程

 

更多hadoop生态文章见: hadoop生态系列

相关文章
相关标签/搜索