剖析Hadoop和Spark的Shuffle过程差别(二)

    上一篇博客《剖析Hadoop和Spark的Shuffle过程差别(一)》剖析了Hadoop MapReduce的Shuffle过程,那么本篇博客,来聊一聊Spark shuffle。算法

    Spark shuffle相对来讲更简单,由于不要求全局有序,因此没有那么多排序合并的操做。Spark shuffle分为write和read两个过程。咱们先来看shuffle write。缓存

    1、shuffle write函数

    shuffle write的处理逻辑会放到该ShuffleMapStage的最后(由于spark以shuffle发生与否来划分stage,也就是宽依赖),final RDD的每一条记录都会写到对应的分区缓存区bucket,以下图所示:oop

      说明:fetch

    一、上图有2个CPU,能够同时运行两个ShuffleMapTaskspa

    二、每一个task将写一个buket缓冲区,缓冲区的数量和reduce任务的数量相等.net

    三、 每一个buket缓冲区会生成一个对应ShuffleBlockFileblog

    四、ShuffleMapTask 如何决定数据被写到哪一个缓冲区呢?这个就是跟partition算法有关系,这个分区算法能够是hash的,也能够是range的 排序

    五、最终产生的ShuffleBlockFile会有多少呢?就是ShuffleMapTask 数量乘以reduce的数量,这个是很是巨大的get

    那么有没有办法解决生成文件过多的问题呢?有,开启FileConsolidation便可,开启FileConsolidation以后的shuffle过程以下:

    在同一核CPU执行前后执行的ShuffleMapTask能够共用一个bucket缓冲区,而后写到同一份ShuffleFile里去,上图所示的ShuffleFile其实是用多个ShuffleBlock构成,那么,那么每一个worker最终生成的文件数量,变成了cpu核数乘以reduce任务的数量,大大缩减了文件量。

    2、Shuffle read

    Shuffle write过程将数据分片写到对应的分片文件,这时候万事具有,只差去拉取对应的数据过来计算了。

    那么Shuffle Read发送的时机是什么?是要等全部ShuffleMapTask执行完,再去fetch数据吗?理论上,只要有一个 ShuffleMapTask执行完,就能够开始fetch数据了,实际上,spark必须等到父stage执行完,才能执行子stage,因此,必须等到全部 ShuffleMapTask执行完毕,才去fetch数据。fetch过来的数据,先存入一个Buffer缓冲区,因此这里一次性fetch的FileSegment不能太大,固然若是fetch过来的数据大于每个阀值,也是会spill到磁盘的。

   fetch的过程过来一个buffer的数据,就能够开始聚合了,这里就遇到一个问题,每次fetch部分数据,怎么能实现全局聚合呢?以word count的reduceByKey(《Spark RDD操做之ReduceByKey 》)为例,假设单词hello有十个,可是一次fetch只拉取了2个,那么怎么全局聚合呢?Spark的作法是用HashMap,聚合操做其实是map.put(key,map.get(key)+1),将map中的聚合过的数据get出来相加,而后put回去,等到全部数据fetch完,也就完成了全局聚合。

    3、总结

    Hadoop的MapReduce Shuffle和Spark Shuffle差异总结以下:

    一、Hadoop的有一个Map完成,Reduce即可以去fetch数据了,没必要等到全部Map任务完成,而Spark的必须等到父stage完成,也就是父stage的map操做所有完成才能去fetch数据。

    二、Hadoop的Shuffle是sort-base的,那么不论是Map的输出,仍是Reduce的输出,都是partion内有序的,而spark不要求这一点。

    三、Hadoop的Reduce要等到fetch彻底部数据,才将数据传入reduce函数进行聚合,而spark是一边fetch一边聚合。    

相关文章
相关标签/搜索