1、前言面试
对于基于MapReduce编程范式的分布式计算来讲,本质上而言,就是在计算数据的交、并、差、聚合、排序等过程。而分布式计算分而治之的思想,让每一个节点只计算部分数据,也就是只处理一个分片,那么要想求得某个key对应的全量数据,那就必须把相同key的数据聚集到同一个Reduce任务节点来处理,那么Mapreduce范式定义了一个叫作Shuffle的过程来实现这个效果。算法
2、编写本文的目的编程
本文旨在剖析Hadoop和Spark的Shuffle过程,并对比二者Shuffle的差别。缓存
3、Hadoop的Shuffle过程网络
Shuffle描述的是数据从Map端到Reduce端的过程,大体分为排序(sort)、溢写(spill)、合并(merge)、拉取拷贝(Copy)、合并排序(merge sort)这几个过程,大致流程以下:架构
- 
上图的Map的输出的文件被分片为红绿蓝三个分片,这个分片的就是根据Key为条件来分片的,分片算法能够本身实现,例如Hash、Range等,最终Reduce任务只拉取对应颜色的数据来进行处理,就实现把相同的Key拉取到相同的Reduce节点处理的功能。下面分开来讲Shuffle的的各个过程。分布式
Map端作了下图所示的操做:函数
- 一、Map端sort
Map端的输出数据,先写环形缓存区kvbuffer,当环形缓冲区到达一个阀值(能够经过配置文件设置,默认80),便要开始溢写,但溢写以前会有一个sort操做,这个sort操做先把Kvbuffer中的数据按照partition值和key两个关键字来排序,移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位汇集在一块儿,同一partition内的按照key有序。oop
二、spill(溢写) 学习
当排序完成,便开始把数据刷到磁盘,刷磁盘的过程以分区为单位,一个分区写完,写下一个分区,分区内数据有序,最终实际上会屡次溢写,而后生成多个文件
三、merge(合并)
spill会生成多个小文件,对于Reduce端拉取数据是至关低效的,那么这时候就有了merge的过程,合并的过程也是同分片的合并成一个片断(segment),最终全部的segment组装成一个最终文件,那么合并过程就完成了,以下图所示
至此,Map的操做就已经完成,Reduce端操做即将登场
Reduce操做
整体过程以下图的红框处:
- 
- 一、拉取拷贝(fetch copy)
Reduce任务经过向各个Map任务拉取对应分片。这个过程都是以Http协议完成,每一个Map节点都会启动一个常驻的HTTP server服务,Reduce节点会请求这个Http Server拉取数据,这个过程彻底经过网络传输,因此是一个很是重量级的操做。
- 二、合并排序
Reduce端,拉取到各个Map节点对应分片的数据以后,会进行再次排序,排序完成,结果丢给Reduce函数进行计算。
4、总结
至此整个shuffle过程完成,最后总结几点:
Spark shuffle相对来讲更简单,由于不要求全局有序,因此没有那么多排序合并的操做。Spark shuffle分为write和read两个过程。咱们先来看shuffle write。
shuffle write的处理逻辑会放到该ShuffleMapStage的最后(由于spark以shuffle发生与否来划分stage,也就是宽依赖),final RDD的每一条记录都会写到对应的分区缓存区bucket,以下图所示:
说明:
那么有没有办法解决生成文件过多的问题呢?有,开启FileConsolidation便可,开启FileConsolidation以后的shuffle过程以下:
在同一核CPU执行前后执行的ShuffleMapTask能够共用一个bucket缓冲区,而后写到同一份ShuffleFile里去,上图所示的ShuffleFile其实是用多个ShuffleBlock构成,那么,那么每一个worker最终生成的文件数量,变成了cpu核数乘以reduce任务的数量,大大缩减了文件量。
Shuffle write过程将数据分片写到对应的分片文件,这时候万事具有,只差去拉取对应的数据过来计算了。
那么Shuffle Read发送的时机是什么?是要等全部ShuffleMapTask执行完,再去fetch数据吗?理论上,只要有一个 ShuffleMapTask执行完,就能够开始fetch数据了,实际上,spark必须等到父stage执行完,才能执行子stage,因此,必须等到全部 ShuffleMapTask执行完毕,才去fetch数据。fetch过来的数据,先存入一个Buffer缓冲区,因此这里一次性fetch的FileSegment不能太大,固然若是fetch过来的数据大于每个阀值,也是会spill到磁盘的。
fetch的过程过来一个buffer的数据,就能够开始聚合了,这里就遇到一个问题,每次fetch部分数据,怎么能实现全局聚合呢?以word count的reduceByKey(《Spark RDD操做之ReduceByKey 》)为例,假设单词hello有十个,可是一次fetch只拉取了2个,那么怎么全局聚合呢?Spark的作法是用HashMap,聚合操做其实是map.put(key,map.get(key)+1),将map中的聚合过的数据get出来相加,而后put回去,等到全部数据fetch完,也就完成了全局聚合。
Hadoop的MapReduce Shuffle和Spark Shuffle差异总结以下:
以为不错请点赞支持,欢迎留言或进个人我的群855801563领取【架构资料专题目合集90期】、【BATJTMD大厂JAVA面试真题1000+】,本群专用于学习交流技术、分享面试机会,拒绝广告,我也会在群内不按期答题、探讨。