下文中新旧的意思分别表明 Hadoop 0.20 先后。缘由是 MapReduce 在这个版本进行了一次大改,主要的特色就是划分了新旧两个包名。新版的特色是使用了抽象类代替一些可扩展的接口,以及增长了 Context 的概念。好比说,MapContext
中就封装了获取切片、读取 Record 等功能,而MapContextImpl
就是对 reader 和 writer 逻辑的封装。java
有时候,咱们会不当心配置错误,好比把mapreduce.job.combine.class
配置到了mapred.combiner.class
,就可能致使启用了旧的模块。node
在org.apache.hadoop.mapred.TaskStatus.Phase
这个枚举中能够看到 task 定义的全部阶段:STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP
。固然咱们在理解的时候可能加入一些本身定义的阶段(根据动做),好比 Split。算法
整个分片在客户端进行。Split 实际上是是逻辑上的分片,但其大小依赖于底层的文件块。这个公式搞 Hadoop 的人都很熟悉:apache
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
复制代码
它其实就是保证了。它是
FileInputFormat
里专门设置的规则,且这个并非固定的,而是针对每一个文件(由于 MapReduce 支持传入路径,并且是多个路径)。这里你就能够看到一个能够调优的点了:显然这里须要进行和 namenode 的通讯,这样就须要容许设置mapreduce.input.fileinputformat.list-status.num-threads
增长遍历文件信息时的线程数,这很合理(固然,略有一些奇怪的地方是这个配置实际上影响的是客户端)。数据结构
Split的核心类是InputFormat
。这个接口必须实现getSplits
和getRecordReader
方法,这对应着将输入的文件划分为分片并映射到 Map 的输入。通常来讲,咱们默认使用的是TextInputFormat
,它使用了LineRecordReader
读取模式和FileInputFormat
的按块切分。其余的模式也很重要,好比CombineFileInputFormat
(能够读取若干小文件,这个和 SequenceFile 合在一块儿不一样,只是打包了元数据),好比KeyValueTextInputFormat
能够读取键值对,好比NLineInputFormat
容许指定切分的行数(其实我以为应该实如今TextInputFormat
里)。LineRecordReader
还有一个细节是它实现了对压缩文件的自解释。app
这里仍是有一些值得细想的地方。好比,当获取了文件的分块信息以后,如何对任务的划分进行优化(本地化)?若是是本地文件,如何写入 HDFS 并补全元信息?svn
显然,本地化已经涉及到做业调度,因此这里的逻辑应该在 AM 处理。所以这里JobSubmitter
会将数据进行序列化(具体能够参考writeJobSplitMetaInfo
),以后在JobImpl.InitTransition
的createSplits
方法能够看到它读取了一组TaskSplitMetaInfo
信息。JobImpl
中只会建立 Map 和 Reduce 任务,具体的调度就要参考调度器的实现了。函数
在listStatus
调用getFileSystem
后,会根据 schema 获得对应的FileSystem
实现类,本地就是LocalFileSystem
。这里并无作特殊的处理,而是将这个位置也做为元信息传递了。oop
Mapper
里的MapContext
会从InputFormat
里取 RecordReader 做为记录读取的方法。这里的实现比较简单并且不过重要,重点是理解 Mapper 的run
方法:优化
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
复制代码
其中setup
和cleanup
只调用一次,都是平时 MR 常见的回调点。
咱们知道在 Map 中会将结果分区写入不一样文件。这里提供了一个环形缓冲区(相似于其余的缓冲设计,好比消息队列)MapOutputBuffer
来平滑输出,它继承了MapOutputCollector
。在每次缓冲区溢出(spill,其实阈值为80%)后,都会写入一个临时文件。以后,Map 会合并临时文件到最终的分区文件。这个过程有大量的配置优化点,好比mapreduce.task.io.sort.mb
(控制缓冲区大小),mapreduce.task.io.sort.factor
(控制进行 merge 的文件数,另外这个也针对 Reduce),mapreduce.map.output.compress
(进行 map 结果压缩)等等。
注意,分区不表明写入不一样文件,而是文件的不一样位置。
spill 过程是由独立的线程执行的,不过为何不能和 Reducer 同样多个线程写呢?主要是这种多写者问题几乎必然是有锁的(无锁算法必须使用其余特定数据结构),效率并不比单写者高。
在写入磁盘前,Map 会进行排序(sort),这点比较容易忽略。你能够这样记住它:MapOutputBuffer
实现了IndexedSortable
(固然,其实原本是 MapReduce 提到了老是排序的好处)。这个接口颇有意思,这里默认实现是 Hadoop 本身实现的快速排序,它会对分区先进行排序,然后保持分区内有序(这个也能够经过map.sort.class
设置)。至于写到什么文件里,你能够在getSpillFileForWrite
里找到(通常都是 out 后缀),好比%s_spill_%d.out
(前面是做业号,后面是溢出的次数)。
在网上流传着一种说法:Combiner 应该是一个纯函数。不过,其实咱们知道,Google 在论文里就提到了 Map 和 Reduce 也应该是纯函数(不过也能够在必定范围内违反)。固然这个传言是有缘由的,由于 Combine 被屡次调用了:在sortAndSpill
的最后,以及你们都知道的,在整个 Map 阶段的最后,它会调用mergeParts
方法。
注意Combine 和 Merge 的含义是不一样的。Combine 是按照原文教义的,是一种 accelerate;Merge 是处理临时文件的。不管是 Map 仍是 Reduce 阶段都是缓冲区再加文件合并。那么能不能直接写到结果文件呢?其实理论上能够,可是没有意义,由于咱们每次有新的溢出时,都要和旧的结果文件进行合并。这样作也拖慢了 Map 的输出。
Spill 以后可选的步骤是合并(combine)和压缩。Map 的压缩不只仅是最后的输出,它在输出临时文件时就会进行了,因此能够极大提升传输的效率(可是以 CPU 占用为代价)。
Spill 生成的全部分区都在一个文件里。所以,它须要元信息来标记分区的范围,这就是SpillRecord
,在 Map Task 里有一个对应的 ArrayList 集合indexCacheList
,它存储着全部 Map 临时文件 的元信息。不过,若是它的大小超过了mapreduce.task.index.cache.limit.bytes
,那么就会溢写到磁盘,因此这里也是一个能够调优的地方。
Merge 过程能够经过修改mapreduce.task.io.sort.factor
来增长一次合并的数量(多路归并),不然的话就会增长循环次数。不过,这里略微比咱们想的要复杂:整个文件列表首先会被排序,这以后会取出要归并的文件,组成小根堆,而后迭代堆的值合并——固然这也会生成临时文件,且临时文件会二分搜索后插入到当前排序列表。最终,全部文件会合并为一个。但这个代码看起来很是容易让人迷惑:这个堆MergeQueue
继承了 Hadoop 本身实现的PriorityQueue
类。它自己的泛型参数是K 和 V,而继承的PriorityQueue
是Segment<K,V>
,也就是待合并的文件。而这个类又包含了排序的文件列表segments
。
Hadoop 实际上是利用这个堆找全局最小值,方法是它将 Segment 的最小值(也就是第一个值)做为排序的 key。这样,就能够实现一个相似于单出队的多路归并效果。要注意的是,取出来的 key 以后就要被更新。这一点咱们能够从adjustPriorityQueue
里看到:
private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
long startPos = reader.getReader().bytesRead;
boolean hasNext = reader.nextRawKey();
long endPos = reader.getReader().bytesRead;
totalBytesProcessed += endPos - startPos;
mergeProgress.set(Math.min(1.0f, totalBytesProcessed * progPerByte));
if (hasNext) {
adjustTop();
} else {
pop();
reader.close();
}
}
复制代码
其中nextRawKey
会把 key 设置为下一个 segment 里键的位置(细节能够看一下IFile
的positionToNextRecord
方法)。调整以后就能够顺利的继续进行堆的调整了。
最后,就是在Merger
的writeFile
方法里,会对RawKeyValueIterator
进行调用,这里是直接传的this
。不过我看了一下,Merger
自己在Merge
时就会写文件,而以后咱们在输出时仍然会写 Map 的结果文件,感受这样不是重复写了两次?虽说Merger
是 Map 和 Reduce 复用的,但这样看起来也并不妥。
Google 的论文也提到,有些场景很适合自定义分区器,他举的例子是 IP 地址。自定义分区器很大的意义之一在于防止数据倾斜——虽说哈希是最不容易倾斜的了。好比,Hadoop 提供了TotalOrderPartitioner
,它的额外做用是能够有效分区。这个分区器颇有意思,它原本是实现全序排序的,也就是说它会保存每一个分区的最大最小值。这样一来就构成了一个典型的二分搜索树结构——不过,这里考虑得更细一些,即对于可进行逐字节比较的类型(实现BinaryComparable
)好比Text
,还能够经过 trie 树搜索。
还有一种KeyFieldBasedPartitioner
能够基于某些域来进行哈希。
也能够说是 Copy,从 Map 端拉取(pull)数据为 Reducer 提供输入。
getProgress().setStatus("reduce");
setPhase(TaskStatus.Phase.SHUFFLE);
复制代码
在 Reducer 初始化的时候,会设置当前 Task 状态为 SHUFFLE。Shuffle 这个名字也不是没有体现,好比在 Reducer 获取到 Map 的输出之后,会对数据的顺序进行 Shuffle,防止热点。
这里 Shuffle 阶段的代码都放在了Shuffle
这个类里,它实现了ShuffleConsumerPlugin
接口。这个名字看起来有点奇怪,由于它是为了实现Shuffle
的服务化(可由第三方服务提供)。这个能够查看对应的 commit:
这里 Shuffle 会启动一个事件接收器EventFetcher
的独立线程来处理 map 完成的事件。这个接收器内部使用了TaskUmbilicalProtocol
协议。咱们没必要去深究下层 RPC 通讯的逻辑,只须要知道它包含了getMapCompletionEvents
方法。另外,它的实现类TaskAttemptListenerImpl
实际上是用来监听心跳的(实现了TaskAttemptListener
)。
当收到了完成的事件以后,这些消息会被ShuffleScheduler
解析。具体的实如今addKnownMapOutput
方法中,它会把解析到的mapHost
,也就是持有 map 输出的节点放到pendingHosts
里。mapHost
有这样几种状态:
不过,这个pendingHosts
并无暴露出来,Shuffle
是不可见的。调度器ShuffleScheduler
只暴露一个getHost
的接口给Fetcher
线程。这个和EventFetcher
同样,都是独立的线程,并且这里还能够用mapreduce.reduce.shuffle.parallelcopies
配置来增长并行数。
ShuffleScheduler
的实现ShuffleSchedulerImpl
是使用synchronized
上锁,诸如addKnownMapOutput
最后会进行notifyAll
,而getHost
会在pendingHosts
不足时等待。Fetcher
的run
代码很是简洁:
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
metrics.threadBusy();
// Shuffle
copyFromHost(host);
} finally {
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
}
}
}
复制代码
这里若是有内存 merge 正在运行,会阻塞当前拷贝数据,由于后面的copyFromHost
有可能会触发归并操做。
和 Map 不同的是,这个Merger
的类型是MergeManager
。它是对 Shuffle 阶段归并的抽象,把归并划分到了OnDiskMerger
和inMemoryMerger
两个单独的线程。它提供了三个方法:waitForResource
,reserve
和close
。
其中reserve
会判断当前是否溢出,并建立一个临时的拷贝文件(也可能在内存)。这个阈值是根据mapreduce.reduce.memory.totalbytes
和mapreduce.reduce.shuffle.memory.limit.percent
(默认是0.25)的乘积。若是小于阈值,且总内存使用没有溢出,就生成InMemoryMapOutput
,反之生成OnDiskMapOutput
。它们都实现了MapOutput
接口,当整个拷贝完成的时候会调用这个接口的commit
方法,将这个输入加入到finishedMaps
里。
InMemoryMapOutput
的commit
很特殊,它会判断是否超过了缓冲区阈值(默认是总内存的0.66)或者文件数是否超过mapreduce.reduce.merge.memtomem.threshold
(默认是ioSortFactor
),若是超过了就会调inMemoryMerger
;同理,这个OnDiskMapOutput
也会根据文件的数量来进行归并(默认是2 * ioSortFactor - 1
)。这里调用是基于一个链表pendingToBeMerged
的对象锁,也就是 notify。
最后在close
的时候,会把内存和磁盘的临时文件都合并一次(有可能没到阈值)。当完成了全部的前期工做后,会调用finalMerge
方法,这个方法的核心就是Merger.merge
,也就是 Map 里相同的归并流程。这样,就能够造成一个全局排序的输出文件。由于核心过程都在前面提到了,这里再也不赘述。
copyFromHost
是从HttpURLConnection
里获取数据流。
Reduce 和 Map 同样,能够定制OutputFormat
的格式,不过它没有如何分片。整个代码都很简单,reducerContext
会把 Map 的键值对合并,其实就是遍历键值对,一直到下一个不一样的键(更准确的来讲,是在根据groupComparator
来肯定下一个键,因此它有可能会把几个 key 值放在一块儿,但默认是RawComparator
,也就是不分组)为止,而后将全部的值合在一块儿输出。
Reduce 必须等待 Shuffle 完成才开始执行,所以有可能会致使 Slot Hoarding 问题。
其实整个 MapReduce 的代码有很是多值得看一看的地方,好比,Uber 是如何实现的? 和 AM 通讯是如何实现的?但每每时间所限,咱们只能观其大略,而后实际用到了再翻一翻细节。董西成大佬很早就出了两本关于 Hadoop 源码的书,不过惋惜只对 MapReduce 1.x 有比较详细的描述,而 YARN 的篇幅较少,许多细节已经与如今比较主流的 Hadoop 版本不太同样了,固然仍是值得一看的。
To be continued...