Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架。java
Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。算法
设想一个海量数据场景下的wordcount需求:编程
单机版:内存受限,磁盘受限,运算能力受限
分布式:
一、文件分布式存储(HDFS)
二、运算逻辑须要至少分红2个阶段(一个阶段独立并发,一个阶段汇聚)
三、运算程序如何分发
四、程序如何分配运算任务(切片)
五、两阶段的程序如何启动?如何协调?
六、整个程序运行过程当中的监控?容错?重试?
可见在程序由单机版扩成分布式时,会引入大量的复杂工做。为了提升开发效率,能够将分布式程序中的公共功能封装成框架,让开发人员能够将精力集中于业务逻辑。缓存
首先要说明的是Hadoop2.0以前和Hadoop2.0以后的区别:网络
从图上的user program开始,user program连接了MapReduce库,实现了最基本的Map函数和Reduce函数。并发
一、MapReduce库把的输入文件划分为M份,即如图左所示分红了split0-4的分片,而后使用fork将用户进程copy到集群内其它机器上。 二、被分配了Map做业的Worker,开始读取对应分片的输入数据,Map做业从输入数据中抽取出键值对,map()函数产生的中间键值对被缓存在内存中。 三、缓存的中间键值对会按期写入本地磁盘,这些中间键值对的位置会被通报给Master,Master负责将信息转发给Reduce Worker。 四、Reduce Worker将分配好的Reduce做业的中间键值对读取后,并对它们进行排序,并将相同key的键值对汇集在一块儿。 五、Reduce Worker遍历排序好的键值并传递给reduce() 函数,经reduce() 函数计算后产生的输出会添加到这个分区的输出文件中。
一个Map/Reduce做业的输入和输出类型以下所示:app
(input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
一、Input阶段 数据以必定的格式传递给Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可使用,在Job.setInputFormat能够设置,也能够自定义分片函数。 二、map阶段 对输入的(key,value)进行处理,即map(k1,v1)->list(k2,v2),使用Job.setMapperClass进行设置。 三、Sort阶段 对于Mapper的输出进行排序,使用Job.setOutputKeyComparatorClass进行设置,而后定义排序规则。 四、Combine阶段 这个阶段对于Sort以后,对相同key的结果进行合并,使用Job.setCombinerClass进行设置,也能够自定义Combine Class类。 五、Partition阶段 将Mapper的中间结果按照key的范围划分为R份(Reduce做业的个数),默认使用HashPartioner(key.hashCode()&Integer.MAX_VALUE%numPartitions),也能够自定义划分的函数,使用Job.setPartitionClass设置。 六、Reduce阶段 对于Mapper阶段的结果进行进一步处理,Job.setReducerClass进行设置自定义的Reduce类。 七、Output阶段 Reducer输出数据的格式。
一、结构 一个完整的MapReduce程序是这样一个分布式程序的通用框架,其应对以上问题的总体结构以下:框架
二、MapReduce运行流程解析
1) 一个MapReduce程序启动的时候,最早启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出须要的MapTask实例数量,而后向集群申请机器启动相应数量的MapTask进程。
2)MapTask进程启动以后,根据给定的数据切片范围进行数据处理,主体流程为:分布式
3) MRAppMaster监控到全部MapTask进程任务完成以后,会根据客户指定的参数启动相应数量的ReduceTask进程,并告知ReduceTask进程要处理的数据范围(数据分区)
4)ReduceTask进程启动以后,根据MRAppMaster告知的待处理数据所在位置,从若干台MapTask运行所在机器上获取到若干个MapTask输出结果文件,并在本地进行从新归并排序,而后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,而后调用客户指定的outputformat将结果数据输出到外部存储。函数
三、MapTask并行度决定机制
MapTask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度。那么,MapTask并行实例是否越多越好呢?其并行度又是如何决定呢?
3.一、MapTask并行度的决定机制
一个job的map阶段并行度由客户端在提交job时决定,而客户端对map阶段并行度的规划的基本逻辑为: 将待处理数据执行逻辑切片,而后每个split分配一个mapTask并行实例处理,这段逻辑及造成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成
3.二、ReduceTask并行度的决定
ReduceTask的并行度一样影响整个job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不一样,ReduceTask数量的决定是能够直接手动设置:
//默认值是1
手动设置为4 job.setNumReduceTasks(4);
若是数据分布不均匀,就有可能在reduce阶段产生数据倾斜。
(注意: ReduceTask数量并非任意设置,还要考虑业务逻辑需求,有些状况下,须要计算全局汇总结果,就只能有1个reducetask 。尽可能不要运行太多的ReduceTask,对大多数job来讲,最好reduce的个数最多和集群中的reduce持平,或者比集群的reduce slots小)
3.三、mapreduce的shuffle机制
1)概述 MapReduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle。 shuffle的核心机制:数据分区,排序,缓存。具体来讲:就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程当中,对数据按key进行了分区和排序。
分区partition(肯定哪一个数据进入哪一个reduce)
Sort根据key排序
Combiner进行局部value的合并
2)详细流程
一、 MapReduce收集咱们的map()方法输出的kv对,放到内存缓冲区中 二、 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 三、 多个溢出文件会被合并成大的溢出文件 四、 在溢出过程当中,及合并的过程当中,都要调用partitoner进行分组和针对key进行排序 五、 reducetask根据本身的分区号,去各个maptask机器上取相应的结果分区数据 六、 reducetask会取到同一个分区的来自不一样maptask的结果文件,reducetask会将 这些文件再进行合并(归并排序) 七、 合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)
Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快 。缓冲区的大小能够经过参数调整, 参数:io.sort.mb 默认100M。
一个mapreduce做业的执行流程是:做业提交->做业初始化->任务分配->任务执行->更新任务执行进度和状态->做业完成。
一个完整的mapreduce做业流程,包括4个独立的实体:
客户端:client,编写mapreduce程序,配置做业,提交做业。
JobTracker:协调这个做业的运行,分配做业,初始化做业,与
TaskTracker进行通讯。
TaskTracker:负责运行做业,保持与JobTracker进行通讯。
HDFS:分布式文件系统,保持做业的数据和结果。
一、提交做业
JobClient使用runjob方法建立一个JobClient实例,而后调用submitJob()方法进行做业的提交,提交做业的具体过程以下:
1) 经过调用JobTracker对象的getNewJobId()方法从JobTracker处得到一个做业ID。 2) 检查做业的相关路径。若是输出路径存在,做业将不会被提交(保护上一个做业运行结果)。 3) 计算做业的输入分片,若是没法计算,例如输入路径不存在,做业将不被提交,错误返回给mapreduce程序。 4) 将运行做业所需资源(做业jar文件,配置文件和计算获得的分片)复制到HDFS上。 5) 告知JobTracker做业准备执行(使用JobTracker对象的submitJob()方法来真正提交做业)。
二、做业初始化
三、任务的分配
TaskTracker和JobTracker之间的通讯和任务分配是经过心跳机制完成的。TaskTracker做为一个单独的JVM,它执行一个简单的循环,主要实现每隔一段时间向JobTracker发送心跳,告诉JobTracker此TaskTracker是否存活,是否准备执行新的任务。若是有待分配的任务,它就会为TaskTracker分配一个任务。
四、任务的执行
五、更新任务的执行进度和状态
六、任务完成 当Job完成后,JobTracker会收一个Job Complete的通知,并将当前的Job状态更新为successful。同时JobClient也会轮循获知提交的Job已经完成,将信息显示给用户。最后,JobTracker会清理和回收该Job的相关资源,并通知TaskTracker进行相同的操做(好比删除中间结果文件)