Yarn在Shuffle阶段内存不足问题(error in shuffle in fetcher)

最近在使用MR跑一个任务的时候shuffle阶段出现OOM,这个问题以前历来没有遇到过,上网找了一下,发现网友也遇到过想似的问题,如下是转载的该问题的解决方法:java

原文地址:http://blog.csdn.net/bigdatahappy/article/details/39295657shell

=====================================================================apache

在Hadoop集群(CDH4.4, Mv2即Yarn框架)使用过程当中,发现处理大数据集时程序报出以下错误:bash

2016-12-15 08:10:57,726 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#18
	at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56)
	at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46)
	at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63)
	at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:305)
	at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:295)
	at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:514)
	at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:336)
	at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:193)

Google一番后竟然无果!程序等着运行,老板催着要结果,没有大师协助,只能开始艰难地自救了!认真分析,求助于源代码! 首先发现的一点是:map任务百分比一直在递增,出现reduce任务以后,每隔一段时间报一个相似上面的错误,reduce从0%从新开始,而Map任务继续前进,reduce处理一段后再报,再从0开始。累计到第四个报错后即整个Application宣布Fail。 根据这一点,大体能够得出这样的结论: reduce任务每次尝试都失败了,失败后从新开始; reduce任务失败累计4次后整个Application退出,应该是设置了最大重试次数之类的配置项。 map任务与reduce任务是隔离的,之间不会干扰。这个从map、reduce任务原理也能够了解到。 基于这一点,首先查询到map-site.xml中的配置项mapreduce.reduce.maxattempts,表示Reduce Task最大失败尝试次数,这个配置默认是4,调整到400后接着尝试。 mapreduce.reduce.maxattempts起了做用,可是报错依然不断,不过不会4次报错就结束了,map进度一直向前,map到达100%后,reduce依然重复报错的节奏。是时候查查这里报错的类究竟在作啥了。app

org.apache.hadoop.mapreduce.task.reduce.Fetcher类位于hadoop-mapreduce-client-core-2.0.0-cdh4.4.0.jar包中,Maven的话在pom.xml添加以下配置,能够获取该包以及源码:框架

<dependency>
    <groupId >org.apache.hadoop</ groupId>
    <artifactId >hadoop-mapreduce -client-core</ artifactId>
    <version >2.0.0-cdh4.4.0</ version>
</dependency>

问题的入口是run中的:函数

// Shuffleoop

copyFromHost(host);测试

跟踪到copyMapOutput,是要准备从Map节点本地拷贝map的output进行shuffle。其中出错点:fetch

// Get the location for the map output – either in-memory or on-disk

mapOutput = merger.reserve(mapId, decompressedLength, id );

merger指向了MergeManagerImpl对象,调用其reserve函数,而这个函数中定义了shuffle的处理方式,是将output塞入内存(InMemoryMapOutput)仍是放在磁盘上慢慢作(OnDiskMapOutput)? 从咱们这边的出错信息,显然能够看到任务选择了InMemoryMapOutput,在检查为何做出这样的选择前,咱们看看map的输出结果到底有多大:
 

shell>cd /data/1/mrlocal/yarn/local/usercache/hdfs/appcache/application_1385983958793_0001/output 

shell>du -sh * | grep _r_ 7.3G attempt_1385983958793_0001_r_000000_1

6.5G attempt_1385983958793_0001_r_000000_12

5.2G attempt_1385983958793_0001_r_000000_5

5.8G attempt_1385983958793_0001_r_000000_7

这样大的输出放到内存里,显然要OOM了,能够有两种选择,它为何不选择OnDiskMapOutput呢?

以下这段很显然是关键所在:
 

if (!canShuffleToMemory(requestedSize)) { 

    LOG.info(mapId + “: Shuffling to disk since ” + requestedSize + ” is greater than maxSingleShuffleLimit (” + maxSingleShuffleLimit + “)” ); 

    return new OnDiskMapOutput(mapId, reduceId, this , requestedSize, jobConf, mapOutputFile , fetcher, true); 

}

再看canShuffleToMemory:

private boolean canShuffleToMemory( long requestedSize) { return (requestedSize < maxSingleShuffleLimit); }

requestedSize从源码上并不能清楚了解其真实含义,问题最终落在maxSingleShuffleLimit这个参数的含义和来源上,进一步细查能够发现其来源:

this.maxSingleShuffleLimit = (long)( memoryLimit * singleShuffleMemoryLimitPercent);

两个变量的取值:

// Allow unit tests to fix Runtime memory this.

memoryLimit = (long)(jobConf.getLong(MRJobConfig. REDUCE_MEMORY_TOTAL_BYTES, Math. min(Runtime.getRuntime ().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse); 

final float singleShuffleMemoryLimitPercent = jobConf.getFloat(MRJobConfig. SHUFFLE_MEMORY_LIMIT_PERCENT, DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT );

singleShuffleMemoryLimitPercent 取的是mapreduce.reduce.shuffle.memory.limit.percent这个配置的取值,官网给出的解释是:

Expert: Maximum percentage of the in-memory limit that a single shuffle can consume

单个shuffle可以消耗的内存占reduce全部内存的比例,默认值为0.25。Expert”专家模式”,说的很唬人。。

那么下降mapreduce.reduce.shuffle.memory.limit.percent这个参数应该可使得程序选择OnDiskMapout而不是选择InMemory,调低至0.06在测试,顺利执行,再也不报错。

收获:选择了最新的框架,意味着会遇到最新的问题。无助时,了解原理,查询源码,总能找到想要的答案。

遗留:

1.查看源码,不少不清晰的地方都略过了,其中memoryLimit的取值,即reduce全部可以使用的内存,实际取值如何肯定,须要进一步找寻答案。

2.如何控制mapreduce.reduce.shuffle.memory.limit.percent使得咱们可以使用合理的配置来最大化的使用内存,待续。

相关文章
相关标签/搜索