正常状况下,一个spark task要处理一个partition即128M的数据,因处理过程较耗时而成为任务瓶颈。算法
大的方向是进行任务拆分,增大并行度。apache
优势:RDD中定义的算子,能够直接使用
缺点:使用以上算子来增大并行度,必定会进行shuffle操做
结论:测试发现,虽然增大了业务处理的并行度,但shuffle操做的开销比较大,所以总体的耗时没有明显减小。bash
初始化SparkSession时进行以下代码设置:oop
.config("mapreduce.input.fileinputformat.split.minsize","67108864") // 即为想设置的分片大小:64M
.config("mapreduce.job.maps","1000") // 确保分片足够大
复制代码
用以实现spark读取hive时,一个task处理一个64M的数据块。
优势:理论来讲,并行度扩大一倍,耗时将减小一半。
结论:测试发下,耗时确实大幅度降低。源码分析
调用链: HadoopTableReader#createHadoopRdd性能
HadoopRDD#getPartitions
FileInputFormat#getSplits
FileInputFormat#computeSplitSize测试
核心代码片断ui
private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(hadoopConf.getInt("mapreduce.job.maps", 1),
sparkSession.sparkContext.defaultMinPartitions)
}
复制代码
val rdd = new HadoopRDD(
sparkSession.sparkContext,
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
Some(initializeJobConfFunc),
inputFormatClass,
classOf[Writable],
classOf[Writable],
_minSplitsPerRDD)
复制代码
由HadoopTableReader生成HadoopRDD,参数:_minSplitsPerRDD在非local模式下可经过mapreduce.job.maps设置spa
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
// 多处省略
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
return splits.toArray(new FileSplit[splits.size()]);
}
复制代码
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
复制代码
public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
复制代码
最终数据分片的大小由Math.max(minSize, Math.min(goalSize, blockSize))
计算获得,根据源码可知:code
blockSize:hdfs实际存储的blockSize,128M不可变
goalSize:totalSize / (numSplits == 0 ? 1 : numSplits)
numSplits local模式下为0;其余模式可经过:mapreduce.job.maps 配置
minSize:SPLIT_MINSIZE与minSplitSize的最大值
SPLIT_MINSIZE:默认为1,可经过mapreduce.input.fileinputformat.split.minsize 配置
minSplitSize:默认为1
复制代码
默认状况下, 返回结果为128M。为了让计算结果为减少,好比64M,只须要 minSize
为64M,Math.min(goalSize, blockSize)
足够小便可,即:
Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize)
,便可实现 minSize 为64M结合spark分片机制进行参数设置,既提升任务并行度又避免shuffle的性能损耗。