MapReduce是一种可用于数据处理的编程模型(或计算模型),该模型能够比较简单,但想写出有用的程序却不太容易。MapReduce能将大型数据处理任务分解成不少单个的、能够在服务器集群中并行执行的任务,而这些任务的计算结果能够合并在一块儿计算最终的结果。最重要的是,MapReduce的优点在于易于编程且能在大型集群(上千节点)并行处理大规模数据集,以可靠,容错的方式部署在商用机器上。html
从MapReduce的全部长处来看,它基本上是一个批处理系统,并不适合交互式分析。不可能执行一条查询并在几秒内或更短的时间内获得结果。典型状况下,执行查询须要几分钟或更多时间。所以,MapReduce更适合那种没有用户在现场等待查询结果的离线使用场景。算法
在MapReduce整个过程能够归纳为如下过程:编程
input split --> map --> shuffle --> reduce --> output数组
下图是《Hadoop权威指南》给出的MapReduce运行过程:缓存
MapReduce运行过程图 服务器
MapReduce做业是客户端须要执行的一个工做单元:它包括输入数据、MapReduce程序和配置信息。Hadoop将做业分红若干个任务(task)来执行,其中包括两类任务:map任务和reduce任务。这些任务运行在集群的节点上,并经过YARN进行调度。若是一个任务失败,它将在另外一个不一样的节点上自动从新调度运行。网络
Hadoop将MapReduce的输入数据划分红等长的小数据块,称为输入分片(input split)或简称“分片”。Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map函数从而处理分片中的每条记录。数据结构
拥有许多分片,意味着处理每一个分片所须要的时间少于处理整个输入数据所花的时间。所以,若是咱们并行处理每一个分片,且每一个分片数据比较小,那么整个处理过程将得到更好的负载平衡,由于一台较快的计算机可以处理的数据分片比一台较慢的计算机更多,且成必定的比例。即便使用相同的机器,失败的进程或其余并发运行的做业可以实现满意的负载平衡,而且随着分片被切分得更细,负载平衡的质量会更高。另外一方面,若是分片切分得过小,那么管理分片得总时间和构建map任务得总时间将决定做业的整个执行时间。对于大多数做业来讲,一个合理的分片大小趋向于HDFS的一个块的大小,这样能够确保存储在单个节点上的最大输入块的大小。数据块默认是128MB,不过能够针对集群调整这个默认值,或在每一个文件建立时指定。并发
map任务会将集合中的元素从一种形式转化成另外一种形式,在这种状况下,输入的键值对会被转换成零到多个键值对输出。其中输入和输出的键必须彻底不一样,输入和输出的值则可能彻底不一样。app
Hadoop在存储有输入数据(HDFS中的数据)的节点上运行map任务,能够得到最佳性能,由于它无需使用宝贵的集群带宽资源。这就是所谓的“数据本地化优化”。可是,有时对于一个map任务的输入分片来讲,存储该分片的HDFS数据块复本的全部节点可能正在运行其余map任务,此时做业调度须要从某一数据块所在的机架中的一个节点上寻找一个空闲的map槽(slot)来运行该map任务分片。仅仅在很是偶然的状况下(该状况基本上不会发生),会使用其余机架中的节点运行该map任务,这将致使机架与机架之间的网络传输。下图显示了这三种可能性。
map任务的网络传输的三种可能性图
map任务的输出被称为中间键和中间值,会被发送到reducer作后续处理。但输出结果只写入本地硬盘,而非HDFS。这是为何?由于map的输出是中间结果:该中间结果由reduce任务处理后才产生最终输出结果,并且一旦做业完成,map的输出结果就能够删除。所以,若是把它存储在HDFS中并实现备份,不免有些小题大作。若是运行map任务的节点在将map中间结果传送给reduce任务以前失败,Hadoop将在另外一个节点上从新运行这个map任务以再次构建map中间结果。
shuffle和排序在MapReduce流程图中的执行过程
MapReduce确保每一个reducer的输入都是按键排序的。系统执行排序、将map输出做为输入传给reduce的过程称为shuffle。在此,咱们将学习shuffle是如何工做的,由于它有助于咱们理解工做机制(若是须要优化MapReduce程序)。shuffle属于不断被优化和改进的代码库的一部分,所以下面的描述有必要隐藏一些细节。从许多方面来看,shuffle是MapReduce的“心脏”,是奇迹发生的地方。
map端shuffle过程
1. 读取HDFS上的输入分片input split,每一行解析成一个<key, value>。每个键值对调用一次map函数。输入<0,helloyou>,<10,hello me>。
2. 覆盖map(),接收1中产生的<key, value>,而后进行处理,转换为新的<key, value>输出。每一个map任务都有一个环形内存缓存区,输出结果会暂且放在环形内存缓冲区中(该缓冲区的大小默认为100MB,由mapreduce.task.io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由mapreduce.map.sort.spill.percent属性控制),会由单独线程在本地文件系统中建立一个临时溢出文件(spill file),将该缓冲区中的数据写入这个文件,但若是再此期间缓冲区被填满,map会被堵塞直到写入过程完成。溢出写过程按轮询方式将缓冲区中的内容写到mapreduce.cluster.local.dir属性在做业特定子目录下指定的目录中。输出:<hello, 1>,<you, 1>,<hello, 1>,<me, 1>。
注:当缓冲区的数据值达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢出写线程启动,锁定这80MB的内存,执行溢出写过程。map任务的输出结果还能够往剩下的20MB内存中写,互不影响。
3. 对2输出的<key, value>进行分区,默认分为一个区,MapReduce提供Partitioner接口,做用就是根据key或value及reduce的数量来决定当前的输出数据最终应该交由哪一个reduce任务处理。默认对key hash后再以reduce任务数据取模。默认的取模方式只是为了平均reduce的处理能力,若是用户本身对Partitioner有需求,能够定制并设置到job上。
在写入磁盘以前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样作是为了不有些reduce任务分配到大量数据,而有些reduce任务却分到不多数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。接下来对不一样分区中的数据进行排序(按照key),也就是对这80MB空间内的key作排序(sort),这里的排序是对序列化的字节作的排序。若是此时设置了Combiner,将排序后的结果进行Combiner操做,若是至少存在3个溢出文件(经过mapreduce.map.combine.minspills属性设置)时,则combiner就会在输出文件写到磁盘以前再次运行,这样作的目的是让尽量减小数据写入到磁盘和传递给reduce的数据。排序后:<hello, 1>,<hello, 1>,<me, 1>,<you, 1>,Combiner后:<hello, {1, 1}>,<me, {1}>,<you, {1}>。
注:combiner能够在输入上反复运行,但并不影响最终结果。若是只有1或2个溢出文件,那么因为map输出规模减小,于是不值得调用combiner带来的开销,所以不会为该map输出再次运行combiner。
4. 当map任务输出最后一个记录时,可能会有不少的溢出文件,这时须要将这些文件合并(merge)。合并的过程当中会不断地进行排序和combiner操做,目的有两个:
① 尽可能减小每次写入磁盘的数据量;
② 尽可能减小下一复制阶段网络传输的数据量。
最后合并成了一个已分区且已排序的输出文件。配置属性mapreduce.task.io.sort.factor控制着一次最多能合并多少流,默认值是10。为了减小网络传输的数据量,节约磁盘空间和写磁盘的速度更快,这里能够将数据压缩,只要将mapreduce.map.output.compress设置为true就能够。数据压缩算法有DEFLATE、gzip、bzip二、LZO、LZ四、Snappy等,能够经过mapreduce.map.output.compress.codec配置压缩类型便可。
5. 将分区中的数据拷贝给相对应的reduce任务(可选)。reducer经过HTTP获得输出文件的分区。用于文件分区的工做线程的数量由任务的mapreduce.shuffle.max.threads属性控制,此设置针对的是每个节点管理器,而不是针对每一个map任务。默认值0将最大线程数设置为机器中处理器数量的两倍。
有人可能会问:分区中的数据怎么知道它对应的reduce是哪一个呢?其实map任务一直和其节点上的Application Master保持联系,而Application Master又一直和Application Manager保持心跳。因此Application Manager中保存了整个集群中的宏观信息。只要reduce任务向ApplicationManager获取对应的map输出位置就OK了。
至此,map端的全部工做已经结束了,最终生成的这个文件也存放在运行map任务的tasktracker的本地磁盘上(但reduce输出并不这样)。每一个reduce任务不断地经过RPC从JobTracker那获取map任务是否完成的信息,若是reduce任务获得通知,获知某台TaskTracker上的map任务执行完成,shuffle的后半段过程开始启动。
如今,tasktracker须要为分区文件运行reduce任务。下图是reduce端shuffle过程图:
reduce端shuffle过程图
1. copy过程,简单地拉取数据。reduce任务须要集群上若干个map任务的map输出做为其特殊的分区文件。每一个map任务的完成时间可能不一样,所以在每一个任务完成时,reduce任务就开始经过HTTP方式请求复制其输出。 reduce任务由少许复制线程,所以可以并行取得map输出。默认值是5个线程,但这个默认值能够修改设置mapreduce.reduce.shuffle.parallelcopies属性便可。
若是map输出至关小,会被复制到reduce任务JVM的内存(缓冲区大小由mapreduce.reduce.shuffle.input.buffer.percent属性控制,指定用于此用途的堆空间的百分比),不然,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapreduce.reduce.shuffle.merge.percent决定,默认是0.66)或达到map输出阈值(由mapreduce.reduce.merge.inmem.threshold控制),则合并后溢出写到磁盘中。若是指定combiner,则在合并期间运行它以下降写入硬盘的数据量。
随着磁盘上副本增多,后台线程会将它们合并为更大的、排好序的文件。这会为后面的合并节省一些时间。
2. merge阶段。从map端copy过来的数据会先放入JVM的内存缓冲区中,这里的缓冲区大小要比map端更为灵活,它基于JVM的heap size设置的,由于shuffle阶段reducer不运行,因此绝大部分的内存都给shuffle使用。这个merge阶段将合并map输出,维持其顺序排序。这是循环进行的。好比,若是由50个map输出,而合并因子是10(10为默认设置,由mapreduce.task.io.sort.factor属性设置,与map的合并相似),合并将进行5趟。每趟将10个文件合并成一个文件,所以最后有5个中间文件。
Merge有三种形式:一、内存到内存;二、内存到磁盘;三、磁盘到磁盘。默认状况下第一种形式是不启动的。当内存中的数据量到达必定阈值,就启动内存到磁盘的merge。与map端相似,这也是溢写的过程,在这个过程当中若是设置了combiner,也是会启动的,而后在磁盘中合并溢写文件。第二种merge方式一直再运行,直到没有map端的数据时才结束,而后启动第三种磁盘到磁盘的merge方式生成最终的输出文件。
3. reduce阶段。这是最后阶段了,直接把数据输入reduce函数,也就是对已排序输出中的每一个键调用reduce函数,从而省略了一次磁盘往返行程,并无将这5个文件合并成一个已排序的文件做为最后一趟。最后的合并能够来自内存和磁盘片断。此阶段的输出直接写到输出文件系统,通常为HDFS。若是采用HDFS,因为节点管理器(NodeManager)也运行数据节点(DataNode),因此第一个块复本将被写到本地磁盘。
reduce任务并不具有数据本地化的优点,单个reduce任务的输入一般来自于全部map任务的输出,或者接收到不一样map任务的输出。在本例中,咱们假设仅有一个reduce任务,其输入是全部map任务的输出。所以,排过序的map输出需经过网络传输发送到运行reduce任务的节点。数据在reduce端合并,而后由用户定义的reduce函数处理。reduce的输出一般存储在HDFS中以实现可靠存储。对于reduce输出的每一个HDFS块,第一个复本存储在本地节点上,其余复本出于可靠性考虑存储在其余机架的节点中。所以,将reduce的输出写入HDFS确实须要占用网络带宽,但这与正常的HDFS管线写入的消耗同样。
一个reduce任务的完整数据流如图所示。虚线框表示节点,虚线箭头表示节点内部的数据传输,而实线箭头表示不一样节点之间的数据传输。
一个reduce任务的MapReduce数据流图
reduce任务的数量并不是由输入数据的大小决定,相反是独立指定的。
若是有好多个reduce任务,每一个map任务就会针对输出进行分区(partition),即为每一个reduce任务建一个分区。每一个分区有许多键(及其对应的值),但每一个键对应的键-值对记录都在同一个分区中。分区可由用户定义的分区函数控制,但一般用默认的partitioner经过哈希函数来分区,很高效。
通常状况下,多个reduce任务的数据流以下图所示。该图很清晰地代表了为何map任务和reduce任务之间的数据流称为shuffle(混洗),由于每一个reduce任务的输入都来自许多map任务。shuffle通常比图中所示的更复杂(上下节已描述了大概),并且调整混洗参数对做业总执行时间的影响很是打。
多个reduce任务的数据流图
最后,当数据处理能够彻底并行(即无需混洗时),可能会出现无reduce任务的状况。在这种状况下,惟一的非本地节点数据传输是map任务将结果写入HDFS,参见下图所示。
无reduce任务的MapReduce数据流
Map的输出结果是由Collector处理的,每一个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽量多的数据。
这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,可是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分二者,分界点不是亘古不变的,而是每次Spill以后都会更新一次。初始的分界点是0,数据的存储方向是向上增加,索引数据的存储方向是向下增加,Kvbuffer的存放指针bufindex时指向数据区的,是一直闷着头地向上增加,好比bufindex初始值为0,一个Int型的key写完以后,bufindex增加为4,一个Int型的value写完以后,bufindex增加为8。
1 kvoffsets缓冲区:也叫偏移量索引数组,用于保存key/value信息在位置索引kvindices中的偏移量。当kvoffsets的使用率超过io.sort.spill.percent(默认为80%)后,便会触发一次SpillThread线程的“溢写”操做,也就是开始一次spill阶段的操做。
索引数据区域:存元数据信息,都是整数,只存储分区信息(整数)和kvbuffer在数组中的位置
2 kvindices缓冲区:也叫位置索引数组,用于保存key/value在数据缓冲区kvbuffer中的起始位置。
3 kvbuffer数据缓冲区:用于保存实际的key/value的值。默认状况下该缓冲区最多可使用io.sort.mb的95%,当kvbuffer使用率超过io.sort.spill.percent(默认80%)后,便会触发一次SpillThread线程的“溢写”操做,也就是开始一次spill阶段的操做。
整个过程描述以下图所示。在最高层,有如下5个独立的实体。
Hadoop运行MapReduce做业的工做原理图
1. 客户端提交一个MapReduce做业,Job的submit()方法建立一个内部的JobSummiter实例,而且调用其submitJobInternal()方法。提交做业后,waitForCompletion()每秒轮询做业的进度,若是发现自上次报告后有改变,便把进度报告到控制台。做业完成后,若是成功,就显示做业计数器;若是失败,则致使做业失败的错误被记录到控制台。
2. Job向资源管理器请求一个新应用ID,用于MapReduce做业ID。资源管理器检查做业的输出说明和计算做业的输入分片,若是没有指定输出目录,输出目录已存在或者分片没法计算,那么做业就不提交,错误抛回给MapReduce程序。
3. 将运行做业所须要的资源(包括做业JAR文件、配置文件和计算所得的输入分片)复制到一个以做业ID命名的目录下的共享文件系统中。做业JAR的复本较多(由mapreduce.client.submit.file.replication属性控制,默认值为10),所以在运行做业的任务时,集群中有不少个复本可供节点管理器访问。
4. 经过调用资源管理器的submitApplication()方法提交做业。
5. 资源管理器收到调用它的submitApplication()消息后,便将请求传递给YARN调度器(scheduler)。调度器分配一个容器,而后资源管理器在节点管理器的管理下在容器中启动application master的进程。
6. MapReduce做业的application master是一个Java应用程序,它的主类是MRAppMaster。因为将接受来自任务的进度和完成报告,所以application master对做业的初始化是经过建立多个薄记对象以保持对做业进度的跟踪来完成的。
7. 对每个分片建立一个map任务对象以及由mapreduce.job.reduces属性(经过做业的setNumReduceTasks()方法设置)肯定的多个reduce任务对象。任务ID在此时分配。
application master必须决定如何运行构成MapReduce做业的各个任务。若是做业很小,就选择和本身在同一个JVM上运行任务。与在一个节点上顺序运行这些任务相比,当application master判断在新的容器中分配和运行任务的开销大于并行运行它们的开销时,就会发生这种状况。这样的做业称为uberized,或者uber任务(小做业)运行。
默认状况下,小做业就是少于10个mapper且只有1个reducer且输入大小小于一个HDFS块的做业(经过设置mapreduce.job.ubertask.maxmaps、mapreduce.job.ubertask.maxreduces和mapreduce.job.ubertask.maxbytes能够改变这几个值)。必须明确启动uber任务(对于单个做业,或者是对整个集群),具体方法是将mapreduce.job.ubertask.enable设置为true。
最后,在任何任务运行以前,application master调用setupJob()方法设置OutputCommitter。FileOutputCommitter为默认值,表示将创建做业的最终输出目录及任务输出的临时工做空间。
8. 若是做业不适合做为uber任务运行,那么application master就会为该做业中的全部map任务和reduce任务向资源管理器请求容器。首先为Map任务发出请求,该请求优先级要高于reduce任务的请求,这是由于全部的map任务必须在reduce的排序阶段可以启动前完成。直到有5%的map任务已经完成时,为reduce任务的请求才会发出(慢启动reduce)。
reduce任务可以在集群中任意位置运行,但map任务的请求有着数据本地化局限,这也是调度器所关注的。map任务的三种状况的详见1、概念综述中的map。
请求也为任务指定了内存需求和CPU数。在默认状况下,每一个map任务和reduce任务都分配到1024MB的内存和一个虚拟的内核,这些值能够在每一个做业的基础上进行配置,配置参考以下表:
属性名称 | 类型 | 默认值 | 说明 |
mapreduce.map.memory.mb | int | 1024 | map容器所用的内存容量 |
mapreduce.reduce.memory.mb | int | 1024 | reduce容器所用的内存容量 |
mapreduce.map.cpu.vcores | int | 1 | map容器所用的虚拟内核 |
mapreduce.reduce.cpu.vcoresp.memory.mb | int | 1 | reduce容器所用的虚拟内核 |
9. 一旦资源管理器的调度器为任务分配了一个特定节点上的容器,application master就经过与节点管理器通讯来启动容器。该任务由主类为YarnChild的一个Java应用程序执行。
10. 在它运行任务以前,首先将任务须要的资源本地化,包括做业的配置、JAR文件和全部来自分布式缓存的文件。
11. 最后,运行map任务或reduce任务。
参考资料:《Hadoop权威指南(第四版)》
https://www.jianshu.com/p/1e542477b59a