概述:MapReduce是hadoop的核心组件之一,能够经过MapReduce很容易在hadoop平台上进行分布式的计算编程。本文组织结果以下:首先对MapReduce架构和基本原理进行概述,其次对整个MapReduce过程的生命周期进行详细讨论。 编程
参考文献:董西城的《Hadoop技术内幕》以及若干论坛文章,没法考证出处。 数组
MapReduce主要分为Map和Reduce两个过程,采用了M/S的设计架构。在1.0系列中,主要角色包括:Client, JobTracke, TaskTracker和Task 架构
Client:用户须要执行的Job在Client进行配置,例如编写MapReduce程序,指定输入输出路径,指定压缩比例等等,配置好后,由客户端提交给JobTracker。每一个做业Job会被分红若干个任务Task。 并发
在整个MapReduce执行过程当中,数据须要通过Mapper,中间过程shuffle,Reducer的处理。 app
Mapper主要执行MapTask,将分配给它的split解析为若干个K-V对做为map函数的输入。例如在Wordcount程序中,K=字符串偏移量,V=一行字符串。而后依次调用map()进行处理,输出仍为K-V对。此时<K=word,V=1>。 分布式
中间过程也分为Map端和Reduce端执行 函数
在mapper端,每一个Map Task都有一个内存缓冲区,存储着map函数的输出结果,当缓冲区快满的时候须要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个Map Task结束后再对磁盘中这个Map Task产生的全部临时文件作合并,生成最终的正式输出文件,而后等待reduce task来拉取数据。 oop
Partition spa
Partitioner根据map函数输出的K-V对以及Reduce Task的数量来决定当前这对Map输出的K-V由哪一个Reduce Task来处理。首先对key进行hash运算,再以Reduce task的数量对它取模;取模是为了平均Reduce的处理能力,也能够定制并设置到规定job上。 操作系统
Spill
对于map()函数的输出数据K-V对,要写入内存缓冲区,该内存缓冲区的做用是批量收集Map结果,减小IO的影响。再写入磁盘文件。将K-V对序列化为字节数组,而后将K-V对以及partition的分组结果写入缓冲区。该缓冲区是有大小默认限制为100M,因此当Map任务输出结果过多时,须要对缓冲进行刷新,将数据写入磁盘。向磁盘写数据的过程称为Spill,意为溢写,由单独的进程完成,不影响Map结果写入的线程操做。为了达到这个目的,经过设置溢写比例spill.percent(默认0.8)来实现,即当缓冲区中数据达到80%,就启动Spill进程,锁定这80%的缓冲,进行溢写过程;与此同时,其余线程能够继续将Map的输出结果写入到剩下20%的缓冲区中,互不影响。
Sort
Spill线程启动后,须要对将要写入磁盘的数据进行处理,对已经序列化为字节的key进行排序。因为Map任务的结果要交给不一样Reduce任务来处理,因此须要将交个同一个Reduce任务的数据合并在一块儿。这个合并过程在写入缓冲区时并未执行,而是由Spill进程在写入到磁盘时进行合并。若是有不少的K-V对须要提交给一个Reduce任务,那么应该将这些K-V进行拼接,减小与partitioner相关的索引记录。<K=word, V=1,V=1,V=1>
Merge
每次Spill进行溢写操做,都会在磁盘上产生一个溢写文件。若是缓冲区不够大或者Map输出结果很大,那么会屡次执行溢写文件。因此须要将这些溢写文件归并为一个文件,该过程称为Merge。Merge所作的操做就是未来自不一样map task结果中,key相同的K-V归并为组,造成K-[v1,v2,v…]。由于是将多个文件合并为一个文件,因此可能也有相同的Key存在,若是在client端设置过combiner,则会调用他来合并相同的Key。
至此,Map端的工做所有结束,最后这个文件放在TaskTracker可以获取到的本地目录内,每一个reduce task不断经过RPC从JobTracker处获取map任务是否完成的信息,若是获知某台Tasktracker上的map任务完成,则shuffle过程后半段开始启动。
在Reduce端的中间过程,就是在reduce执行以前所进行的工做,不断将各个map输出的最终结果进行拉取,而后进行merge操做。
Copy
简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),经过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。由于map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。
Merge
这里的merge如map端的merge动做,只是数组中存放的是不一样map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,由于Shuffle阶段Reducer不运行,因此应该把绝大部分的内存都给Shuffle用。这里须要强调的是,merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认状况下第一种形式不启用。一样的要在内存进行sort操做。当内存中的数据量到达必定阈值,就启动内存到磁盘的merge。与map 端相似,这也是溢写的过程,这个过程当中若是设置有Combiner,也是会启用的,而后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,而后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。
Reduce Task 读取中间过程shuffle产生并放在HDFS上的最终文件,不断地调用reduce()函数来处理输入数据,该输入数据格式为<K=word,V=n,V=m…>,最后输出到HDFS上。
本节对MapReduce做业的处理过程从提交到完成进行概述。做业处理的整个流程包括:
做业提交与初始化→任务调度与监控→运行环境准备→任务的执行→任务结束
用户须要执行的做业在客户端进行配置,例如编写MapReduce程序,指定输入输出路径,指定压缩比例等等,配置好后,由客户端提交给JobTracker。用户提交做业后,首先由JobClient 实例将做业相关信息(好比将程序jar 包、做业配置文件、分片元信息文件等)上传到HDFS上,其中,分片元信息文件记录了每一个输入分片的逻辑位置信息。而后JobClient经过RPC 通知JobTracker。JobTracker 收到新做业提交请求后,由做业调度模块对做业进行初始化:为做业建立一个JobInProgress 对象以跟踪做业运行情况,而JobInProgress 则会为每一个Task 建立一个TaskInProgress 对象以跟踪每一个任务的运行状态,TaskInProgress 可能须要管理多个"Task Attempt"
TaskTracker 周期性地经过Heartbeat 向JobTracker 汇报本节点的资源使用状况,一旦出现空闲资源,JobTracker 会按照必定的策略选择一个合适的任务使用该空闲资源,这由任务调度器完成。任务调度器是一个可插拔的独立模块,且为双层架构,即首先选择做业,而后从该做业中选择任务,其中,选择任务时须要重点考虑数据本地性。此外,JobTracker 跟踪做业的整个运行过程,并为做业的成功运行提供全方位的保障。首先,当TaskTracker 或者Task 失败时,转移计算任务;其次,当某个Task 执行进度远落后于同一做业的其余Task 时,为之启动一个相同Task,并选取计算快的Task 结果做为最终结果。
运行环境准备包括JVM 启动和资源隔离, 均由TaskTracker 实现。TaskTracker 为每一个Task 启动一个独立的JVM 以免不一样Task 在运行过程当中相互影响;同时,TaskTracker 使用了操做系统进程实现资源隔离以防止Task 滥用资源。
TaskTracker 为Task 准备好运行环境后,便会启动Task。在运行过程当中,每一个Task 的最新进度首先由Task 经过RPC 汇报给TaskTracker,再由TaskTracker汇报给JobTracker。
JobTracker在接受到最后一个任务运行完成后,会将任务标志为成功。此时会作删除中间结果等善后处理工做。
本文简单讨论总结了MapReduce的架构和做业的生命周期,若是有错误之处,还望指正。