上一节,咱们对BlockManager的主要写入方法作了一个整理,知道了BlockMananger的主要写入逻辑,以及对于块信息的管理。可是,因为spark的整个存储模块是在是很庞大,并且不少细节的逻辑错综复杂,若是对于每一个细节都刨根问底,一来精力有限,二来感受也没有太大的必要,固然若是时间容许确定是越详细越好,在这里,个人分析的主要目的是理清存储模块的重点逻辑,但愿可以提纲契领地把各个模块的脉络领出来,创建起对spark-core中各模块的总体认知,这样咱们在遇到一些问题的时候就可以很快地知道应该从何处下手,从哪一个具体的模块去找问题。
好了废话很少说,本节接着上一节。上一篇,咱们分析了BlockManager的几个主要的存储方法,发现BlockManager主要依靠内部的两个组件MemoryStore和DiskStore来进行实际的数据写入和块的管理。
本节,咱们就来看一下MemoryStore这个组件。java
不过,我仍是延续我一向的风格,从外部对一个类的方法调用为切入点分析这个类的做用和逻辑。
因此,咱们先来看一下上一节对于MemoryStore的主要的方法调用的总结:api
memoryStore.putIteratorAsValues memoryStore.putIteratorAsBytes memoryStore.putBytes
这个方法主要是用于存储级别是非序列化的状况,即直接以java对象的形式将数据存放在jvm堆内存上。咱们都知道,在jvm堆内存上存放大量的对象并非什么好事,gc压力大,挤占内存,可能引发频繁的gc,可是也有明显的好处,就是省去了序列化和反序列化耗时,并且直接从堆内存取数据显然比任何其余方式(磁盘和直接内存)都要快不少,因此对于内存充足且要缓存的数据量本省不是很大的状况,这种方式也不失为一种不错的选择。数组
private[storage] def putIteratorAsValues[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { // 用于存储java对象的容器 val valuesHolder = new DeserializedValuesHolder[T](classTag) putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match { // 存储成功 case Right(storedSize) => Right(storedSize) // 存储失败的状况 case Left(unrollMemoryUsedByThisBlock) => // ValuesHolder内部的数组和vector会相互转换 // 数据写入完成后会将vector中的数据转移到数组中 val unrolledIterator = if (valuesHolder.vector != null) { valuesHolder.vector.iterator } else { valuesHolder.arrayValues.toIterator } // 返回写入一半的迭代器、 // 外部调用者一半会选择关闭这个迭代器以释放被使用的内存 Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, unrollMemoryUsedByThisBlock, unrolled = unrolledIterator, rest = values)) } }
这个方法的逻辑很简单,做用也比较单一,主要是对实际存储方法putIterator的返回结果作处理,若是失败的话,就封装一个PartiallyUnrolledIterator返回给外部调用这个,调用这个通常须要将这个写入一半的迭代器关闭。缓存
这个方法看似很长,其实逻辑相对简单,主要作的事就是把数据一条一条往ValuesHolder中写,并周期性地检查内存,若是内存不够就经过内存管理器MemoryManager申请内存,每次申请当前内存量的1.5倍。
最后,将ValuesHolder中的数据转移到一个数组中(其实数据在SizeTrackingVector中也是以数组的形式存储,只不过SizeTrackingVector对象内部处理数组还有一些其余的簿记量,更为关键的是咱们须要将存储的数据以同一的接口进行包装,以利于MemoryStore进行同一管理)。最后还有关键的一步,就是释放展开内存,从新申请存储内存。
此外,这个过程当中有使用到memoryManager,具体的方法调用是:安全
memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
------------------------------分割线------------------------------app
private def putIterator[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode, valuesHolder: ValuesHolder[T]): Either[Long, Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Number of elements unrolled so far var elementsUnrolled = 0 // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true // Initial per-task memory to request for unrolling blocks (bytes). // 用于数据在内存展开的初始的内存使用量 val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory // 检查内存的频率,每写这么多条数据就会检查一次是否须要申请额外的内存 val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD) // Memory currently reserved by this task for this particular unrolling operation // 内存阈值,开始时等于初始阈值 var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size // 内存增加因子,每次申请的内存是当前内存的这个倍数 val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR) // Keep track of unroll memory used by this particular block / putIterator() operation // 当前的块使用的内存大小 var unrollMemoryUsedByThisBlock = 0L // Request enough memory to begin unrolling // 首先进行初始的内存申请,向MemoryManager申请内存 keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") } else { // 若是成功申请到内存,则累加记录 unrollMemoryUsedByThisBlock += initialMemoryThreshold } // Unroll this block safely, checking whether we have exceeded our threshold periodically // 循环将每条数据写入容器中valuesHolder while (values.hasNext && keepUnrolling) { valuesHolder.storeValue(values.next()) // 若是写入数据的条数达到一个周期,那么就检查一下是否须要申请额外的内存 if (elementsUnrolled % memoryCheckPeriod == 0) { // 经过valuesHolder获取已经写入的数据的评估大小 // 注意,这里的数据大小只是估计值,并非十分准确 // 具体如何进行估算的能够看valuesHolder内部实现 val currentSize = valuesHolder.estimatedSize() // If our vector's size has exceeded the threshold, request more memory // 若是已写入的数据大小超过了当前阈值 if (currentSize >= memoryThreshold) { // 这里每次申请的内存量都是不同的 // 每次申请的内存是当前已使用内存的1.5倍(默认) val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { // 记录累积申请的内存量 unrollMemoryUsedByThisBlock += amountToRequest } // New threshold is currentSize * memoryGrowthFactor // 目前已经向内存管理器申请的内存量 memoryThreshold += amountToRequest } } // 记录插入的数据条数 elementsUnrolled += 1 } // Make sure that we have enough memory to store the block. By this point, it is possible that // the block's actual memory usage has exceeded the unroll memory by a small amount, so we // perform one final call to attempt to allocate additional memory if necessary. // 若是keepUnrolling为true,说明顺利地将全部数据插入, // 并未遇到申请内存失败的状况 if (keepUnrolling) { // 将内部的数据转移到一个数组中 val entryBuilder = valuesHolder.getBuilder() // 数据在内存中的精确大小 val size = entryBuilder.preciseSize // 实际的大小可能大于申请的内存量 // 所以根据实际大小还要再申请额外的内存 if (size > unrollMemoryUsedByThisBlock) { val amountToRequest = size - unrollMemoryUsedByThisBlock keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest } } if (keepUnrolling) { // 获取MemoryEntry对象,该对象是对插入数据的包装 val entry = entryBuilder.build() // Synchronize so that transfer is atomic memoryManager.synchronized { // 这一步主要是释放申请的展开内存 // 而后申请存储内存 // 这里须要弄清楚展开内存的概念 // 展开状态指的是对象在内存中处于一种比较松散的状态,这样的状态方便作一些管理如统计大小等 // 而随后将对象转移到数组中,处于一种比较紧实的状态,数组相对来讲占用的额外内存是比较小的 // 一个数组只是一个对象,只有一个对象头,能够用来管理大量的对象 releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) // 申请存储内存 val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode) assert(success, "transferring unroll memory to storage memory failed") } // 放入map中管理起来 entries.synchronized { entries.put(blockId, entry) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId, Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(entry.size) } else { // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, entryBuilder.preciseSize) // 若是失败,返回已经申请的展开内存 Left(unrollMemoryUsedByThisBlock) } } else { // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, valuesHolder.estimatedSize()) Left(unrollMemoryUsedByThisBlock) } }
咱们再看另外一个方法。套路基本和putIteratorAsValues是同样同样的。
最大的区别在于ValuesHolder类型不一样。非序列化形式存储使用的是DeserializedMemoryEntry,而序列化形式存储使用的是SerializedMemoryEntry。jvm
private[storage] def putIteratorAsBytes[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // 字节数组的块大小,默认是1m val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " + s"is too large to be set as chunk size. Chunk size has been capped to " + s"${Utils.bytesToString(Int.MaxValue)}") Int.MaxValue } else { initialMemoryThreshold.toInt } // 字节数组的容器 val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag, memoryMode, serializerManager) putIterator(blockId, values, classTag, memoryMode, valuesHolder) match { case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => // 部分展开,部分以序列化形式存储的block Left(new PartiallySerializedBlock( this, serializerManager, blockId, valuesHolder.serializationStream, valuesHolder.redirectableStream, unrollMemoryUsedByThisBlock, memoryMode, valuesHolder.bbos, values, classTag)) } }
咱们再来看另外一个被外部调用用来插入数据的方法。很简单,不说了。ide
def putBytes[T: ClassTag]( blockId: BlockId, size: Long, memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // 首先向内存管理器申请内存 // 这里申请的是存储内存,由于要插入的字节数组, // 因此不须要再展开,也就不须要申请展开内存 if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) { // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() assert(bytes.size == size) // 这里直接构建了一个SerializedMemoryEntry // 并放到map中管理起来 val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]]) entries.synchronized { entries.put(blockId, entry) } logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) true } else { false } }
经过对上面的三个方法,其实主要是前两个方法的分析,咱们发现,除了对内存进行簿记管理以外,以及经过内存管理器申请内存以外,插入数据最主要的工做其实都是有ValuesHolder对象来完成的。
ValuesHolder特质有两个实现类:DeserializedValuesHolder和SerializedValuesHolder。ui
DeserializedValuesHolder对象内部有两个成员:vector,是一个SizeTrackingVector;arrayValues,是一个存放值的数组,用于在全部数据插入后,将主句转移到一个数组中,方便包装成一个MemoryEntry对象。大部分工做是有SizeTrackingVector完成的。this
private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] { // Underlying vector for unrolling the block var vector = new SizeTrackingVector[T]()(classTag) var arrayValues: Array[T] = null override def storeValue(value: T): Unit = { vector += value } override def estimatedSize(): Long = { vector.estimateSize() } override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] { // We successfully unrolled the entirety of this block arrayValues = vector.toArray vector = null override val preciseSize: Long = SizeEstimator.estimate(arrayValues) override def build(): MemoryEntry[T] = DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) } }
上面提到的SizeTrackingVector继承了这个特质,除了这个特质,还集成了PrimitiveVector类,可是PrimitiveVector类基本上就是对一个数组的简单包装。
SizeTrackingVector最重要的功能:追踪对象的大小,就是在SizeTracker特之中实现的。
我大体说一下这个特质是如何实现对象大小跟踪和估算的,代码实现也并不复杂,感兴趣的能够看一看,限于篇幅这里就不贴了。
可见这么作并非什么精确,可是因为是抽样,并且抽样周期越日后面越长,因此对于数据插入的效率影响很小,并且这种不精确性其实在后续的内存检查过程当中是有考虑到的。在全部数据插入完的收尾工做中,会对对象大小作一次精确计算。此外,熟悉spark内存管理的同窗应该知道,其实spark通常会配置一个安全因子(通常是0.9),也就是说只是用配置的内存大小的90%,就是为了尽量地减小这种不精确的内存估算形成OOM的可能性。
private class SerializedValuesHolder[T]( blockId: BlockId, chunkSize: Int, classTag: ClassTag[T], memoryMode: MemoryMode, serializerManager: SerializerManager) extends ValuesHolder[T] { val allocator = memoryMode match { case MemoryMode.ON_HEAP => ByteBuffer.allocate _ // 调用unsafe的本地方法申请直接内存 // 这个方法之因此没有调用ByteBuffer.allocateDirect方法 // 是由于这个方法分配的直接内存大小收到参数MaxDirectMemorySize限制 // 因此这里绕过ByteBuffer.allocateDirect方法,经过反射和unsafe类建立直接内存对象 case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ } val redirectableStream = new RedirectableOutputStream val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) redirectableStream.setOutputStream(bbos) val serializationStream: SerializationStream = { val autoPick = !blockId.isInstanceOf[StreamBlockId] val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() // 包装压缩流和序列化流 ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } // 写入方法,写入的对象通过序列化,压缩, // 而后通过ChunkedByteBufferOutputStream被分割成一个个的字节数组块 override def storeValue(value: T): Unit = { serializationStream.writeObject(value)(classTag) } override def estimatedSize(): Long = { bbos.size } override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] { // We successfully unrolled the entirety of this block serializationStream.close() override def preciseSize(): Long = bbos.size override def build(): MemoryEntry[T] = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) } }
大概看一下,主要的逻辑很简单,这里面有几个注意点:
MemoryStore.scala这个文件中乍看代码有八百多行,可是其实很大部分代码是一些辅助类,比较核心的写入逻辑也就是前面提到的几个方法,再加上核心的两个类DeserializedValuesHolder和SerializedValuesHolder实现了以对象或字节数组的形式存储数据。