MapReduce 的核心流程

下文中新旧的意思分别表明 Hadoop 0.20 先后。缘由是 MapReduce 在这个版本进行了一次大改,主要的特色就是划分了新旧两个包名。新版的特色是使用了抽象类代替一些可扩展的接口,以及增长了 Context 的概念。好比说,MapContext中就封装了获取切片、读取 Record 等功能,而MapContextImpl就是对 reader 和 writer 逻辑的封装。java

有时候,咱们会不当心配置错误,好比把mapreduce.job.combine.class配置到了mapred.combiner.class,就可能致使启用了旧的模块。node

org.apache.hadoop.mapred.TaskStatus.Phase这个枚举中能够看到 task 定义的全部阶段:STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP。固然咱们在理解的时候可能加入一些本身定义的阶段(根据动做),好比 Split。算法

Split

整个分片在客户端进行。Split 实际上是是逻辑上的分片,但其大小依赖于底层的文件块。这个公式搞 Hadoop 的人都很熟悉:apache

protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
}
复制代码

它其实就是保证了minSize \le goalSize \le blockSize。它是FileInputFormat里专门设置的规则,且这个并非固定的,而是针对每一个文件(由于 MapReduce 支持传入路径,并且是多个路径)。这里你就能够看到一个能够调优的点了:显然这里须要进行和 namenode 的通讯,这样就须要容许设置mapreduce.input.fileinputformat.list-status.num-threads增长遍历文件信息时的线程数,这很合理(固然,略有一些奇怪的地方是这个配置实际上影响的是客户端)。数据结构

Split的核心类是InputFormat。这个接口必须实现getSplitsgetRecordReader方法,这对应着将输入的文件划分为分片并映射到 Map 的输入。通常来讲,咱们默认使用的是TextInputFormat,它使用了LineRecordReader读取模式和FileInputFormat的按块切分。其余的模式也很重要,好比CombineFileInputFormat(能够读取若干小文件,这个和 SequenceFile 合在一块儿不一样,只是打包了元数据),好比KeyValueTextInputFormat能够读取键值对,好比NLineInputFormat容许指定切分的行数(其实我以为应该实如今TextInputFormat里)。LineRecordReader还有一个细节是它实现了对压缩文件的自解释。app

这里仍是有一些值得细想的地方。好比,当获取了文件的分块信息以后,如何对任务的划分进行优化(本地化)?若是是本地文件,如何写入 HDFS 并补全元信息?svn

显然,本地化已经涉及到做业调度,因此这里的逻辑应该在 AM 处理。所以这里JobSubmitter会将数据进行序列化(具体能够参考writeJobSplitMetaInfo),以后在JobImpl.InitTransitioncreateSplits方法能够看到它读取了一组TaskSplitMetaInfo信息。JobImpl中只会建立 Map 和 Reduce 任务,具体的调度就要参考调度器的实现了。函数

listStatus调用getFileSystem后,会根据 schema 获得对应的FileSystem实现类,本地就是LocalFileSystem。这里并无作特殊的处理,而是将这个位置也做为元信息传递了。oop

Map

Mapper里的MapContext会从InputFormat里取 RecordReader 做为记录读取的方法。这里的实现比较简单并且不过重要,重点是理解 Mapper 的run方法:优化

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
    while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    } finally {
    cleanup(context);
    }
}
复制代码

其中setupcleanup只调用一次,都是平时 MR 常见的回调点。

咱们知道在 Map 中会将结果分区写入不一样文件。这里提供了一个环形缓冲区(相似于其余的缓冲设计,好比消息队列)MapOutputBuffer来平滑输出,它继承了MapOutputCollector。在每次缓冲区溢出(spill,其实阈值为80%)后,都会写入一个临时文件。以后,Map 会合并临时文件到最终的分区文件。这个过程有大量的配置优化点,好比mapreduce.task.io.sort.mb(控制缓冲区大小),mapreduce.task.io.sort.factor(控制进行 merge 的文件数,另外这个也针对 Reduce),mapreduce.map.output.compress(进行 map 结果压缩)等等。

注意,分区不表明写入不一样文件,而是文件的不一样位置

Spill

spill 过程是由独立的线程执行的,不过为何不能和 Reducer 同样多个线程写呢?主要是这种多写者问题几乎必然是有锁的(无锁算法必须使用其余特定数据结构),效率并不比单写者高。

在写入磁盘前,Map 会进行排序(sort),这点比较容易忽略。你能够这样记住它:MapOutputBuffer实现了IndexedSortable(固然,其实原本是 MapReduce 提到了老是排序的好处)。这个接口颇有意思,这里默认实现是 Hadoop 本身实现的快速排序,它会对分区先进行排序,然后保持分区内有序(这个也能够经过map.sort.class设置)。至于写到什么文件里,你能够在getSpillFileForWrite里找到(通常都是 out 后缀),好比%s_spill_%d.out(前面是做业号,后面是溢出的次数)。

在网上流传着一种说法:Combiner 应该是一个纯函数。不过,其实咱们知道,Google 在论文里就提到了 Map 和 Reduce 也应该是纯函数(不过也能够在必定范围内违反)。固然这个传言是有缘由的,由于 Combine 被屡次调用了:在sortAndSpill的最后,以及你们都知道的,在整个 Map 阶段的最后,它会调用mergeParts方法。

注意Combine 和 Merge 的含义是不一样的。Combine 是按照原文教义的,是一种 accelerate;Merge 是处理临时文件的。不管是 Map 仍是 Reduce 阶段都是缓冲区再加文件合并。那么能不能直接写到结果文件呢?其实理论上能够,可是没有意义,由于咱们每次有新的溢出时,都要和旧的结果文件进行合并。这样作也拖慢了 Map 的输出。

Spill 以后可选的步骤是合并(combine)和压缩。Map 的压缩不只仅是最后的输出,它在输出临时文件时就会进行了,因此能够极大提升传输的效率(可是以 CPU 占用为代价)。

Spill 生成的全部分区都在一个文件里。所以,它须要元信息来标记分区的范围,这就是SpillRecord,在 Map Task 里有一个对应的 ArrayList 集合indexCacheList,它存储着全部 Map 临时文件 的元信息。不过,若是它的大小超过了mapreduce.task.index.cache.limit.bytes,那么就会溢写到磁盘,因此这里也是一个能够调优的地方。

Merge 过程能够经过修改mapreduce.task.io.sort.factor来增长一次合并的数量(多路归并),不然的话就会增长循环次数。不过,这里略微比咱们想的要复杂:整个文件列表首先会被排序,这以后会取出要归并的文件,组成小根堆,而后迭代堆的值合并——固然这也会生成临时文件,且临时文件会二分搜索后插入到当前排序列表。最终,全部文件会合并为一个。但这个代码看起来很是容易让人迷惑:这个堆MergeQueue继承了 Hadoop 本身实现的PriorityQueue类。它自己的泛型参数是K 和 V,而继承的PriorityQueueSegment<K,V>,也就是待合并的文件。而这个类又包含了排序的文件列表segments

Hadoop 实际上是利用这个堆找全局最小值,方法是它将 Segment 的最小值(也就是第一个值)做为排序的 key。这样,就能够实现一个相似于单出队的多路归并效果。要注意的是,取出来的 key 以后就要被更新。这一点咱们能够从adjustPriorityQueue里看到:

private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
    long startPos = reader.getReader().bytesRead;
    boolean hasNext = reader.nextRawKey();
    long endPos = reader.getReader().bytesRead;
    totalBytesProcessed += endPos - startPos;
    mergeProgress.set(Math.min(1.0f, totalBytesProcessed * progPerByte));
    if (hasNext) {
    adjustTop();
    } else {
    pop();
    reader.close();
    }
}
复制代码

其中nextRawKey会把 key 设置为下一个 segment 里键的位置(细节能够看一下IFilepositionToNextRecord方法)。调整以后就能够顺利的继续进行堆的调整了。

最后,就是在MergerwriteFile方法里,会对RawKeyValueIterator进行调用,这里是直接传的this。不过我看了一下,Merger自己在Merge时就会写文件,而以后咱们在输出时仍然会写 Map 的结果文件,感受这样不是重复写了两次?虽说Merger是 Map 和 Reduce 复用的,但这样看起来也并不妥。

Partition

Google 的论文也提到,有些场景很适合自定义分区器,他举的例子是 IP 地址。自定义分区器很大的意义之一在于防止数据倾斜——虽说哈希是最不容易倾斜的了。好比,Hadoop 提供了TotalOrderPartitioner,它的额外做用是能够有效分区。这个分区器颇有意思,它原本是实现全序排序的,也就是说它会保存每一个分区的最大最小值。这样一来就构成了一个典型的二分搜索树结构——不过,这里考虑得更细一些,即对于可进行逐字节比较的类型(实现BinaryComparable)好比Text,还能够经过 trie 树搜索。

还有一种KeyFieldBasedPartitioner能够基于某些域来进行哈希。

Shuffle

也能够说是 Copy,从 Map 端拉取(pull)数据为 Reducer 提供输入。

getProgress().setStatus("reduce"); 
setPhase(TaskStatus.Phase.SHUFFLE);  
复制代码

在 Reducer 初始化的时候,会设置当前 Task 状态为 SHUFFLE。Shuffle 这个名字也不是没有体现,好比在 Reducer 获取到 Map 的输出之后,会对数据的顺序进行 Shuffle,防止热点。

这里 Shuffle 阶段的代码都放在了Shuffle这个类里,它实现了ShuffleConsumerPlugin接口。这个名字看起来有点奇怪,由于它是为了实现Shuffle的服务化(可由第三方服务提供)。这个能够查看对应的 commit:

svn.apache.org/viewvc?view…

这里 Shuffle 会启动一个事件接收器EventFetcher的独立线程来处理 map 完成的事件。这个接收器内部使用了TaskUmbilicalProtocol 协议。咱们没必要去深究下层 RPC 通讯的逻辑,只须要知道它包含了getMapCompletionEvents方法。另外,它的实现类TaskAttemptListenerImpl实际上是用来监听心跳的(实现了TaskAttemptListener)。

当收到了完成的事件以后,这些消息会被ShuffleScheduler解析。具体的实如今addKnownMapOutput方法中,它会把解析到的mapHost,也就是持有 map 输出的节点放到pendingHosts里。mapHost有这样几种状态:

  • IDLE:表示 Map 尚未完成
  • PENDING:已经完成等待处理
  • BUSY:表示正被拷贝
  • PENALIZED:错误,拷贝失败

不过,这个pendingHosts并无暴露出来,Shuffle是不可见的。调度器ShuffleScheduler只暴露一个getHost的接口给Fetcher线程。这个和EventFetcher同样,都是独立的线程,并且这里还能够用mapreduce.reduce.shuffle.parallelcopies配置来增长并行数。

ShuffleScheduler的实现ShuffleSchedulerImpl是使用synchronized上锁,诸如addKnownMapOutput最后会进行notifyAll,而getHost会在pendingHosts不足时等待。Fetcherrun代码很是简洁:

while (!stopped && !Thread.currentThread().isInterrupted()) {
  MapHost host = null;
  try {
    // If merge is on, block
    merger.waitForResource();

    // Get a host to shuffle from
    host = scheduler.getHost();
    metrics.threadBusy();

    // Shuffle
    copyFromHost(host);
  } finally {
    if (host != null) {
    scheduler.freeHost(host);
    metrics.threadFree();            
    }
  }
}
复制代码

这里若是有内存 merge 正在运行,会阻塞当前拷贝数据,由于后面的copyFromHost有可能会触发归并操做。

和 Map 不同的是,这个Merger的类型是MergeManager。它是对 Shuffle 阶段归并的抽象,把归并划分到了OnDiskMergerinMemoryMerger两个单独的线程。它提供了三个方法:waitForResourcereserveclose

其中reserve会判断当前是否溢出,并建立一个临时的拷贝文件(也可能在内存)。这个阈值是根据mapreduce.reduce.memory.totalbytesmapreduce.reduce.shuffle.memory.limit.percent(默认是0.25)的乘积。若是小于阈值,且总内存使用没有溢出,就生成InMemoryMapOutput,反之生成OnDiskMapOutput。它们都实现了MapOutput接口,当整个拷贝完成的时候会调用这个接口的commit方法,将这个输入加入到finishedMaps里。

InMemoryMapOutputcommit很特殊,它会判断是否超过了缓冲区阈值(默认是总内存的0.66)或者文件数是否超过mapreduce.reduce.merge.memtomem.threshold(默认是ioSortFactor),若是超过了就会调inMemoryMerger;同理,这个OnDiskMapOutput也会根据文件的数量来进行归并(默认是2 * ioSortFactor - 1)。这里调用是基于一个链表pendingToBeMerged的对象锁,也就是 notify。

Sort

最后在close的时候,会把内存和磁盘的临时文件都合并一次(有可能没到阈值)。当完成了全部的前期工做后,会调用finalMerge方法,这个方法的核心就是Merger.merge,也就是 Map 里相同的归并流程。这样,就能够造成一个全局排序的输出文件。由于核心过程都在前面提到了,这里再也不赘述。

copyFromHost是从HttpURLConnection里获取数据流。

Reduce

Reduce 和 Map 同样,能够定制OutputFormat的格式,不过它没有如何分片。整个代码都很简单,reducerContext会把 Map 的键值对合并,其实就是遍历键值对,一直到下一个不一样的键(更准确的来讲,是在根据groupComparator来肯定下一个键,因此它有可能会把几个 key 值放在一块儿,但默认是RawComparator,也就是不分组)为止,而后将全部的值合在一块儿输出。

Reduce 必须等待 Shuffle 完成才开始执行,所以有可能会致使 Slot Hoarding 问题。


其实整个 MapReduce 的代码有很是多值得看一看的地方,好比,Uber 是如何实现的? 和 AM 通讯是如何实现的?但每每时间所限,咱们只能观其大略,而后实际用到了再翻一翻细节。董西成大佬很早就出了两本关于 Hadoop 源码的书,不过惋惜只对 MapReduce 1.x 有比较详细的描述,而 YARN 的篇幅较少,许多细节已经与如今比较主流的 Hadoop 版本不太同样了,固然仍是值得一看的。

To be continued...

相关文章
相关标签/搜索