Apache Spark中,对Block的查询、存储管理,是经过惟一的Block ID来进行区分的。html
同一个Spark Application,以及多个运行的Application之间,对应的Block都具备惟一的ID后端
须要在worker和driver间共享数据时,就须要对这个数据进行惟一的标识,经常使用的须要传输的block信息有如下几类 RDDBlockId、ShuffleBlockId、ShuffleDataBlockId、ShuffleIndexBlockId、BroadcastBlockId、TaskResultBlockId、TempLocalBlockId、TempShuffleBlockId缓存
RDDBlockId : "rdd_" + rddId + "_" + splitIndex ShuffleBlockId : "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId ShuffleDataBlockId:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data" ShuffleIndexBlockId:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index" TaskResultBlockId:"taskresult_" + taskId StreamBlockId:"input-" + streamId + "-" + uniqueId ...
DiskStore是经过DiskBlockManager进行管理存储到磁盘上的Block数据文件的,在同一个节点上的多个Executor共享相同的磁盘文件路径,相同的Block数据文件也就会被同一个节点上的多个Executor所共享。而对应MemoryStore,由于每一个Executor对应独立的JVM实例,从而具备独立的Storage/Execution内存管理,因此使用MemoryStore不能共享同一个Block数据,可是同一个节点上的多个Executor之间的MemoryStore之间拷贝数据,比跨网络传输要高效的多安全
数据在内存中存储的形式网络
MEMORY_ONLY MEMORY_ONLY_2 MEMORY_ONLY_SER MEMORY_ONLY_SER_2 MEMORY_AND_DISK MEMORY_AND_DISK_2 MEMORY_AND_DISK_SER MEMORY_AND_DISK_SER_2 OFF_HEAP
数据罗盘的几种形式:分布式
DISK_ONLY DISK_ONLY_2 MEMORY_AND_DISK MEMORY_AND_DISK_2 MEMORY_AND_DISK_SER MEMORY_AND_DISK_SER_2 OFF_HEAP
DiskStore即基于文件来存储Block. 基于Disk来存储,首先必需要解决一个问题就是磁盘文件的管理:磁盘目录结构的组成,目录的清理等,在Spark对磁盘文件的管理是经过 DiskBlockManager来进行管理的ide
DiskBlockManager管理了每一个Block数据存储位置的信息,包括从Block ID到磁盘上文件的映射关系。DiskBlockManager主要有以下几个功能:ui
堆外存储不支持序列化和副本this
Spark中实现的OffHeap是基于Tachyon:分布式内存文件系统来实现的spa
在Spark Application提交之后,最终会在Worker上启动独立的Executor JVM,Task就运行在Executor里面。在一个Executor JVM内部,内存管理模型就是管理excutor运行所须要的内存
http://shiyanjun.cn/archives/1585.html
1.5以前版本使用 缺点:
统一内存分配管理模型:
abstract class MemoryManager( conf: SparkConf, numCores: Int, onHeapStorageMemory: Long, onHeapExecutionMemory: Long){ // storage堆内内存 @GuardedBy("this") protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP) // storage堆外内存 @GuardedBy("this") protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP) // execution堆内内存 @GuardedBy("this") protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP) // excution堆外内存 @GuardedBy("this") protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP) } // 默认最大堆内存 val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0) // 默认storage和excution的内存大小各占50% offHeapStorageMemory = (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
在统一内存管理模型中,storage和excution内存大小能够动态调整,在必定程度上减小了OOM发生几率
默认内存划分:
预留内存reservedMemory=300M 管理内存maxHeapMemory = (systemMemory - reservedMemory) * 0.6 storageMemory=excutionMemory=maxHeapMemory*0.5
非堆内存默认值0,可经过spark.memory.offHeap.size参数调整,其中storage和excution的内存占比也均为50%
Storage内存,用来缓存Task数据、在Spark集群中传输(Propagation)内部数据
Execution内存,用于知足Shuffle、Join、Sort、Aggregation计算过程当中对内存的需求
// 为blockId申请numBytes字节大小的内存 override def acquireStorageMemory ()synchronized { val (executionPool, storagePool, maxMemory) = memoryMode match { // 根据memoryMode值,返回对应的StorageMemoryPool与ExecutionMemoryPool case MemoryMode.ON_HEAP => case MemoryMode.OFF_HEAP => } if (numBytes > maxMemory) { // 申请的内存大于剩余内存总理则申请失败 s"memory limit ($maxMemory bytes)") return false } if (numBytes > storagePool.memoryFree) { // 若是Storage内存块中没有足够可用内存给blockId使用,则计算当前Storage内存区缺乏多少内存,而后从Execution内存区中借用 val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes) // Execution内存区减掉借用内存量 executionPool.decrementPoolSize(memoryBorrowedFromExecution) // Storage内存区增长借用内存量 storagePool.incrementPoolSize(memoryBorrowedFromExecution) } // 若是Storage内存区能够为blockId分配内存,直接成功分配;不然,若是从Execution内存区中借用的内存可以知足blockId,则分配成功,不能知足则分配失败。 storagePool.acquireMemory(blockId, numBytes) }
释放Storage内存比较简单,只须要更新Storage内存计量变量便可
def releaseMemory(size: Long): Unit = lock.synchronized { if (size > _memoryUsed) { // 须要释放内存大于已使用内存,则直接清零 _memoryUsed = 0 } else { // 从已使用内存中减去释放内存大小 _memoryUsed -= size } }
excution内存的获取和释放都是线程安全的,并且分配给每一个task的内存大小是均等的,每当有task运行完毕后,都会触发内存的回收操做。
若是从storage申请内存大小比storage剩余内存大,则申请线程会阻塞,并对storage内存发起缩小操做。直到storage释放足够内存。
Execution内存区内存分配的基本原则: 若是有N个活跃(Active)的Task在运行,ExecutionMemoryPool须要保证每一个Task在将中间结果数据Spill到磁盘以前,至少可以申请到当前Execution内存区对应的Pool中1/2N大小的内存量,至可能是1/N大小的内存。
这里N是动态变化的,由于可能有新的Task被启动,也有可能Task运行完成释放资源,因此ExecutionMemoryPool会持续跟踪ExecutionMemoryPool内部Task集合memoryForTask的变化,并不断地从新计算分配给每一个Task的这两个内存量的值:1/2N和1/N。
// 同步的释放内存 def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized { val curMem = memoryForTask.getOrElse(taskAttemptId, 0L) // 计算释放内存大小 var memoryToFree = if (curMem < numBytes) { // 没有足够内存须要释放,则释放掉当前task全部使用内存 curMem } else { numBytes } if (memoryForTask.contains(taskAttemptId)) { // Task执行完成,从内部维护的memoryForTask中移除 memoryForTask(taskAttemptId) -= memoryToFree if (memoryForTask(taskAttemptId) <= 0) { memoryForTask.remove(taskAttemptId) } } // 通知调用acquireMemory()方法申请内存的Task内存已经释放 lock.notifyAll() }
BlockManagerMaster管理BlockManager. BlockManager在每一个Dirver和Executor上都有,用来管理Block数据,包括数据的获取和保存等
谈到Spark中的Block数据存储,咱们很容易可以想到BlockManager,他负责管理在每一个Dirver和Executor上的Block数据,多是本地或者远程的。具体操做包括查询Block、将Block保存在指定的存储中,如内存、磁盘、堆外(Off-heap)。而BlockManager依赖的后端,对Block数据进行内存、磁盘存储访问,都是基于前面讲到的MemoryStore、DiskStore。 在Spark集群中,当提交一个Application执行时,该Application对应的Driver以及全部的Executor上,都存在一个BlockManager、BlockManagerMaster,而BlockManagerMaster是负责管理各个BlockManager之间通讯,这个BlockManager管理集群
每一个Executor上都有一个BlockManager实例,负责管理用户提交的该Application计算过程当中产生的Block。
颇有可能当前Executor上存储在RDD对应Partition的通过处理后获得的Block数据,也有可能当前Executor上没有,可是其余Executor上已经处理过并缓存了Block数据,因此对应着本地获取、远程获取两种可能
关于一个Application运行过程当中Block的管理,主要是基于该Application所关联的一个Driver和多个Executor构建了一个Block管理集群:Driver上的(BlockManagerMaster, BlockManagerMasterEndpoint)是集群的Master角色,全部Executor上的(BlockManagerMaster, RpcEndpointRef)做为集群的Slave角色。当Executor上的Task运行时,会查询对应的RDD的某个Partition对应的Block数据是否处理过,这个过程当中会触发多个BlockManager之间的通讯交互
BlockManager在进行put操做后,经过blockInfoManager来控制当前put等操做是否完成以及是否成功。
对于BlockManager中的存储的每一个Block,不必定是对应的数据都PUT成功了,不必定能够当即提供对外的读取,由于PUT是一个过程,有成功仍是有失败的状态. ,拿ShuffleBlock来讲,在shuffleMapTask须要Put一个Block到BlockManager中,在Put完成以前,该Block将处于Pending状态,等待Put完成了不表明Block就能够被读取, 由于Block还可能Put"fail"了.
所以BlockManager经过BlockInfo来维护每一个Block状态,在BlockManager的代码中就是经过一个TimeStampedHashMap来维护BlockID和BlockInfo之间的map.
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] 注: 2.2中此处是经过线程安全的hashMap和一个计数器实现的
BlockInfoManager经过同步机制防止多个task处理同一个block数据块
用户提交一个Spark Application程序,若是程序对应的DAG图相对复杂,其中不少Task计算的结果Block数据都有可能被重复使用,这种状况下如何去控制某个Executor上的Task线程去读写Block数据呢?其实,BlockInfoManager就是用来控制Block数据读写操做,而且跟踪Task读写了哪些Block数据的映射关系,这样若是两个Task都想去处理同一个RDD的同一个Partition数据,若是没有锁来控制,极可能两个Task都会计算并写同一个Block数据,从而形成混乱
class BlockInfoManager{ val infos = new mutable.HashMap[BlockId, BlockInfo] // 存放被锁定任务列表 val writeLocksByTask = new mutable.HashMap[ TaskAttemptId, mutable.Set[BlockId]] val readLocksByTask = new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]] def lockForReading(){ infos.get(blockId) match { case Some(info) => // 没有写任务 if (info.writerTask == BlockInfo.NO_WRITER) { // 读task数量加一 info.readerCount += 1 // 放入读多锁定队列 readLocksByTask( currentTaskAttemptId). add(blockId) } } def lockForWriting(){ case Some(info) => if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) { info.writerTask = currentTaskAttemptId writeLocksByTask.addBinding( currentTaskAttemptId, blockId) }
sparkCore源码解析系列: