咱们知道,Netty使用直接内存实现Netty零拷贝以提高性能,
但直接内存的建立和释放可能须要涉及系统调用,是比较昂贵的操做,若是每一个请求都建立和释放一个直接内存,那性能确定是不能知足要求的。
这时就须要使用内存池。
即从系统中申请一大块内存,再在上面分配每一个请求所需的内存。算法
Netty中的内存池主要涉及PoolArena,PoolChunk与PoolSubpage。
本文主要分析PoolArena的做用与实现。
源码分析基于Netty 4.1.52数组
接口关系
ByteBufAllocator,内存分配器,负责为ByteBuf分配内存, 线程安全。
PooledByteBufAllocator,池化内存分配器,默认的ByteBufAllocator,预先从操做系统中申请一大块内存,在该内存上分配内存给ByteBuf,能够提升性能和减少内存碎片。
UnPooledByteBufAllocator,非池化内存分配器,每次都从操做系统中申请内存。缓存
RecvByteBufAllocator,接收内存分配器,为Channel读入的IO数据分配一块大小合理的buffer空间。具体功能交由内部接口Handle定义。
它主要是针对Channel读入场景添加一些操做,如guess,incMessagesRead,lastBytesRead等等。
ByteBuf,分配好的内存块,能够直接使用。安全
下面只关注PooledByteBufAllocator,它是Netty中默认的内存分配器,也是理解Netty内存机制的难点。微信
前面文章《ChannelPipeline机制与读写过程》中分析了数据读取过程,
NioByteUnsafe#read多线程
public final void read() { ... final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; ... byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); ... }
recvBufAllocHandle方法返回AdaptiveRecvByteBufAllocator.HandleImpl。(AdaptiveRecvByteBufAllocator,PooledByteBufAllocator都在DefaultChannelConfig中初始化)并发
AdaptiveRecvByteBufAllocator.HandleImpl#allocate -> AbstractByteBufAllocator#ioBuffer -> PooledByteBufAllocator#directBuffer -> PooledByteBufAllocator#newDirectBufferjvm
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { // #1 PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; final ByteBuf buf; if (directArena != null) { // #2 buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { // #3 buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
AbstractByteBufAllocator#ioBuffer方法会判断当前系统是否支持unsafe。支持时使用直接内存,不支持则使用堆内存。这里只关注直接内存的实现。
#1
从当前线程缓存中获取对应内存池PoolArena
#2
在当前线程内存池上分配内存
#3
内存池不存在,只能使用非池化内存分配内存了高并发
PooledByteBufAllocator#threadCache是一个PoolThreadLocalCache实例,PoolThreadLocalCache继承于FastThreadLocal,FastThreadLocal这里简单理解为对ThreadLocal的优化,它为每一个线程维护了一个PoolThreadCache,PoolThreadCache上关联了内存池。
当PoolThreadLocalCache上某个线程的PoolThreadCache不存在时,经过initialValue方法构造。源码分析
PoolThreadLocalCache#initialValue
protected synchronized PoolThreadCache initialValue() { // #1 final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); // #2 final Thread current = Thread.currentThread(); if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { final PoolThreadCache cache = new PoolThreadCache( heapArena, directArena, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); ... } // No caching so just use 0 as sizes. return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0); }
#1
从PooledByteBufAllocator的heapArenas,directArenas中获取使用率最小的PoolArena。
PooledByteBufAllocator构造时默认会为PooledByteBufAllocator#directArenas初始化8个PoolArena。
#2
构造PoolThreadCache。
PoolArena,能够理解为一个内存池,负责管理从操做系统中申请到的内存块。
PoolThreadCache为每个线程关联一个PoolArena(PoolThreadCache#directArena),该线程的内存都在该PoolArena上分配。
Netty支持高并发系统,可能有不少线程进行同时内存分配。为了缓解线程竞争,经过建立多个PoolArena细化锁的粒度,从而提升并发执行的效率。
注意,一个PoolArena能够会分给多个的线程,能够看到PoolArena上会有一些同步操做。
前面分析SizeClasses的文章说过,Netty将内存池中的内存块按大小划分为3个级别。
不一样级别的内存块管理算法不一样。默认划分规则以下:
small <= 28672(28K)
normal <= 16777216(16M)
huge > 16777216(16M)
smallSubpagePools是一个PoolSubpage数组,负责维护small级别的内存块信息。
PoolChunk负责维护normal级别的内存,PoolChunkList管理一组PoolChunk。
PoolArena按内存使用率将PoolChunk分别维护到6个PoolChunkList中,
PoolArena按内存使用率将PoolChunk分别维护到6个PoolChunkList中,
qInit->内存使用率为0~25,
q000->内存使用率为1~50,
q025->内存使用率为25~75,
q050->内存使用率为50~75,
q075->内存使用率为75~100,
q100->内存使用率为100。
注意:PoolChunk是Netty每次向操做系统申请的内存块。
PoolSubpage须要从PoolChunk中分配,而Tiny,Small级別的内存则是从PoolSubpage中分配。
下面来看一下分配过程
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { // #1 final int sizeIdx = size2SizeIdx(reqCapacity); // #2 if (sizeIdx <= smallMaxSizeIdx) { tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx); } else if (sizeIdx < nSizes) { // #3 tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx); } else { // #4 int normCapacity = directMemoryCacheAlignment > 0 ? normalizeSize(reqCapacity) : reqCapacity; // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, normCapacity); } }
#1
size2SizeIdx是父类SizeClasses提供的方法,它使用特定算法,将申请的内存大小调整为规范大小,划分到对应位置,返回对应索引,可参考《内存对齐类SizeClasses》
#2
分配small级别的内存块
#3
分配normal级别的内存块
#4
分配huge级别的内存块
private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity, final int sizeIdx) { // #1 if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) { return; } // #2 final PoolSubpage<T> head = smallSubpagePools[sizeIdx]; final boolean needsNormalAllocation; synchronized (head) { // #3 final PoolSubpage<T> s = head.next; needsNormalAllocation = s == head; if (!needsNormalAllocation) { assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx); long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache); } } // #4 if (needsNormalAllocation) { synchronized (this) { allocateNormal(buf, reqCapacity, sizeIdx, cache); } } incSmallAllocation(); }
#1
首先尝试在线程缓存上分配。
除了PoolArena,PoolThreadCache#smallSubPageHeapCaches还为每一个线程维护了Small级别的内存缓存
#2
使用前面SizeClasses#size2SizeIdx方法计算的索引,获取对应PoolSubpage
#3
注意,head是一个占位节点,并不存储数据,s==head表示当前存在能够用的PoolSubpage,由于已经耗尽的PoolSubpage是会从链表中移除。
接着从PoolSubpage中分配内存,后面有文章解析详细过程
注意,这里必要运行在同步机制中。
#4
没有可用的PoolSubpage,须要申请一个Normal级别的内存块,再在上面分配所需内存
normal级别的内存也是先尝试在线程缓存中分配,分配失败后再调用allocateNormal方法申请
PoolArena#allocate -> allocateNormal
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) { if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) || q025.allocate(buf, reqCapacity, sizeIdx, threadCache) || q000.allocate(buf, reqCapacity, sizeIdx, threadCache) || qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) || q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) { return; } // Add a new chunk. PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize); boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache); assert success; qInit.add(c); }
#1
依次从q050,q025,q000,qInit,q075上申请内存
为何要是这个顺序呢?
PoolArena中的PoolChunkList之间也组成一个“双向”链表
qInit ---> q000 <---> q025 <---> q050 <---> q075 <---> q100
PoolChunkList中还维护了minUsage,maxUsage,即当一个PoolChunk使用率大于maxUsage,它将被移动到下一个PoolChunkList,使用率小于minUsage,则被移动到前一个PoolChunkList。
注意:q000没有前置节点,它的minUsage为1,即上面的PoolChunk内存彻底释放后,将被销毁。
qInit的前置节点是它本身,但它的minUsage为Integer.MIN_VALUE,即便上面的PoolChunk内存彻底释放后,也不会被销毁,而是继续保留在内存。
不优先从q000分配,正是由于q000上的PoolChunk内存彻底释放后要被销毁,若是在上面分配,则会延迟内存的回收进度。
而q075上因为内存利用率过高,致使内存分配的成功率大大下降,所以放到最后。
因此从q050是一个不错的选择,这样大部分状况下,Chunk的利用率都会保持在一个较高水平,提升整个应用的内存利用率;
在PoolChunkList上申请内存,PoolChunkList会遍历链表上PoolChunk节点,直到分配成功或到达链表末尾。
PoolChunk分配后,若是内存使用率高于maxUsage,它将被移动到下一个PoolChunkList。
newChunk方法负责构造一个PoolChunk,这里是内存池向操做系统申请内存。
DirectArena#newChunk
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { if (directMemoryCacheAlignment == 0) { return new PoolChunk<ByteBuffer>(this, allocateDirect(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0); } final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment); return new PoolChunk<ByteBuffer>(this, memory, pageSize, pageShifts, chunkSize, maxPageIdx, offsetCacheLine(memory)); }
allocateDirect方法向操做系统申请内存,得到一个(jvm)ByteBuffer,
PoolChunk#memory维护了该ByteBuffer,PoolChunk的内存实际上都是在该ByteBuffer上分配。
最后是huge级别的内存申请
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) { PoolChunk<T> chunk = newUnpooledChunk(reqCapacity); activeBytesHuge.add(chunk.chunkSize()); buf.initUnpooled(chunk, reqCapacity); allocationsHuge.increment(); }
比较简单,没有使用内存池,直接向操做系统申请内存。
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) { if (chunk.unpooled) { // #1 int size = chunk.chunkSize(); destroyChunk(chunk); activeBytesHuge.add(-size); deallocationsHuge.increment(); } else { // #2 SizeClass sizeClass = sizeClass(handle); if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) { // cached so not free it. return; } freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false); } }
#1
非池化内存,直接销毁内存
#2
池化内存,首先尝试加到线程缓存中,成功则不须要其余操做。失败则调用freeChunk
void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) { final boolean destroyChunk; synchronized (this) { ... destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer); } if (destroyChunk) { // destroyChunk not need to be called while holding the synchronized lock. destroyChunk(chunk); } }
chunk.parent即PoolChunkList,PoolChunkList#free会调用PoolChunk释放内存,释放内存后,若是内存使用率低于minUsage,则移动前一个PoolChunkList,若是前一个PoolChunkList不存在(q000),则返回false,由后面的步骤销毁该PoolChunk。
可回顾前面解析ByteBuf文章中关于内存销毁的内容。
若是您以为本文不错,欢迎关注个人微信公众号,系列文章持续更新中。您的关注是我坚持的动力!