1、MapReduce模型框架css
MapReduce是一个用于大规模数据处理的分布式计算模型,最初由Google工程师设计并实现的,Google已经将完整的MapReduce论文公开发布了。其中的定义是,MapReduce是一个编程模型,是一个用于处理和生成大规模数据集的相关的实现。用户定义一个map函数来处理一个Key-Value对以生成一批中间的Key-Value对,再定义一个reduce函数将全部这些中间的有相同Key的Value合并起来。不少现实世界中的任务均可用这个模型来表达。html
一、MapReduce模型node

源数据 中间数据 结果数据算法
MapReduce模型如上图所示,Hadoop MapReduce模型主要有Mapper和Reducer两个抽象类。Mapper端主要负责对数据的分析处理,最终转化为Key-Value的数据结构;Reducer端主要是获取Mapper出来的结果,对结果进行统计。编程
二、MapReduce框架windows

整个过程如上图所示,包含4个独立的实体,以下所示:数组
- client:提交MapReduce做业,好比,写的MR程序,还有CLI执行的命令等。
- jobtracker:协调做业的运行,就是一个管理者。
- tasktracker:运行做业划分后的任务,就是一个执行者。
- hdfs:用来在集群间共享存储的一种抽象的文件系统。
说明:
其实,还有namenode就是一个元数据仓库,就像windows中的注册表同样。secondarynamenode能够当作namenode的备份。datanode能够当作是用来存储做业划分后的任务。在DRCP中,master是namenode,secondarynamenode,jobtracker,其它的3台slaver都是tasktracker,datanode,且tasktracker都须要运行在HDFS的datanode上面。
MapReduce框架中组成部分及它们之间的关系,以下所示:
运行在Hadoop上的MapReduce应用程序最基本的组成部分包括:一是Mapper抽象类,一是Reducer抽象类,一是建立JobConf的执行程序。
JobTracker是一个master服务,软件启动以后JobTracker接收Job,负责调度Job的每个子任务Task运行于TaskTracker上,而且监控它们的运行,若是发现有失败的Task就从新运行它,通常状况下应该把JobTracker部署在单独的机器上。
TaskTracker是运行在多个节点上的slaver服务。TaskTracker主动与JobTracker通讯(与DataNode和NameNode类似,经过心跳来实现)接收做业,并负责直接执行每个任务。
每个Job都会在用户端经过JobClient类将应用程序以及配置参数Configuration打包成JAR文件存储在HDFS中,并把路径提交到JobTracker的master服务,而后由master建立每个Task(即MapTask和ReduceTask)将它们分发到各个TaskTracker服务中去执行。
JobClient提交Job后,JobTracker会建立一个JobInProgress来跟踪和调度这个Job,并把它添加到Job队列之中。JobInProgress会根据提交的任务JAR中定义的输入数据集(已分解成FileSplit)建立对应的一批TaskInProgress用于监控和调度MapTask,同时建立指定书目的TaskInProgress用于监控和调度ReduceTask,默认为1个ReduceTask。
JobTracker启动任务时经过每个TaskInProgress来运行Task,这时会把Task对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会建立对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress,做用相似)用于监控和调度该Task。启动具体的Task进程经过TaskInProgress管理,经过TaskRunner对象来运行。TaskRunner会自动装载任务JAR文件并设置好环境变量后,启动一个独立的Java Child进程来执行Task,即MapTask或者ReduceTask,但它们不必定运行在同一个TaskTracker中。
一个完整的Job会自动依次执行Mapper、Combiner(在JobConf指定Combiner时执行)和Reducer,其中Mapper和Combiner是由MapTask调用执行,Reduce则由ReduceTask调用,Combiner实际也是Reducer接口类的实现。Mapper会根据Job JAR中定义的输入数据集<key1, value1>对读入,处理完成生成临时的<key2, value2>对,若是定义了Combiner,MapTask会在Mapper完成调用该Combiner将相同Key的值作合并处理,以减小输出结果集。MapTask的任务所有完成后,交给ReduceTask进程调用Reducer处理,生成最终结果<Key3, value3>对。
2、MapReduce工做原理

一、做业的提交
JobClient的submitJob()方法实现的做业提交过程,以下所示:
- 经过JobTracker的getNewJobId()方法,向jobtracker请求一个新的做业ID。参见步骤2。
- 检查做业的输出说明,也就是说要指定输出目录的路径,可是输出目录还不能存在(防止覆盖输出结果),若是不知足条件,就会将错误抛给MapReduce程序。
- 检查做业的输入说明,也就是说若是输入路径不存在,做业也无法提交,若是不知足条件,就会将错误抛给MapReduce程序。
- 将做业运行所需的资源,好比做业JAR文件、配置文件等复制到HDFS中。参见步骤3。
- 经过JobTracker的submitJob()方法,告诉jobtracker做业准备执行。参见步骤4。
二、做业的初始化
- JobTracker接收到对其submitJob()方法调用以后,就会把此调用放入一个内部队列当中,交由做业调度器进行调度。(说明:Hadoop做业的调度器常见的有3个:先进先出调度器;容量调度器;公平调度器。Hadoop做业调度器采用的是插件机制,即做业调度器是动态加载的、可插拔的,同时第三方能够开发本身的做业调度器,参考资料”大规模分布式系统架构与设计实战”)。参见步骤5。
- 初始化包括建立一个表示正在运行做业的对象——封装任务的记录信息,以便跟踪任务的状态和进程。参见步骤5。
- 接下来要建立运行任务列表,做业调度器首先从共享文件系统中获取JobClient已计算好的输入分片信息,而后为每一个分片建立一个map任务(也就是说mapper的个数与分片的数目相同)。参见步骤6。(建立reduce任务的数量由JobConf的mapred.reduce.task属性决定,它是用setNumReduceTasks()方法来设置的,而后调度器建立相应数量的要运行的reduce任务,默认状况只有一个reducer)
三、任务的分配
- tasktracker自己运行一个简单的循环来按期发送”心跳(heartbeat)”给jobtracker。什么是心跳呢?就是tasktracker告诉jobtracker它是否还活着,同时心跳也充当二者之间的消息通讯,好比tasktracker会指明它是否已经作好准备来运行新的任务了,若是是,管理者jobtracker就会给执行者tasktracker分配一个任务。参见步骤7。
- 固然,在管理者jobtracker为执行者tasktracker选择任务以前,jobtracker必须先选定任务所在的做业。一旦选择好做业,jobtracker就能够给tasktracker选定一个任务。如何选择一个做业呢?固然是Hadoop做业的调度器了,它就像是Hadoop的中枢神经系统同样,默认的方法是简单维护一个做业优先级列表。(对于调度算法的更深理解能够学习操做系统的做业调度算法,进程调度算法,好比先来先服务(FCFS)调度算法,短做业优先(SJF)调度算法,优先级调度算法,高响应比优先调度算法,时间片轮转调度算法,多级反馈队列调度算法等。若是从更高的角度来看调度算法,实际上是一种控制和决策的策略选择。)
四、任务的执行
- 做业选择好了,任务也选择好了,接下来要作的事情就是任务的运行了。首先,从HDFS中把做业的JAR文件复制到tasktracker所在的文件系统,同时,tasktracker将应用程序所须要的所有文件从分布式缓存复制到本地磁盘,也就是从HDFS文件系统复制到ext4等文件系统之中。参见步骤8。
- tasktracker为任务新建一个本地工做目录,并把JAR文件中的内容解压到这个文件夹中,新建一个TaskRunner实例来运行该任务。
- TaskRunner启动一个新的JVM(参见步骤9)来运行每一个任务(参见步骤10),以便用户定义的map和reduce函数的任何缺陷都不会影响TaskTracker守护进程(好比致使它崩溃或者挂起)。须要说明一点的是,对于map和reduce任务,tasktracker有固定数量的任务槽,准确数量由tasktracker核的数量和内存大小来决定,好比一个tasktracker可能同时运行两个map任务和reduce任务。map任务和reduce任务中关于数据本地化部分再也不讲解,由于DRCP没有用到,只要理解本地数据级别就能够了,好比node-local,rack-local,off-switch。
- 子进程经过umbilical接口与父进程进行通讯,任务的子进程每隔几秒便告诉父进程它的进度,直到任务完成。
五、进度和状态的更新

- MapReduce是Hadoop的一个离线计算框架,运行时间范围从数秒到数小时,所以,对于咱们而言直到做业进展是很重要的。
- 一个做业和每一个任务都有一个状态信息,包括做业或任务的运行状态(好比,运行状态,成功完成,失败状态)、Map和Reduce的进度、计数器值、状态消息和描述(能够由用户代码来设置)等。
- 这些消息经过必定的时间间隔由Child JVM—>TaskTracker—>JobTracker汇聚。JobTracker将产生一个代表全部运行做业及其任务状态的全局视图。能够经过Web UI查看。同时JobClient经过每秒查询JobTracker来得到最新状态,输出到控制台上。
- 如今可能会有一个疑问,这些状态信息在做业执行期间不断变化,它们是如何与客户端进行通讯的呢?详细细节不在讲解,参考资料《Hadoop权威指南》。
六、做业的完成
- 当jobtracker收到做业最后一个任务已完成的通知后,便把做业的状态设置为”成功”。而后,在JobClient查询状态时,便知道做业已成功完成,因而JobClient打印一条消息告知用户,最后从runJob()方法返回。
说明:
MapReduce容错,即做业失败状况再也不讲解,参考资料《Hadoop权威指南》。
3、Shuffle阶段和Sort阶段
若是说以上是从物理实体的角度来说解MapReduce的工做原理,那么以上即是从逻辑实体的角度来说解MapReduce的工做原理,以下所示:
- 输入分片: 在进行map计算以前,mapreduce会根据输入文件计算输入分片,每一个输入分片针对一个map任务,输入分片存储的并不是数据自己,而是一个分片长度和一个记录数据位置的数组,输入分片每每和hdfs的block关系很密切。假如咱们设定hdfs块的大小是64MB,若是咱们有三个输入文件,大小分别是3MB、65MB和127MB,那么mapreduce会把3MB文件分为一个输入分片,65MB则是两个输入分片,而127MB也是两个输入分片,就会有5个map任务将执行。
- map阶段: 就是编写好的map函数,并且通常map操做都是本地化操做,也就是在数据存储节点上进行。
- combiner阶段: combiner阶段是能够选择的,combiner本质也是一种reduce操做。Combiner是一个本地化的reduce操做,它是map运算的后续操做,主要是在map计算出中间文件后作一个简单的合并重复key值的操做,好比,咱们对文件里的单词频率作统计,若是map计算时候碰到一个hadoop单词就会记录为1,这篇文章里hadoop可能会出现屡次,那么map输出文件冗余就会不少,所以在reduce计算前对相同的key作一个合并操做,文件就会变小,这样就提升了宽带的传输效率。可是combiner操做是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终结果,好比:若是计算只是求总数,最大值,最小值可使用combiner,可是若是作平均值计算使用combiner,那么最终的reduce计算结果就会出错。
- shuffle阶段: 将map的输出做为reduce输入的过程就是shuffle。通常mapreduce计算的都是海量数据,map输出的时候不可能把全部文件都放到内存中进行操做,所以map写入磁盘的过程十分的复杂,更况且map输出的时候要对结果进行排序,内存开销是很大的。map在作输出的时候会在内存里开启一个环形内存缓冲区,这个缓冲区是专门用来输出的,默认大小是100MB,而且在配置文件里为这个缓冲区设定了一个阀值,默认是0.80(这个大小和阀值都是能够在配置文件里进行配置的),同时map还会为输出操做启动一个守护线程,若是缓冲区的内存达到了阀值的80%时候,这个守护线程就会把内容写到磁盘上,这个过程叫spill。另外的20%内存能够继续写入要写进磁盘的数据,写出磁盘和写入内存操做是互不干扰的,若是缓存区被填满了,那么map就会阻塞写入内存的操做,让写出磁盘操做完成后再继续执行写入内存操做。写出磁盘前会有个排序操做,这个是在写出磁盘操做的时候进行的,不是在写入内存的时候进行的,若是还定义了combiner函数,那么排序后还会执行combiner操做。每次spill操做也就是写出磁盘操做的时候就会写一个溢出文件,即在作map输出的时候有几回spill操做就会产生多少个溢出文件。这个过程里还会有一个partitioner操做,其实partitioner操做和map阶段的输入分片很像,一个partitioner对应一个reduce做业,若是mapreduce操做只有一个reduce操做,那么partitioner就只有一个。若是有多个reduce操做,那么partitioner对应的就会有多个。所以,能够把partitioner看做reduce的输入分片。到了reduce阶段就是合并map输出文件,partitioner会找到对应的map输出文件,而后进行复制操做,复制操做时reduce会开启几个复制线程,这些线程默认个数是5个(也能够在配置文件中更改复制线程的个数),这个复制过程和map写出磁盘的过程相似,也有阀值和内存大小,阀值同样能够在配置文件里配置,而内存大小是直接使用reduce的tasktracker的内存大小,复制的时候reduce还会进行排序操做和合并文件操做,这些操做完毕以后就会进行reduce计算。
- reduce阶段: 和map函数同样,是编写好的reduce函数,最终结果是存储在hdfs上的。
参考文献:缓存
[1] MapReduce编程模型的要点: http://blog.sina.com.cn/s/blog_4a1f59bf0100tgqj.htmlmarkdown
[2] Hadoop权威指南(第三版)数据结构
[3] Hadoop应用开发技术详解
[4] mapreduce中reducers个数设置: http://www.2cto.com/os/201312/263998.html
[5] 操做系统典型调度算法: http://see.xidian.edu.cn/cpp/html/2595.html
[6] MapReduce框架结构: http://www.cppblog.com/javenstudio/articles/43073.html
[7] MapReduce框架详解: http://www.cnblogs.com/sharpxiajun/p/3151395.html