Spark源码分析之-Storage模块

原文连接:http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/java

Background

前段时间杂事颇多,一直没有时间整理本身的博客,Spark源码分析写到一半也搁置了。以前介绍了deployscheduler两大模块,此次介绍Spark中的另外一大模块 - storage模块。node

在写Spark程序的时候咱们经常和RDD ( Resilient Distributed Dataset ) 打交道,经过RDD为咱们提供的各类transformation和action接口实现咱们的应用,RDD的引入提升了抽象层次,在接口和实现上进行有效地隔离,使用户无需关心底层的实现。可是RDD提供给咱们的仅仅是一个“”, 咱们所操做的数据究竟放在哪里,如何存取?它的“”是怎么样的?这是由storage模块来实现和管理的,接下来咱们就要剖析一下storage模块。网络

Storage模块总体架构

Storage模块主要分为两层:架构

  1. 通讯层:storage模块采用的是master-slave结构来实现通讯层,master和slave之间传输控制信息、状态信息,这些都是经过通讯层来实现的。
  2. 存储层:storage模块须要把数据存储到disk或是memory上面,有可能还需replicate到远端,这都是由存储层来实现和提供相应接口。

而其余模块若要和storage模块进行交互,storage模块提供了统一的操做类BlockManager,外部类与storage模块打交道都须要经过调用BlockManager相应接口来实现。app

Storage模块通讯层

首先来看一下通讯层的UML类图:less

communication layer class chart

其次咱们来看看各个类在master和slave上所扮演的不一样角色:dom

communication character class chart

对于master和slave,BlockManager的建立有所不一样:ide

  • Master (client driver)函数

    BlockManagerMaster拥有BlockManagerMasterActoractor和全部BlockManagerSlaveActorref源码分析

  • Slave (executor)

    对于slave,BlockManagerMaster则拥有BlockManagerMasterActorref和自身BlockManagerSlaveActoractor

BlockManagerMasterActorrefactor之间进行通讯;BlockManagerSlaveActorrefactor之间通讯。

actorref:

actorrefAkka中的两个不一样的actor reference,分别由actorOfactorFor所建立。actor相似于网络服务中的server端,它保存全部的状态信息,接收client端的请求执行并返回给客户端;ref相似于网络服务中的client端,经过向server端发起请求获取结果。

BlockManager wrap了BlockManagerMaster,经过BlockManagerMaster进行通讯。Spark会在client driver和executor端建立各自的BlockManager,经过BlockManager对storage模块进行操做。

BlockManager对象在SparkEnv中被建立,建立的过程以下所示:

  1. def registerOrLookup(name:String, newActor:=>Actor):ActorRef={
  2. if(isDriver){
  3. logInfo("Registering "+ name)
  4. actorSystem.actorOf(Props(newActor), name = name)
  5. }else{
  6. val driverHost:String=System.getProperty("spark.driver.host","localhost")
  7. val driverPort:Int=System.getProperty("spark.driver.port","7077").toInt
  8. Utils.checkHost(driverHost,"Expected hostname")
  9. val url ="akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
  10. logInfo("Connecting to "+ name +": "+ url)
  11. actorSystem.actorFor(url)
  12. }
  13. }
  14. val blockManagerMaster =newBlockManagerMaster(registerOrLookup(
  15. "BlockManagerMaster",
  16. newBlockManagerMasterActor(isLocal)))
  17. val blockManager =newBlockManager(executorId, actorSystem, blockManagerMaster, serializer)

能够看到对于client driver和executor,Spark分别建立了BlockManagerMasterActor actorref,并被wrap到BlockManager中。

通讯层传递的消息

  • BlockManagerMasterActor

    • executor to client driver

      RegisterBlockManager (executor建立BlockManager之后向client driver发送请求注册自身) HeartBeat UpdateBlockInfo (更新block信息) GetPeers (请求得到其余BlockManager的id) GetLocations (获取block所在的BlockManager的id) GetLocationsMultipleBlockIds (获取一组block所在的BlockManager id)

    • client driver to client driver

      GetLocations (获取block所在的BlockManager的id) GetLocationsMultipleBlockIds (获取一组block所在的BlockManager id) RemoveExecutor (删除所保存的已经死亡的executor上的BlockManager) StopBlockManagerMaster (中止client driver上的BlockManagerMasterActor)

有些消息例如GetLocations在executor端和client driver端都会向actor请求,而其余的消息好比RegisterBlockManager只会由executor端的ref向client driver端的actor发送,于此同时例如RemoveExecutor则只会由client driver端的ref向client driver端的actor发送。

具体消息是从哪里发送,哪里接收和处理请看代码细节,在这里就再也不赘述了。

  • BlockManagerSlaveActor

    • client driver to executor

      RemoveBlock (删除block) RemoveRdd (删除RDD)

通讯层中涉及许多控制消息和状态消息的传递以及处理,这些细节能够直接查看源码,这里就不在一一罗列。下面就只简单介绍一下exeuctor端的BlockManager是如何启动以及向client driver发送注册请求完成注册。

Register BlockManager

前面已经介绍了BlockManager对象是如何被建立出来的,当BlockManager被建立出来之后须要向client driver注册本身,下面咱们来看一下这个流程:

首先BlockManager会调用initialize()初始化本身

  1. privatedef initialize(){
  2. master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
  3. ...
  4. if(!BlockManager.getDisableHeartBeatsForTesting){
  5. heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds){
  6. heartBeat()
  7. }
  8. }
  9. }

initialized()函数中首先调用BlockManagerMaster向client driver注册本身,同时设置heartbeat定时器,定时发送heartbeat报文。能够看到在注册自身的时候向client driver传递了自身的slaveActor,client driver收到slaveActor之后会将其与之对应的BlockManagerInfo存储到hash map中,以便后续经过slaveActor向executor发送命令。

BlockManagerMaster会将注册请求包装成RegisterBlockManager报文发送给client driver的BlockManagerMasterActorBlockManagerMasterActor调用register()函数注册BlockManager

  1. privatedefregister(id:BlockManagerId, maxMemSize:Long, slaveActor:ActorRef){
  2. if(id.executorId =="<driver>"&&!isLocal){
  3. // Got a register message from the master node; don't register it
  4. }elseif(!blockManagerInfo.contains(id)){
  5. blockManagerIdByExecutor.get(id.executorId) match {
  6. caseSome(manager)=>
  7. // A block manager of the same executor already exists.
  8. // This should never happen. Let's just quit.
  9. logError("Got two different block manager registrations on "+ id.executorId)
  10. System.exit(1)
  11. caseNone=>
  12. blockManagerIdByExecutor(id.executorId)= id
  13. }
  14. blockManagerInfo(id)=newBlockManagerMasterActor.BlockManagerInfo(
  15. id,System.currentTimeMillis(), maxMemSize, slaveActor)
  16. }
  17. }

须要注意的是在client driver端也会执行上述过程,只是在最后注册的时候若是判断是"<driver>"就不进行任何操做。能够看到对应的BlockManagerInfo对象被建立并保存在hash map中。

Storage模块存储层

在RDD层面上咱们了解到RDD是由不一样的partition组成的,咱们所进行的transformation和action是在partition上面进行的;而在storage模块内部,RDD又被视为由不一样的block组成,对于RDD的存取是以block为单位进行的,本质上partition和block是等价的,只是看待的角度不一样。在Spark storage模块中中存取数据的最小单位是block,全部的操做都是以block为单位进行的。

首先咱们来看一下存储层的UML类图:

storage layer class chart

BlockManager对象被建立的时候会建立出MemoryStoreDiskStore对象用以存取block,同时在initialize()函数中建立BlockManagerWorker对象用以监听远程的block存取请求来进行相应处理。

  1. private[storage] val memoryStore:BlockStore=newMemoryStore(this, maxMemory)
  2. private[storage] val diskStore:DiskStore=
  3. newDiskStore(this,System.getProperty("spark.local.dir",System.getProperty("java.io.tmpdir")))
  4. privatedef initialize(){
  5. ...
  6. BlockManagerWorker.startBlockManagerWorker(this)
  7. ...
  8. }

下面就具体介绍一下对于DiskStoreMemoryStore,block的存取操做是怎样进行的。

DiskStore如何存取block

DiskStore能够配置多个folder,Spark会在不一样的folder下面建立Spark文件夹,文件夹的命名方式为(spark-local-yyyyMMddHHmmss-xxxx, xxxx是一个随机数),全部的block都会存储在所建立的folder里面。DiskStore会在对象被建立时调用createLocalDirs()来建立文件夹:

  1. privatedef createLocalDirs():Array[File]={
  2. logDebug("Creating local directories at root dirs '"+ rootDirs +"'")
  3. val dateFormat =newSimpleDateFormat("yyyyMMddHHmmss")
  4. rootDirs.split(",").map { rootDir =>
  5. var foundLocalDir =false
  6. var localDir:File=null
  7. var localDirId:String=null
  8. var tries =0
  9. val rand =newRandom()
  10. while(!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS){
  11. tries +=1
  12. try{
  13. localDirId ="%s-%04x".format(dateFormat.format(newDate), rand.nextInt(65536))
  14. localDir =newFile(rootDir,"spark-local-"+ localDirId)
  15. if(!localDir.exists){
  16. foundLocalDir = localDir.mkdirs()
  17. }
  18. }catch{
  19. case e:Exception=>
  20. logWarning("Attempt "+ tries +" to create local dir "+ localDir +" failed", e)
  21. }
  22. }
  23. if(!foundLocalDir){
  24. logError("Failed "+ MAX_DIR_CREATION_ATTEMPTS +
  25. " attempts to create local dir in "+ rootDir)
  26. System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
  27. }
  28. logInfo("Created local directory at "+ localDir)
  29. localDir
  30. }
  31. }

DiskStore里面,每个block都被存储为一个file,经过计算block id的hash值将block映射到文件中,block id与文件路径的映射关系以下所示:

  1. privatedef getFile(blockId:String):File={
  2. logDebug("Getting file for block "+ blockId)
  3. // Figure out which local directory it hashes to, and which subdirectory in that
  4. val hash =Utils.nonNegativeHash(blockId)
  5. val dirId = hash % localDirs.length
  6. val subDirId =(hash / localDirs.length)% subDirsPerLocalDir
  7. // Create the subdirectory if it doesn't already exist
  8. var subDir = subDirs(dirId)(subDirId)
  9. if(subDir ==null){
  10. subDir = subDirs(dirId).synchronized{
  11. val old = subDirs(dirId)(subDirId)
  12. if(old !=null){
  13. old
  14. }else{
  15. val newDir =newFile(localDirs(dirId),"%02x".format(subDirId))
  16. newDir.mkdir()
  17. subDirs(dirId)(subDirId)= newDir
  18. newDir
  19. }
  20. }
  21. }
  22. newFile(subDir, blockId)
  23. }

根据block id计算出hash值,将hash取模得到dirIdsubDirId,在subDirs中找出相应的subDir,若没有则新建一个subDir,最后以subDir为路径、block id为文件名建立file handler,DiskStore使用此file handler将block写入文件内,代码以下所示:

  1. overridedef putBytes(blockId:String, _bytes:ByteBuffer, level:StorageLevel){
  2. // So that we do not modify the input offsets !
  3. // duplicate does not copy buffer, so inexpensive
  4. val bytes = _bytes.duplicate()
  5. logDebug("Attempting to put block "+ blockId)
  6. val startTime =System.currentTimeMillis
  7. val file = createFile(blockId)
  8. val channel =newRandomAccessFile(file,"rw").getChannel()
  9. while(bytes.remaining >0){
  10. channel.write(bytes)
  11. }
  12. channel.close()
  13. val finishTime =System.currentTimeMillis
  14. logDebug("Block %s stored as %s file on disk in %d ms".format(
  15. blockId,Utils.bytesToString(bytes.limit),(finishTime - startTime)))
  16. }

而获取block则很是简单,找到相应的文件并读取出来便可:

  1. overridedef getBytes(blockId:String):Option[ByteBuffer]={
  2. val file = getFile(blockId)
  3. val bytes = getFileBytes(file)
  4. Some(bytes)
  5. }

所以在DiskStore中存取block首先是要将block id映射成相应的文件路径,接着存取文件就能够了。

MemoryStore如何存取block

相对于DiskStore须要根据block id hash计算出文件路径并将block存放到对应的文件里面,MemoryStore管理block就显得很是简单:MemoryStore内部维护了一个hash map来管理全部的block,以block id为key将block存放到hash map中。

  1. caseclassEntry(value:Any, size:Long, deserialized:Boolean)
  2. private val entries =newLinkedHashMap[String,Entry](32,0.75f,true)

MemoryStore中存放block必须确保内存足够容纳下该block,若内存不足则会将block写到文件中,具体的代码以下所示:

  1. overridedef putBytes(blockId:String, _bytes:ByteBuffer, level:StorageLevel){
  2. // Work on a duplicate - since the original input might be used elsewhere.
  3. val bytes = _bytes.duplicate()
  4. bytes.rewind()
  5. if(level.deserialized){
  6. val values = blockManager.dataDeserialize(blockId, bytes)
  7. val elements =newArrayBuffer[Any]
  8. elements ++= values
  9. val sizeEstimate =SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
  10. tryToPut(blockId, elements, sizeEstimate,true)
  11. }else{
  12. tryToPut(blockId, bytes, bytes.limit,false)
  13. }
  14. }

tryToPut()中,首先调用ensureFreeSpace()确保空闲内存是否足以容纳block,若能够则将该block放入hash map中进行管理;若不足以容纳则经过调用dropFromMemory()将block写入文件。

  1. privatedef tryToPut(blockId:String, value:Any, size:Long, deserialized:Boolean):Boolean={
  2. // TODO: Its possible to optimize the locking by locking entries only when selecting blocks
  3. // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
  4. // released, it must be ensured that those to-be-dropped blocks are not double counted for
  5. // freeing up more space for another block that needs to be put. Only then the actually dropping
  6. // of blocks (and writing to disk if necessary) can proceed in parallel.
  7. putLock.synchronized{
  8. if(ensureFreeSpace(blockId, size)){
  9. val entry =newEntry(value, size, deserialized)
  10. entries.synchronized{
  11. entries.put(blockId, entry)
  12. currentMemory += size
  13. }
  14. if(deserialized){
  15. logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
  16. blockId,Utils.bytesToString(size),Utils.bytesToString(freeMemory)))
  17. }else{
  18. logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
  19. blockId,Utils.bytesToString(size),Utils.bytesToString(freeMemory)))
  20. }
  21. true
  22. }else{
  23. // Tell the block manager that we couldn't put it in memory so that it can drop it to
  24. // disk if the block allows disk storage.
  25. val data =if(deserialized){
  26. Left(value.asInstanceOf[ArrayBuffer[Any]])
  27. }else{
  28. Right(value.asInstanceOf[ByteBuffer].duplicate())
  29. }
  30. blockManager.dropFromMemory(blockId, data)
  31. false
  32. }
  33. }
  34. }

而从MemoryStore中取得block则很是简单,只需从hash map中取出block id对应的value便可。

  1. overridedef getValues(blockId:String):Option[Iterator[Any]]={
  2. val entry = entries.synchronized{
  3. entries.get(blockId)
  4. }
  5. if(entry ==null){
  6. None
  7. }elseif(entry.deserialized){
  8. Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
  9. }else{
  10. val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate()// Doesn't actually copy data
  11. Some(blockManager.dataDeserialize(blockId, buffer))
  12. }
  13. }

Put or Get block through BlockManager

上面介绍了DiskStoreMemoryStore对于block的存取操做,那么咱们是要直接与它们交互存取数据吗,仍是封装了更抽象的接口使咱们无需关心底层?

BlockManager为咱们提供了put()get()函数,用户可使用这两个函数对block进行存取而无需关心底层实现。

首先咱们来看一下put()函数的实现:

  1. def put(blockId:String, values:ArrayBuffer[Any], level:StorageLevel,
  2. tellMaster:Boolean=true):Long={
  3. ...
  4. // Remember the block's storage level so that we can correctly drop it to disk if it needs
  5. // to be dropped right after it got put into memory. Note, however, that other threads will
  6. // not be able to get() this block until we call markReady on its BlockInfo.
  7. val myInfo ={
  8. val tinfo =newBlockInfo(level, tellMaster)
  9. // Do atomically !
  10. val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
  11. if(oldBlockOpt.isDefined){
  12. if(oldBlockOpt.get.waitForReady()){
  13. logWarning("Block "+ blockId +" already exists on this machine; not re-adding it")
  14. return oldBlockOpt.get.size
  15. }
  16. // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
  17. oldBlockOpt.get
  18. }else{
  19. tinfo
  20. }
  21. }
  22. val startTimeMs =System.currentTimeMillis
  23. // If we need to replicate the data, we'll want access to the values, but because our
  24. // put will read the whole iterator, there will be no values left. For the case where
  25. // the put serializes data, we'll remember the bytes, above; but for the case where it
  26. // doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
  27. var valuesAfterPut:Iterator[Any]=null
  28. // Ditto for the bytes after the put
  29. var bytesAfterPut:ByteBuffer=null
  30. // Size of the block in bytes (to return to caller)
  31. var size =0L
  32. myInfo.synchronized{
  33. logTrace("Put for block "+ blockId +" took "+Utils.getUsedTimeMs(startTimeMs)
  34. +" to get into synchronized block")
  35. var marked =false
  36. try{
  37. if(level.useMemory){
  38. // Save it just to memory first, even if it also has useDisk set to true; we will later
  39. // drop it to disk if the memory store can't hold it.
  40. val res = memoryStore.putValues(blockId, values, level,true)
  41. size = res.size
  42. res.data match {
  43. caseRight(newBytes)=> bytesAfterPut = newBytes
  44. caseLeft(newIterator)=> valuesAfterPut = newIterator
  45. }
  46. }else{
  47. // Save directly to disk.
  48. // Don't get back the bytes unless we replicate them.
  49. val askForBytes = level.replication >1
  50. val res = diskStore.putValues(blockId, values, level, askForBytes)
  51. size = res.size
  52. res.data match {
  53. caseRight(newBytes)=> bytesAfterPut = newBytes
  54. case _ =>
  55. }
  56. }
  57. // Now that the block is in either the memory or disk store, let other threads read it,
  58. // and tell the master about it.
  59. marked =true
  60. myInfo.markReady(size)
  61. if(tellMaster){
  62. reportBlockStatus(blockId, myInfo)
  63. }
  64. }finally{
  65. // If we failed at putting the block to memory/disk, notify other possible readers
  66. // that it has failed, and then remove it from the block info map.
  67. if(! marked){
  68. // Note that the remove must happen before markFailure otherwise another thread
  69. // could've inserted a new BlockInfo before we remove it.
  70. blockInfo.remove(blockId)
  71. myInfo.markFailure()
  72. logWarning("Putting block "+ blockId +" failed")
  73. }
  74. }
  75. }
  76. logDebug("Put block "+ blockId +" locally took "+Utils.getUsedTimeMs(startTimeMs))
  77. // Replicate block if required
  78. if(level.replication >1){
  79. val remoteStartTime =System.currentTimeMillis
  80. // Serialize the block if not already done
  81. if(bytesAfterPut ==null){
  82. if(valuesAfterPut ==null){
  83. thrownewSparkException(
  84. "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
  85. }
  86. bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
  87. }
  88. replicate(blockId, bytesAfterPut, level)
  89. logDebug("Put block "+ blockId +" remotely took "+Utils.getUsedTimeMs(remoteStartTime))
  90. }
  91. BlockManager.dispose(bytesAfterPut)
  92. return size
  93. }

对于put()操做,主要分为如下3个步骤:

  1. 为block建立BlockInfo结构体存储block相关信息,同时将其加锁使其不能被访问。
  2. 根据block的storage level将block存储到memory或是disk上,同时解锁标识该block已经ready,可被访问。
  3. 根据block的replication数决定是否将该block replicate到远端。

接着咱们来看一下get()函数的实现:

  1. defget(blockId:String):Option[Iterator[Any]]={
  2. val local= getLocal(blockId)
  3. if(local.isDefined){
  4. logInfo("Found block %s locally".format(blockId))
  5. returnlocal
  6. }
  7. val remote = getRemote(blockId)
  8. if(remote.isDefined){
  9. logInfo("Found block %s remotely".format(blockId))
  10. return remote
  11. }
  12. None
  13. }

get()首先会从local的BlockManager中查找block,若是找到则返回相应的block,若local没有找到该block,则发起请求从其余的executor上的BlockManager中查找block。在一般状况下Spark任务的分配是根据block的分布决定的,任务每每会被分配到拥有block的节点上,所以getLocal()就能找到所需的block;可是在资源有限的状况下,Spark会将任务调度到与block不一样的节点上,这样就必须经过getRemote()来得到block。

咱们先来看一下getLocal():

  1. def getLocal(blockId:String):Option[Iterator[Any]]={
  2. logDebug("Getting local block "+ blockId)
  3. val info = blockInfo.get(blockId).orNull
  4. if(info !=null){
  5. info.synchronized{
  6. // In the another thread is writing the block, wait for it to become ready.
  7. if(!info.waitForReady()){
  8. // If we get here, the block write failed.
  9. logWarning("Block "+ blockId +" was marked as failure.")
  10. returnNone
  11. }
  12. val level = info.level
  13. logDebug("Level for block "+ blockId +" is "+ level)
  14. // Look for the block in memory
  15. if(level.useMemory){
  16. logDebug("Getting block "+ blockId +" from memory")
  17. memoryStore.getValues(blockId) match {
  18. caseSome(iterator)=>
  19. returnSome(iterator)
  20. caseNone=>
  21. logDebug("Block "+ blockId +" not found in memory")
  22. }
  23. }
  24. // Look for block on disk, potentially loading it back into memory if required
  25. if(level.useDisk){
  26. logDebug("Getting block "+ blockId +" from disk")
  27. if(level.useMemory && level.deserialized){
  28. diskStore.getValues(blockId) match {
  29. caseSome(iterator)=>
  30. // Put the block back in memory before returning it
  31. // TODO: Consider creating a putValues that also takes in a iterator ?
  32. val elements =newArrayBuffer[Any]
  33. elements ++= iterator
  34. memoryStore.putValues(blockId, elements, level,true).data match {
  35. caseLeft(iterator2)=>
  36. returnSome(iterator2)
  37. case _ =>
  38. thrownewException("Memory store did not return back an iterator")
  39. }
  40. caseNone=>
  41. thrownewException("Block "+ blockId +" not found on disk, though it should be")
  42. }
  43. }elseif(level.useMemory &&!level.deserialized){
  44. // Read it as a byte buffer into memory first, then return it
  45. diskStore.getBytes(blockId) match {
  46. caseSome(bytes)=>
  47. // Put a copy of the block back in memory before returning it. Note that we can't
  48. // put the ByteBuffer returned by the disk store as that's a memory-mapped file.
  49. // The use of rewind assumes this.
  50. assert(0== bytes.position())
  51. val copyForMemory =ByteBuffer.allocate(bytes.limit)
  52. copyForMemory.put(bytes)
  53. memoryStore.putBytes(blockId, copyForMemory, level)
  54. bytes.rewind()
  55. returnSome(dataDeserialize(blockId, bytes))
  56. caseNone=>
  57. thrownewException("Block "+ blockId +" not found on disk, though it should be")
  58. }
  59. }else{
  60. diskStore.getValues(blockId) match {
  61. caseSome(iterator)=>
  62. returnSome(iterator)
  63. caseNone=>
  64. thrownewException("Block "+ blockId +" not found on disk, though it should be")
  65. }
  66. }
  67. }
  68. }
  69. }else{
  70. logDebug("Block "+ blockId +" not registered locally")
  71. }
  72. returnNone
  73. }

getLocal()首先会根据block id得到相应的BlockInfo并从中取出该block的storage level,根据storage level的不一样getLocal()又进入如下不一样分支:

  1. level.useMemory == true:从memory中取出block并返回,若没有取到则进入分支2。
  2. level.useDisk == true:
    • level.useMemory == true: 将block从disk中读出并写入内存以便下次使用时直接从内存中得到,同时返回该block。
    • level.useMemory == false: 将block从disk中读出并返回
  3. level.useDisk == false: 没有在本地找到block,返回None。

接下来咱们来看一下getRemote()

  1. def getRemote(blockId:String):Option[Iterator[Any]]={
  2. if(blockId ==null){
  3. thrownewIllegalArgumentException("Block Id is null")
  4. }
  5. logDebug("Getting remote block "+ blockId)
  6. // Get locations of block
  7. val locations = master.getLocations(blockId)
  8. // Get block from remote locations
  9. for(loc <- locations){
  10. logDebug("Getting remote block "+ blockId +" from "+ loc)
  11. val data =BlockManagerWorker.syncGetBlock(
  12. GetBlock(blockId),ConnectionManagerId(loc.host, loc.port))
  13. if(data !=null){
  14. returnSome(dataDeserialize(blockId, data))
  15. }
  16. logDebug("The value of block "+ blockId +" is null")
  17. }
  18. logDebug("Block "+ blockId +" not found")
  19. returnNone
  20. }

getRemote()首先取得该block的全部location信息,而后根据location向远端发送请求获取block,只要有一个远端返回block该函数就返回而不继续发送请求。

至此咱们简单介绍了BlockManager类中的get()put()函数,使用这两个函数外部类能够轻易地存取block数据。

Partition如何转化为Block

在storage模块里面全部的操做都是和block相关的,可是在RDD里面全部的运算都是基于partition的,那么partition是如何与block对应上的呢?

RDD计算的核心函数是iterator()函数:

  1. finaldef iterator(split:Partition, context:TaskContext):Iterator[T]={
  2. if(storageLevel !=StorageLevel.NONE){
  3. SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  4. }else{
  5. computeOrReadCheckpoint(split, context)
  6. }
  7. }

若是当前RDD的storage level不是NONE的话,表示该RDD在BlockManager中有存储,那么调用CacheManager中的getOrCompute()函数计算RDD,在这个函数中partition和block发生了关系:

首先根据RDD id和partition index构造出block id (rdd_xx_xx),接着从BlockManager中取出相应的block。

  • 若是该block存在,表示此RDD在以前已经被计算过和存储在BlockManager中,所以取出便可,无需再从新计算。
  • 若是该block不存在则须要调用RDD的computeOrReadCheckpoint()函数计算出新的block,并将其存储到BlockManager中。

须要注意的是block的计算和存储是阻塞的,若另外一线程也须要用到此block则需等到该线程block的loading结束。

  1. def getOrCompute[T](rdd: RDD[T], split:Partition, context:TaskContext, storageLevel:StorageLevel)
  2. :Iterator[T]={
  3. val key ="rdd_%d_%d".format(rdd.id, split.index)
  4. logDebug("Looking for partition "+ key)
  5. blockManager.get(key) match {
  6. caseSome(values)=>
  7. // Partition is already materialized, so just return its values
  8. return values.asInstanceOf[Iterator[T]]
  9. caseNone=>
  10. // Mark the split as loading (unless someone else marks it first)
  11. loading.synchronized{
  12. if(loading.contains(key)){
  13. logInfo("Another thread is loading %s, waiting for it to finish...".format (key))
  14. while(loading.contains(key)){
  15. try{loading.wait()}catch{case _ :Throwable=>}
  16. }
  17. logInfo("Finished waiting for %s".format(key))
  18. // See whether someone else has successfully loaded it. The main way this would fail
  19. // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
  20. // partition but we didn't want to make space for it. However, that case is unlikely
  21. // because it's unlikely that two threads would work on the same RDD partition. One
  22. // downside of the current code is that threads wait serially if this does happen.
  23. blockManager.get(key) match {
  24. caseSome(values)=>
  25. return values.asInstanceOf[Iterator[T]]
  26. caseNone=>
  27. logInfo("Whoever was loading %s failed; we'll try it ourselves".format (key))
  28. loading.add(key)
  29. }
  30. }else{
  31. loading.add(key)
  32. }
  33. }
  34. try{
  35. // If we got here, we have to load the split
  36. logInfo("Partition %s not found, computing it".format(key))
  37. val computedValues = rdd.computeOrReadCheckpoint(split, context)
  38. // Persist the result, so long as the task is not running locally
  39. if(context.runningLocally){return computedValues }
  40. val elements =newArrayBuffer[Any]
  41. elements ++= computedValues
  42. blockManager.put(key, elements, storageLevel,true)
  43. return elements.iterator.asInstanceOf[Iterator[T]]
  44. }finally{
  45. loading.synchronized{
  46. loading.remove(key)
  47. loading.notifyAll()
  48. }
  49. }
  50. }
  51. }

这样RDD的transformation、action就和block数据创建了联系,虽然抽象上咱们的操做是在partition层面上进行的,可是partition最终仍是被映射成为block,所以实际上咱们的全部操做都是对block的处理和存取。

End

本文就storage模块的两个层面进行了介绍-通讯层和存储层。通讯层中简单介绍了类结构和组成以及类在通讯层中所扮演的不一样角色,还有不一样角色之间通讯的报文,同时简单介绍了通讯层的启动和注册细节。存储层中分别介绍了DiskStoreMemoryStore中对于block的存和取的实现代码,同时分析了BlockManagerput()get()接口,最后简单介绍了Spark RDD中的partition与BlockManager中的block之间的关系,以及如何交互存取block的。

本文从总体上分析了storage模块的实现,并未就具体实现作很是细节的分析,相信在看完本文对storage模块有一个总体的印象之后再去分析细节的实现会有事半功倍的效果。

相关文章
相关标签/搜索