Shuffle Map的过程,即Shuffle Stage的ShuffleTask按照必定的规则将数据写到相应的文件中,并把写的文件"位置信息" 以MapOutput返回给DAGScheduler ,MapOutput将它更新到特定位置就完成了整个Shuffle Map过程. 在Spark中,Shuffle reduce过程抽象化为ShuffledRDD,即这个RDD的compute方法计算每个分片即每个reduce的数据是经过拉取ShuffleMap输出的文件并返回Iterator来实现的html
二者差异不大,度分为map和reduce两个阶段。node
从 high-level 的角度来看,二者并无大的差异。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不一样的 partition 送到不一样的 reducer(Spark 里 reducer 多是下一个 stage 里的 ShuffleMapTask,也多是 ResultTask)。Reducer 之内存做缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好之后进行 reduce() (Spark 里多是后续的一系列操做)。算法
差异较大,Hadoop在Map和reduce阶段都有排序操做,而spark默认使用hash进行聚合,不会提早进行排序操做。编程
从 low-level 的角度来看,二者差异不小。 Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 能够处理大规模的数据,由于其输入数据能够经过外排获得(mapper 对每段数据先作排序,reducer 的 shuffle 对排好序的每段数据作归并)。目前的 Spark 默认选择的是 hash-based,一般使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提早排序。若是用户须要通过排序的数据,那么须要本身调用相似 sortByKey() 的操做缓存
mapreduce将处理流程进行细化出map,shuffle,sort,reduce等几个阶段,而spark只有一个stage和一系列的transformation()网络
Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每一个阶段各司其职,能够按照过程式的编程思想来逐一实现每一个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不一样的 stage 和一系列的 transformation(),因此 spill, merge, aggregate 等操做须要蕴含在 transformation() 中。数据结构
为了分析方便,假定每一个Executor只有1个CPU core,也就是说,不管这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。并发
shuffle write阶段,主要就是在一个stage结束计算以后,为了下一个stage能够执行shuffle类的算子(好比reduceByKey),而将每一个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘以前,会先将数据写入内存缓冲中,当内存缓冲填满以后,才会溢写到磁盘文件中去。参见下面HashShuffleManager图示。app
参见: ShuffleMapStage函数
MapOutputTracker :是为MapOutput提供一个访问入口,提供了注册和获取MapStatus的接口。
MapOutputTracker能够把每一个Map输出的MapStatus注册到Tracker,同时Tracker也提供了访问接口,能够从该Tracker中读取指定每一个ShuffleID所对应的map输出的位置;
同时MapOutputTracker也是主从结构,其中Master提供了将Map输出注册到Tracker的入口, slave运行在每一个Executor上,提供读取入口, 可是这个读取过程须要和Master进行交互,将指定的 ShuffleID所对应的MapStatus信息从Master中fetch过来;
参见: 提交stage
driver端,记录shuffle信息
MapStatus数据记录的格式:{shuffleId,mapId,MapStatus}
每一个Shuffle都对应一个ShuffleID,该ShuffleID下面对应多个MapID,每一个MapID都会输出一个MapStatus,经过该MapStatus,能够定位每一个 MapID所对应的ShuffleMapTask运行过程当中所对应的机器
经过shuffleID进行索引,存储了全部注册到tracker的Shuffle, 经过registerShuffle能够进行注册Shuffle, 经过registerMapOutput能够在每次ShuffleMapTask结束之后,将Map的输出注册到Track中; 同时提供了getSerializedMapOutputStatuses接口 将一个Shuffle全部的MapStatus进行序列化并进行返回;
class MapOutputTrackerMaster{ val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala val mapStatuses = new Array[MapStatus](numPartitions) // 在建立stage时,初始化ShuffleStatus def registerShuffle(shuffleId: Int, numMaps: Int) { shuffleStatuses.put( shuffleId,new ShuffleStatus(numMaps)) } // 将MapTask的输出注册到Track中 def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) { **// {shuffleId,mapId,MapStatus}** shuffleStatuses(shuffleId).addMapOutput(mapId, status) } def addMapOutput(mapId: Int, status: MapStatus): Unit = synchronized { mapStatuses(mapId) = status } } // mapStatus中包含了task运行位置,partitions数量等信息 MapStatus{ def location: BlockManagerId def getSizeForBlock(reduceId: Int): Long }
excutor端获取shuffle信息,注意:local模式下是直接从trackerMaster获取信息的(worker和master拥有相同的父类,local模式下直接获取不用再走RPC调用)
MapOutputTrackerWorker的实现很简单,核心功能就是getServerStatuses, 它获取指定Shuffle的每一个reduce所对应的MapStatus信息
class MapOutputWorker{ def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { // 根据shuffleId获取MapStatus集合 val statuses = getStatuses(shuffleId) // 根据shuffleId和起始分区,从mapStatus获取响应的blockManager信息 MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses) } // 发送消息给trackerMaster,获取mapOutPut信息 def askTracker{ var trackerEndpoint: RpcEndpointRef = _ trackerEndpoint.askSync[T](message) }
ShuffleMapTask负责写数据操做,最后会生成.data和.index文件,在执行完毕后返回一个MapStatus对象。
ShuffleMapTask在excutor上获取到具体的writer后进行实际的写操做
class ShuffleMapTask extends Task( def runTask(context: TaskContext): MapStatus = { // 反序列化接收到的数据 val (rdd, dep) = closureSerializer.deserialize( ByteBuffer.wrap(taskBinary.value)) // 调用ShuffleManager的getWriter方法获取一组writer writer = manager.getWriter(dep.shuffleHandle, partitionId, context) // 遍历RDD进行write writer.write() } }
参见: writer
Get a writer for a given partition. Called on executors by map tasks.
由于Shuffle过程当中须要将Map结果数据输出到文件,因此须要经过注册一个ShuffleHandle来获取到一个ShuffleWriter对象,经过它来控制Map阶段记录数据输出的行为。其中,ShuffleHandle包含了以下基本信息:
shuffleId:标识Shuffle过程的惟一ID numMaps:RDD对应的Partitioner指定的Partition的个数,也就是ShuffleMapTask输出的Partition个数 dependency:RDD对应的依赖ShuffleDependency
class SortShuffleManager{ def getWriter(){ handle match { case SerializedShuffleHandle=> new UnsafeShuffleWriter() case BypassMergeSortShuffleHandle=> new BypassMergeSortShuffleWriter() case BaseShuffleHandle=> new SortShuffleWriter() } }
若是ShuffleDependency中的Serializer,容许对将要输出数据对象进行排序后,再执行序列化写入到文件,则会选择建立一个SerializedShuffleHandle,生成一个UnsafeShuffleWriter
除了上面两种ShuffleHandle之后,其余状况都会建立一个BaseShuffleHandle对象,它会以反序列化的格式处理Shuffle输出数据。
数据记录格式:
// shuffle_shuffleId_mapId_reducId shuffle_2901_11825_0.data shuffle_2901_11825_0.index
数据格式有两种,若是不须要合并则使用buffer,若是须要合并使用map
class ExternalSorter{ map = new PartitionedAppendOnlyMap[K, C] buffer = new PartitionedPairBuffer[K, C] def insertAll{ if(shouldCombine){ map.changeValue() }else{ buffer.insert() } } }
2.2.1.3.1.1.1. map
在Map阶段会执行Combine操做,在Map阶段进行Combine操做可以下降Map阶段数据记录的总数,从而下降Shuffle过程当中数据的跨网络拷贝传输。这时,RDD对应的ShuffleDependency须要设置一个Aggregator用来执行Combine操做
map是内存数据结构,最重要的是update函数和map的changeValue方法(这里的map对应的实现类是PartitionedAppendOnlyMap)。update函数所作的工做,其实就是对createCombiner和mergeValue这两个函数的使用,第一次遇到一个Key调用createCombiner函数处理,非首次遇到同一个Key对应新的Value调用mergeValue函数进行合并处理。map的changeValue方法主要是将Key和Value在map中存储或者进行修改(对出现的同一个Key的多个Value进行合并,并将合并后的新Value替换旧Value)。 PartitionedAppendOnlyMap是一个通过优化的哈希表,它支持向map中追加数据,以及修改Key对应的Value,可是不支持删除某个Key及其对应的Value。它可以支持的存储容量是0.7 * 2 ^ 29 = 375809638。当达到指定存储容量或者指定限制,就会将map中记录数据Spill到磁盘文件,这个过程和前面的相似
class ExternalSorter{ map = new PartitionedAppendOnlyMap[K, C] buffer = new PartitionedPairBuffer[K, C] def insertAll{ if(shouldCombine){ // 定义一个aggtregator函数 val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } // 使用update函数实现对新增元素的合并操做 map.changeValue((getPartition(kv._1), kv._1), update) }else{ buffer.insert() } } }
2.2.1.3.1.1.2. buffer
map端不须要排序时使用的数据存储格式
Map阶段不进行Combine操做,在内存中缓存记录数据会使用PartitionedPairBuffer这种数据结构来缓存、排序记录数据,它是一个Append-only Buffer,仅支持向Buffer中追加数据键值对记录
组装完数据后写磁盘
class ExternalSorter{ map = new PartitionedAppendOnlyMap[K, C] buffer = new PartitionedPairBuffer[K, C] def insertAll{ if(shouldCombine){ maybeSpillCollection(usingMap = true) }else{ maybeSpillCollection(usingMap = false) } } }
shuffle read,一般就是一个stage刚开始时要作的事情。此时该stage的每个task就须要将上一个stage的计算结果中的全部相同key,从各个节点上经过网络都拉取到本身所在的节点上,而后进行key的聚合或链接等操做。因为shuffle write的过程当中,task为下游stage的每一个task都建立了一个磁盘文件,所以shuffle read的过程当中,每一个task只要从上游stage的全部task所在节点上,拉取属于本身的那一个磁盘文件便可。
shuffle read的拉取过程是一边拉取一边进行聚合的。每一个shuffle read task都会有一个本身的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,而后经过内存中的一个Map进行聚合等操做。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操做。以此类推,直到最后将全部数据到拉取完,并获得最终的结果
参见: ResultTask
调用ShuffleManager经过getReader方法获取具体的Reader,去读数据。
class ShuffledRDD { def compute(){ val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] } }
ShuffleManager是shuffle整个过程的管理入口,对外提供读写等接口。
ShuffleManager在driver中建立,driver用ShuffleManager进行注册shuffle,执行读写操做等
对Shuffle作了什么优化来提供Spark的性能,本质上就是对ShuffleManager进行优化和提供新的实现
spark2.2.0中已取消对HashShuffleManager的支持 新增了tungsten-sort。
ShuffleManager有两种实现HashShuffleManager和SorShuffleManager,1.1一会的版本默认是SortShuffleManger,可经过 conf.get("spark.shuffle.manager", "sort") 修改默认的shuffle实现方式
SortShuffleManager和HashShuffleManager有一个本质的差异,即同一个map的多个reduce的数据都写入到同一个文件中;那么SortShuffleManager产生的Shuffle 文件个数为2*Map个数
// shuffleManger提供的功能 private[spark] trait ShuffleManager { // shuffle注册 def registerShuffle(shuffleId: Int, numMaps: Int,dependency: ShuffleDependency): ShuffleHandle // shuffle注销 def unregisterShuffle(shuffleId: Int): Boolean // mapTask返回一组Writer def getWriter(handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter // 提供Start分区编号和end分区编号;固然通常状况若是每一个reduce单独运行,那么start-end区间也只对应一个reduce def getReader(handle: ShuffleHandle,startPartition: Int,endPartition: Int,context: TaskContext): ShuffleReader def shuffleBlockManager: ShuffleBlockManager def stop(): Unit }
spark2.2.0中已取消对HashShuffleManager的支持 (SPARK-14667)。参考:http://lxw1234.com/archives/2016/05/666.htm
HashShuffleManager是Spark最先版本的ShuffleManager,该ShuffleManager的严重缺点是会产生太多小文件,特别是reduce个数不少时候,存在很大的性能瓶颈。
最第一版本:ShuffleMapTask个数×reduce个数 后期版本: 并发的ShuffleMapTask的个数为M xreduce个数
参考:http://shiyanjun.cn/archives/1655.html
根据partition的起止位置,从别的节点获取blockURL,node信息组成reader,
class BlockStoreShuffleReader[K, C]( handle: BaseShuffleHandle[K, _, C], startPartition: Int, endPartition: Int, context: TaskContext, serializerManager: SerializerManager = SparkEnv.get.serializerManager, blockManager: BlockManager = SparkEnv.get.blockManager, mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) extends ShuffleReader[K, C] with Logging { var blocksByAddress= mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition) // 根据获取到的block信息,给trackerMaster发送消息,获取RDD数据 new ShuffleBlockFetcherIterator( blocksByAddress ) } class MapOutputTracker{ // excutor在计算ShuffleRDD时调用,返回{blocak地址,Seq{blockID,和输出数据大小}}等 def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] } class ShuffleBlockFetcherIterator{ val results = new LinkedBlockingQueue[FetchResult] // 负责发送请求和接收数据 def sendRequest(req: FetchRequest){ // 将接收到的数据放入到队列中 results.put(new SuccessFetchResult( blockId: BlockId, address: BlockManagerId, size: Long, **buf: ManagedBuffer,** isNetworkReqDone: Boolean )) } }
参见: writer
Shuffle过程当中须要将Map结果数据输出到文件,因此须要经过注册一个ShuffleHandle来获取到一个ShuffleWriter对象,经过它来控制Map阶段记录数据输出的行为
sparkCore源码解析系列: