MapReduce的一点理解

对于MapReduce编程,大几率的流程用过的人或多或少都清楚,可是归结到细节上,就有的地方不清楚了,下面根据本身的疑问,加上从网上各处,找到的被人的描述,最本身的疑问作出回答。算法

1. MapReduce 和 HDFS有什么关系?

  首先,HDFS和MapReduce是Hadoop最核心的设计;编程

  对于HDFS,即Hadoop Distributed File System,它是Hadoop的存储基础,是数据层面的,提供海量的数据存储;而MapReduce,则是一种引擎或者编程模型,能够理解为数据的上一层,咱们能够经过编写MapReduce程序,对海量的数据进行计算处理。这就相似于咱们经过 检索(MapReduce)全部文件(HDFS),找到咱们想要的结果。数组

  其次,MapReduce中JobTracker和TaskTracker分别对应于HDFS中的NameNode和DataNode网络

 

2.  MapReduce处理中,数据的流程是什么?

在客户端、JobTracker、TaskTracker的层次来分析MapReduce的工做原理的:  app

  a. 在客户端启动一个做业。函数

  b. 向JobTracker请求一个Job ID。oop

  c. 将运行做业所须要的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息。这些文件都存放在JobTracker专门为该做业建立的文件夹中。文件夹名为该做业的Job ID。JAR文件默认会有10个副本(mapred.submit.replication属性控制);输入划分信息告诉了JobTracker应该为这个做业启动多少个map任务等信息。线程

  d. JobTracker接收到做业后,将其放在一个做业队列里,等待做业调度器对其进行调度(这里是否是很像微机中的进程调度呢,呵呵),看成业调度器根据本身的调度算法调度到该做业时,会根据输入划分信息为每一个划分建立一个map任务,并将map任务分配给TaskTracker执行。对于map和reduce任务,TaskTracker根据主机核的数量和内存的大小有固定数量的map槽和reduce槽。这里须要强调的是:map任务不是随随便便地分配给某个TaskTracker的,这里有个概念叫:数据本地化(Data-Local)。意思是:将map任务分配给含有该map处理的数据块的TaskTracker上,同时将程序JAR包复制到该TaskTracker上来运行,这叫“运算移动,数据不移动”。而分配reduce任务时并不考虑数据本地化。设计

  e. TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着不少的信息,好比当前map任务完成的进度等信息。当JobTracker收到做业的最后一个任务完成信息时,便把该做业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户。blog

     

若是具体从map端和reduce端分析,能够参考上面的图片,具体以下:

Map端: 

  a. 每一个输入分片会让一个map任务来处理,默认状况下,以HDFS的一个块的大小(默认为64M)为一个分片,固然咱们也能够设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中建立一个溢出文件,将该缓冲区中的数据写入这个文件。
  b. 在写入磁盘以前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样作是为了不有些reduce任务分配到大量数据,而有些reduce任务却分到不多数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。而后对每一个分区中的数据进行排序,若是此时设置了Combiner,将排序后的结果进行Combia操做,这样作的目的是让尽量少的数据写入到磁盘。
  c. 当map任务输出最后一个记录时,可能会有不少的溢出文件,这时须要将这些文件合并。合并的过程当中会不断地进行排序和combia操做,目的有两个:1.尽可能减小每次写入磁盘的数据量;2.尽可能减小下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减小网络传输的数据量,这里能够将数据压缩,只要将mapred.compress.map.out设置为true就能够了。
  d. 将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪一个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。因此JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就ok了哦。

  到这里,map端就分析完了。那到底什么是Shuffle呢?Shuffle的中文意思是“洗牌”,其实Shuffle也是一个很复杂的过程,这里能够参照下面的第三个问题。

Reduce端: 

  a.Reduce会接收到不一样map任务传来的数据,而且每一个map传来的数据都是有序的。若是reduce端接受的数据量至关小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用做此用途的堆空间的百分比),若是数据量超过了该缓冲区大小的必定比例(由mapred.job.shuffle.merge.percent决定),则对数据合并后溢写到磁盘中。
  b.随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样作是为了给后面的合并节省时间。其实无论在map端仍是reduce端,MapReduce都是反复地执行排序,合并操做,如今终于明白了有些人为何会说:排序是hadoop的灵魂。
  c.合并的过程当中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽量地少,而且最后一次合并的结果并无写入磁盘,而是直接输入到reduce函数。

 

3.   Map处理数据后,到Reduce获得数据以前,数据的流程是什么?

   其实,将map处理的结果,传输到reduce上的过程,在MapReduce中,能够看作shuffle的过程。

  在map端,每一个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候须要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的全部临时文件作合并,生成最终的正式输出文件,而后等待reduce task来拉数据。 

  a.  在map task执行时,它的输入数据来源于HDFS的block,固然在MapReduce概念中,map task只读取split。Split与block的对应关系多是多对一,默认是一对一。

    b. 在通过mapper的运行后,咱们得知mapper的输出是这样一个key/value对。到底当前的key应该交由哪一个reduce去作呢,是须要如今决定的。 MapReduce提供Partitioner接口,它的做用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪一个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,若是用户本身对Partitioner有需求,能够订制并设置到job上。接下来,须要将数据写入内存缓冲区中,缓冲区的做用是批量收集map结果,减小磁盘IO的影响。咱们的key/value对以及Partition的结果都会被写入缓冲区。固然写入以前,key与value值都会被序列化成字节数组。

  c.  这个内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果不少时,就可能会撑爆内存,因此须要在必定条件下将缓冲区中的数据临时写入磁盘,而后从新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写,字面意思很直观。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不该该阻止map的结果输出,因此整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还能够往剩下的20MB内存中写,互不影响。 
        当溢写线程启动后,须要对这80MB空间内的key作排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节作的排序。 存缓冲区没有对将发送到相同reduce端的数据作合并,那么这种合并应该是体现是磁盘文件中的。从官方图上也能够看到写到磁盘中的溢写文件是对不一样的reduce端的数值作过合并。因此溢写过程一个很重要的细节在于,若是有不少个key/value对须要发送到某个reduce端去,那么须要将这些key/value值拼接到一块,减小与partition相关的索引记录。 

    在map端的过程,可参考下图: 

       

至此,map端的全部工做都已结束,最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内。每一个reduce task不断地经过RPC从JobTracker那里获取map task是否完成的信息,若是reduce task获得通知,获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。 

        简单地说,reduce task在执行以前的工做就是不断地拉取当前job里每一个map task的最终结果,而后对从不一样地方拉取过来的数据不断地作merge,也最终造成一个文件做为reduce task的输入文件。见下图:

                 

   a. Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),经过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。由于map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。 
   b. Merge阶段。这里的merge如map端的merge动做,只是数组中存放的是不一样map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,由于Shuffle阶段Reducer不运行,因此应该把绝大部分的内存都给Shuffle用。这里须要强调的是,merge有三种形式:1)内存到内存  2)内存到磁盘  3)磁盘到磁盘。默认状况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达必定阈值,就启动内存到磁盘的merge。与map 端相似,这也是溢写的过程,这个过程当中若是你设置有Combiner,也是会启用的,而后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,而后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。 
     c. Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。为何加引号?由于这个文件可能存在于磁盘上,也可能存在于内存中。对咱们来讲,固然但愿它存放于内存中,直接做为Reducer的输入,但默认状况下,这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。而后就是Reducer执行,把结果放到HDFS上。

    

上面的文章,部分是本身写的,一部分摘自别人通俗易懂的理解,具体可参考:

  http://weixiaolu.iteye.com/blog/1474172

  http://langyu.iteye.com/blog/992916

相关文章
相关标签/搜索