转载自:http://www.javashuo.com/article/p-mxuuborw-es.htmlhtml
这里只取其前面的部分,跟以前看的同样,不过本身就懒得去画图总结了。而后取前半部分是由于,目前对于Spark的学习仍是在1.6,我也但愿后面有时间接触一下2.x的,由于以前跟朋友讨论过,他告诉我2.x跟1.6仍是有很多的差别的,至少在使用姿式上,然而我也是十分无奈,由于现阶段确实没有太多的机会去接触2.x的东西,然而我相信很快就有了。编程
另外,若是有MapReduce的编程经验,其实spark的shuffle过程也是很好理解的,以前有写过MapReduce shuffle过程的文章,其实就是那个很经典的图。app
最近把《Hadoop权威指南》关于Spark的部分翻出来看了下,发现真的都是精华,发现它把以前本身学习Spark的不少基础理论知识很好的总结到了一块,尤为是Spark On Yarn-Client&Cluster那一小节,真的总结得太好了!ide
假如咱们有个 spark job 依赖关系以下oop
咱们抽象出来其中的rdd和依赖关系:性能
E <-------n------, C <--n---D---n-----F--s---, A <-------s------ B <--n----`-- G学习
对应的划分后的RDD结构为:优化
最终咱们获得了整个执行过程:spa
中间就涉及到shuffle 过程,前一个stage 的 ShuffleMapTask 进行 shuffle write, 把数据存储在 blockManager 上面, 而且把数据位置元信息上报到 driver 的 mapOutTrack 组件中, 下一个 stage 根据数据位置元信息, 进行 shuffle read, 拉取上个stage 的输出数据。3d
这篇文章讲述的就是其中的 shuffle write 过程。
Spark 0.8及之前 Hash Based Shuffle
Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
Spark 1.2 默认的Shuffle方式改成Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出历史舞台
总结一下, 就是最开始的时候使用的是 Hash Based Shuffle, 这时候每个Mapper会根据Reducer的数量建立出相应的bucket,bucket的数量是M R ,其中M是Map的个数,R是Reduce的个数。这样会产生大量的小文件,对文件系统压力很大,并且也不利于IO吞吐量。后面忍不了了就作了优化,把在同一core上运行的多个Mapper 输出的合并到同一个文件,这样文件数目就变成了 cores R 个了,
举个例子:
原本是这样的,3个 map task, 3个 reducer, 会产生 9个小文件,
是否是很恐怖, 后面改造以后
4个map task, 4个reducer, 若是不使用 Consolidation机制, 会产生 16个小文件。
可是可是如今这 4个 map task 分两批运行在 2个core上, 这样只会产生 8个小文件
在同一个 core 上前后运行的两个 map task的输出, 对应同一个文件的不一样的 segment上, 称为一个 FileSegment, 造成一个 ShuffleBlockFile,
后面就引入了 Sort Based Shuffle, map端的任务会按照Partition id以及key对记录进行排序。同时将所有结果写到一个数据文件中,同时生成一个索引文件, 再后面就就引入了 Tungsten-Sort Based Shuffle, 这个是直接使用堆外内存和新的内存管理模型,节省了内存空间和大量的gc, 是为了提高性能。
如今2.1 分为三种writer, 分为 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter,顾名思义,你们应该能够对应上,咱们本着过期不讲的原则, 本文中只描述这三种 writer 的实现细节, Hash Based Shuffle 已经退出历史舞台了,我就不讲了。