Netty源码分析第5章(ByteBuf)---->第4节: PooledByteBufAllocator简述

 

Netty源码分析第五章: ByteBufhtml

 

第四节: PooledByteBufAllocator简述数组

 

 

上一小节简单介绍了ByteBufAllocator以及其子类UnPooledByteBufAllocator的缓冲区分类的逻辑, 这一小节开始带你们剖析更为复杂的PooledByteBufAllocator, 咱们知道PooledByteBufAllocator是经过本身取一块连续的内存进行ByteBuf的封装, 因此这里更为复杂, 在这一小节简单讲解有关PooledByteBufAllocator分配逻辑缓存

友情提示:  从这一节开始难度开始加大, 请各位战友作好心理准备ide

PooledByteBufAllocator一样也重写了AbstractByteBuf的newDirectBuffer和newHeapBuffer两个抽象方法, 咱们这一小节以newDirectBuffer为例, 先简述一下其逻辑源码分析

首先看UnPooledByteBufAllocator中newDirectBuffer这个方法:性能

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }

首先 PoolThreadCache cache = threadCache.get() 这一步是拿到一个线程局部缓存对象, 线程局部缓存, 顾明思议, 就是同一个线程共享的一个缓存this

threadCache是PooledByteBufAllocator类的一个成员变量, 类型是PoolThreadLocalCache(这两个很是容易混淆, 切记):spa

private final PoolThreadLocalCache threadCache;

再看其类型PoolThreadLocalCache的定义:.net

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代码省略
} 

这里继承了一个FastThreadLocal类, 这个类至关于jdk的ThreadLocal, 只是性能更快, 有关FastThreadLocal, 咱们在后面的章节会详细剖析, 这里咱们只要知道, 继承FastThreadLocal类而且重写了initialValue方法, 则经过其get方法就能得到initialValue返回的对象, 而且这个对象是线程共享的线程

 

在这里咱们看到, 在重写的initialValue方法中, 初始化了heapArena和directArena两个属性以后, 经过new PoolThreadCache()这种方式建立了PoolThreadCache对象

这里注意, PoolThreadLocalCache是一个FastThreadLocal, 而PoolThreadCache才是线程局部缓存, 这两个类名很是很是像, 千万别搞混了(我当初读这段代码时由于搞混因此懵逼了)

其中heapArena和directArena是分别是用来分配堆和堆外内存用的两个对象, 以directArena为例, 咱们看到是经过leastUsedArena(directArenas)这种方式得到的, directArenas是一个directArena类型的数组, leastUsedArena(directArenas)这个方法是用来获取数组中一个使用最少的directArena对象

 

directArenas是PooledByteBufAllocator的成员变量, 是在其构造方法中初始化的:

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { //代码省略
    if (nDirectArena > 0) { directArenas = newArenaArray(nDirectArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( this, pageSize, maxOrder, pageShifts, chunkSize); directArenas[i] = arena; metrics.add(arena); } directArenaMetrics = Collections.unmodifiableList(metrics); } else { directArenas = null; directArenaMetrics = Collections.emptyList(); } }

咱们看到这里经过directArenas = newArenaArray(nDirectArena)初始化了directArenas, 其中nDirectArena, 默认是cpu核心数的2倍, 这点咱们能够跟踪构造方法的调用链能够分析到

这样保证了每个线程会有一个独享的arena

咱们看newArenaArray(nDirectArena)这个方法:

private static <T> PoolArena<T>[] newArenaArray(int size) { return new PoolArena[size]; }

这里只是建立了一个数组, 默认长度为nDirectArena

继续跟PooledByteBufAllocator的构造方法, 建立完了数组, 后面在for循环中为数组赋值:

首先经过new PoolArena.DirectArena建立一个DirectArena实例, 而后再为新建立的directArenas数组赋值

再回到PoolThreadLocalCache的构造方法中:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代码省略
} 

方法最后, 建立PoolThreadCache的一个对象, 咱们跟进构造方法中:

PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { //代码省略 //保存成两个成员变量
    this.heapArena = heapArena; this.directArena = directArena; //代码省略
}

这里省略了大段代码, 只须要关注这里将两个值保存在PoolThreadCache的成员变量中

咱们回到newDirectBuffer中:

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }

简单分析的线程局部缓存初始化相关逻辑, 咱们再往下跟:

PoolArena<ByteBuffer> directArena = cache.directArena;

经过上面的分析, 这步咱们应该不陌生, 在PoolThreadCache构造方法中将directArena和heapArena中保存在成员变量中, 这样就能够直接经过cache.directArena这种方式拿到其成员变量的内容

 

从以上逻辑, 咱们能够大概的分析一下流程, 一般会建立和线程数量相等的arena, 并以数组的形式存储在PooledByteBufAllocator的成员变量中, 每个PoolThreadCache建立的时候, 都会在当前线程拿到一个arena, 并保存在自身的成员变量中

5-4-1

PoolThreadCache除了维护了一个arena以外, 还维护了一个缓存列表, 咱们在重复分配ByteBuf的时候, 并不须要每次都经过arena进行分配, 能够直接从缓存列表中拿一个ByteBuf

 

有关缓存列表, 咱们按部就班的往下看:

在PooledByteBufAllocator中维护了三个值:

1.  tinyCacheSize

2.  smallCacheSize

3.  normalCacheSize

tinyCacheSize表明tiny类型的ByteBuf能缓存多少个

smallCacheSize表明small类型的ByteBuf能缓存多少个

normalCacheSize表明normal类型的ByteBuf能缓存多少个

具体tiny类型, small类型, normal是什么意思, 咱们会在后面讲解

 

咱们回到PoolThreadLocalCache类中看其构造方法:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代码省略
} 

咱们看到这三个属性是在PoolThreadCache的构造方法中传入的

这三个属性是经过PooledByteBufAllocator的构造方法中初始化的, 跟随构造方法的调用链会走到这个构造方法:

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE); }

这里仍然调用了一个重载的构造方法, 这里咱们关注这几个参数:

DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE

这里对应着几个静态的成员变量:

private static final int DEFAULT_TINY_CACHE_SIZE; private static final int DEFAULT_SMALL_CACHE_SIZE; private static final int DEFAULT_NORMAL_CACHE_SIZE;

咱们在static块中看其初始化过程:

static{ //代码省略
    DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512); DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256); DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64); //代码省略
}

在这里咱们看到, 这三个属性分别初始化的大小是512, 256, 64, 这三个属性就对应了PooledByteBufAllocator另外的几个成员变量, tinyCacheSize, smallCacheSize, normalCacheSize

也就是说, tiny类型的ByteBuf在每一个缓存中默认缓存的数量是512个, small类型的ByteBuf在每一个缓存中默认缓存的数量是256个, normal类型的ByteBuf在每一个缓存中默认缓存的数量是64个

咱们再到PooledByteBufAllocator中重载的构造方法中:

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { super(preferDirect); threadCache = new PoolThreadLocalCache(); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; //代码省略
}

篇幅缘由, 这里也省略了大段代码, 你们能够经过构造方法参数找到源码中相对的位置进行阅读

咱们关注这段代码:

this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize;

在这里将将参数的DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE的三个值保存到了成员变量tinyCacheSize, smallCacheSize, normalCacheSize

PooledByteBufAllocator中将这三个成员变量初始化以后, 在PoolThreadLocalCache的initialValue方法中就可使用这三个成员变量的值了

咱们再次跟到initialValue方法中:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代码省略
} 

这里就能够在建立PoolThreadCache对象的的构造方法中传入tinyCacheSize, smallCacheSize, normalCacheSize这三个成员变量了

咱们再跟到PoolThreadCache的构造方法中:

PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { //代码省略
    this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize);  normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { //代码省略
 } //代码省略
 ThreadDeathWatcher.watch(thread, freeTask); }

其中tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches就表明了三种类型的缓存数组, 数组元素是MemoryRegionCache类型的对象, MemoryRegionCache就表明一个ByeBuf的缓存

以tinySubPageDirectCaches为例, 咱们看到tiny类型的缓存是经过createSubPageCaches这个方法建立的

这里传入了三个参数tinyCacheSize咱们以前分析过是512, PoolArena.numTinySubpagePools这里是32(这里不一样类型的缓存大小不同, small类型是4, normal类型是3) , SizeClass.Tiny表明其类型是tiny类型

咱们跟到createSubPageCaches这个方法中:

private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0) { //建立数组, 长度为32
        @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { //每个节点是ubPageMemoryRegionCache对象
            cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null; } }

这里首先建立了MemoryRegionCache, 长度是咱们刚才分析过的32

而后经过for循环, 为数组赋值, 赋值的对象是SubPageMemoryRegionCache类型的, SubPageMemoryRegionCache就是MemoryRegionCache类型的子类, 一样也是一个缓存对象, 构造方法中, cacheSize, 就是其中缓存对象的数量, 若是是tiny类型就是512, sizeClass, 表明其类型, 好比tiny, small或者normal

再简单跟到其构造方法:

SubPageMemoryRegionCache(int size, SizeClass sizeClass) { super(size, sizeClass); }

这里调用了父类的构造方法, 咱们继续跟进去:

MemoryRegionCache(int size, SizeClass sizeClass) { //size会进行规格化
     this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); //队列大小
     queue = PlatformDependent.newFixedMpscQueue(this.size); this.sizeClass = sizeClass; }

首先会对其进行规格化, 其实就是查找大于等于当前size的2的幂次方的数, 这里若是是512那么规格化以后仍是512, 而后初始化一个队列, 队列大小就是传入的大小, 若是是tiny, 这里大小就是512

最后并保存其类型

这里咱们不难看出, 其实每一个缓存的对象, 里面是经过一个队列保存的, 有关缓存队列和ByteBuf之间的逻辑, 后面的小节会进行剖析

 

从上面剖析咱们不难看出, PoolThreadCache中维护了三种类型的缓存数组, 每一个缓存数组中的每一个值中, 又经过一个队列进行对象的存储

5-4-2

固然这里只举了Direct类型的对象关系, heap类型其实都是同样的, 这里再也不赘述

这一小节逻辑较为复杂, 同窗们能够本身在源码中跟踪一遍加深印象

 

上一节: 缓冲区分配器

下一节: directArena分配缓冲区概述

相关文章
相关标签/搜索