netty Pooled字节buf分配器

netty 字节buf定义:[url]http://donald-draper.iteye.com/blog/2393813[/url]
netty 资源泄漏探测器:[url]http://donald-draper.iteye.com/blog/2393940[/url]
netty 抽象字节buf解析:[url]http://donald-draper.iteye.com/blog/2394078[/url]
netty 抽象字节buf引用计数器:[url]http://donald-draper.iteye.com/blog/2394109[/url]
netty 复合buf概念:[url]http://donald-draper.iteye.com/blog/2394408[/url]
netty 抽象字节buf分配器:[url]http://donald-draper.iteye.com/blog/2394419[/url]
netty Unpooled字节buf分配器:[url]http://donald-draper.iteye.com/blog/2394619[/url
[size=medium][b]引言:[/b][/size]
上一篇文章咱们看了,Unpooled字节buf分配器,先来回顾一下:
非池类堆字节buf,实际为一个字节数组,直接在Java虚拟机堆内存中,分配字节缓存;非池类Direct buf,实际为一个nio 字节buf,从操做系统实际物理内存中,分配字节缓存。Unpooled建立字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。
今天来看一下Pooled字节buf非配器,不作深刻研究,仅仅窥探一下,因为本人当前能力有限,只能简单看一下,因为Pooled字节分配器可能涉及到不少内存相关的概念,能够参考一下链接:

深刻浅出Netty内存管理PoolChunk:[url]http://blog.jobbole.com/106001/[/url]
Netty4 中的内存管理:[url]http://www.cnblogs.com/ungshow/p/3541737.html[/url]
Netty5源码学习之buffer篇(一)PooledHeapByteBuf [url]:https://yq.aliyun.com/articles/55623[/url]
Netty系列之Netty百万级推送服务设计要点:[url]http://www.infoq.com/cn/articles/netty-million-level-push-service-design-points/[/url]

关于内存分配策略能够参考这篇文章,
[url]http://www.360doc.com/content/13/0915/09/8363527_314549128.shtml[/url]
如今对内存这段研究的不交少,有兴趣的能够先在国内搜索一些内存的相关策略,
在去国外搜索具体内存分配策略相关的论文或专业解析,因为本人尚未深刻到操做系统分配这一块,
有时间研究一下,但愿这不是接口。
下面两篇文章是具体的应用和内存分配策略比较,虽然有点粗糙,重要的是咱们要吸取精华部分,
扯远了,扯了蛋了,疼...
浅谈redis采用不一样内存分配器tcmalloc和jemalloc:[url]http://www.jb51.net/article/100575.htm[/url]
jemalloc优化MySQL、Nginx内存管理:[url]https://blog.linuxeye.cn/356.html[/url]

PooledByteBufAllocator:分配heap、direct buffer
PoolArena:一块逻辑上的内存池,用来管理和组织buffer的,内部数据结构较复杂。
PoolChunk: 管理实际的底层内存,内部已内存Page组成
默认状况下,Page的大小为4KB,有三类,small、large和huge。small类的内存请求都属于一个内存页以内 。另外,在small类里面,又分了三个子类,分别是Tiny、Quantum-Spaced和Sub-page。

看了上面的文章,简单理一下,咱们使用内存,实际为机器内存的Memory Mapping Region区域,
PoolArena能够理解为mmap中内存分配区,分配区由内存块PoolChunk组成,内存块之内存Page管理内存,Page的大小为4KB,有三类,small、large和huge。small类的内存请求都属于一个内存页以内 。另外,在small类里面,又分了三个子类,分别是Tiny、Quantum-Spaced和Sub-page。

来看Pooled 字节buf分配器
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
private static final int DEFAULT_NUM_HEAP_ARENA;
private static final int DEFAULT_NUM_DIRECT_ARENA;

private static final int DEFAULT_PAGE_SIZE;
private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
private static final int DEFAULT_TINY_CACHE_SIZE;//默认 tiny buf 缓存size
private static final int DEFAULT_SMALL_CACHE_SIZE;//默认 small buf 缓存size
private static final int DEFAULT_NORMAL_CACHE_SIZE;//默认正常buf 缓存size
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;//是否为全部线程使用buf缓存
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;

private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);

static {
//获取默认内存页size
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
Throwable pageSizeFallbackCause = null;
try {
validateAndCalculatePageShifts(defaultPageSize);
} catch (Throwable t) {
pageSizeFallbackCause = t;
defaultPageSize = 8192;
}
DEFAULT_PAGE_SIZE = defaultPageSize;

int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
Throwable maxOrderFallbackCause = null;
try {
validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
} catch (Throwable t) {
maxOrderFallbackCause = t;
defaultMaxOrder = 11;
}
DEFAULT_MAX_ORDER = defaultMaxOrder;

// Determine reasonable default for nHeapArena and nDirectArena.
// Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
final Runtime runtime = Runtime.getRuntime();

/*
* We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
* number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
* allocation and de-allocation needs to be synchronized on the PoolArena.
*
* See https://github.com/netty/netty/issues/3888.
*/
//最小内存分配区的数量,默认最小缓冲buf数量为处理器的2倍
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(
defaultMinNumArena,
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));

// cache sizes 默认tiny为512,small为256,normal为64
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);

// 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
// 'Scalable memory allocation using jemalloc',默认最大缓存容量为32kb
DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
"io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);

// the number of threshold of allocations when cached entries will be freed up if not frequently used
//分配次数阈值,能够能够理解当缓存多久不用时,释放
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192);
//默认开启线程buf缓存
DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
"io.netty.allocator.useCacheForAllThreads", true);

DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
"io.netty.allocator.directMemoryCacheAlignment", 0);

if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
if (pageSizeFallbackCause == null) {
logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
} else {
logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
}
if (maxOrderFallbackCause == null) {
logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
} else {
logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
}
logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
}
}
public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

private final PoolArena<byte[]>[] heapArenas;//堆buf分配区
private final PoolArena<ByteBuffer>[] directArenas;//direct buf缓存分配区
private final int tinyCacheSize;//tiny buf 缓存size
private final int smallCacheSize;//small buf 缓存size
private final int normalCacheSize;//正常buf 缓存size
private final List<PoolArenaMetric> heapArenaMetrics;//堆buf分配区度量器
private final List<PoolArenaMetric> directArenaMetrics;//direct buf分配区度量器
private final PoolThreadLocalCache threadCache;//线程本地字节buf缓存
private final int chunkSize;//分配区内存块size
private final PooledByteBufAllocatorMetric metric;//buf 分配器,度量器
public PooledByteBufAllocator() {
this(false);
}

@SuppressWarnings("deprecation")
public PooledByteBufAllocator(boolean preferDirect) {
this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
}
...
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
//线程buf 缓存为PoolThreadLocalCache
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

if (nHeapArena < 0) {
throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: >= 0)");
}
if (nDirectArena < 0) {
throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)");
}

if (directMemoryCacheAlignment < 0) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: >= 0)");
}
if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
}

if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: power of two)");
}

int pageShifts = validateAndCalculatePageShifts(pageSize);

if (nHeapArena > 0) {
//建立堆buf 分配区
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
//包装pool缓存为PoolArena.HeapArena
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
//添加pool 堆buf 分配区到堆分配区度量集
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}

if (nDirectArena > 0) {
//建立direct buf 分配区
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
//包装pool缓存为PoolArena.DirectArena
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
//添加pool direct buf 分配区到堆分配区度量集
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
metric = new PooledByteBufAllocatorMetric(this);
}
}

//建立内存分配区
@SuppressWarnings("unchecked")
private static <T> PoolArena<T>[] newArenaArray(int size) {
return new PoolArena[size];
}

从上面能够看出Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区,每一个Region的内存块size为chunkSize,每一个内存块内存页大小,默认为8k。
来看建立堆buf:

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;

final ByteBuf buf;
if (heapArena != null) {
//从堆bu分配区,建立一个堆buf
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
//追踪字节buf资源泄漏状况
return toLeakAwareBuffer(buf);
}


来看从堆分配区获取堆buf,PoolArena同时为Pool buf分配区量器,获取buf,实际是从PoolThreadCache中获取
abstract class PoolArena<T> implements PoolArenaMetric {
static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
...
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);//建立Pooled buf
allocate(cache, buf, reqCapacity);//从缓冲获取堆buf
return buf;
}
//建立Pooled buf,待子类扩展
protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
//从缓冲获取堆buf
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
//容量小于页size,即tiny 或small buf
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;//sub page pool

boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
...
}
//正常bufsize
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
}
...
}
//堆buf分配区
static final class HeapArena extends PoolArena<byte[]> {

HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
}
//建立Pooled堆buf
@Override
protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
: PooledHeapByteBuf.newInstance(maxCapacity);
}
...
}
//direct buf 缓存
static final class DirectArena extends PoolArena<ByteBuffer> {

DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
}
//建立Pooled Direct buf
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}

}
...
}
}

//PoolArenaMetric
public interface PoolArenaMetric {

/**
* Returns the number of thread caches backed by this arena.
返回缓存此arena的线程数
*/
int numThreadCaches();
/**
* Returns the number of tiny sub-pages for the arena.
tiny sub page数量
*/
int numTinySubpages();
/**
* Returns the number of small sub-pages for the arena.
small sub page数量
*/
int numSmallSubpages();
/**
* Returns the number of chunk lists for the arena.
分配区,内存块数量
*/
int numChunkLists();
...
}

从上面来看,PoolArena根据容量来决定建立tiny,small仍是Normal buf:
咱们以Normal为例:
从线程本地缓存获取buf
//PoolThreadCache
final class PoolThreadCache {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);

final PoolArena<byte[]> heapArena;//堆分配区
final PoolArena<ByteBuffer> directArena;//direct buf分配区
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;//tiny subpage 堆缓存
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;//tiny subpage direct缓存
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;//small subpage 堆缓存
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;//small subpage direct缓存
private final MemoryRegionCache<byte[]>[] normalHeapCaches;//normal subpage 堆缓存
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;//normal subpage direct缓存

// Used for bitshifting when calculate the index of normal caches later
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
private final int freeSweepAllocationThreshold;

private final Thread deathWatchThread;
private final Runnable freeTask;

private int allocations;
...
/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
//从内存域缓存,建立buf
return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}
//获取对应的缓存region
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
if (area.isDirect()) {
int idx = log2(normCapacity >> numShiftsNormalDirect);
return cache(normalDirectCaches, idx);
}
int idx = log2(normCapacity >> numShiftsNormalHeap);
return cache(normalHeapCaches, idx);
}

private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache[idx];
}
//从内存域缓存,建立buf
@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}

//内存region缓存
private abstract static class MemoryRegionCache<T> {
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private int allocations;
/**
* Allocate something out of the cache if possible and remove the entry from the cache.
*/
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
//从缓冲buf队列poll一个buf
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
entry.recycle();

// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}
/**
* Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.

*/
protected abstract void initBuf(PoolChunk<T> chunk, long handle,
PooledByteBuf<T> buf, int reqCapacity);
...
}
}
/**
* Cache used for buffers which are backed by NORMAL size.
正常size buf的内存Region 缓存
*/
private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
NormalMemoryRegionCache(int size) {
super(size, SizeClass.Normal);
}
//初始化Pooled字节buf
@Override
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBuf(buf, handle, reqCapacity);
}
}
...
}



//PoolChunk,内存块
final class PoolChunk<T> implements PoolChunkMetric {

private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;

final PoolArena<T> arena;//关联缓存池
final T memory;
final boolean unpooled;
final int offset;

private final byte[] memoryMap;
private final byte[] depthMap;
private final PoolSubpage<T>[] subpages;//内存页
/** Used to determine if the requested capacity is equal to or greater than pageSize. */
private final int subpageOverflowMask;
private final int pageSize;//内存页size
private final int pageShifts;
private final int maxOrder;
private final int chunkSize;
private final int log2ChunkSize;
private final int maxSubpageAllocs;
/** Used to mark memory as unusable */
private final byte unusable;

private int freeBytes;

PoolChunkList<T> parent;
PoolChunk<T> prev;
PoolChunk<T> next;
//初始化Pooled字节buf
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
} else {
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
}
...
}


//PooledByteBuf
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {

private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;

protected PoolChunk<T> chunk;//内存块
protected long handle;
protected T memory;
protected int offset;
protected int length;
int maxLength;
PoolThreadCache cache;
private ByteBuffer tmpNioBuf;
private ByteBufAllocator allocator;
...
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, handle, offset, length, maxLength, cache);
}

void initUnpooled(PoolChunk<T> chunk, int length) {
init0(chunk, 0, chunk.offset, length, length, null);
}

private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;

this.chunk = chunk;
memory = chunk.memory;
allocator = chunk.arena.parent;
this.cache = cache;
this.handle = handle;
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
tmpNioBuf = null;
}
}

再来看分配其余两种分配tiny和small:
/**
* Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}
//获取tiny内存域缓存
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
//获取Small内存域缓存
private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.smallIdx(normCapacity);
if (area.isDirect()) {
return cache(smallSubPageDirectCaches, idx);
}
return cache(smallSubPageHeapCaches, idx);
}
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache[idx];
}

/**
* Cache used for buffers which are backed by TINY or SMALL size.
*/
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}

@Override
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, handle, reqCapacity);
}
}



再来看建立direct类型buf:
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;

final ByteBuf buf;
if (directArena != null) {
//从direct分配区,分配一个direct buf
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
//追踪字节buf资源泄漏状况
return toLeakAwareBuffer(buf);
}

这个思路与建立堆buf思路一致。

咱们来简单看一Pooled 堆和direct buf

先来看堆分配区和direct分配区,分配buf
//堆buf分配区
static final class HeapArena extends PoolArena<byte[]> {

HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
}
//建立Pooled堆buf
@Override
protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
: PooledHeapByteBuf.newInstance(maxCapacity);
}
...
}

//direct buf 分配区
static final class DirectArena extends PoolArena<ByteBuffer> {

DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
}
//建立Pooled Direct buf
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}

}
...
}

从上面能够看出Pool 堆buf为,PooledUnsafeHeapByteBuf、PooledHeapByteBuf
direct buf为PooledUnsafeDirectByteBuf、PooledDirectByteBuf。
咱们分别来简单看一下这四种buf:

//PooledUnsafeHeapByteBuf
final class PooledUnsafeHeapByteBuf extends PooledHeapByteBuf {

private static final Recycler<PooledUnsafeHeapByteBuf> RECYCLER = new Recycler<PooledUnsafeHeapByteBuf>() {
@Override//建立buf
protected PooledUnsafeHeapByteBuf newObject(Handle<PooledUnsafeHeapByteBuf> handle) {
return new PooledUnsafeHeapByteBuf(handle, 0);
}
};

static PooledUnsafeHeapByteBuf newUnsafeInstance(int maxCapacity) {
PooledUnsafeHeapByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
buf.reuse(maxCapacity);//重置Pooled字节buf
return buf;
}
....
}


//PooledHeapByteBuf
class PooledHeapByteBuf extends PooledByteBuf<byte[]> {

private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
@Override//建立buf
protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);
}
};

static PooledHeapByteBuf newInstance(int maxCapacity) {
PooledHeapByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
buf.reuse(maxCapacity);//重置Pooled字节buf
return buf;
}
...
}


//PooledUnsafeDirectByteBuf
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
@Override//建立buf
protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
return new PooledUnsafeDirectByteBuf(handle, 0);
}
};

static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
PooledUnsafeDirectByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
buf.reuse(maxCapacity);//重置Pooled字节buf
return buf;
}
...
}


//PooledDirectByteBuf
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {

private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
@Override//建立buf
protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
return new PooledDirectByteBuf(handle, 0);
}
};

static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
buf.reuse(maxCapacity);//重置Pooled字节buf
return buf;
}
...
}



//PooledByteBuf
重用Pooled字节buf以前,必须调用#reuse方法
/**
* Method must be called before reuse this {@link PooledByteBufAllocator}
*/
final void reuse(int maxCapacity) {
maxCapacity(maxCapacity);
setRefCnt(1);//重置引用计数器
setIndex0(0, 0);//重置读写索引
discardMarks();//丢弃读写索引标记
}


在简单看一下字节buf内存的回收器Recycler

/**
* Light-weight object pool based on a thread-local stack.
*
* @param <T> the type of the pooled object
*/
public abstract class Recycler<T> {
...
//线程本地对象栈
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
ratioMask, maxDelayedQueuesPerThread);
}
};
@SuppressWarnings("unchecked")
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
//获取线程本地对象栈
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();//从对象栈中获取对象handle
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
static final class Stack<T> {
final Recycler<T> parent;
final Thread thread;
final AtomicInteger availableSharedCapacity;
final int maxDelayedQueues;

private final int maxCapacity;
private final int ratioMask;
private DefaultHandle<?>[] elements;
private int size;
private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
private WeakOrderQueue cursor, prev;
private volatile WeakOrderQueue head;
//建立对象Hanlde
DefaultHandle<T> newHandle() {
return new DefaultHandle<T>(this);
}
}
//建立对象
protected abstract T newObject(Handle<T> handle);
...
}


从上面能够看出,Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将
对象放在一个线程本地栈中管理。


再来简单看一下线程本地buf缓存池:
//线程buf 缓存为PoolThreadLocalCache
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);


//线程本地buf缓存
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final boolean useCacheForAllThreads;

PoolThreadLocalCache(boolean useCacheForAllThreads) {
this.useCacheForAllThreads = useCacheForAllThreads;
}
//初始化线程本地buf缓存
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
// No caching for non FastThreadLocalThreads.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
//获取最少被线程使用的buf 缓存
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}

return minArena;
}
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}


}


再来简单看一下buf缓存池度量器:
metric = new PooledByteBufAllocatorMetric(this);

/**
* Exposed metric for {@link PooledByteBufAllocator}.
*/
@SuppressWarnings("deprecation")
public final class PooledByteBufAllocatorMetric implements ByteBufAllocatorMetric {
private final PooledByteBufAllocator allocator;

PooledByteBufAllocatorMetric(PooledByteBufAllocator allocator) {
this.allocator = allocator;
}
/**
* Return the number of heap arenas.
返回堆缓存计数器
*/
public int numHeapArenas() {
return allocator.numHeapArenas();
}
/**
* Return the number of direct arenas.
返回direct缓存计数器
*/
public int numDirectArenas() {
return allocator.numDirectArenas();
}
/**
* Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool.
堆buf缓存度量器
*/
public List<PoolArenaMetric> heapArenas() {
return allocator.heapArenas();
}
/**
* Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool.
direct buf缓存度量器
*/
public List<PoolArenaMetric> directArenas() {
return allocator.directArenas();
}
/**
* Return the number of thread local caches used by this {@link PooledByteBufAllocator}.
使用此Pooled字节分配器的线程本地缓存数
*/
public int numThreadLocalCaches() {
return allocator.numThreadLocalCaches();
}
/**
* Return the size of the tiny cache.
tiny缓存大小
*/
public int tinyCacheSize() {
return allocator.tinyCacheSize();
}
/**
* Return the size of the small cache.
small缓存大小
*/
public int smallCacheSize() {
return allocator.smallCacheSize();
}
/**
* Return the size of the normal cache.
normal缓存大小
*/
public int normalCacheSize() {
return allocator.normalCacheSize();
}
/**
* Return the chunk size for an arena.
*/
public int chunkSize() {
return allocator.chunkSize();
}
//堆内存使用量
@Override
public long usedHeapMemory() {
return allocator.usedHeapMemory();
}
//direct内存使用量
@Override
public long usedDirectMemory() {
return allocator.usedDirectMemory();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(256);
sb.append(StringUtil.simpleClassName(this))
.append("(usedHeapMemory: ").append(usedHeapMemory())
.append("; usedDirectMemory: ").append(usedDirectMemory())
.append("; numHeapArenas: ").append(numHeapArenas())
.append("; numDirectArenas: ").append(numDirectArenas())
.append("; tinyCacheSize: ").append(tinyCacheSize())
.append("; smallCacheSize: ").append(smallCacheSize())
.append("; normalCacheSize: ").append(normalCacheSize())
.append("; numThreadLocalCaches: ").append(numThreadLocalCaches())
.append("; chunkSize: ").append(chunkSize()).append(')');
return sb.toString();
}
}


再来看Pooledd字节分配器的其余方法:
//PooledByteBufAllocator
/** * Return the number of heap arenas. * * @deprecated use {@link PooledByteBufAllocatorMetric#numHeapArenas()}. */@Deprecatedpublic int numHeapArenas() {    return heapArenaMetrics.size();}/** * Return the number of direct arenas. * * @deprecated use {@link PooledByteBufAllocatorMetric#numDirectArenas()}. */@Deprecatedpublic int numDirectArenas() {    return directArenaMetrics.size();}/** * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool. * * @deprecated use {@link PooledByteBufAllocatorMetric#heapArenas()}. */@Deprecatedpublic List<PoolArenaMetric> heapArenas() {    return heapArenaMetrics;}/** * Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool. * * @deprecated use {@link PooledByteBufAllocatorMetric#directArenas()}. */@Deprecatedpublic List<PoolArenaMetric> directArenas() {    return directArenaMetrics;}/** * Return the number of thread local caches used by this {@link PooledByteBufAllocator}. * * @deprecated use {@link PooledByteBufAllocatorMetric#numThreadLocalCaches()}. */@Deprecatedpublic int numThreadLocalCaches() {    PoolArena<?>[] arenas = heapArenas != null ? heapArenas : directArenas;    if (arenas == null) {        return 0;    }    int total = 0;    for (PoolArena<?> arena : arenas) {        total += arena.numThreadCaches.get();    }    return total;}/** * Return the size of the tiny cache. * * @deprecated use {@link PooledByteBufAllocatorMetric#tinyCacheSize()}. */@Deprecatedpublic int tinyCacheSize() {    return tinyCacheSize;}/** * Return the size of the small cache. * * @deprecated use {@link PooledByteBufAllocatorMetric#smallCacheSize()}. */@Deprecatedpublic int smallCacheSize() {    return smallCacheSize;}/** * Return the size of the normal cache. * * @deprecated use {@link PooledByteBufAllocatorMetric#normalCacheSize()}. */@Deprecatedpublic int normalCacheSize() {    return normalCacheSize;}/** * Return the chunk size for an arena. * * @deprecated use {@link PooledByteBufAllocatorMetric#chunkSize()}. */@Deprecatedpublic final int chunkSize() {    return chunkSize;}final long usedHeapMemory() {    return usedMemory(heapArenas);}final long usedDirectMemory() {    return usedMemory(directArenas);}private static long usedMemory(PoolArena<?>... arenas) {    if (arenas == null) {        return -1;    }    long used = 0;    for (PoolArena<?> arena : arenas) {        used += arena.numActiveBytes();        if (used < 0) {            return Long.MAX_VALUE;        }    }    return used;}final PoolThreadCache threadCache() {    return threadCache.get();}/** * Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive * and so should not called too frequently. */public String dumpStats() {    int heapArenasLen = heapArenas == null ? 0 : heapArenas.length;    StringBuilder buf = new StringBuilder(512)            .append(heapArenasLen)            .append(" heap arena(s):")            .append(StringUtil.NEWLINE);    if (heapArenasLen > 0) {        for (PoolArena<byte[]> a: heapArenas) {            buf.append(a);        }    }    int directArenasLen = directArenas == null ? 0 : directArenas.length;    buf.append(directArenasLen)       .append(" direct arena(s):")       .append(StringUtil.NEWLINE);    if (directArenasLen > 0) {        for (PoolArena<ByteBuffer> a: directArenas) {            buf.append(a);        }    }    return buf.toString();}
[size=medium][b]总结:[/b][/size] [color=blue]Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区(PoolArena),每一个Region的内存块(PoolChunk)size为chunkSize,每一个内存块内存页(PoolSubpage)大小,默认为8k。Pooled 堆buf是基于字节数组,而direct buf是基于nio 字节buf。Pooled字节分配器分配heap和direct buf时,首先获取线程本地buf缓存PoolThreadCache,从buf获取对应的heap或direct分配区,分配区建立buf(PooledByteBuf),而后将buf放到内存块中管理,根据buf的容量,将放到相应tiny,small,normal Memory Region Cache(MemoryRegionCache)中。每一个Pooled buf经过内存的Recycler,重用buf。Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将对象放在一个线程本地栈中管理。 [/color]