(二)Hadoop之MapReduce原理分析

简介

Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架。java

Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。算法

为何要MapReduce

  1. 海量数据在单机上处理由于硬件资源限制,没法胜任
  2. 而一旦将单机版程序扩展到集群来分布式运行,将极大增长程序的复杂度和开发难度
  3. 引入MapReduce框架后,开发人员能够将绝大部分工做集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理

设想一个海量数据场景下的wordcount需求:编程

单机版:内存受限,磁盘受限,运算能力受限
分布式:
一、文件分布式存储(HDFS)
二、运算逻辑须要至少分红2个阶段(一个阶段独立并发,一个阶段汇聚)
三、运算程序如何分发
四、程序如何分配运算任务(切片)
五、两阶段的程序如何启动?如何协调?
六、整个程序运行过程当中的监控?容错?重试?

可见在程序由单机版扩成分布式时,会引入大量的复杂工做。为了提升开发效率,能够将分布式程序中的公共功能封装成框架,让开发人员能够将精力集中于业务逻辑。缓存

MapReduce并行处理的基本过程

首先要说明的是Hadoop2.0以前和Hadoop2.0以后的区别:网络

  • 2.0以前只有MapReduce的运行框架,它里面只有两种节点,一是master,二是worker。master既作资源调度又作程序调度,worker只是用来参与计算的。
  • 但在2.0以后加入了Yarn集群,Yarn集群的主节点承担了资源调度,Yarn集群的从节点中会挑选出一个节点(由RedourceManager决定)来进行应用程序的资源调度,做用相似于2.0以前的master的工做。(资源调度: 处理程序所须要的cpu、内存资源,以及存储数据所须要的硬盘资源)

从图上的user program开始,user program连接了MapReduce库,实现了最基本的Map函数和Reduce函数。
image.png并发

一、MapReduce库把的输入文件划分为M份,即如图左所示分红了split0-4的分片,而后使用fork将用户进程copy到集群内其它机器上。
二、被分配了Map做业的Worker,开始读取对应分片的输入数据,Map做业从输入数据中抽取出键值对,map()函数产生的中间键值对被缓存在内存中。
三、缓存的中间键值对会按期写入本地磁盘,这些中间键值对的位置会被通报给Master,Master负责将信息转发给Reduce Worker。
四、Reduce Worker将分配好的Reduce做业的中间键值对读取后,并对它们进行排序,并将相同key的键值对汇集在一块儿。
五、Reduce Worker遍历排序好的键值并传递给reduce() 函数,经reduce() 函数计算后产生的输出会添加到这个分区的输出文件中。

MapRrduce输入与输出问题

image.png

  • Map/Reduce框架运转在<key, value>键值对上,也就是说,框架把做业的输入看为是一组<key, value>键值对,一样也产出一组 <key, value>键值对作为做业的输出,这两组键值对的类型可能不一样。
  • 框架须要对key和value的类进行序列化,所以这些类都须要实现Writable接口(Writable接口是一个实现列化协议的序列化对象,序列化最主要的做用就是持久化存储或者是用于网络传输)。另外,为了方便框架执行排序操做,key类必须实现WritableComparable接口。

一个Map/Reduce做业的输入和输出类型以下所示:app

(input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

MapReduce实际处理流程

  • MapReduce 能够当作是分治算法的一种体现。所谓分治算法就是“就是分而治之 ,将大的问题分解为相同类型的子问题(最好具备相同的规模),对子问题进行求解,而后合并成大问题的解。
  • MapReduce 就是分治法的一种,将输入进行分片,而后交给不一样的task进行处理,而后合并成最终的解。
  • MapReduce 实际的处理过程能够理解为Input->Map->Sort->Combine->Partition->Reduce->Output。
一、Input阶段
数据以必定的格式传递给Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可使用,在Job.setInputFormat能够设置,也能够自定义分片函数。
二、map阶段
对输入的(key,value)进行处理,即map(k1,v1)->list(k2,v2),使用Job.setMapperClass进行设置。
三、Sort阶段
对于Mapper的输出进行排序,使用Job.setOutputKeyComparatorClass进行设置,而后定义排序规则。
四、Combine阶段
这个阶段对于Sort以后,对相同key的结果进行合并,使用Job.setCombinerClass进行设置,也能够自定义Combine Class类。
五、Partition阶段
将Mapper的中间结果按照key的范围划分为R份(Reduce做业的个数),默认使用HashPartioner(key.hashCode()&Integer.MAX_VALUE%numPartitions),也能够自定义划分的函数,使用Job.setPartitionClass设置。
六、Reduce阶段
对于Mapper阶段的结果进行进一步处理,Job.setReducerClass进行设置自定义的Reduce类。
七、Output阶段
Reducer输出数据的格式。

MapReduce框架结构及核心运行机制

一、结构 一个完整的MapReduce程序是这样一个分布式程序的通用框架,其应对以上问题的总体结构以下:框架

  • MRAppMaster:负责整个程序的过程调度及状态协调(Hadoop2.0以后就不同了)
  • MapTask:负责map阶段的整个数据处理流程
  • ReduceTask:负责reduce阶段的整个数据处理流程

二、MapReduce运行流程解析
1) 一个MapReduce程序启动的时候,最早启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出须要的MapTask实例数量,而后向集群申请机器启动相应数量的MapTask进程。
2)MapTask进程启动以后,根据给定的数据切片范围进行数据处理,主体流程为:分布式

  • 利用客户指定的inputformat来获取RecordReader读取数据,造成输入KV对
  • 将输入KV对传递给客户定义的map()方法,作逻辑运算,并将map()方法输出的KV对收集到缓存
  • 将缓存中的KV对按照Key分区排序后不断溢写到磁盘文件

3) MRAppMaster监控到全部MapTask进程任务完成以后,会根据客户指定的参数启动相应数量的ReduceTask进程,并告知ReduceTask进程要处理的数据范围(数据分区)
4)ReduceTask进程启动以后,根据MRAppMaster告知的待处理数据所在位置,从若干台MapTask运行所在机器上获取到若干个MapTask输出结果文件,并在本地进行从新归并排序,而后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,而后调用客户指定的outputformat将结果数据输出到外部存储。函数

三、MapTask并行度决定机制
MapTask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度。那么,MapTask并行实例是否越多越好呢?其并行度又是如何决定呢?

3.一、MapTask并行度的决定机制
一个job的map阶段并行度由客户端在提交job时决定,而客户端对map阶段并行度的规划的基本逻辑为: 将待处理数据执行逻辑切片,而后每个split分配一个mapTask并行实例处理,这段逻辑及造成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成

3.二、ReduceTask并行度的决定
ReduceTask的并行度一样影响整个job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不一样,ReduceTask数量的决定是能够直接手动设置:

//默认值是1
手动设置为4 job.setNumReduceTasks(4);

若是数据分布不均匀,就有可能在reduce阶段产生数据倾斜。
(注意: ReduceTask数量并非任意设置,还要考虑业务逻辑需求,有些状况下,须要计算全局汇总结果,就只能有1个reducetask 。尽可能不要运行太多的ReduceTask,对大多数job来讲,最好reduce的个数最多和集群中的reduce持平,或者比集群的reduce slots小)

3.三、mapreduce的shuffle机制
image.png

1)概述 MapReduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle。 shuffle的核心机制:数据分区,排序,缓存。具体来讲:就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程当中,对数据按key进行了分区和排序

分区partition(肯定哪一个数据进入哪一个reduce)
Sort根据key排序
Combiner进行局部value的合并

2)详细流程

一、 MapReduce收集咱们的map()方法输出的kv对,放到内存缓冲区中 二、 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 
三、 多个溢出文件会被合并成大的溢出文件 
四、 在溢出过程当中,及合并的过程当中,都要调用partitoner进行分组和针对key进行排序 
五、 reducetask根据本身的分区号,去各个maptask机器上取相应的结果分区数据 
六、 reducetask会取到同一个分区的来自不一样maptask的结果文件,reducetask会将 这些文件再进行合并(归并排序) 
七、 合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)

Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快 。缓冲区的大小能够经过参数调整, 参数:io.sort.mb 默认100M。

一个job的运行流程

一个mapreduce做业的执行流程是:做业提交->做业初始化->任务分配->任务执行->更新任务执行进度和状态->做业完成。

一个完整的mapreduce做业流程,包括4个独立的实体:

客户端:client,编写mapreduce程序,配置做业,提交做业。
JobTracker:协调这个做业的运行,分配做业,初始化做业,与
TaskTracker进行通讯。
TaskTracker:负责运行做业,保持与JobTracker进行通讯。
HDFS:分布式文件系统,保持做业的数据和结果。

image.png

一、提交做业
JobClient使用runjob方法建立一个JobClient实例,而后调用submitJob()方法进行做业的提交,提交做业的具体过程以下:

1) 经过调用JobTracker对象的getNewJobId()方法从JobTracker处得到一个做业ID。 
2) 检查做业的相关路径。若是输出路径存在,做业将不会被提交(保护上一个做业运行结果)。 
3) 计算做业的输入分片,若是没法计算,例如输入路径不存在,做业将不被提交,错误返回给mapreduce程序。 
4) 将运行做业所需资源(做业jar文件,配置文件和计算获得的分片)复制到HDFS上。 
5) 告知JobTracker做业准备执行(使用JobTracker对象的submitJob()方法来真正提交做业)。

二、做业初始化

  • 当JobTracker收到Job提交的请求后,将Job保存在一个内部队列,并让Job Scheduler(做业调度器)处理并初始化。
  • 初始化涉及到建立一个封装了其tasks的job对象,并保持对task的状态和进度的跟踪(步骤5)。
  • 当建立要运行的一系列task对象后,Job Scheduler首先开始从文件系统中获取由JobClient计算的input splits(步骤6),而后再为每一个split建立map task。

三、任务的分配
TaskTracker和JobTracker之间的通讯和任务分配是经过心跳机制完成的。TaskTracker做为一个单独的JVM,它执行一个简单的循环,主要实现每隔一段时间向JobTracker发送心跳,告诉JobTracker此TaskTracker是否存活,是否准备执行新的任务。若是有待分配的任务,它就会为TaskTracker分配一个任务。

四、任务的执行

  • TaskTracker申请到新的任务以后,就要在本地运行了。首先,是将任务本地化(包括运行任务所需的数据、配置信息、代码等),即从HDFS复制到本地,调用localizeJob()完成的。
  • 对于使用Streaming和Pipes建立Map或者Reduce程序的任务,Java会把key/value传递给外部进程,而后经过用户自定义的Map或者Reduce进行处理,而后把key/value传回到java中。其中就好像是TaskTracker的子进程在处理Map和Reduce代码同样。

五、更新任务的执行进度和状态

  • 进度和状态是经过heartbeat(心跳机制)来更新和维护的。
  • 对于Map Task,进度就是已处理数据和全部输入数据的比例。
  • 对于Reduce Task,状况就有点复杂,包括3部分,拷贝中间结果文件、排序、reduce调用,每部分占1/3。

六、任务完成 当Job完成后,JobTracker会收一个Job Complete的通知,并将当前的Job状态更新为successful。同时JobClient也会轮循获知提交的Job已经完成,将信息显示给用户。最后,JobTracker会清理和回收该Job的相关资源,并通知TaskTracker进行相同的操做(好比删除中间结果文件)

相关文章
相关标签/搜索