接着上一篇,本篇,咱们分析一下实现磁盘存储的功能类DiskStore,这个类相对简单。在正式展开以前,我以为有必要大概分析一下BlockManager的背景,或者说它的运行环境,运行的做用范围。Blockmanager这个类其实在运行时的每一个节点都会有一个实例(包括driver和executor进程),由于不管是driver端进行广播变量的建立,仍是executor端shuffle过程当中写shuffle块,或者是任务运行时结果太大须要经过BlockManager传输,或者是RDD的缓存,其实在每一个运行节点上都会经过Blockmanager来管理程序内部对于本地的内存和磁盘的读写,因此综上,我想表达的核心意思就是每一个进程(driver和executor)都有一Blockmanager实例,而这些Blockmanager实例是经过BlockManagerId类来进行惟一区分的,BlockManagerId其实是对进程物理位置的封装。java
首先咱们来看一个最经常使用的写入方法缓存
def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { // 经过DiskBlockManager对象检查这个blockId对应的文件名的文件是否存在 if (contains(blockId)) { throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis // 经过DiskBlockManager获取一个文件用于写入数据 val file = diskManager.getFile(blockId) // 用CountingWritableChannel包装一下,以便于记录写入的字节数 val out = new CountingWritableChannel(openForWrite(file)) var threwException: Boolean = true try { writeFunc(out) // 关键步骤,记录到内部的map结构中 blockSizes.put(blockId, out.getCount) threwException = false } finally { try { out.close() } catch { case ioe: IOException => if (!threwException) { threwException = true throw ioe } } finally { if (threwException) { remove(blockId) } } } val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(file.length()), finishTime - startTime)) }
这个方法很简单,没什么好说的,可是调用了一个比较重要的类DiskBlockManager,这个类的功能就是对磁盘上的目录和文件进行管理,会在磁盘上按照必定规则建立一些目录和子目录,在分配文件名时也会尽可能均匀第分配在这些目录和子目录下。app
这个方法就不说了,简单处理一下直接调用put方法。ide
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { put(blockId) { channel => bytes.writeFully(channel) } }
咱们来看一下这个方法,首先经过DiskBlockManager获取对应的文件名,而后将其包装成一个BlockData对象,分为加密和不加密两种。加密
def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) val blockSize = getSize(blockId) securityManager.getIOEncryptionKey() match { case Some(key) => // Encrypted blocks cannot be memory mapped; return a special object that does decryption // and provides InputStream / FileRegion implementations for reading the data. new EncryptedBlockData(file, blockSize, conf, key) case _ => // 看一下DiskBlockData new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize) } }
这个类做为磁盘文件的包装类,主要功能是提供了几个方便的接口,将磁盘文件中的数据读取出来并生成缓冲对象。
这个类中有两个重要的方法toChunkedByteBuffer和toByteBuffer,toByteBuffer就不说了,调用ReadableByteChannel.read(ByteBuffer dst)方法读取文件数据,咱们看一下toChunkedByteBufferspa
这个方法也很简单,在数据量比较大的时候,因为每次申请的内存块大小有限制maxMemoryMapBytes,因此须要切分红多个块code
override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = { // Utils.tryWithResource调用保证在使用完资源后关闭资源 // 基本等同于java中的try{}finally{} Utils.tryWithResource(open()) { channel => var remaining = blockSize val chunks = new ListBuffer[ByteBuffer]() while (remaining > 0) { // 这里取剩余大小和maxMemoryMapBytes的较小值, // 也就是说每次申请的内存块大小不超过maxMemoryMapBytes val chunkSize = math.min(remaining, maxMemoryMapBytes) val chunk = allocator(chunkSize.toInt) remaining -= chunkSize JavaUtils.readFully(channel, chunk) chunk.flip() chunks += chunk } new ChunkedByteBuffer(chunks.toArray) } }
这个类以前也分析过,主要是用来管理spark运行过程当中写入的一些临时文件,以及目录的管理。orm
首先会根据参数配置建立本地目录(能够是逗号分隔的多个目录),参数的优先顺序是:若是是运行在yarn上,则会使用yarn参数LOCAL_DIRS配置的本地目录;不然获取环境变量SPARK_LOCAL_DIRS的值;不然获取spark.local.dir参数的值;最后若是都没有配置,那么就用java系统参数java.io.tmpdir的值做为临时目录。对象
其次,关于文件在目录之间分配的问题,使用文件名的hash值对目录数量取余的方法来尽可能将文件均匀地分配到不一样的目录下。接口
另一点要说的是文件名的命名规则,是根据不一样做用的Block来区别命名的,例如RDD缓存写入的block的id就是RDDBlockId,它的文件名拼接规则是"rdd_" + rddId + "_" + splitIndex