一个MR做业一般会把输入的数据集切分为若干独立的数据块,先由Map任务并行处理,而后MR框架对Map的输出先进行排序,而后把结果做为Reduce任务的输入。MR框架是一种主从框架,由一个单独的JobTracker节点和多个TaskTracker节点组成。(JobTracker至关于Master,负责做业任务的调度,TaskTracker至关于Slave,负责执行Master指派的任务)缓存
如上图所示,具体MR的具体步骤可描述以下:并发
步骤1:app
把输入文件分红M块(每一块大小Hadoop默认是64M,可修改)。框架
步骤2:函数
master选择空闲的执行者worker节点,把总共M个Map任务和R个Reduce任务分配给他们,如上图中(2)所示。oop
步骤3:spa
一个分配了Map任务的worker读取并处理输入数据块。从数据块中解析出key/value键值对,把他们传递给用户自定义的Map函数,由Map函数生成并输出中间key/value键值对,暂时缓存在内存中,如上图中(3)所示。命令行
步骤4:线程
缓存中的key/value键值对经过分区函数分红R个区域,以后周期性地写入本地磁盘上。并把本地磁盘上的存储位置回传给master,由master负责把这些存储位置传送给Reduce worker,如上图中(4)所示。对象
步骤5:
当Reduce worker接收到master发来的存储位置后,使用RPC协议从Map worker所在主机的磁盘上读取数据。在获取全部中间数据后,经过对key排序使得相同具备key的数据汇集在一块儿。如上图中(5)所示。
步骤6:
Reduce worker程序对排序后的中间数据进行遍历,对每个惟一的中间key,Reduce worker程序都会将这个key对应的中间value值的集合传递给用户自定义的Reduce函数,完成计算后输出文件(每一个Reduce任务产生一个输出文件)。如上图中(6)所示。
步骤1:命令行提交。用户使用Hadoop命令行脚本提交MR程序到集群。
步骤2:做业上传。这一步骤包括了不少初始化工做,如获取用户做业的JobId,建立HDFS目录,上传做业、相关依赖库等到HDFS上。
步骤3:产生切分文件。
步骤4:提交做业到JobTracker。
Mapper的输入文件位于HDFS上,先对输入数据切分,每个split分块对应一个Mapper任务,经过RecordReader对象从输入分块中读取并生成键值对,而后执行Map函数,输出的中间键值对被partion()函数区分并写入缓冲区,同时调用sort()进行排序。
Reducer主要有三个阶段:Shuffle、Sort、Reduce
1. Shuffle阶段:
Reducer的输入就是Mapper阶段已经排好序的输出。在这个阶段,框架为每一个Reducer任务得到全部Mapper输出中与之相关的分块,把Map端的输出结果传送到Reduce端,大量操做是数据复制(所以也称数据复制阶段)。
2. Sort阶段:
框架按照key对Reducer的输入进行分组(Mapper阶段时每个Map任务对于它自己的输出结果会有一个排序分组,而不一样Map任务的输出中可能会有相同的key,所以要再一次分组)。Shuffle和Sort是同时进行的,Map的输出也是一边被取回一边被合并。排序是基于内存和磁盘的混合模式进行,通过屡次Merge才能完成排序。(PS:若是两次排序分组规则须要不一样,能够指定一个Comparator比较器来控制分组规则)。
3. Reduce阶段:
经过Shuffle和Sort操做后获得的<key, (list of values)>被送到Reducer的reduce()函数中执行,针对每个<key, (list of values)>会调用一次reduce()函数。
步骤1:
输入文件从HDFS到Mapper节点。通常状况下,存储数据的节点就是Mapper运行节点,避免数据在节点之间的传输(让存储靠近计算)。但总会有节点之间的数据传输存在,这时Hadoop会从离计算节点更近的存储节点上传输数据。
步骤2:
Mapper输出到内存缓冲区。Mapper的输出不是直接写到本地文件系统,而是先写入内存缓冲区,缓冲区达到必定阈值后以临时文件的形式写入本地磁盘,当整个map任务结束后把全部临时文件合并,产生最终的输出文件。(Partioner就发生在该阶段,写入缓冲区的同时执行Partioner对文件进行分区)。
步骤3:
从内存缓冲区到本地磁盘。缓冲区大小默认是100M,须要在必定条件下将缓冲区中的数据临时写入磁盘,而后从新利用这块缓冲区,这个过程称做spill(溢写),由单独线程来完成,且不影响往缓冲区写map结果的线程。溢写线程启动时不该该阻止map的结果输出,因此整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还能够往剩下的20MB内存中写,互不影响。 当溢写线程启动后,须要对这80MB空间内的key作排序(Sort)。
步骤4:
从Mapper端的本地文件系统到Reduce端。也就是Shuffle阶段,分三种状况:
步骤5:
从Reduce端内存缓冲区流向Reduce端的本地磁盘。这个过程就是Merge和Sort阶段。Merge包括内存文件的合并(5-1)以及磁盘文件合并(5-2),同时Sort以key为键进行排序分组,产生输出文件。
步骤6:
Merge和Sort以后直接流向Reduce函数进行归约处理。
步骤7:
根据用户指定的输出类型写入HDFS中,产生part-*形式的输出文件。
步骤1:
JobClient类中会把用户应用程序的Mapper类、Reducer类以及配置文件JobConf打包成一个JAR文件保存到HDFS中(上图b所示),JobClient在提交做业的同时把这个JAR文件的路径一块儿提交到JobTracker的master服务(做业调度器),如上图1所示。
步骤2:
JobClient提交Job后,JobTracker会建立一个JobInProgress来跟踪和调度这个Job做业,并将其添加到调度器的做业队列中,如上图2所示。
步骤3:
JobInProgress会根据提交的做业JAR文件中定义的输入数据集建立相应数量的TaskInProgress用于监控和调度MapTask,同时建立指定数量的TaskInProgress用于监控和调度ReduceTask,默认为1个ReduceTask,如上图3所示。
步骤4:
JobTracker经过TaskInProgress来启动做业任务(如上图4),这时会把Task对象(MapTask和ReduceTask)序列化写入相应的TaskTracker服务中(如上图5)。
步骤5:
TaskTracker收到后建立对应的TaskInProgress(不是JobTracker中的TaskInProgress,但做用相似)来监控和调度运行该Task,如上图中6所示。
步骤6:
启动具体的Task进程,TaskTracker经过TaskInProgress管理的TaskRunner对象来运行Task,如图中7所示。
步骤7:
TaskRunner自动装载用户做业JAR文件,启动一个独立Java子进程来执行Task,TaskRunner会先执行MapTask,如上图中8所示。
步骤8:
MapTask先调用Mapper,生成中间键值对,若是用户还定义了Combiner,再调用Combiner,把相同key的value作归约处理,减小Map输出的键值对集合。
步骤9:
MapTask的任务所有完成后,TaskRunner再调用ReduceTask进行来启动Reducer(如上图中11),注意MapTask和ReduceTask不必定运行在同一个TaskTracker节点中。
步骤10:
ReduceTask直接调用Reducer类处理Mapper的输出结果,生成最终结果,写入HDFS中。
Mapper的输出是写入本地磁盘的,而且是按照partion进行分区的,Reduce的输入是集群上多个Mapper输出数据中同一partion段的数据,Map任务可能会在不一样时间完成,只要其中一个Map任务完成了,ReduceTask任务就开始复制它的输出,这个阶段称为Shuffle阶段或者Copy阶段。Reduce任务能够由多个线程并发复制,默认是5个线程,能够经过参数改变。
ReduceTask如何知道从哪些TaskTrackers中获取Mapper 的输出?经过心跳检测机制,当Mapper完成任务后会通知TaskTracker,而后TaskTracker经过心跳机制将任务完成状态和结果数据传输给JobTracker,JobTracker会保存一个Mapper输出和TaskTrackers的映射关系表,Reducer中有一个线程会间歇地向JobTracker询问Mapper输出的地址,直到全部数据都取完。取完后TaskTrackers不会当即删除这些数据,由于Reducer可能会失败,在整个做业完成后JobTracker告知它们要删除的时候才去删除。