Netty内存池之PoolThreadCache详解

       PoolThreadCahche是Netty内存管理中可以实现高效内存申请和释放的一个重要缘由,Netty会为每个线程都维护一个PoolThreadCache对象,当进行内存申请时,首先会尝试从PoolThreadCache中申请,若是没法从中申请到,则会尝试从Netty的公共内存池中申请。本文首先会对PoolThreadCache的数据结构进行讲解,而后会介绍Netty是如何初始化PoolThreadCache的,最后会介绍如何在PoolThreadCache中申请内存和如何将内存释放到PoolThreadCache中。java

1. PoolThreadCache数据结构

       PoolThreadCache的数据结构与PoolArena的主要属性结构很是类似,但细微位置有很大的不一样。在PoolThreadCache中,其维护了三个数组(咱们以直接内存的缓存方式为例进行讲解),以下所示:数组

// 存储tiny类型的内存缓存,该数组长度为32,其中只有下标为1~31的元素缓存了有效数据,第0号位空置。
// 这里内存大小的存储方式也与PoolSubpage相似,数组的每一号元素都存储了不一样等级的内存块,每一个等级的
// 内存块的内存大小差值为16byte,好比第1号位维护了大小为16byte的内存块,第二号为维护了大小为32byte的
// 内存块,依次类推,第31号位维护了大小为496byte的内存块。
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
// 存储small类型的内存缓存,该数组长度为4,数组中每一个元素中维护的内存块大小也是成等级递增的,而且这里
// 的递增方式是按照2的指数次幂进行的,好比第0号为维护的是大小为512byte的内存块,第1号位维护的是大小为
// 1024byte的内存块,第2号位维护的是大小为2048byte的内存块,第3号位维护的是大小为4096byte的内存块
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
// 存储normal类型的内存缓存。须要注意的是,这里虽然说是维护的normal类型的缓存,可是其只维护2<<13,2<<14
// 和2<<15三个大小的内存块,而该数组的大小也正好为3,于是这三个大小的内存块将被依次放置在该数组中。
// 若是申请的目标内存大于2<<15,那么Netty会将申请动做交由PoolArena进行。
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

       这三个数组分别保存了tiny,small和normal类型的缓存数据,不一样于PoolArena的使用PoolSubpage和PoolChunk进行内存的维护,这里都是使用MemoryRegionCache进行的。另外,在MemoryRegionCache中保存了一个有界队列,对于tiny类型的缓存,该队列的长度为512,对于small类型的缓存,该队列的长度为256,对于normal类型的缓存,该队列的长度为64。在进行内存释放的时候,若是队列已经满了,那么就会将该内存块释放回PoolArena中。这里须要说明的是,这里的队列中的元素统一使用的是Entry这种数据结构,该结构的主要属性以下:缓存

static final class Entry<T> {
  // 用于循环利用当前Entry对象的处理器,该处理器的实现原理,咱们后续将进行讲解
  final Handle<Entry<?>> recyclerHandle;
  // 记录了当前内存块是从哪个PoolChunk中申请得来的
  PoolChunk<T> chunk;
  // 若是是直接内存,该属性记录了当前内存块所在的ByteBuffer对象
  ByteBuffer nioBuffer;
  // 因为当前申请的内存块在PoolChunk以及PoolSubpage中的位置是能够经过一个长整型参数来表示的,
  // 这个长整型参数就是这里的handle,于是这里直接将其记录下来,以便后续须要将当前内存块释放到
  // PoolArena中时,可以快速获取其所在的位置
  long handle = -1;
}

       PoolThreadCache中维护每个内存块最终都是使用的一个Entry对象来进行的,从上面的属性能够看出,记录该内存块最重要的属性是chunk和handle,chunk记录了当前内存块所在的PoolChunk对象,而handle则记录了当前内存块是在PoolChunk和PoolSubpage中的哪一个位置(关于PoolChunk,PoolSubpage和PoolArena的实现原理,建议读者阅读一下前面的文章,这样有助于读者快速理解相关原理)。如此,对于Netty使用的PoolThreadCache的存储结构咱们就有了一个比较清晰的认识。下面咱们经过一幅图来对PoolThreadCache的数据结构进行一个总体的演示:数据结构

image.png

       如上图所示展现的就是PoolThreadCache的结构示意图。从图中能够看出在一个PoolThreadCache中,主要有三个MemoryRegionCache数组用于存储tiny,small和normal类型的内存块。每一个MemoryRegionCache中有一个队列,队列中的元素类型为Entry。Entry的做用就是存储缓存的内存块的,其存储的方式主要是经过记录当前内存块所在的PoolChunk和标志其在PoolChunk中位置的handle参数。对于不一样类型的数组,队列的长度是不同的,tiny类型的是512,small类型的是256,normal类型的则是64。ide

2. PoolThreadCache初始化

       对于PoolThreadCache的初始化,这里单独拿出来说解的缘由是,其初始化过程是与PoolThreadLocalCache所绑定的。PoolThreadLocalCache的做用与Java中的ThreadLocal的做用很是相似,其有一个initialValue()方法,用于在没法从PoolThreadLocalCache中获取数据时,经过调用该方法初始化一个。另外其提供了一个get()方法和和remove()方法,分别用于从PoolThreadLocalCache中将当前绑定的数据给清除。这里咱们首先看看获取PoolThreadCache的入口代码:this

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  // 从PoolThreadLocalCache中尝试获取一个PoolThreadCache对象,
  // 若是不存在,则自行初始化一个返回
  PoolThreadCache cache = threadCache.get();
  // 因为当前方法是须要返回一个direct buffer,于是这里直接使用cache中的directArena
  PoolArena<ByteBuffer> directArena = cache.directArena;

  final ByteBuf buf;
  if (directArena != null) {
    // 若是directArena不为空,则直接调用其allocate()方法申请内存
    buf = directArena.allocate(cache, initialCapacity, maxCapacity);
  } else {
    // 若是当前缓存中因为某种缘由没法获取到directArena,则直接建立一个存有直接内存的ByteBuf,
    // 通常状况下不会走到这一步
    buf = PlatformDependent.hasUnsafe() ?
      UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  }

  // 为ByteBuf设置内存泄露检测功能
  return toLeakAwareBuffer(buf);
}

       从上面的代码中能够看出,在最开始的时候,就会经过PoolThreadLocalCache尝试获取一个PoolThreadCache对象,若是不存在,其会自行初始化一个。这里咱们直接看其是如何初始化的,以下是PoolThreadLocalCache.initialValue()方法的源码:线程

@Override
protected synchronized PoolThreadCache initialValue() {
  // 这里leastUsedArena()就是获取对应的PoolArena数组中最少被使用的那个Arena,将其返回。
  // 这里的判断方式是经过比较PoolArena.numThreadCaches属性来进行的,该属性记录了当前PoolArena被
  // 多少个线程所占用了。这里采用的思想就是,找到最少被使用的那个PoolArena,将其存入新的线程缓存中
  final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
  final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

  Thread current = Thread.currentThread();
  // 只有在指定了为每一个线程使用缓存,或者当前线程是FastThreadLocalThread的子类型时,才会使用线程缓存
  if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
    return new PoolThreadCache(
      heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
      DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
  }
  // 若是指定了不使用缓存,或者线程换粗对象不是FastThreadLocalThread类型的,则建立一个PoolThreadCache
  // 对象,该对象中是不作任何缓存的,由于初始化数据都是0
  return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}

private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
  if (arenas == null || arenas.length == 0) {
    return null;
  }

  // 在PoolArena数组中找到被最少线程占用的对象,将其返回。这样作的目的是,因为内存池是多个线程均可以
  // 访问的公共区域,于是当这里就须要对内存池进行划分,以减小线程之间的竞争。
  PoolArena<T> minArena = arenas[0];
  for (int i = 1; i < arenas.length; i++) {
    PoolArena<T> arena = arenas[i];
    if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
      minArena = arena;
    }
  }

  return minArena;
}

       从上述代码能够看出,对于PoolThreadCache的初始化,其首先会查找PoolArena数组中被最少线程占用的那个arena,而后将其封装到一个新建的PoolThreadCache中。code

3. 内存申请

       须要注意的是,PoolThreadCache申请内存并非说其会建立一块内存,或者说其会到PoolArena中申请内存,而是指,其自己已经缓存有内存块,而当前申请的内存块大小正好与其一致,就会将该内存块返回;PoolThreadCache中的内存块都是在当前线程使用完建立的ByteBuf对象后,经过调用其release()方法释放内存时直接缓存到当前PoolThreadCache中的,其并不会直接将内存块返回给PoolArena。这里咱们直接看一下其allocate()方法是如何实现的:orm

// 申请tiny类型的内存块
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
  return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

// 申请small类型的内存块
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, 
     int reqCapacity, int normCapacity) {
  return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}

// 申请normal类型的内存块
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, 
     int reqCapacity, int normCapacity) {
  return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}

// 从MemoryRegionCache中申请内存
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
  if (cache == null) {
    return false;
  }

  // 从MemoryRegionCache中申请内存,本质上就是从其队列中申请,若是存在,则初始化申请到的内存块
  boolean allocated = cache.allocate(buf, reqCapacity);
  // 这里是若是当前PoolThreadCache中申请内存的次数达到了8192次,则对内存块进行一次trim()操做,
  // 对使用较少的内存块,将其返还给PoolArena,以供给其余线程使用
  if (++allocations >= freeSweepAllocationThreshold) {
    allocations = 0;
    trim();
  }
  return allocated;
}

       这里对于内存块的申请,咱们能够看到,PoolThreadCache是将其分为tiny,small和normal三种不一样的方法来调用的,而具体大小的区分实际上是在PoolArena中进行区分的(读者能够阅读本人前面的关于PoolArena介绍的文章)。在对应的内存数组中找到MemoryRegionCache对象以后,经过调用allocate()方法来申请内存,申请完以后还会检查当前缓存申请次数是否达到了8192次,达到了则对缓存中使用的内存块进行检测,将较少使用的内存块返还给PoolArena。这里咱们首先看一下获取MemoryRegionCache的代码是如何实现的,也即cacheForTiny(),cacheForSmall()和cacheForNormal()的代码:对象

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
  // 计算当前数组下标索引,因为tiny类型的内存块每一层级相差16byte,于是这里的计算方式就是
  // 将目标内存大小除以16
  int idx = PoolArena.tinyIdx(normCapacity);
  // 返回tiny类型的数组中对应位置的MemoryRegionCache
  if (area.isDirect()) {
    return cache(tinySubPageDirectCaches, idx);
  }
  return cache(tinySubPageHeapCaches, idx);
}

private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
  // 计算当前数组下标的索引,因为small类型的内存块大小都是2的指数次幂,于是这里就是将目标内存大小
  // 除以1024以后计算其偏移量
  int idx = PoolArena.smallIdx(normCapacity);
  // 返回small类型的数组中对应位置的MemoryRegionCache
  if (area.isDirect()) {
    return cache(smallSubPageDirectCaches, idx);
  }
  return cache(smallSubPageHeapCaches, idx);
}

private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
  // 对于normal类型的缓存,这里也是首先将其向右位移13位,也就是8192,而后取2的对数,这样就
  // 能够获得其在数组中的位置,而后返回normal类型的数组中对应位置的MemoryRegionCache
  if (area.isDirect()) {
    int idx = log2(normCapacity >> numShiftsNormalDirect);
    return cache(normalDirectCaches, idx);
  }
  int idx = log2(normCapacity >> numShiftsNormalHeap);
  return cache(normalHeapCaches, idx);
}

       这里对于数组位置的计算,主要是根据各个数组数据存储方式的不一样而进行的,而它们最终都是经过一个MemoryRegionCache存储的,于是只须要返回该缓存对象便可。下面咱们继续看一下MemoryRegionCache.allocate()方法是如何申请内存的:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
  // 尝试从队列中获取,若是队列中不存在,说明没有对应的内存块,则返回false,表示申请失败
  Entry<T> entry = queue.poll();
  if (entry == null) {
    return false;
  }
  
  // 走到这里说明队列中存在对应的内存块,那么经过其存储的Entry对象来初始化ByteBuf对象,
  // 如此即表示申请内存成功
  initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
  // 对entry对象进行循环利用
  entry.recycle();

  // 更新当前已经申请的内存数量
  ++allocations;
  return true;
}

       能够看到,MemoryRegionCache申请内存的方式主要是从队列中取,若是取到了,则使用该内存块初始化一个ByteBuf对象。

       前面咱们讲到,PoolThreadCache会对其内存块使用次数进行计数,这么作的目的在于,若是一个ThreadPoolCache所缓存的内存块使用较少,那么就能够将其释放到PoolArena中,以便于其余线程能够申请使用。PoolThreadCache会在其内存总的申请次数达到8192时遍历其全部的MemoryRegionCache,而后调用其trim()方法进行内存释放,以下是该方法的源码:

public final void trim() {
  // size表示当前MemoryRegionCache中队列的最大可存储容量,allocations表示当前MemoryRegionCache
  // 的内存申请次数,size-allocations的含义就是判断当前申请的次数是否连队列的容量都没达到
  int free = size - allocations;
  allocations = 0;

  // 若是申请的次数连队列的容量都没达到,则释放该内存块
  if (free > 0) {
    free(free);
  }
}

private int free(int max) {
  int numFreed = 0;
  // 依次从队列中取出Entry数据,调用freeEntry()方法释放该Entry
  for (; numFreed < max; numFreed++) {
    Entry<T> entry = queue.poll();
    if (entry != null) {
      freeEntry(entry);
    } else {
      return numFreed;
    }
  }
  return numFreed;
}

private void freeEntry(Entry entry) {
  // 经过当前Entry中保存的PoolChunk和handle等数据释放当前内存块
  PoolChunk chunk = entry.chunk;
  long handle = entry.handle;
  ByteBuffer nioBuffer = entry.nioBuffer;
  entry.recycle();
  chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer);
}

4. 内存释放

       对于内存的释放,其原理比较简单,通常的释放内存的入口在ByteBuf对象中。当调用ByteBuf.release()方法的时候,其首先会将释放动做委托给PoolChunk的free()方法,PoolChunk则会判断当前是不是池化的ByteBuf,若是是池化的ByteBuf,则调用PoolThreadCache.add()方法将其添加到PoolThreadCache中,也就是说在释放内存时,其其实是释放到当前线程的PoolThreadCache中的。以下是add()方法的源码:

boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
    long handle, int normCapacity, SizeClass sizeClass) {
  // 经过当前释放的内存块的大小计算其应该放到哪一个等级的MemoryRegionCache中
  MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
  if (cache == null) {
    return false;
  }
  
  // 将内存块释放到目标MemoryRegionCache中
  return cache.add(chunk, nioBuffer, handle);
}

public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
  // 这里会尝试从缓存中获取一个Entry对象,若是没获取到则建立一个
  Entry<T> entry = newEntry(chunk, nioBuffer, handle);
  // 将实例化的Entry对象放到队列里
  boolean queued = queue.offer(entry);
  if (!queued) {
    entry.recycle();
  }

  return queued;
}

5. 小结

       本文首先详细讲解了PoolThreadCache的数据结构,而且说明了其中须要注意的点,而后介绍了PoolThreadCache的实例化方式,接着从申请和释放内存两个角度介绍了PoolThreadCache源码的实现方式。

相关文章
相关标签/搜索