Mapreduce中因为sort的存在,MapTask和ReduceTask直接是工做流的架构。而不是数据流的架构。在MapTask还没有结束,其输出结果还没有排序及合并前,ReduceTask是又有数据输入的,所以即便ReduceTask已经建立也只能睡眠等待MapTask完成。从而能够从MapTask节点获取数据。一个MapTask最终的数据输出是一个合并的spill文件,能够经过Web地址访问。因此reduceTask通常在MapTask快要完成的时候才启动。启动早了浪费container资源。java
ReduceTask是个线程,这个线程运行在YarnChild的Java虚拟机上,咱们从ReduceTask.run开始看Reduce阶段。 获取更多大数据视频资料请加QQ群:947967114apache
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)api
throws IOException, InterruptedException, ClassNotFoundException {数组
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());网络
if (isMapOrReduce()) {架构
/*添加reduce过程须要通过的几个阶段。以便通知TaskTracker目前运 行的状况*/app
copyPhase = getProgress().addPhase("copy");框架
sortPhase = getProgress().addPhase("sort");eclipse
reducePhase = getProgress().addPhase("reduce");socket
}
// start thread that will handle communication with parent
TaskReporter reporter = startReporter(umbilical);
// 设置并启动reporter进程以便和TaskTracker进行交流
boolean useNewApi = job.getUseNewReducer();
//在job client中初始化job时,默认就是用新的API,详见Job.setUseNewAPI()方法
initialize(job, getJobID(), reporter, useNewApi);
/*用来初始化任务,主要是进行一些和任务输出相关的设置,好比建立commiter,设置工做目录等*/
// check if it is a cleanupJobTask
/*如下4个if语句均是根据任务类型的不一样进行相应的操做,这些方 法均是Task类的方法,因此与任务是MapTask仍是ReduceTask无关*/
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;//只是为了JobCleanup,作完就停
}
if () {
runJobSetupTask(umbilical, reporter);
return;
//主要是建立工做目录的FileSystem对象
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
//设置任务目前所处的阶段为结束阶段,而且删除工做目录
}
下面才是真正要成为reducer
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
//若是须要就建立combineCollector
Classextends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
//配置文件找mapreduce.job.reduce.shuffle.consumer.plugin.class默认是shuffle.class
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
//建立shuffle类对象
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
//建立context对象,ShuffleConsumerPlugin.Context
shuffleConsumerPlugin.init(shuffleContext);
//这里调用的起始是shuffle的init函数,重点摘要以下。
this.localMapFiles = context.getLocalMapFiles();
scheduler = new ShuffleSchedulerImpl(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
//建立shuffle所需的调度器
merger = createMergeManager(context);
//建立shuffle内部的merge,createMergeManager里面源码:
return new MergeManagerImpl(reduceId, jobConf, context.getLocalFS(),
context.getLocalDirAllocator(), reporter, context.getCodec(),
context.getCombinerClass(), context.getCombineCollector(),
context.getSpilledRecordsCounter(),
context.getReduceCombineInputCounter(),
context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
context.getMapOutputFile());
//建立MergeMnagerImpl对象和Merge线程
rIter = shuffleConsumerPlugin.run();
//从各个Mapper复制其输出文件,并加以合并排序,等待直到完成为止
// free up the data structures
mapOutputFilesOnDisk.clear();
sortPhase.complete();
//排序阶段完成
setPhase(TaskStatus.Phase.REDUCE);
//进入reduce阶段
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
//3.Reduce 1.Reduce任务的最后一个阶段。它会准备好Map的 keyClass("mapred.output.key.class""mapred.mapoutput.key.class"),valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")和 Comparator (“mapred.output.value.groupfn.class”或“mapred.output.key.comparator.class”)
if (useNewApi) {
//2.根据参数useNewAPI判断执行runNewReduce仍是runOldReduce。分析润runNewReduce
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
//0.像报告进程书写一些信息,1.得到一个TaskAttemptContext对象。经过这个对象建立reduce、output及用于跟踪的统计output的RecordWrit、最后建立用于收集reduce结果的Context,2.reducer.run(reducerContext)开始执行reduce
} else {//老API
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
(1)reduce分为三个阶段(copy就是远程拷贝Map的输出数据、sort就是对全部的数据作排序、reduce作汇集就是咱们本身写的reducer),为这三个阶段分别设置Progress,用来和TaskTracker通讯报道状态。
(2)上面代码的15-40行和MapReduce的MapTask任务的运行源码级分析中对应部分基本相同,可参考之;
(3)codec = initCodec()这句是检查map的输出是不是压缩的,压缩的则返回压缩codec实例,不然返回null,这里讨论不压缩的;
(4)咱们讨论彻底分布式的hadoop,即isLocal==false,而后构造一个ReduceCopier对象reduceCopier,并调用reduceCopier.fetchOutputs()方法拷贝各个Mapper的输出,到本地;
(5)而后copy阶段完成,设置接下来的阶段是sort阶段,更新状态信息;
(6)根据isLocal来选择KV迭代器,彻底分布式的会使用reduceCopier.createKVIterator(job, rfs, reporter)做为KV迭代器;
(7)sort阶段完成,设置接下来的阶段是reduce阶段,更新状态信息;
(8)而后获取一些配置信息,并根据是否使用新API选择不一样的处理方式,这里是新的API,调用runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass)会执行reducer;
(9)done(umbilical, reporter)这个方法用于作结束任务的一些清理工做:更新计数器updateCounters();若是任务须要提交,设置Taks状态为COMMIT_PENDING,并利用TaskUmbilicalProtocol,汇报Task完成,等待提交,而后调用commit提交任务;设置任务结束标志位;结束Reporter通讯线程;发送最后一次统计报告(经过sendLastUpdate方法);利用TaskUmbilicalProtocol报告结束状态(经过sendDone方法)。
有些人将Reduce Task分为了5个阶段:1、shuffle阶段:也称为Copy阶段,就是从各个MapTask上远程拷贝一片数据,若是大小超过必定阈值就写到磁盘,不然放入内存;2、Merge阶段:在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,防止内存使用过多和磁盘文件过多;3、sort阶段:用户编写的reduce方法的输入数据是按key进行汇集的,须要对copy过来的数据排序,这里用的是归并排序,由于Map Task的结果是有序的;4、Reduce阶段:将每组数据依次交给用户编写的Reduce方法处理;5、write阶段:就是将结果写入HDFS。
上面的5个阶段分的比较细了,代码里分为3个阶段copy、sort、reduce,咱们在eclipse运行MR程序时,控制台看到的reduce阶段的百分比就分为3个阶段各占33.3%。
这里的shuffleConsumerPlugin是实现了ShuffleConsumerPlugin的某个类对象。具体能够经过配置文件mapreduce.job.reduce.shuffle.consumer.plugin.class选项设置,默认状况下是使用shuffle。咱们在代码中分析过完成shuffleConsumerPlugin.run,一般是shuffle.run,由于有了这个过程Mapper的合成的spill文件才能经过HTTP协议传输到Reducer端。有了数据才能进行runNewReducer或者runOldReducer。能够说shuffle对象就是MapTask的搬运工。并且shuffle的搬运方式不是一遍搬运一遍Reducer处理,而是要把MapTask全部的数据都搬运过来,而且进行合并排序以后才开始提供给对应的Reducer。
通常而言,MapTask和ReduceTask是多对多的关系,假若有M个Mapper有N个Reducer。咱们知道N个Reducer对应着N个partition,因此每一个Mapper都会被划分红N个Partition,每一个Reducer承担着一个Partition部分的操做。这样每个Reducer从每一个不一样的Mapper内拿来属于本身的那部分数据,这样每一个Reducer就有M份不一样Mapper的数据,把M份数据合并在一块儿就是一个最终完整的Partition,有必要还会进行排序,这时候才成为了Reducer的具体输入数据。这个数据搬运和重组的过程被叫作shuffle过程。shuffle这个过程开销颇大,会占用较大的网络流量,由于涉及到大量数据的传输,shuffle过程也会有延迟,由于M个Mapper的计算有快有慢,可是shuffle要全部的Mapper完成才能开始,Reduce又必须等shuffle完成才能开始,固然这种延迟不是shuffle形成的,若是Reducer不须要所有Partition数据到位并排序,就不用与最慢的Mapper同步,这是排序付出的代价。
因此shuffle在MapReduce框架中起着很是重要的做用。咱们先看shuffle的摘要: 获取更多大数据视频资料请加QQ群:947967114
public class Shuffle implements ShuffleConsumerPlugin, ExceptionReporter
private ShuffleConsumerPlugin.Context context;
private TaskAttemptID reduceId;
private JobConf jobConf;
private TaskUmbilicalProtocol umbilical;
private ShuffleSchedulerImpl scheduler;
private MergeManager merger;
private Task reduceTask; //Used for status updates
private Map localMapFiles;
public void init(ShuffleConsumerPlugin.Context context)
public RawKeyValueIterator run() throws IOException, InterruptedException
在ReduceTask.run中看到调用了shuffle.init,在run理建立了ShuffleSchedulerImpl和MergeManagerImpl对象。后面会讲解就是是作什么用的。
以后就是对shuffle.run的调用,shuffle虽然有一个run可是并不是是一个线程,只是用了这个名字而已。
咱们看:ReduceTask.run->Shuffle.run
public RawKeyValueIterator run() throws IOException, InterruptedException {
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher eventFetcher =
new EventFetcher(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
//经过查看EventFetcher咱们看到他继承了Thread,因此他是一个线程
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher[] fetchers = new Fetcher[numFetchers];
//建立了一个线程池
if (isLocal) {
//若是Mapper和Reducer在同一台机器上,就在本地fetche
fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
//LocalFetcher是对Fetcher的扩展,也是线程。
fetchers[0].start();//本地Fecher只有一个
} else {
//Mapper集合Reducer不在同一个机器上,须要跨多个节点Fecher
for (int i=0; i < numFetchers; ++i) {
//启动全部的Fecher
fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
//建立Fecher线程
fetchers[i].start();
//跨节点的Fecher须要好多个,都须要开启
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
//等待全部的Fecher都完成,若是有超时状况就报告进度
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
//关闭eventFetcher,表明shuffle操做完成,全部的MapTask的数据都拷贝过来了
// Stop the map-output fetcher threads
for (Fetcher fetcher : fetchers) {
fetcher.shutDown();//关闭全部的fetcher。
}
// stop the scheduler
scheduler.close();
//也不须要shuffle的调度,因此关闭
copyPhase.complete(); // copy is already complete
//文件复制阶段结束
如下就是Reduce阶段的MergeSort了
taskStatus.setPhase(TaskStatus.Phase.SORT);
//完成排序
reduceTask.statusUpdate(umbilical);
//经过umbilical向MRAppMaster汇报,更新状态
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
//合并和排序,完成后返回一个队列kvIter 。
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}
数据从MapTask转移到ReduceTask就两种方式,一MapTask送,二ReduceTask取,hadoop采用的是第二种方式,就是文件的复制。在Shuffle进入run以前,RduceTask.run调用过他的init函数shuffleConsumerPlugin.init(shuffleContext),在init里建立了scheduler和用于合并排序的merge,进入run后又建立了EventFetcher线程和若干个Fetcher线程。Fetcher的做用就是拿取,向MapTask节点提取数据。可是咱们要清楚EventFetcher虽然也是Fetcher,可是提取的是event,不是数据自己。咱们能够认为它只是对Fetcher过程的一个事件的控制。
Fetcher线程的数量也不必定,Uber模式下,MapTask和ReduceTask在同一个节点上,而且只有一个MapTask,因此只有一个Fetcher就可以完成,并且这个Fetcher是localFetcher。若是不是Uber模式可能会有不少MapTask而且通常和ReduceTask不在同一个节点。这时Fetcher的数量能够进行配置,默认有5个。数组fetchers就至关于Fetcher的线程池。
建立了EventFetcher和Fetcher线程池后,进入了while循环,可是while循环什么都不作,一直等待,因此实际的操做都是在线程完成的,也就是经过EventFetcher和若干的Fetcher完成。EventFetcher起到了很是关键的枢纽的做用。
咱们查看EventFetcher的源代码摘要,咱们提取关键的东西:
class EventFetcher extends Thread {
private final TaskAttemptID reduce;
private final TaskUmbilicalProtocol umbilical;
private final ShuffleScheduler scheduler;
private final int maxEventsToFetch;
public void run() {
int failures = 0;
LOG.info(reduce + " Thread started: " + getName());
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {//线程没有被打断
try {
int numNewMaps = getMapCompletionEvents();
//获取Map的完成的事件,接着咱们看getMapCompletionEvents源代码:
protected int getMapCompletionEvents()
throws IOException, InterruptedException {
int numNewMaps = 0;
TaskCompletionEvent events[] = null;
do {
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(
(org.apache.hadoop.mapred.JobID)reduce.getJobID(),
fromEventIdx,
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID)reduce);
//汇报umbilical从MRAppMaster获取Map完成的时间的报告
events = update.getMapTaskCompletionEvents();
//获取有关具体的MapTask结束运行的状况
LOG.debug("Got " + events.length + " map completion events from " +
fromEventIdx);
assert !update.shouldReset() : "Unexpected legacy state";
//作了一个断言 获取更多大数据视频资料请加QQ群:947967114
// Update the last seen event ID
fromEventIdx += events.length;
// Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
// fetching from those maps.
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
// outputs at all.
for (TaskCompletionEvent event : events) {
//对于获取的每一个事件的报告
scheduler.resolve(event);
//这里使用了ShuffleSchedullerImpl.resolve函数,源代码以下:
public void resolve(TaskCompletionEvent event) {
switch (event.getTaskStatus()) {
case SUCCEEDED://若是成功
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());//获取其URI
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
//记录这个MapTask的节点主机记录下来,供Fetcher使用,getBaseURI的源代码:
static URI getBaseURI(TaskAttemptID reduceId, String url) {
StringBuffer baseUrl = new StringBuffer(url);
if (!url.endsWith("/")) {
baseUrl.append("/");
}
baseUrl.append("mapOutput?job=");
baseUrl.append(reduceId.getJobID());
baseUrl.append("&reduce=");
baseUrl.append(reduceId.getTaskID().getId());
baseUrl.append("&map=");
URI u = URI.create(baseUrl.toString());
return u;
获取各类信息,而后添加都URI对象中。
}
回到源代码
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
//最大的尝试时间
break;
case FAILED:
case KILLED:
case OBSOLETE://若是MapTask运行失败
obsoleteMapOutput(event.getTaskAttemptId());//获取TaskId
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");//写日志
break;
case TIPFAILED://若是失败
tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");//写日志
break;
}
}
回到源代码
if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {//若是事件成功
++numNewMaps;//增长map数量
}
}
} while (events.length == maxEventsToFetch);
return numNewMaps;
}
回到源代码
failures = 0;
if (numNewMaps > 0) {
LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
}
LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME);
}
} catch (InterruptedException e) {
LOG.info("EventFetcher is interrupted.. Returning");
return;
} catch (IOException ie) {
LOG.info("Exception in getting events", ie);
// check to see whether to abort
if (++failures >= MAX_RETRIES) {
throw new IOException("too many failures downloading events", ie);//失败数量大于重试的数量
}
// sleep for a bit
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(RETRY_PERIOD);
}
}
}
} catch (InterruptedException e) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
return;
}
}
MapTask和ReduceTask没有直接的关系,MapTask不知道ReduceTask在哪些节点上,它只是把进度的时间报告给MRAppMaster。ReduceTask经过“脐带”执行getMapCompletionEvents操做想MRAppMaster获取MapTask结束运行的时间报告。有个别的MapTask可能会失败,可是绝大多数都会成功,只要成功的就经过Fetcher去索取输出数据,这个信息就是经过shcheduler完成的也就是ShuffleSchedulerImpl对象,ShuffleSchedulerImpl对象并很少,只是个普通的对象。
fetchers就像线程池,里面有若干线程(默认有5个),这些线程等待EventFetcher的通知,一旦有MapTask完成就前往提取数据。
获取更多大数据视频资料请加QQ群:947967114
咱们看Fetcher线程类的run方法:
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
//从scheduler获取一个已经成功完成的MapTask的节点。
metrics.threadBusy();
//线程变成繁忙状态
// Shuffle
copyFromHost(host);
//开始复制这个节点的数据
} finally {
if (host != null) {//maphost还有运行中的
scheduler.freeHost(host);
//状态设置成空闲状态,等待其完成。
metrics.threadFree();
}
}
}
} catch (InterruptedException ie) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
}
}
这里的重点是copyFromHost获取数据的函数。
protected void copyFromHost(MapHost host) throws IOException {
// reset retryStartTime for a new host
//这是在ReduceTask的节点上运行的
retryStartTime = 0;
// Get completed maps on 'host'
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
//获取目标节点上的MapTask集合。
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (maps.size() == 0) {
return;//没有完成的直接返回
}
if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+ maps);
}
// List of maps to be fetched yet
Set remaining = new HashSet(maps);
//已经完成、等待shuffle的MapTask集合。
// Construct the url and connect
DataInputStream input = null;
URL url = getMapOutputURL(host, maps);
//生成MapTask所在节点的URL,下面要看getMapOutputURL源码:
private URL getMapOutputURL(MapHost host, Collection maps
) throws MalformedURLException {
// Get the base url
StringBuffer url = new StringBuffer(host.getBaseUrl());
boolean first = true;
for (TaskAttemptID mapId : maps) {
if (!first) {
url.append(",");
}
url.append(mapId);//在URL后面加上mapid
first = false;
}
LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
//写日志
return new URL(url.toString());
//返回URL
}
回到主代码:
try {
setupConnectionsWithRetry(host, remaining, url);
//和对方主机创建HTTP链接,setupConnectionsWithRetry使用了openConnectionWithRetry函数打开连接。
openConnectionWithRetry(host, remaining, url);
这段源代码有使用了openConnection(url);方式,继续查看。
以下是连接的主要过程:
protected synchronized void openConnection(URL url)
throws IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
//使用的是HTTPURL进行链接
if (sslShuffle) {//若是是有信任证书的
HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
//强转conn类型
try {
httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());//添加一个证书socket的工厂
} catch (GeneralSecurityException ex) {
throw new IOException(ex);
}
httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
}
connection = conn;
}
在setupConnectionsWithRetry中继续写到:
setupShuffleConnection(encHash);
//创建了Shuffle连接
connect(connection, connectionTimeout);
// verify that the thread wasn't stopped during calls to connect
if (stopped) {
return;
}
verifyConnection(url, msgToEncode, encHash);
}
//至此链接经过。
if (stopped) {
abortConnect(host, remaining);
//这里边是关闭链接,能够点进去看一下,知足列表和等待的两个条件
return;
}
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
" map outputs", ie);
回到主代码
input = new DataInputStream(connection.getInputStream());
//实例一个输入流对象。
try {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
//若是须要fetcher的列表不空,而且失败的task数量没有
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
//复制数据出来copyMapOutput的源代码以下:
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
mapId = TaskAttemptID.forName(header.mapId);
//获取mapID
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
//Don't know which one was bad, so consider all of them as bad
return remaining.toArray(new TaskAttemptID[remaining.size()]);
}
InputStream is = input;
is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
//若是须要解压或解密
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
return new TaskAttemptID[] {mapId};
}
if(LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", len: " + compressedLength +
", decomp len: " + decompressedLength);
}
try {
mapOutput = merger.reserve(mapId, decompressedLength, id);
//为merge预留一个MapOutput:是内存仍是磁盘上。
} catch (IOException ioe) {
// kill this reduce attempt
ioErrs.increment(1);
scheduler.reportLocalError(ioe);
//报告错误
return EMPTY_ATTEMPT_ID_ARRAY;
}
// Check if we can shuffle *now* ...
if (mapOutput == null) {
LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");
//Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
}
// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
// on decompression failures. Catching and re-throwing as IOException
// to allow fetch failure logic to be processed
try {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map "
+ mapOutput.getMapId() + " decomp: " + decompressedLength
+ " len: " + compressedLength + " to " + mapOutput.getDescription());
mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter);
//跨节点把Mapper的文件内容拷贝到reduce的内存或者磁盘上。
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
throw new IOException(e);
}
// Inform the shuffle scheduler
long endTime = Time.monotonicNow();
// Reset retryStartTime as map task make progress if retried before.
retryStartTime = 0;
scheduler.copySucceeded(mapId, host, compressedLength,
startTime, endTime, mapOutput);
//告诉调度器完成了一个节点的Map输出的文件拷贝。
remaining.remove(mapId);
//这个MapTask的输出已经shuffle完毕
metrics.successFetch();
return null;后面的异常失败信息咱们无论。
这里的mapOutput是用来容纳MapTask输出文件的存储空间,根据输出文件的内容大小和内存的状况,能够是内存的Output也能够是DiskOutput。 若是是内存须要预定,由于不止一个Fetcher。咱们以InMemoryMapOutput为例。
代码结构;
Fetcher.run-->copyFromHost-->copyMapOutput-->merger.reserve(MergeManagerImpl.reserve)-->InmemoryMapOutput.shuffle
public void shuffle(MapHost host, InputStream input,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
//跨节点从Mapper拷贝spill文件
IFileInputStream checksumIn =
new IFileInputStream(input, compressedLength, conf);
//校验和的输入流
input = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
//若是涉及到了压缩
decompressor.reset();
//重启解压器
input = codec.createInputStream(input, decompressor);
//加了解压器的输入流
}
try {
IOUtils.readFully(input, memory, 0, memory.length);
//从Mapper方把特定的Partition数据读入Reducer的内存缓冲区。
metrics.inputBytes(memory.length);
reporter.progress();//汇报进度
LOG.info("Read " + memory.length + " bytes from map-output for " +
getMapId());
/**
* We've gotten the amount of data we were expecting. Verify the
* decompressor has nothing more to offer. This action also forces the
* decompressor to read any trailing bytes that weren't critical
* for decompression, which is necessary to keep the stream
* in sync.
*/
if (input.read() >= 0 ) {
throw new IOException("Unexpected extra bytes from input stream for " +
getMapId());
}
} catch (IOException ioe) {
// Close the streams
IOUtils.cleanup(LOG, input);
// Re-throw
throw ioe;
} finally {
CodecPool.returnDecompressor(decompressor);
//释放解压器
}
}
从对方把spill文件中属于本partition数据复制过来,回到copyFromHost中,经过scheduler.copySuccessed告知scheduler,并把这个MapTask的ID从remaining集合中删除,进入下一个循环,复制下一个MapTask数据。直到把全部的属于本Partition的数据都复制过来。
以上是Reducer端Fetcher的过程,它向Mapper端发送HTTP GET请求,下载文件。在MapTask就有一个与之对应的Server,这个网络协议的源代码不作深究,课下有兴趣本身研究。 获取更多大数据视频资料请加QQ群:947967114