本篇文章主要从mapreduce运行做业的过程,shuffle,以及mapreduce做业失败的容错几个方面进行详解。html
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的状况下,将本身的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证全部映射的键值对中的每个共享相同的键组。 ---来源于百度百科java
MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)
MapReduce是一个并行计算与运行软件框架(Software Framework)
MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)node
mapreduce是hadoop中一个批量计算的框架,在整个mapreduce做业的过程当中,包括从数据的输入,数据的处理,数据的数据输入这些部分,而其中数据的处理部分就要map,reduce,combiner等操做组成。在一个mapreduce的做业中一定会涉及到以下一些组件:算法
一、客户端,提交mapreduce做业
二、yarn资源管理器,负责集群上计算资源的协调
三、yarn节点管理器,负责启动和监控集群中机器上的计算容器(container)
四、mapreduce的application master,负责协调运行mapreduce的做业
五、hdfs,分布式文件系统,负责与其余实体共享做业文件sql
做业的运行过程主要包括以下几个步骤:编程
1、做业的提交 2、做业的初始化 3、做业任务的分配 4、做业任务的执行 5、做业执行状态更新 6、做业完成
具体做业执行过程的流程图以下图所示:缓存
做业提交源码分析详情见:hadoop2.7之做业提交详解(上) hadoop2.7之做业提交详解(下)网络
在MR的代码中调用waitForCompletion()方法,里面封装了Job.submit()方法,而Job.submit()方法里面会建立一个JobSubmmiter对象。当咱们在waitForCompletion(true)时,则waitForCompletion方法会每秒轮询做业的执行进度,若是发现与上次查询到的状态有差异,则将详情打印到控制台。若是做业执行成功,就显示做业计数器,不然将致使做业失败的记录输出到控制台。并发
其中JobSubmmiter实现的大概过程以下:
一、向资源管理器resourcemanager提交申请,用于一个mapreduce做业ID,如图步骤2所示
二、检查做业的输出配置,判断目录是否已经存在等信息
三、计算做业的输入分片的大小
四、将运行做业的jar,配置文件,输入分片的计算资源复制到一个以做业ID命名的hdfs临时目录下,做业jar的复本比较多,默认为10个(经过参数mapreduce.client.submit.file.replication控制),
五、经过资源管理器的submitApplication方法提交做业app
一、当资源管理器经过方法submitApplication方法被调用后,便将请求传给了yarn的调度器,而后调度器在一个节点管理器上分配一个容器(container0)用来启动application master(主类是MRAppMaster)进程。该进程一旦启动就会向resourcemanager注册并报告本身的信息,application master而且能够监控map和reduce的运行状态。所以application master对做业的初始化是经过建立多个薄记对象以保持对做业进度的跟踪。
二、application master接收做业提交时的hdfs临时共享目录中的资源文件,jar,分片信息,配置信息等。并对每个分片建立一个map对象,以及经过mapreduce.job.reduces参数(做业经过setNumReduceTasks()方法设定)肯定reduce的数量。
三、application master会判断是否使用uber(做业与application master在同一个jvm运行,也就是maptask和reducetask运行在同一个节点上)模式运行做业,uber模式运行条件:map数量小于10个,1个reduce,且输入数据小于一个hdfs块
能够经过参数:
mapreduce.job.ubertask.enable #是否启用uber模式
mapreduce.job.ubertask.maxmaps #ubertask的最大map数
mapreduce.job.ubertask.maxreduces #ubertask的最大reduce数
mapreduce.job.ubertask.maxbytes #ubertask最大做业大小
四、application master调用setupJob方法设置OutputCommiter,FileOutputCommiter为默认值,表示创建作的最终输出目录和任务输出的临时工做空间
一、在application master判断做业不符合uber模式的状况下,那么application master则会向资源管理器为map和reduce任务申请资源容器。
二、首先就是为map任务发出资源申请请求,直到有5%的map任务完成时,才会为reduce任务所需资源申请发出请求。
三、在任务的分配过程当中,reduce任务能够在任何的datanode节点运行,可是map任务执行的时候须要考虑到数据本地化的机制,在给任务指定资源的时候每一个map和reduce默认为1G内存,能够经过以下参数配置:
mapreduce.map.memory.mb
mapreduce.map.cpu.vcores
mapreduce.reduce.memory.mb
mapreduce.reduce.cpu.vcores
application master提交申请后,资源管理器为其按需分配资源,这时,application master就与节点管理器通讯来启动容器。该任务由主类YarnChild的一个java应用程序执行。在运行任务以前,首先将所需的资源进行本地化,包括做业的配置,jar文件等。接下来就是运行map和reduce任务。YarnChild在单独的JVM中运行。
每一个做业和它的每一个任务都有一个状态:做业或者任务的状态(运行中,成功,失败等),map和reduce的进度,做业计数器的值,状态消息或描述看成业处于正在运行中的时候,客户端能够直接与application master通讯,每秒(能够经过参数mapreduce.client.progressmonitor.pollinterval设置)轮询做业的执行状态,进度等信息。
当application master收到最后一个任务已完成的通知,便把做业的状态设置为成功。
在job轮询做业状态时,知道任务已经完成,而后打印消息告知用户,并从waitForCompletion()方法返回。
看成业完成时,application master和container会清理中间数据结果等临时问题。OutputCommiter的commitJob()方法被调用,做业信息由做业历史服务存档,以便用户往后查询。
mapreduce确保每一个reduce的输入都是按照键值排序的,系统执行排序,将map的输入做为reduce的输入过程称之为shuffle过程。shuffle也是咱们优化的重点部分。shuffle流程图以下图所示:
在生成map以前,会计算文件分片的大小:计算源码详见:hadoop2.7做业提交详解之文件分片
而后会根据分片的大小计算map的个数,对每个分片都会产生一个map做业,或者是一个文件(小于分片大小*1.1)生成一个map做业,而后经过自定的map方法进行自定义的逻辑计算,计算完毕后会写到本地磁盘。
在这里不是直接写入磁盘,为了保证IO效率,采用了先写入内存的环形缓冲区,并作一次预排序(快速排序)。缓冲区的大小默认为100MB(可经过修改配置项mpareduce.task.io.sort.mb进行修改),当写入内存缓冲区的大小到达必定比例时,默认为80%(可经过mapreduce.map.sort.spill.percent配置项修改),将启动一个溢写线程将内存缓冲区的内容溢写到磁盘(spill to disk),这个溢写线程是独立的,不影响map向缓冲区写结果的线程,在溢写到磁盘的过程当中,map继续输入到缓冲中,若是期间缓冲区被填满,则map写会被阻塞到溢写磁盘过程完成。溢写是经过轮询的方式将缓冲区中的内存写入到本地mapreduce.cluster.local.dir目录下。在溢写到磁盘以前,咱们会知道reduce的数量,而后会根据reduce的数量划分分区,默认根据hashpartition对溢写的数据写入到相对应的分区。在每一个分区中,后台线程会根据key进行排序,因此溢写到磁盘的文件是分区且排序的。若是有combiner函数,它在排序后的输出运行,使得map输出更紧凑。减小写到磁盘的数据和传输给reduce的数据。
每次环形换冲区的内存达到阈值时,就会溢写到一个新的文件,所以当一个map溢写完以后,本地会存在多个分区切排序的文件。在map完成以前会把这些文件合并成一个分区且排序(归并排序)的文件,能够经过参数mapreduce.task.io.sort.factor控制每次能够合并多少个文件。
在map溢写磁盘的过程当中,对数据进行压缩能够提交速度的传输,减小磁盘io,减小存储。默认状况下不压缩,使用参数mapreduce.map.output.compress控制,压缩算法使用mapreduce.map.output.compress.codec参数控制。
map任务完成后,监控做业状态的application master便知道map的执行状况,并启动reduce任务,application master而且知道map输出和主机之间的对应映射关系,reduce轮询application master便知道主机所要复制的数据。
一个Map任务的输出,可能被多个Reduce任务抓取。每一个Reduce任务可能须要多个Map任务的输出做为其特殊的输入文件,而每一个Map任务的完成时间可能不一样,当有一个Map任务完成时,Reduce任务就开始运行。Reduce任务根据分区号在多个Map输出中抓取(fetch)对应分区的数据,这个过程也就是Shuffle的copy过程。。reduce有少许的复制线程,所以可以并行的复制map的输出,默认为5个线程。能够经过参数mapreduce.reduce.shuffle.parallelcopies控制。
这个复制过程和map写入磁盘过程相似,也有阀值和内存大小,阀值同样能够在配置文件里配置,而内存大小是直接使用reduce的tasktracker的内存大小,复制时候reduce还会进行排序操做和合并文件操做。
若是map输出很小,则会被复制到Reducer所在节点的内存缓冲区,缓冲区的大小能够经过mapred-site.xml文件中的mapreduce.reduce.shuffle.input.buffer.percent指定。一旦Reducer所在节点的内存缓冲区达到阀值,或者缓冲区中的文件数达到阀值,则合并溢写到磁盘。
若是map输出较大,则直接被复制到Reducer所在节点的磁盘中。随着Reducer所在节点的磁盘中溢写文件增多,后台线程会将它们合并为更大且有序的文件。当完成复制map输出,进入sort阶段。这个阶段经过归并排序逐步将多个map输出小文件合并成大文件。最后几个经过归并合并成的大文件做为reduce的输出
当Reducer的输入文件肯定后,整个Shuffle操做才最终结束。以后就是Reducer的执行了,最后Reducer会把结果存到HDFS上。
在Hadoop集群环境中,大部分map 任务与reduce任务的执行是在不一样的节点上。固然不少状况下Reduce执行时须要跨节点去拉取其它节点上的map任务结果。若是集群正在运行的job有不少,那么task的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,咱们不能限制,能作的就是最大化地减小没必要要的消耗。还有在节点内,相比于内存,磁盘IO对job完成时间的影响也是可观的。从最基本的要求来讲,咱们对Shuffle过程的指望能够有:
一、完整地从map task端拉取数据到reduce 端。
二、在跨节点拉取数据时,尽量地减小对带宽的没必要要消耗。
三、减小磁盘IO对task执行的影响。
在MapReduce计算框架中,主要用到两种排序算法:快速排序和归并排序。在Map任务发生了2次排序,Reduce任务发生一次排序:
一、第1次排序发生在Map输出的内存环形缓冲区,使用快速排序。当缓冲区达到阀值时,在溢写到磁盘以前,后台线程会将缓冲区的数据划分红相应分区,在每一个分区中按照键值进行内排序。
二、第2次排序是在Map任务输出的磁盘空间上将多个溢写文件归并成一个已分区且有序的输出文件。因为溢写文件已经通过一次排序,因此合并溢写文件时只需一次归并排序便可使输出文件总体有序。
三、第3次排序发生在Shuffle阶段,将多个复制过来的Map输出文件进行归并,一样通过一次归并排序便可获得有序文件。
既然有做业的运行,确定会有做业的失败,做业的失败(不考虑硬件,平台缘由引发的失败)可能会存在不一样的问题,以下:
用户代码抛出异常(代码没写好):这种状况任务JVM会在退出以前向application master发送错误报告,并记录进用户日志,application master对该做业标记为failed,并释放掉占有的资源容器。
另外一种就是JVM忽然退出,这种状况节点管理器会注意到进程已经退出,并通知application master将此任务标记为失败,若是是由于推测执行而致使任务被终止,则不会被被标记为失败。而任务挂起又不一样,一旦application master注意到有一段时间没有收到进度更新,便会把任务标记为失败,默认为10分钟,参数mapreduce.task.timeout控制application master被告知一个任务失败,将会从新调度该任务执行(会在与以前失败的不一样节点上运行),默认重试4次,若是四次都失败,则做业断定为失败,参数控制为:
mapreduce.map.maxattempts
mapreduce.reduce.maxattempts
AM也可能因为各类缘由(如网络问题或者硬件故障)失效,Yarn一样会尝试重启AM
能够为每一个做业单独配置AM的尝试重启次数:mapreduce.am.max-attempts,默认值为2
Yarn中的上限一块儿提升:yarn.resourcemanager.am.nax-attempts,默认为2,单个应用程序不能够超过这个限制,除非同时修改这两个参数。
恢复过程:application master向资源管理器发送周期性的心跳。当application master失败时,资源管理器会检测到该失败,并在一个新的容器中启动application master,并使用做业历史来恢复失败的应用程序中的运行任务状态,使其没必要从新运行,默认状况下恢复功能是开启的,yarn.app.mapreduce.am.job.recovery.enable控制客户端向application master轮询做业状态时,若是application master运行失败了,则客户端会向资源管理器resourcemanager询问和缓存application master地址。
若是节点管理器崩溃或者运行很是缓慢,则就会中止向资源管理器发送心跳信息,若是10分钟(能够经过参数yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms设置)资源管理器没有收到一条心跳信息,则资源管理器将会通知中止发送心跳的节点管理器,并将其从本身的资源池中移除该节点管理器,在该节点上的application master和任务的失败,都经过如上两种恢复机制进行恢复。
资源管理器失败时一个很严重的问题,全部的任务将不能被分配资源,做业和容器都没法启动,那么整个经过yarn控制资源的集群都处于瘫痪状态。
容错机制:resourcemanager HA 详情见:hadoop高可用安装和原理详解
更多hadoop生态文章见: hadoop生态系列
参考:
《Hadoop权威指南 大数据的存储与分析 第四版》