MapOutputBuffer中有一个变量叫作mapOutputFile。在sortAndSpill函数中(被flush调用),会经过这个变量拿到文件地址,并写出中间结果,在该方法中,调用了下文中提到的writer.append(key, value)
来写出数据。看起来没有加密的过程。app
在执行shuffle.run()时,会对map的数据进行提取并合并。就会调用merger.close(),
实际会调用到MergeManagerlmpl的close方法,代码以下:ide
@Override public RawKeyValueIterator close() throws Throwable { // Wait for on-going merges to complete if (memToMemMerger != null) { memToMemMerger.close(); } inMemoryMerger.close(); onDiskMerger.close(); List<InMemoryMapOutput<K, V>> memory = new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); memory.addAll(inMemoryMapOutputs); inMemoryMapOutputs.clear(); List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs); onDiskMapOutputs.clear(); return finalMerge(jobConf, rfs, memory, disk); }
那么咱们看到了memToMemMerger\inMemoryMerger\onDiskMerger三种不一样的Merger,定义以下:函数
private IntermediateMemoryToMemoryMerger memToMemMerger; private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger; private final OnDiskMerger onDiskMerger;
其中IntermediateMemoryToMemoryMerger继承自 MergeThread<InMemoryMapOutput<K, V>, K, V>,然而MergeThread的close方法和run方法以下:oop
public synchronized void close() throws InterruptedException { closed = true; waitForMerge(); interrupt(); } public void run() { while (true) { List<T> inputs = null; try { // Wait for notification to start the merge... synchronized (pendingToBeMerged) { while(pendingToBeMerged.size() <= 0) { pendingToBeMerged.wait(); } // Pickup the inputs to merge. inputs = pendingToBeMerged.removeFirst(); } // Merge merge(inputs); } catch (InterruptedException ie) { numPending.set(0); return; } catch(Throwable t) { numPending.set(0); reporter.reportException(t); return; } finally { synchronized (this) { numPending.decrementAndGet(); notifyAll(); } } }
而imMemoryMerger则是由createInMemoryMerger函数建立,实际上是一个InMemoryMerger的实例。this
这三者都会在merge方法中建立一个Writer变量,并调用Merger.writeFile(iter, writer, reporter, jobConf)
。随后调用writer.close()
来完成调用。close函数实现以下:加密
public void close() throws IOException { // When IFile writer is created by BackupStore, we do not have // Key and Value classes set. So, check before closing the // serializers if (keyClass != null) { keySerializer.close(); valueSerializer.close(); } // Write EOF_MARKER for key/value length WritableUtils.writeVInt(out, EOF_MARKER); WritableUtils.writeVInt(out, EOF_MARKER); decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER); //Flush the stream out.flush(); if (compressOutput) { // Flush compressedOut.finish(); compressedOut.resetState(); } // Close the underlying stream iff we own it... if (ownOutputStream) { out.close(); } else { // Write the checksum checksumOut.finish(); } compressedBytesWritten = rawOut.getPos() - start; if (compressOutput) { // Return back the compressor CodecPool.returnCompressor(compressor); compressor = null; } out = null; if(writtenRecordsCounter != null) { writtenRecordsCounter.increment(numRecordsWritten); } }
咱们会发现其中关键的就是out。out的建立以下:code
if (codec != null) { this.compressor = CodecPool.getCompressor(codec); if (this.compressor != null) { this.compressor.reset(); this.compressedOut = codec.createOutputStream(checksumOut, compressor); this.out = new FSDataOutputStream(this.compressedOut, null); this.compressOutput = true; } else { LOG.warn("Could not obtain compressor from CodecPool"); this.out = new FSDataOutputStream(checksumOut,null); } } else { this.out = new FSDataOutputStream(checksumOut,null); }
这一部分解释了党咱们传入了压缩格式的时候,中间结果如何进行压缩。orm
几个结论:继承