本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归做者(秦凯新)全部,禁止转载,欢迎学习。算法
BlockInfoManger 主要提供读写锁控制,层级仅仅位于BlockManger之下,一般Spark读写操做都先调用BlockManger,而后咨询BlockInfoManger是否存在锁竞争,而后才会调用DiskStore和MemStore,进而调用DiskBlockManger来肯定数据与位置映射,或者调用 MemoryManger来肯定内存池的软边界和内存使用申请。缓存
Driver与 Executor 组件各自拥有任务执行的SparkEnv环境,而每个SparkEnv 中都有一个BlockManger负责存储服务,做为高层抽象,BlockManger 之间须要经过 RPCEnv,ShuffleClient,及BlocakTransferService相互通信。架构
BlockInfo中具备读写锁的标志,经过标志能够判断是否进行写控制框架
val NO_WRITER: Long = -1
val NON_TASK_WRITER: Long = -1024
* The task attempt id of the task which currently holds the write lock for this block, or
* [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or
* [[BlockInfo.NO_WRITER]] if this block is not locked for writing.
def writerTask: Long = _writerTask
def writerTask_=(t: Long): Unit = {
_writerTask = t
checkInvariants()
复制代码
BlockInfoManager具备BlockId与BlockInfo的映射关系以及任务id与BlockId的锁映射:dom
private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]
*Tracks the set of blocks that each task has locked for writing.
private[this] val writeLocksByTask = new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
with mutable.MultiMap[TaskAttemptId, BlockId]
*Tracks the set of blocks that each task has locked for reading, along with the number of times
*that a block has been locked (since our read locks are re-entrant).
private[this] val readLocksByTask =
new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
复制代码
能够看到DiskStore内部会调用DiskBlockManager来肯定Block的读写位置:函数
如下是DiskStore的抽象写操做,须要传入FileOutputStream => Unit高阶函数:oop
def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
if (contains(blockId)) {
throw new IllegalStateException(s"Block $blockId is already present in the disk store")
}
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val fileOutputStream = new FileOutputStream(file)
var threwException: Boolean = true
try {
writeFunc(fileOutputStream)
threwException = false
} finally {
try {
Closeables.close(fileOutputStream, threwException)
} finally {
if (threwException) {
remove(blockId)
}
}
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName,
Utils.bytesToString(file.length()),
finishTime - startTime))
}
复制代码
如下是DiskStore的读操做,调用DiskBlockManager来获取数据位置:post
def getBytes(blockId: BlockId): ChunkedByteBuffer = {
val file = diskManager.getFile(blockId.name)
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally {
* For small files, directly read rather than memory map
if (file.length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(file.length.toInt)
channel.position(0)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer\n" +
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
}
}
buf.flip()
new ChunkedByteBuffer(buf)
} else {
new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
}
} {
channel.close()
}
}
复制代码
在这里要强调的是:第一代大数据框架hadoop只将内存做为计算资源,而Spark不只将内存做为计算资源外,还将内存的一部分归入存储体系:学习
未完待续大数据
存储体系是Spark的基石,我争取把每一块细微的知识点进行剖析,和大部分博客不一样的是,我会尽可能采用最平实的语言,毕竟技术就是一层窗户纸。
秦凯新 20181031 凌晨