Netty源码分析第五章: ByteBufhtml
第六节: page级别的内存分配api
前面小节咱们剖析过命中缓存的内存分配逻辑, 前提是若是缓存中有数据, 那么缓存中没有数据, netty是如何开辟一块内存进行内存分配的呢?这一小节带你们进行剖析:数组
剖析以前首先简单介绍netty内存分配的大概数据结构:缓存
以前咱们介绍过, netty内存分配的单位是chunk, 一个chunk的大小是16MB, 实际上每一个chunk, 都以双向链表的形式保存在一个chunkList中, 而多个chunkList, 一样也是双向链表进行关联的, 大概结构以下所示:数据结构
5-7-1函数
在chunkList中, 是根据chunk的内存使用率归到一个chunkList中, 这样, 在内存分配时, 会根据百分比找到相应的chunkList, 在chunkList中选择一个chunk进行内存分配 源码分析
咱们看PoolArena中有关chunkList的成员变量:this
private final PoolChunkList<T> q050; private final PoolChunkList<T> q025; private final PoolChunkList<T> q000; private final PoolChunkList<T> qInit; private final PoolChunkList<T> q075; private final PoolChunkList<T> q100;
这里总共定义了6个chunkList, 并在构造方法将其进行初始化spa
跟到其构造方法中:debug
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) { //代码省略
q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE, chunkSize); q075 = new PoolChunkList<T>(q100, 75, 100, chunkSize); q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize); q025 = new PoolChunkList<T>(q050, 25, 75, chunkSize); q000 = new PoolChunkList<T>(q025, 1, 50, chunkSize); qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25, chunkSize); //用双向链表的方式进行链接
q100.prevList(q075); q075.prevList(q050); q050.prevList(q025); q025.prevList(q000); q000.prevList(null); qInit.prevList(qInit); //代码省略
}
首先经过new PoolChunkList()这种方式将每一个chunkList进行建立, 咱们以 q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize) 为例进行简单的介绍
q075表示当前q50的下一个节点是q075, 刚才咱们讲过ChunkList是经过双向链表进行关联的, 因此这里不难理解
参数50和100表示当前chunkList中存储的chunk的内存使用率都在50%到100%之间, 最后chunkSize为其设置大小
建立完ChunkList以后, 再设置其上一个节点, q050.prevList(q025)为例, 这里表明当前chunkList的上一个节点是q025
以这种方式建立完成以后, chunkList的节点关系变成了以下图所示:
5-7-2
netty中, chunk又包含了多个page, 每一个page的大小为8k, 若是要分配16k的内存, 则在在chunk中找到连续的两个page就能够分配, 对应关系以下:
5-7-3
不少场景下, 为缓冲区分配8k的内存也是一种浪费, 好比只须要分配2k的缓冲区, 若是使用8k会形成6k的浪费, 这种状况, netty又会将page切分红多个subpage, 每一个subpage大小要根据分配的缓冲区大小而指定, 好比要分配2k的内存, 就会将一个page切分红4个subpage, 每一个subpage的大小为2k, 如图:
5-7-4
咱们看PoolSubpage的属性:
final PoolChunk<T> chunk; private final int memoryMapIdx; private final int runOffset; private final int pageSize; private final long[] bitmap; PoolSubpage<T> prev; PoolSubpage<T> next; boolean doNotDestroy; int elemSize;
chunk表明其子页属于哪一个chunk
bitmap用于记录子页的内存分配状况
prev和next, 表明子页是按照双向链表进行关联的, 这里分别指向上一个和下一个节点
elemSize属性, 表明的就是这个子页是按照多大内存进行划分的, 若是按照1k划分, 则能够划分出8个子页
简单介绍了内存分配的数据结构, 咱们开始剖析netty在page级别上分配内存的流程:
咱们回到PoolArena的allocate方法:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { //规格化
final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { int tableIdx; PoolSubpage<T>[] table; //判断是否是tinty
boolean tiny = isTiny(normCapacity); if (tiny) { // < 512 //缓存分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { return; } //经过tinyIdx拿到tableIdx
tableIdx = tinyIdx(normCapacity); //subpage的数组
table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } //拿到对应的节点
final PoolSubpage<T> head = table[tableIdx]; synchronized (head) { final PoolSubpage<T> s = head.next; //默认状况下, head的next也是自身
if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); if (tiny) { allocationsTiny.increment(); } else { allocationsSmall.increment(); } return; } } allocateNormal(buf, reqCapacity, normCapacity); return; } if (normCapacity <= chunkSize) { //首先在缓存上进行内存分配
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { //分配成功, 返回
return; } //分配不成功, 作实际的内存分配
allocateNormal(buf, reqCapacity, normCapacity); } else { //大于这个值, 就不在缓存上分配
allocateHuge(buf, reqCapacity); } }
咱们以前讲过, 若是在缓存中分配不成功, 则会开辟一块连续的内存进行缓冲区分配, 这里咱们先跳过isTinyOrSmall(normCapacity)日后的代码, 下一小节进行分析
首先 if (normCapacity <= chunkSize) 说明其小于16MB, 而后首先在缓存中分配, 由于最初缓存中没有值, 因此会走到allocateNormal(buf, reqCapacity, normCapacity), 这里实际上就是在page级别上进行分配, 分配一个或者多个page的空间
咱们跟进allocateNormal:
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原来的chunk上进行内存分配(1)
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //建立chunk进行内存分配(2)
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3)
c.initBuf(buf, handle, reqCapacity); qInit.add(c); }
这里主要拆解了以下步骤
1. 在原有的chunk中进行分配
2. 建立chunk进行分配
3. 初始化ByteBuf
首先咱们看第一步, 在原有的chunk中进行分配:
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; }
咱们以前讲过, chunkList是存储不一样内存使用量的chunk集合, 每一个chunkList经过双向链表的形式进行关联, 这里的q050.allocate(buf, reqCapacity, normCapacity)就表明首先在q050这个chunkList上进行内存分配
咱们以q050为例进行分析, 跟到q050.allocate(buf, reqCapacity, normCapacity)方法中:
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { if (head == null || normCapacity > maxCapacity) { return false; } //从head节点往下遍历
for (PoolChunk<T> cur = head;;) { long handle = cur.allocate(normCapacity); if (handle < 0) { cur = cur.next; if (cur == null) { return false; } } else { cur.initBuf(buf, handle, reqCapacity); if (cur.usage() >= maxUsage) { remove(cur); nextList.add(cur); } return true; } } }
首先会从head节点往下遍历
long handle = cur.allocate(normCapacity) 表示对于每一个chunk, 都尝试去分配
if (handle < 0) 说明没有分配到, 则经过cur = cur.next找到下一个节点继续进行分配, 咱们讲过chunk也是经过双向链表进行关联的, 因此对这块逻辑应该不会陌生
若是handle大于0说明已经分配到了内存, 则经过cur.initBuf(buf, handle, reqCapacity)对byteBuf进行初始化
if (cur.usage() >= maxUsage) 表明当前chunk的内存使用率大于其最大使用率, 则经过remove(cur)从当前的chunkList中移除, 再经过nextList.add(cur)添加到下一个chunkList中
咱们再回到PoolArena的allocateNormal方法中:
咱们看第二步PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize)
这里的参数pageSize是8192, 也就是8k
maxOrder为11
pageShifts为13, 2的13次方正好是8192, 也就是8k
chunkSize为16777216, 也就是16MB
这里的参数值能够经过debug的方式跟踪到
由于咱们的示例是堆外内存, newChunk(pageSize, maxOrder, pageShifts, chunkSize)因此会走到DirectArena的newChunk方法中:
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { return new PoolChunk<ByteBuffer>( this, allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize); }
这里直接经过构造函数建立了一个chunk
allocateDirect(chunkSize)这里是经过jdk的api的申请了一块直接内存, 咱们跟到PoolChunk的构造函数中:
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) { unpooled = false; this.arena = arena; //memeory为一个ByteBuf
this.memory = memory; //8k
this.pageSize = pageSize; //13
this.pageShifts = pageShifts; //11
this.maxOrder = maxOrder; this.chunkSize = chunkSize; unusable = (byte) (maxOrder + 1); log2ChunkSize = log2(chunkSize); subpageOverflowMask = ~(pageSize - 1); freeBytes = chunkSize; assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder; maxSubpageAllocs = 1 << maxOrder; //节点数量为4096
memoryMap = new byte[maxSubpageAllocs << 1]; //也是4096个节点
depthMap = new byte[memoryMap.length]; int memoryMapIndex = 1; //d至关于一个深度, 赋值的内容表明当前节点的深度
for (int d = 0; d <= maxOrder; ++ d) { int depth = 1 << d; for (int p = 0; p < depth; ++ p) { memoryMap[memoryMapIndex] = (byte) d; depthMap[memoryMapIndex] = (byte) d; memoryMapIndex ++; } } subpages = newSubpageArray(maxSubpageAllocs); }
首先将参数传入的值进行赋值
this.memory = memory 就是将参数中建立的堆外内存进行保存, 就是chunk所指向的那块连续的内存, 在这个chunk中所分配的ByteBuf, 都会在这块内存中进行读写
咱们重点关注 memoryMap = new byte[maxSubpageAllocs << 1] 和 depthMap = new byte[memoryMap.length] 这两步
首先看 memoryMap = new byte[maxSubpageAllocs << 1]
这里初始化了一个字节数组memoryMap, 大小为maxSubpageAllocs << 1, 也就是4096
depthMap = new byte[memoryMap.length] 一样也是初始化了一个字节数组, 大小为memoryMap的大小, 也就是4096
继续往下分析以前, 咱们看chunk的一个层级关系
5-7-5
这是一个二叉树的结构, 左侧的数字表明层级, 右侧表明一块连续的内存, 每一个父节点下又拆分红多个子节点, 最顶层表示的内存范围为0-16MB, 其又下分为两层, 范围为0-8MB, 8-16MB, 以此类推, 最后到11层, 以8k的大小划分, 也就是一个page的大小
若是咱们分配一个8mb的缓冲区, 则会将第二层的第一个节点, 也就是0-8这个连续的内存进行分配, 分配完成以后, 会将这个节点设置为不可用, 具体逻辑后面会讲解
结合上面的图, 咱们再看构造方法中的for循环:
for (int d = 0; d <= maxOrder; ++ d) { int depth = 1 << d; for (int p = 0; p < depth; ++ p) { memoryMap[memoryMapIndex] = (byte) d; depthMap[memoryMapIndex] = (byte) d; memoryMapIndex ++; } }
实际上这个for循环就是将上面的结构包装成一个字节数组memoryMap, 外层循环用于控制层数, 内层循环用于控制里面每层的节点, 这里通过循环以后, memoryMap和depthMap内容为如下表现形式:
[0, 0, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4...........]
这里注意一下, 由于程序中数组的下标是从1开始设置的, 因此第零个节点元素为默认值0
这里数字表明层级, 同时也表明了当前层级的节点, 相同的数字个数就是这一层级的节点数
其中0为2个(由于这里分配时下标是从1开始的, 因此第0个位置是默认值0, 实际上第零层元素只有一个, 就是头结点), 1为2个, 2为4个, 3为8个, 4为16个, n为2的n次方个, 直到11, 也就是11有2的11次方个
咱们再回到PoolArena的allocateNormal方法中:
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原来的chunk上进行内存分配(1)
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //建立chunk进行内存分配(2)
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3)
c.initBuf(buf, handle, reqCapacity); qInit.add(c); }
咱们继续剖析 long handle = c.allocate(normCapacity) 这步
跟到allocate(normCapacity)中:
long allocate(int normCapacity) { if ((normCapacity & subpageOverflowMask) != 0) { return allocateRun(normCapacity); } else { return allocateSubpage(normCapacity); } }
若是分配是以page为单位, 则走到allocateRun(normCapacity)方法中, 跟进去:
private long allocateRun(int normCapacity) { int d = maxOrder - (log2(normCapacity) - pageShifts); int id = allocateNode(d); if (id < 0) { return id; } freeBytes -= runLength(id); return id; }
int d = maxOrder - (log2(normCapacity) - pageShifts) 表示根据normCapacity计算出图5-8-5中的第几层
int id = allocateNode(d) 表示根据层级关系, 去分配一个节点, 其中id表明memoryMap中的下标
咱们跟到allocateNode方法中:
private int allocateNode(int d) { //下标初始值为1
int id = 1; //表明当前层级第一个节点的初始下标
int initial = - (1 << d); //获取第一个节点的值
byte val = value(id); //若是值大于层级, 说明chunk不可用
if (val > d) { return -1; } //当前下标对应的节点值若是小于层级, 或者当前下标小于层级的初始下标
while (val < d || (id & initial) == 0) { //当前下标乘以2, 表明下当前节点的子节点的起始位置
id <<= 1; //得到id位置的值
val = value(id); //若是当前节点值大于层数(节点不可用)
if (val > d) { //id为偶数则+1, id为奇数则-1(拿的是其兄弟节点)
id ^= 1; //获取id的值
val = value(id); } } byte value = value(id); assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d", value, id & initial, d); //将找到的节点设置为不可用
setValue(id, unusable); //逐层往上标记被使用
updateParentsAlloc(id); return id; }
这里是其实是从第一个节点往下找, 找到层级为d未被使用的节点, 咱们能够经过注释体会其逻辑
找到相关节点后经过setValue将当前节点设置为不可用, 其中id是当前节点的下标, unusable表明一个不可用的值, 这里是12, 由于咱们的层级只有12层, 因此设置为12以后就至关于标记不可用
设置成不可用以后, 经过updateParentsAlloc(id)逐层设置为被使用
咱们跟进updateParentsAlloc方法:
private void updateParentsAlloc(int id) { while (id > 1) { //取到当前节点的父节点的id
int parentId = id >>> 1; //获取当前节点的值
byte val1 = value(id); //找到当前节点的兄弟节点
byte val2 = value(id ^ 1); //若是当前节点值小于兄弟节点, 则保存当前节点值到val, 不然, 保存兄弟节点值到val //若是当前节点是不可用, 则当前节点值是12, 大于兄弟节点的值, 因此这里将兄弟节点的值进行保存
byte val = val1 < val2 ? val1 : val2; //将val的值设置为父节点下标所对应的值
setValue(parentId, val); //id设置为父节点id, 继续循环
id = parentId; } }
这里实际上是将循环将兄弟节点的值替换成父节点的值, 咱们能够经过注释仔细的进行逻辑分析
若是实在理解有困难, 我经过画图帮助你们理解:
简单起见, 咱们这里只设置三层:
5-7-6
这里咱们模拟其分配场景, 假设只有三层, 其中index表明数组memoryMap的下标, value表明其值, memoryMap中的值就为[0, 0, 1, 1, 2, 2, 2, 2]
咱们要分配一个4MB的byteBuf, 在咱们调用allocateNode(int d)中传入的d是2, 也就是第二层
根据咱们上面分分析的逻辑这里会找到第二层的第一个节点, 也就是0-4mb这个节点, 找到以后将其设置为不可用, 这样memoryMap中的值就为[0, 0, 1, 1, 12, 2, 2, 2]
二叉树的结构就会变为:
5-7-7
注意标红部分, 将index为4的节点设置为了避免可用
将这个节点设置为不可用以后, 则会将进行向上设置不可用, 循环将兄弟节点数值较小的节点替换到父节点, 也就是将index为2的节点的值替换成了index的为5的节点的值, 这样数组的值就会变为[0, 1, 2, 1, 12, 2, 2, 2]
二叉树的结构变为:
5-7-8
注意, 这里节点标红仅仅表明节点变化, 并非当前节点为不可用状态, 真正不可用状态的判断依据是value值为12
这样, 若是再次分配一个4MB内存的ByteBuf, 根据其逻辑, 则会找到第二层的第二个节点, 也就是4-8MB
再根据咱们的逻辑, 经过向上设置不可用, index为2就会设置成不可用状态, 将value的值设置为12, 数组数值变为[0, 1, 12, 1, 12, 12, 2, 2]二叉树以下图所示:
5-7-9
这样咱们看到, 经过分配两个4mb的byteBuf以后, 当前节点和其父节点都会设置成不可用状态, 当index=2的节点设置为不可用以后, 将不会再找这个节点下的子节点
以此类推, 直到全部的内存分配完毕的时候, index为1的节点, 也会变成不可用状态, 这样全部的page就分配完毕, chunk中再无可用节点
咱们再回到PoolArena的allocateNormal方法中:
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原来的chunk上进行内存分配(1)
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //建立chunk进行内存分配(2)
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3)
c.initBuf(buf, handle, reqCapacity); qInit.add(c); }
经过以上逻辑咱们知道, long handle = c.allocate(normCapacity)这一步, 其实返回的就是memoryMap的一个下标, 经过这个下标, 咱们能惟一的定位一块内存
继续往下跟, 经过c.initBuf(buf, handle, reqCapacity)初始化ByteBuf以后, 经过qInit.add(c)将新建立的chunk添加到chunkList中
咱们跟到initBuf方法中去:
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) { int memoryMapIdx = memoryMapIdx(handle); int bitmapIdx = bitmapIdx(handle); if (bitmapIdx == 0) { byte val = value(memoryMapIdx); assert val == unusable : String.valueOf(val); buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache()); } else { initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); } }
这里经过memoryMapIdx(handle)找到memoryMap的下标, 其实就是handle的值
bitmapIdx(handle)是有关subPage中使用到的逻辑, 若是是page级别的分配, 这里只返回0, 因此进入到if块中
if中首先断言当前节点是否是不可用状态, 而后经过init方法进行初始化
其中runOffset(memoryMapIdx)表示偏移量, 偏移量至关于分配给缓冲区的这块内存相对于chunk中申请的内存的首地址偏移了多少
参数runLength(memoryMapIdx), 表示根据下标获取可分配的最大长度
咱们跟到init中, 这里会走到PooledByteBuf的init方法中:
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { //初始化
assert handle >= 0; assert chunk != null; //在哪一块内存上进行分配的
this.chunk = chunk; //这一块内存上的哪一块连续内存
this.handle = handle; memory = chunk.memory; this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; this.cache = cache; }
这里又是咱们熟悉的部分, 将属性进行了初始化
以上就是完整的DirectUnsafePooledByteBuf在page级别的完整分配的流程, 逻辑也是很是的复杂, 想真正的掌握熟练, 也须要多下功夫进行调试和剖析