本博文的主要内容:算法
一、Hash Shuffle完全解密apache
二、Shuffle Pluggable解密缓存
三、Sorted Shuffle解密性能优化
四、Shuffle性能优化微信
一:到底什么是Shuffle?网络
Shuffle中文翻译为“洗牌”,须要Shuffle的关键性缘由是某种具备共同特征的数据须要最终汇聚到一个计算节点上进行计算。app
二:Shuffle可能面临的问题?负载均衡
运行Task的时候才会产生Shuffle(Shuffle已经融化在Spark的算子中了)。框架
一、 数据量很是大;【几千甚至上万台机器进行Shuffle的数据量会很大,从其余各台机器上收集过来数据的时候,网络传输量会很恐怖】分布式
二、 数据如何分类,即如何Partition,Hash、Sort、钨丝计算;【不一样的Partition的不一样实现,他会影响集群规模的大小,会影响内存的使用,会影响性能等等方面,也就有了Shuffle几个不一样的净化阶段】
三、 负载均衡(数据倾斜);【由于采用不一样的Shuffle的方式对数据进行不一样的分类,而分类以后数据又分到不一样的节点上进行计算,若是Shuffle分类不恰当,会致使负载均衡,也就是数据倾斜】
四、 网络传输效率,须要在压缩和解压缩之间作出权衡,序列化和反序列也是要考虑的问题;【若是压缩,则须要解压缩,解压缩须要消耗CPU,因此须要衡量带宽和CPU解压的时间,作出正确的权衡】
说明:具体的Task进行计算的时候尽一切最大可能使得数据具有Process Locality的特性
【由于这是它运行最快的方式,数据在内存中,也就是默认采起的方式,若是无可奈何,数据不能所有放在内存中,从实际生成角度讲(即不具有内存本地性)】;退而求次是增长数据分片,减小每一个Task处理的数据量
【致使任务运行的批次更多,任务更多】。
【1,cache自己具备风险,Memory溢出风险,它被其余计算占用掉内存的风险,致使从新计算,除非计算特别复杂,计算链条特别长,可能有必要为了容错,为了再次数据复用,来进行中间结果的持久化,不然的话,尤为是持久化到disk时,还不如在内存中直接计算,这样的速度有可能比从磁盘中读取曾经计算结果来的更快2,度磁盘I/O是一个高风险的动做,读内存分享会下降不少。
在一个Stage内部,不持久化中间结果,数据丢失从新计算依赖的RDD;可是在产生Shuffle的时候,会产生网络通讯,这是须要持久化。
持久化默认状况下放在磁盘中,也能够调整Spark的框架,将数据放在内存中,如今通常放在Local FileSystem上面,也能够放在Tachyon中,这些均可以经过调整Spark的配置和改造Spark源码来实现。】
三:Hash Shuffle完全解密
一、key不能是Array;
【key若是是Array,则就没法很是友好的计算具体的hashcode值】
二、 Hash Shuffle不须要排序
【使得速度很快,其工做机制根据Shuffle的前面的Stage的最后一个final RDD,依据Partition把数据分红不一样的类,按照Key的hashcode,而后按照必定的业务逻辑规则(例如,假以下一个Stage有3个并行任务,最简单的就是取模3运算,分红3种类型的数据)无需排序,性能很好】,此时从理论上讲就节省了Hadoop MapReduce中进行Shuffle须要排序时候的时间浪费,由于实际生产环境有大量的不须要排序的Shuffle类型;
思考:不须要排序的Hash Shuffle是否必定比须要排序的Sorted Shuffle速度更快?不必定!若是数据规模比较小的情形下,Hash Shuffle会比Sorted Shuffle速度快(不少)!可是若是数据量大,此时Sorted Shuffle通常都会比Hash Shuffle快(不少)
【数据量大的状况下,Sorted Shuffle比Hash Shuffle快的缘由:若是数据规模比较 大,可能Hash Shuffle没法处理,由于hash的方式时会有key和句柄之类,还有许 多小文件,此时,磁盘的性能会成为瓶颈,内存也会变成瓶颈。Sorted Shuffle会极 大地节省磁盘、内存的访问,更有利于更大规模的数据运算】
三、每一个ShuffleMapTask会根据key的哈希值计算出当前的key须要写入的Partition,而后把决定后的结果写入当单独的文件,此时会致使每一个Task产生R(指下一个Stage的并行度)个文件,若是当前的Stage中有M个ShuffleMapTask,则会M*R个文件!!!
注意:Shuffle操做绝大多数状况下都要经过网络,若是Mapper和Reducer在同一台机器上,此时只须要读取本地磁盘便可。
【每一个任务都产生R个小文件,因为其须要将数据分红几种不一样类型,就是下一个Stage的具体的Task会读取的与本身相关的数据,由于已经分好类了,此时会产生M*R个小文件,那么下一个Stage就会经过网络根据Driver的注册信息(因为上一个Stage写过的内容会注册给Driver),而后询问Driver上一个Stage具体的输出在哪里,以及哪些属于该Stage的部分,经过网络读取数据;同时Shuffle的数据不必定都须要经过网络(有可能在同一台机器上)】
Hash Shuffle的两大死穴:第一:Shuffle前会产生海量的小文件于磁盘之上,此时会产生大量耗时低效的IO操做;第二:内存不共用!!!因为内存中须要保存海量的文件操做句柄和临时缓存信息,若是数据处理规模比较庞大的话,内存不可承受,出现OOM等问题!
Hash-based Shuffle另外一说法
一、 Spark Shuffle在最开始的时候只支持Hash-based Shuffle:默认Mapper阶段会为Reducer阶段的每个Task单首创建一个文件来保存该Task中要使用的数据。
优势:就是操做数据简单。
缺点:可是在一些状况下(例如数据量很是大的状况)会形成大量文件(M*R,其中M表明Mapper中的全部的并行任务数量,R表明Reducer中全部的并行任务数据)大数据的随机磁盘I/O操做且会造成大量的Memory(极易形成OOM)。
二、Hash-based Shuffle产生的问题:
第一:不可以处理大规模的数据
第二:Spark不可以运行在大规模的分布式集群上!
三、Consolidate机制:
后来的改善是加入了Consolidate机制来将Shuffle时候产生的文件数量减小到C*R个(C表明在Mapper端,同时可以使用的cores数量,R表明Reducer中全部的并行任务数量)。可是此时若是Reducer端的并行数据分片过多的话则C*R可能已通过大,此时依旧没有逃脱文件打开过多的厄运!!!Consolidate并无下降并行度,只是下降了临时文件的数量,此时Mapper端的内存消耗就会变少,因此OOM也就会下降,另一方面磁盘的性能也会变得更好。
Spark在引入Sort-Based Shuffle以前,适合中小型数据规模的大数据处理!
四: Sorted Shuffle解密
为了改善上述的问题(同时打开过多文件致使Writer Handler内存使用过大以及产生过分文件致使大量的随机读写带来的效率极为低下的磁盘IO操做),Spark后来推出了Consalidate机制,来把小文件合并【根据TaskId进行合并】,此时Shuffle时文件产生的数量为cores*R,对于ShuffleMapTask的数量明显多于同时可用的并行Cores的数量的状况下,Shuffle产生的文件会大幅度减小,会极大下降OOM的可能;【consalidate机制减小了文件,同时也减小了文件句柄的数量;但对于并行度很是高时,及R值特别大时,仍是很麻烦。】
【在接口ShuffleManager中:registerShuffle:由Driver注册源数据中的信息,系统默认状况下其有HashBasedShuffle和SortedBasedShuffle两种状况。getReader和getWriter:获取怎么在Shuffle的时候写本地数据,获取下一个Stage读取上一个Stage的具体数据的阅读器。unregisterShuffle:删除掉本地的Shuffle的源数据。Stop:中止ShuffleManager】
为此Spark推出了Shuffle Pluggable开放框架,方便系统升级的时候定制Shuffle功能模块,也方便第三方系统改造人员根据实际的业务场景来开放具体最佳的Shuffle模块;核心接口ShuffleManager,具体默认实现有HashShuffleManager、SortShuffleManager等,Spark 1.6.0/Spark 1.5.2中具体的配置以下:
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager")
默认采起SortedBasedShuffle的方式。
Sort是如何解决内存不够、小文件过多的问题?
采用Hash的方式的适用场景是数据规模相对比较小,并且不须要排序。Hadoop的MapReduce进行排序,使得处理数据规模更大,集群规模更大。
Consalidate必定程度上解决了该问题,但仍不完全,SortedBasedShuffle更好的解决了该问题。首先,每一个ShuffleMapTask不会为每一个Reducer生成一个单独的文件,它护肩全部的结果写到一个文件里,同时生成一个Index索引文件,每一个Reducer能够根据这个Index索引文件取得它所须要处理的数据,这样就避免产生大量文件,没有了大量文件,也就没有了大量的文件句柄,节省了内存;同时因为磁盘上文佳变少了,并且有Index索引,不用随机的去读写,而是顺序的disk I/O,带来了低延迟,节省了内存;另外一方面,减小了GC风险和频率,而减小具体的文件数量能够避免同时些多个文件是给系统带来的压力,这就是优点所在。
具体的实现:ShuffleMapTask会按照Key相应的Partition的ID进行Sort,若是属于同一个Partition的Key,自己不进行Sort,所以对不须要sort的操做来讲,若是内存不够用,他就会把那些已经排序的内容写到外部disk,结束的时候再进行归并排序(merge-sort)
为高效读取这些file Seagate,它有一个Index文件,会记录不一样的Partition的位置信息,BlockManager也会对它的寻址算法进行优化性的实现。归并排序最优是打开10-100个文件。
最后生成文件时须要同时生成Index索引文件。
对具体的ShuffleMapTask,它外部有具体的归并排序方式,mergeSort,sort以后会产生两个文件,这两个文件其中一个是Index索引文件,一个是存放具体的Task的输出内容,在Reducer端读取数据的时候,其实首先访问Index,具体在工做的时候,BlockManager首先访问Index,经过Index去定位具体文件内容。避免了大量文件句柄,节省内存。
采用Sort方式集群的规模和数据的计算规模就不受限制了。
Sort-Based Shuffle的另外一说法
一、为了让Spark在更大规模的集群上更高性能处理更大规模的数据,因而就引入了Sort-based Shuffle!今后之后(Spark1.1版本开始),Spark能够胜任任何规模(包括PB级别及PB以上的级别)的大数据的处理,尤为是钨丝计划的引入和优化,Spark更快速的在更大规模的集群处理更海量的数据的能力推向了一个新的巅峰!
二、Spark1.6版本支持最少三种类型Shuffle:
实现ShuffleManager接口能够根据本身的业务实际须要最优化的使用自定义的Shuffle实现;
三、Spark1.6默认采用的就是Sort-based Shuffle的方式:
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
上述源码说明,你能够在Spark配置文件中配置Spark框架运行时要使用的具体的ShuffleManager的实现。能够在conf/spark-default.conf加入以下内容:
spark.shuffle.manager SORT 配置Shuffle方式是SORT
四、 Sort-based Shuffle的工做方式以下:Shuffle的目的就是:数据分类,而后数据汇集
1) 首先每一个ShuffleMapTask不会为每一个Reducer单独生成一个文件,相反,Sort-based Shuffle会把Mapper中每一个ShuffleMapTask全部的输出数据Data只写到一个文件中。由于每一个ShuffleMapTask中的数据会被分类,因此Sort-based Shuffle使用了index文件存储具体ShuffleMapTask输出数据在同一个Data文件中是如何分类的信息!!
2) 基于Sort-base的Shuffle会在Mapper中的每个ShuffleMapTask中产生两个文件:Data文件和Index文件,其中Data文件是存储当前Task的Shuffle输出的。而index文件中则存储了Data文件中的数据经过Partitioner的分类信息,此时下一个阶段的Stage中的Task就是根据这个Index文件获取本身所要抓取的上一个Stage中的ShuffleMapTask产生的数据的,Reducer就是根据index文件来获取属于本身的数据。
涉及问题:Sorted-based Shuffle:会产生 2*M(M表明了Mapper阶段中并行的Partition的总数量,其实就是ShuffleMapTask的总数量)个Shuffle临时文件。
Shuffle产生的临时文件的数量的变化一次为:
Basic Hash Shuffle: M*R;
Consalidate方式的Hash Shuffle: C*R;
Sort-based Shuffle: 2*M;
在集群中动手实战Sort-based Shuffle
在Sorted-based Shuffle中Reducer是如何获取本身须要的数据呢?具体而言,Reducer首先找Driver去获取父Stage中的ShuffleMapTask输出的位置信息,根据位置信息获取index文件,解析index,从解析的index文件中获取Data文件中属于本身的那部份内容;
Sorted-based Shuffle与排序没有关系,Sorted-based Shuffle并无对内容进行排序,Sorted-based Shuffle是对Shuffle进行Sort,对咱们具体要执行的内容没有排序。
Reducer在何时去fetch数据?
当parent Stage的全部ShuffleMapTasks结束后再fetch。等全部的ShuffleMapTask执行完以后,边fetch边计算。
经过动手实践确实证实了Sort-based Shuffle产生了2M个文件。M是并行Task的数量。
Shuffle_0_0_0.data
shuffle_0_3_0.index
从上能够看出index文件和data文件数量是同样的。
Sorted Shuffle Writer源码:
反序列化RDD和Dependency
调用SortShuffleManager的getWriter方法。
Writer方法写入结果。
2. SortShuffleManager复写了ShuffleManager中的getWriter方法,源码以下:
3. SorShuffleWriter的write方法源码以下:
其中ShuffleBlockId记录shuffleId和mapId得到Block。
用于在Block的索引文件中记录每一个block的偏移量,其中getBlockData方法能够根据ShuffleId和mapId读取索引文件,得到前面partition计算以后,,将结果写入文件中的偏移量和结果的大小。
/** * Write an index file with the offsets of each block, plus a final offset at the end for the * end of the output file. This will be used by getBlockData to figure out where each block * begins and ends. * * It will commit the data and index file as an atomic operation, use the existing ones, or * replace them with new ones. * * Note: the `lengths` will be updated to match the existing index file if use the existing ones. * */ def writeIndexFileAndCommit( shuffleId: Int, mapId: Int, lengths: Array[Long], dataTmp: File): Unit = { val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. var offset = 0L out.writeLong(offset) for (length <- lengths) { offset += length out.writeLong(offset) } } { out.close() } val dataFile = getDataFile(shuffleId, mapId) // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure // the following check and rename are atomic. synchronized { val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) if (existingLengths != null) { // Another attempt for the same task has already written our map outputs successfully, // so just use the existing partition lengths and delete our temporary map outputs. System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) if (dataTmp != null && dataTmp.exists()) { dataTmp.delete() } indexTmp.delete() } else { // This is the first successful attempt in writing the map outputs for this task, // so override any existing index and data files with the ones we wrote. if (indexFile.exists()) { indexFile.delete() } if (dataFile.exists()) { dataFile.delete() } if (!indexTmp.renameTo(indexFile)) { throw new IOException("fail to rename file " + indexTmp + " to " + indexFile) } if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
默认Sort-based Shuffle的几个缺陷:
2.若是须要在分片内也进行排序的话,此时须要进行Mapper端和Reducer端的两次排序!!!
优化:
能够改造Mapper和Reducer端,改框架来实现一次排序。
频繁GC的解决办法是:钨丝计划!!
感谢下面的博主:
王家林 中国Spark第一人
DT大数据梦工厂
新浪微博: http://weibo.com.ilovepains/
微信公共号DT_Spark
博客:http://bolg.sina.com.cn/ilovepains
手机:18610086859
qq:1740415547
邮箱:18610086859@vip.126.com