原文连接:http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/java
前段时间杂事颇多,一直没有时间整理本身的博客,Spark源码分析写到一半也搁置了。以前介绍了deploy和scheduler两大模块,此次介绍Spark中的另外一大模块 - storage模块。node
在写Spark程序的时候咱们经常和RDD ( Resilient Distributed Dataset ) 打交道,经过RDD为咱们提供的各类transformation和action接口实现咱们的应用,RDD的引入提升了抽象层次,在接口和实现上进行有效地隔离,使用户无需关心底层的实现。可是RDD提供给咱们的仅仅是一个“形”, 咱们所操做的数据究竟放在哪里,如何存取?它的“体”是怎么样的?这是由storage模块来实现和管理的,接下来咱们就要剖析一下storage模块。网络
Storage模块主要分为两层:架构
而其余模块若要和storage模块进行交互,storage模块提供了统一的操做类BlockManager
,外部类与storage模块打交道都须要经过调用BlockManager
相应接口来实现。app
首先来看一下通讯层的UML类图:less
其次咱们来看看各个类在master和slave上所扮演的不一样角色:dom
对于master和slave,BlockManager
的建立有所不一样:ide
Master (client driver)函数
BlockManagerMaster
拥有BlockManagerMasterActor
的actor和全部BlockManagerSlaveActor
的ref。源码分析
Slave (executor)
对于slave,BlockManagerMaster
则拥有BlockManagerMasterActor
的ref和自身BlockManagerSlaveActor
的actor。
BlockManagerMasterActor
在ref和actor之间进行通讯;BlockManagerSlaveActor
在ref和actor之间通讯。
actor和ref:
actor和ref是Akka中的两个不一样的actor reference,分别由
actorOf
和actorFor
所建立。actor相似于网络服务中的server端,它保存全部的状态信息,接收client端的请求执行并返回给客户端;ref相似于网络服务中的client端,经过向server端发起请求获取结果。
BlockManager
wrap了BlockManagerMaster
,经过BlockManagerMaster
进行通讯。Spark会在client driver和executor端建立各自的BlockManager
,经过BlockManager
对storage模块进行操做。
BlockManager
对象在SparkEnv
中被建立,建立的过程以下所示:
def registerOrLookup(name:String, newActor:=>Actor):ActorRef={
if(isDriver){
logInfo("Registering "+ name)
actorSystem.actorOf(Props(newActor), name = name)
}else{
val driverHost:String=System.getProperty("spark.driver.host","localhost")
val driverPort:Int=System.getProperty("spark.driver.port","7077").toInt
Utils.checkHost(driverHost,"Expected hostname")
val url ="akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to "+ name +": "+ url)
actorSystem.actorFor(url)
}
}
val blockManagerMaster =newBlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
newBlockManagerMasterActor(isLocal)))
val blockManager =newBlockManager(executorId, actorSystem, blockManagerMaster, serializer)
能够看到对于client driver和executor,Spark分别建立了BlockManagerMasterActor
actor和ref,并被wrap到BlockManager
中。
BlockManagerMasterActor
executor to client driver
RegisterBlockManager (executor建立BlockManager之后向client driver发送请求注册自身) HeartBeat UpdateBlockInfo (更新block信息) GetPeers (请求得到其余BlockManager的id) GetLocations (获取block所在的BlockManager的id) GetLocationsMultipleBlockIds (获取一组block所在的BlockManager id)
client driver to client driver
GetLocations (获取block所在的BlockManager的id) GetLocationsMultipleBlockIds (获取一组block所在的BlockManager id) RemoveExecutor (删除所保存的已经死亡的executor上的BlockManager) StopBlockManagerMaster (中止client driver上的BlockManagerMasterActor)
有些消息例如
GetLocations
在executor端和client driver端都会向actor请求,而其余的消息好比RegisterBlockManager
只会由executor端的ref向client driver端的actor发送,于此同时例如RemoveExecutor
则只会由client driver端的ref向client driver端的actor发送。具体消息是从哪里发送,哪里接收和处理请看代码细节,在这里就再也不赘述了。
BlockManagerSlaveActor
client driver to executor
RemoveBlock (删除block) RemoveRdd (删除RDD)
通讯层中涉及许多控制消息和状态消息的传递以及处理,这些细节能够直接查看源码,这里就不在一一罗列。下面就只简单介绍一下exeuctor端的BlockManager
是如何启动以及向client driver发送注册请求完成注册。
前面已经介绍了BlockManager
对象是如何被建立出来的,当BlockManager
被建立出来之后须要向client driver注册本身,下面咱们来看一下这个流程:
首先BlockManager
会调用initialize()
初始化本身
privatedef initialize(){
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
...
if(!BlockManager.getDisableHeartBeatsForTesting){
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds){
heartBeat()
}
}
}
在initialized()
函数中首先调用BlockManagerMaster
向client driver注册本身,同时设置heartbeat定时器,定时发送heartbeat报文。能够看到在注册自身的时候向client driver传递了自身的slaveActor
,client driver收到slaveActor
之后会将其与之对应的BlockManagerInfo
存储到hash map中,以便后续经过slaveActor
向executor发送命令。
BlockManagerMaster
会将注册请求包装成RegisterBlockManager
报文发送给client driver的BlockManagerMasterActor
,BlockManagerMasterActor
调用register()
函数注册BlockManager
:
privatedefregister(id:BlockManagerId, maxMemSize:Long, slaveActor:ActorRef){
if(id.executorId =="<driver>"&&!isLocal){
// Got a register message from the master node; don't register it
}elseif(!blockManagerInfo.contains(id)){
blockManagerIdByExecutor.get(id.executorId) match {
caseSome(manager)=>
// A block manager of the same executor already exists.
// This should never happen. Let's just quit.
logError("Got two different block manager registrations on "+ id.executorId)
System.exit(1)
caseNone=>
blockManagerIdByExecutor(id.executorId)= id
}
blockManagerInfo(id)=newBlockManagerMasterActor.BlockManagerInfo(
id,System.currentTimeMillis(), maxMemSize, slaveActor)
}
}
须要注意的是在client driver端也会执行上述过程,只是在最后注册的时候若是判断是"<driver>"
就不进行任何操做。能够看到对应的BlockManagerInfo
对象被建立并保存在hash map中。
在RDD层面上咱们了解到RDD是由不一样的partition组成的,咱们所进行的transformation和action是在partition上面进行的;而在storage模块内部,RDD又被视为由不一样的block组成,对于RDD的存取是以block为单位进行的,本质上partition和block是等价的,只是看待的角度不一样。在Spark storage模块中中存取数据的最小单位是block,全部的操做都是以block为单位进行的。
首先咱们来看一下存储层的UML类图:
BlockManager
对象被建立的时候会建立出MemoryStore
和DiskStore
对象用以存取block,同时在initialize()
函数中建立BlockManagerWorker
对象用以监听远程的block存取请求来进行相应处理。
private[storage] val memoryStore:BlockStore=newMemoryStore(this, maxMemory)
private[storage] val diskStore:DiskStore=
newDiskStore(this,System.getProperty("spark.local.dir",System.getProperty("java.io.tmpdir")))
privatedef initialize(){
...
BlockManagerWorker.startBlockManagerWorker(this)
...
}
下面就具体介绍一下对于DiskStore
和MemoryStore
,block的存取操做是怎样进行的。
DiskStore
能够配置多个folder,Spark会在不一样的folder下面建立Spark文件夹,文件夹的命名方式为(spark-local-yyyyMMddHHmmss-xxxx, xxxx是一个随机数),全部的block都会存储在所建立的folder里面。DiskStore
会在对象被建立时调用createLocalDirs()
来建立文件夹:
privatedef createLocalDirs():Array[File]={
logDebug("Creating local directories at root dirs '"+ rootDirs +"'")
val dateFormat =newSimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").map { rootDir =>
var foundLocalDir =false
var localDir:File=null
var localDirId:String=null
var tries =0
val rand =newRandom()
while(!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS){
tries +=1
try{
localDirId ="%s-%04x".format(dateFormat.format(newDate), rand.nextInt(65536))
localDir =newFile(rootDir,"spark-local-"+ localDirId)
if(!localDir.exists){
foundLocalDir = localDir.mkdirs()
}
}catch{
case e:Exception=>
logWarning("Attempt "+ tries +" to create local dir "+ localDir +" failed", e)
}
}
if(!foundLocalDir){
logError("Failed "+ MAX_DIR_CREATION_ATTEMPTS +
" attempts to create local dir in "+ rootDir)
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
logInfo("Created local directory at "+ localDir)
localDir
}
}
在DiskStore
里面,每个block都被存储为一个file,经过计算block id的hash值将block映射到文件中,block id与文件路径的映射关系以下所示:
privatedef getFile(blockId:String):File={
logDebug("Getting file for block "+ blockId)
// Figure out which local directory it hashes to, and which subdirectory in that
val hash =Utils.nonNegativeHash(blockId)
val dirId = hash % localDirs.length
val subDirId =(hash / localDirs.length)% subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
var subDir = subDirs(dirId)(subDirId)
if(subDir ==null){
subDir = subDirs(dirId).synchronized{
val old = subDirs(dirId)(subDirId)
if(old !=null){
old
}else{
val newDir =newFile(localDirs(dirId),"%02x".format(subDirId))
newDir.mkdir()
subDirs(dirId)(subDirId)= newDir
newDir
}
}
}
newFile(subDir, blockId)
}
根据block id计算出hash值,将hash取模得到dirId
和subDirId
,在subDirs
中找出相应的subDir
,若没有则新建一个subDir
,最后以subDir
为路径、block id为文件名建立file handler,DiskStore
使用此file handler将block写入文件内,代码以下所示:
overridedef putBytes(blockId:String, _bytes:ByteBuffer, level:StorageLevel){
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val bytes = _bytes.duplicate()
logDebug("Attempting to put block "+ blockId)
val startTime =System.currentTimeMillis
val file = createFile(blockId)
val channel =newRandomAccessFile(file,"rw").getChannel()
while(bytes.remaining >0){
channel.write(bytes)
}
channel.close()
val finishTime =System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
blockId,Utils.bytesToString(bytes.limit),(finishTime - startTime)))
}
而获取block则很是简单,找到相应的文件并读取出来便可:
overridedef getBytes(blockId:String):Option[ByteBuffer]={
val file = getFile(blockId)
val bytes = getFileBytes(file)
Some(bytes)
}
所以在DiskStore
中存取block首先是要将block id映射成相应的文件路径,接着存取文件就能够了。
相对于DiskStore
须要根据block id hash计算出文件路径并将block存放到对应的文件里面,MemoryStore
管理block就显得很是简单:MemoryStore
内部维护了一个hash map来管理全部的block,以block id为key将block存放到hash map中。
caseclassEntry(value:Any, size:Long, deserialized:Boolean)
private val entries =newLinkedHashMap[String,Entry](32,0.75f,true)
在MemoryStore
中存放block必须确保内存足够容纳下该block,若内存不足则会将block写到文件中,具体的代码以下所示:
overridedef putBytes(blockId:String, _bytes:ByteBuffer, level:StorageLevel){
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
if(level.deserialized){
val values = blockManager.dataDeserialize(blockId, bytes)
val elements =newArrayBuffer[Any]
elements ++= values
val sizeEstimate =SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
tryToPut(blockId, elements, sizeEstimate,true)
}else{
tryToPut(blockId, bytes, bytes.limit,false)
}
}
在tryToPut()
中,首先调用ensureFreeSpace()
确保空闲内存是否足以容纳block,若能够则将该block放入hash map中进行管理;若不足以容纳则经过调用dropFromMemory()
将block写入文件。
privatedef tryToPut(blockId:String, value:Any, size:Long, deserialized:Boolean):Boolean={
// TODO: Its possible to optimize the locking by locking entries only when selecting blocks
// to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
// released, it must be ensured that those to-be-dropped blocks are not double counted for
// freeing up more space for another block that needs to be put. Only then the actually dropping
// of blocks (and writing to disk if necessary) can proceed in parallel.
putLock.synchronized{
if(ensureFreeSpace(blockId, size)){
val entry =newEntry(value, size, deserialized)
entries.synchronized{
entries.put(blockId, entry)
currentMemory += size
}
if(deserialized){
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId,Utils.bytesToString(size),Utils.bytesToString(freeMemory)))
}else{
logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
blockId,Utils.bytesToString(size),Utils.bytesToString(freeMemory)))
}
true
}else{
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data =if(deserialized){
Left(value.asInstanceOf[ArrayBuffer[Any]])
}else{
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
blockManager.dropFromMemory(blockId, data)
false
}
}
}
而从MemoryStore
中取得block则很是简单,只需从hash map中取出block id对应的value便可。
overridedef getValues(blockId:String):Option[Iterator[Any]]={
val entry = entries.synchronized{
entries.get(blockId)
}
if(entry ==null){
None
}elseif(entry.deserialized){
Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
}else{
val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate()// Doesn't actually copy data
Some(blockManager.dataDeserialize(blockId, buffer))
}
}
上面介绍了DiskStore
和MemoryStore
对于block的存取操做,那么咱们是要直接与它们交互存取数据吗,仍是封装了更抽象的接口使咱们无需关心底层?
BlockManager
为咱们提供了put()
和get()
函数,用户可使用这两个函数对block进行存取而无需关心底层实现。
首先咱们来看一下put()
函数的实现:
def put(blockId:String, values:ArrayBuffer[Any], level:StorageLevel,
tellMaster:Boolean=true):Long={
...
// Remember the block's storage level so that we can correctly drop it to disk if it needs
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo ={
val tinfo =newBlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
if(oldBlockOpt.isDefined){
if(oldBlockOpt.get.waitForReady()){
logWarning("Block "+ blockId +" already exists on this machine; not re-adding it")
return oldBlockOpt.get.size
}
// TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
oldBlockOpt.get
}else{
tinfo
}
}
val startTimeMs =System.currentTimeMillis
// If we need to replicate the data, we'll want access to the values, but because our
// put will read the whole iterator, there will be no values left. For the case where
// the put serializes data, we'll remember the bytes, above; but for the case where it
// doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
var valuesAfterPut:Iterator[Any]=null
// Ditto for the bytes after the put
var bytesAfterPut:ByteBuffer=null
// Size of the block in bytes (to return to caller)
var size =0L
myInfo.synchronized{
logTrace("Put for block "+ blockId +" took "+Utils.getUsedTimeMs(startTimeMs)
+" to get into synchronized block")
var marked =false
try{
if(level.useMemory){
// Save it just to memory first, even if it also has useDisk set to true; we will later
// drop it to disk if the memory store can't hold it.
val res = memoryStore.putValues(blockId, values, level,true)
size = res.size
res.data match {
caseRight(newBytes)=> bytesAfterPut = newBytes
caseLeft(newIterator)=> valuesAfterPut = newIterator
}
}else{
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
val askForBytes = level.replication >1
val res = diskStore.putValues(blockId, values, level, askForBytes)
size = res.size
res.data match {
caseRight(newBytes)=> bytesAfterPut = newBytes
case _ =>
}
}
// Now that the block is in either the memory or disk store, let other threads read it,
// and tell the master about it.
marked =true
myInfo.markReady(size)
if(tellMaster){
reportBlockStatus(blockId, myInfo)
}
}finally{
// If we failed at putting the block to memory/disk, notify other possible readers
// that it has failed, and then remove it from the block info map.
if(! marked){
// Note that the remove must happen before markFailure otherwise another thread
// could've inserted a new BlockInfo before we remove it.
blockInfo.remove(blockId)
myInfo.markFailure()
logWarning("Putting block "+ blockId +" failed")
}
}
}
logDebug("Put block "+ blockId +" locally took "+Utils.getUsedTimeMs(startTimeMs))
// Replicate block if required
if(level.replication >1){
val remoteStartTime =System.currentTimeMillis
// Serialize the block if not already done
if(bytesAfterPut ==null){
if(valuesAfterPut ==null){
thrownewSparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, level)
logDebug("Put block "+ blockId +" remotely took "+Utils.getUsedTimeMs(remoteStartTime))
}
BlockManager.dispose(bytesAfterPut)
return size
}
对于put()
操做,主要分为如下3个步骤:
BlockInfo
结构体存储block相关信息,同时将其加锁使其不能被访问。接着咱们来看一下get()
函数的实现:
defget(blockId:String):Option[Iterator[Any]]={
val local= getLocal(blockId)
if(local.isDefined){
logInfo("Found block %s locally".format(blockId))
returnlocal
}
val remote = getRemote(blockId)
if(remote.isDefined){
logInfo("Found block %s remotely".format(blockId))
return remote
}
None
}
get()
首先会从local的BlockManager
中查找block,若是找到则返回相应的block,若local没有找到该block,则发起请求从其余的executor上的BlockManager
中查找block。在一般状况下Spark任务的分配是根据block的分布决定的,任务每每会被分配到拥有block的节点上,所以getLocal()
就能找到所需的block;可是在资源有限的状况下,Spark会将任务调度到与block不一样的节点上,这样就必须经过getRemote()
来得到block。
咱们先来看一下getLocal()
:
def getLocal(blockId:String):Option[Iterator[Any]]={
logDebug("Getting local block "+ blockId)
val info = blockInfo.get(blockId).orNull
if(info !=null){
info.synchronized{
// In the another thread is writing the block, wait for it to become ready.
if(!info.waitForReady()){
// If we get here, the block write failed.
logWarning("Block "+ blockId +" was marked as failure.")
returnNone
}
val level = info.level
logDebug("Level for block "+ blockId +" is "+ level)
// Look for the block in memory
if(level.useMemory){
logDebug("Getting block "+ blockId +" from memory")
memoryStore.getValues(blockId) match {
caseSome(iterator)=>
returnSome(iterator)
caseNone=>
logDebug("Block "+ blockId +" not found in memory")
}
}
// Look for block on disk, potentially loading it back into memory if required
if(level.useDisk){
logDebug("Getting block "+ blockId +" from disk")
if(level.useMemory && level.deserialized){
diskStore.getValues(blockId) match {
caseSome(iterator)=>
// Put the block back in memory before returning it
// TODO: Consider creating a putValues that also takes in a iterator ?
val elements =newArrayBuffer[Any]
elements ++= iterator
memoryStore.putValues(blockId, elements, level,true).data match {
caseLeft(iterator2)=>
returnSome(iterator2)
case _ =>
thrownewException("Memory store did not return back an iterator")
}
caseNone=>
thrownewException("Block "+ blockId +" not found on disk, though it should be")
}
}elseif(level.useMemory &&!level.deserialized){
// Read it as a byte buffer into memory first, then return it
diskStore.getBytes(blockId) match {
caseSome(bytes)=>
// Put a copy of the block back in memory before returning it. Note that we can't
// put the ByteBuffer returned by the disk store as that's a memory-mapped file.
// The use of rewind assumes this.
assert(0== bytes.position())
val copyForMemory =ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
bytes.rewind()
returnSome(dataDeserialize(blockId, bytes))
caseNone=>
thrownewException("Block "+ blockId +" not found on disk, though it should be")
}
}else{
diskStore.getValues(blockId) match {
caseSome(iterator)=>
returnSome(iterator)
caseNone=>
thrownewException("Block "+ blockId +" not found on disk, though it should be")
}
}
}
}
}else{
logDebug("Block "+ blockId +" not registered locally")
}
returnNone
}
getLocal()
首先会根据block id得到相应的BlockInfo
并从中取出该block的storage level,根据storage level的不一样getLocal()
又进入如下不一样分支:
接下来咱们来看一下getRemote()
:
def getRemote(blockId:String):Option[Iterator[Any]]={
if(blockId ==null){
thrownewIllegalArgumentException("Block Id is null")
}
logDebug("Getting remote block "+ blockId)
// Get locations of block
val locations = master.getLocations(blockId)
// Get block from remote locations
for(loc <- locations){
logDebug("Getting remote block "+ blockId +" from "+ loc)
val data =BlockManagerWorker.syncGetBlock(
GetBlock(blockId),ConnectionManagerId(loc.host, loc.port))
if(data !=null){
returnSome(dataDeserialize(blockId, data))
}
logDebug("The value of block "+ blockId +" is null")
}
logDebug("Block "+ blockId +" not found")
returnNone
}
getRemote()
首先取得该block的全部location信息,而后根据location向远端发送请求获取block,只要有一个远端返回block该函数就返回而不继续发送请求。
至此咱们简单介绍了BlockManager
类中的get()
和put()
函数,使用这两个函数外部类能够轻易地存取block数据。
在storage模块里面全部的操做都是和block相关的,可是在RDD里面全部的运算都是基于partition的,那么partition是如何与block对应上的呢?
RDD计算的核心函数是iterator()
函数:
finaldef iterator(split:Partition, context:TaskContext):Iterator[T]={
if(storageLevel !=StorageLevel.NONE){
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
}else{
computeOrReadCheckpoint(split, context)
}
}
若是当前RDD的storage level不是NONE的话,表示该RDD在BlockManager
中有存储,那么调用CacheManager
中的getOrCompute()
函数计算RDD,在这个函数中partition和block发生了关系:
首先根据RDD id和partition index构造出block id (rdd_xx_xx),接着从BlockManager
中取出相应的block。
BlockManager
中,所以取出便可,无需再从新计算。computeOrReadCheckpoint()
函数计算出新的block,并将其存储到BlockManager
中。须要注意的是block的计算和存储是阻塞的,若另外一线程也须要用到此block则需等到该线程block的loading结束。
def getOrCompute[T](rdd: RDD[T], split:Partition, context:TaskContext, storageLevel:StorageLevel)
:Iterator[T]={
val key ="rdd_%d_%d".format(rdd.id, split.index)
logDebug("Looking for partition "+ key)
blockManager.get(key) match {
caseSome(values)=>
// Partition is already materialized, so just return its values
return values.asInstanceOf[Iterator[T]]
caseNone=>
// Mark the split as loading (unless someone else marks it first)
loading.synchronized{
if(loading.contains(key)){
logInfo("Another thread is loading %s, waiting for it to finish...".format (key))
while(loading.contains(key)){
try{loading.wait()}catch{case _ :Throwable=>}
}
logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail
// is for the RDD-level cache eviction policy if someone else has loaded the same RDD
// partition but we didn't want to make space for it. However, that case is unlikely
// because it's unlikely that two threads would work on the same RDD partition. One
// downside of the current code is that threads wait serially if this does happen.
blockManager.get(key) match {
caseSome(values)=>
return values.asInstanceOf[Iterator[T]]
caseNone=>
logInfo("Whoever was loading %s failed; we'll try it ourselves".format (key))
loading.add(key)
}
}else{
loading.add(key)
}
}
try{
// If we got here, we have to load the split
logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if(context.runningLocally){return computedValues }
val elements =newArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel,true)
return elements.iterator.asInstanceOf[Iterator[T]]
}finally{
loading.synchronized{
loading.remove(key)
loading.notifyAll()
}
}
}
}
这样RDD的transformation、action就和block数据创建了联系,虽然抽象上咱们的操做是在partition层面上进行的,可是partition最终仍是被映射成为block,所以实际上咱们的全部操做都是对block的处理和存取。
本文就storage模块的两个层面进行了介绍-通讯层和存储层。通讯层中简单介绍了类结构和组成以及类在通讯层中所扮演的不一样角色,还有不一样角色之间通讯的报文,同时简单介绍了通讯层的启动和注册细节。存储层中分别介绍了DiskStore
和MemoryStore
中对于block的存和取的实现代码,同时分析了BlockManager
中put()
和get()
接口,最后简单介绍了Spark RDD中的partition与BlockManager
中的block之间的关系,以及如何交互存取block的。
本文从总体上分析了storage模块的实现,并未就具体实现作很是细节的分析,相信在看完本文对storage模块有一个总体的印象之后再去分析细节的实现会有事半功倍的效果。