什么是Shuffle:缓存
Shuffle中文翻译为“洗牌”,须要Shuffle的关键缘由是某种具备共同特征的数据须要最终汇聚到一个计算节点上进行计算。网络
Shuffle面临的问题:app
1. 数据量很是大;负载均衡
2 数据如何分类,及如何Partition,Hash、Sort、钨丝计划框架
3. 负载均衡(数据倾斜)oop
4. 网络传输效率,须要在压缩和解压缩作出权衡,序列化和反序列化也是须要考虑的问题。spa
Hash Shuffle:翻译
1. Key不能是Array3d
2. Hash Shuffle不须要排序,从理论上就节省了Hadoop MapReduce中进行Shuffle须要排序时候的时间浪费,由于实际生产环境有大量不要排序的Shuffle类型。blog
思考:不要排序的Hash Shuffle是否必定比不须要排序的Sort额度 Shuffle速度更快? 不必定,若是数据规模比较小的状况下,Hash Shuffle会比Sorted Shuffle速度快(不少)!可是若是数据量大,此时Sorted Shuffle通常会比Hash Shuffle快(不少)。
3. 每一个ShuffleMapTask会根据key的哈希值计算出当前的key须要写入的Partition,而后把决定后的结果写入单独的文件,此时会致使每一个Task产生R(指下一个Stage的并行度)个文件,若是当前的Stage中有M个ShuffleMapTask,则会M*R个文件。
注意:Shuffle操做绝大多数都要经过网路,若是Mapper和Reducer在同一台机器上,此时只须要读取本地磁盘便可。
Hash Shuffle的两大死穴:第一:Shuffle前会产生大量的小文件到磁盘之上,此时会产生大量耗时低效的IO操做;第二:因为内存中须要保存海量的文件操做句柄和临时缓存信息,若是数据量比较庞大的话,内存不可承受,出现OOM等问题。
为了改善上述问题(同时打开太多文件致使Write Handler内存使用过大以及过多文件致使大量的随机读写带来的效率低下的磁盘IO操做),后来推出了Consalidate机制,来把小文件合并,此时Shuffle时产生的文件数量为cores*R,对于ShuffleMapTask的数量明显多于同时可用的并行cores的数量的状况下,Shuffle产生的文件大幅减小,会极大减低OOM的可能。
为此Spark推出了Shuffle Pluggable开发框架,方便系统升级的时候定制Shuffle功能模块,业方面第三方系统改造人员根据实际的业务场景来开发具体最佳的Shuffle模块;核心接口ShuffleManager,具体默认的实现由HashShuffleManager、SortShuffleManager等,Spark1.6.0中具体的配置以下:
为何须要Sort-Based Shuffle?
1. Shuffle通常包含两个阶段任务:第一部分,产生Shuffle数据的阶段(Map阶段,额外的补充,须要实现ShuffleManager中getWriter来写数据(数据能够以BlockManager写到Memory、Disk、Tachyon等,例如像很是快的Shuffle,此时能够考虑把数据写在内存中,可是内存不稳定,建议采用MEMOrY_AND_DISK方式)),第二部分,使用Shuffle数据的阶段(Reduce阶段,额外的补充,须要实现ShuffleManager的getReader,Reader会向Driver去获取上一个Stage产生的Shuffle数据)。
2.Spark的Job会被划分红不少Stage:
若是只有一个Stage,则这个Job就至关于只有一个Mapper阶段,固然不会产生Shuffle,适合于简单的ETL;
若是不止一个Stage,则最后一个Stage就是最终的Reducer,最左侧的第一个Stage就仅仅是整个Job的Mapper,中间全部的任意一个Stage是其父Stage的Reducer且是其子Stage的Mapper。
3.Spark Shuffle在最开始的时候只支持Hash-base Shuffle:默认Mappper阶段会为Reducer阶段的每个Task单首创建一个文件来保存该Task中要使用的数据,可是在一些状况下(例如数据量很是大的状况)会形成大量文件(M*R,其中M表明Mapper中的全部的并行任务的数量,R表明)
3.