【源起Netty 正传】升级版卡车——ByteBuf

卡车

卡车指的是java原生类ByteBuffer,这兄弟在NIO界大名鼎鼎,与Channel、Selector的铁三角组合构筑了NIO的核心。之因此称它为卡车,只因《编程思想》中有段比喻:java

咱们能够把它想象成一个煤矿,通道(Channel)是一个包含煤层(数据)的矿藏,而缓冲器(ByteBuffer)则是派送到矿藏中的卡车。卡车满载煤炭而归,咱们再从卡车上得到煤炭。也就是说,咱们并无直接和通道交互;咱们只是和缓冲器交互,并把缓冲器派送到通道。

那么升级版卡车,天然指的就是ByteBuf数据库

结构和功能

Netty之因此再次打造了升级版的缓冲器,显然是不满ByteBuffer中的某些弊端。编程

  • ByteBuffer长度固定
  • 使用者常常须要调用flip()、rewind()方法调整position的位置,不方便
  • API功能有限

ByteBuffer中有三个重要的位置属性:position、limit、capacity,一个写操做以后大概是这样的缓存

clipboard.png

如若想进行读操做,那么flip()的调用是少不了的,从图中不难看出,目前position到limit啥也没有。
调用flip()以后则不同了(咱们不同~):网络

clipboard.png

而ByteBuf的人设则不相同,它的两个位置属性readIndexwriteIndex,分别和读操做、写操做相对应。“写”不操做readIndex,“读”不操做writeIndex,二者不会相互干扰。这里盗几张图说明下好了:框架

  • 初始状态

clipboard.png

  • 写入N个字节

clipboard.png

  • 读取M个(M<N)字节

clipboard.png

  • 释放已读缓存discardReadBytes

clipboard.png

重点在于ByteBuf的read和write相关方法,已经封装好了对readIndex、writeIndex位置索引的操做,不须要使用者繁琐的flip()。且write()方法中,ByteBuf设计了自动扩容,这一点后续章节会进行详细说明。ide

功能方面,主要关注两点:函数

  • Derived buffers,相似于数据库视图。ByteBuf提供了多个接口用于建立某ByteBuf的视图或复制ByteBuf:性能

    • duplicate:返回当前ByteBuf的复制对象,缓冲区内容共享(修改复制的ByteBuf,原来的ByteBuf内容也随之改变),索引独立维护。
    • copy:内容和索引都独立。
    • slice:返回当前ByteBuf的可读子缓冲区,内容共享,索引独立。
  • 转换成ByteBuffer
    nio的SocketChanel进行网络操做,仍是操做的java原生的ByteBuffer,因此ByteBuf转换成ByteBuffer的需求仍是有市场的。this

    • ByteBuffer nioBuffer():当前ByteBuf的可读缓冲区转换成ByteBuffer,缓冲区内容共享,索引独立。须要指出的是,返回后的ByteBuffer没法感知原ByteBuf的动态扩展操做。

ByteBuf星系

称之为“星系”,是由于ByteBuf一脉涉及到的类实在太多了,但多而不乱,归功于类关系结构的设计。

类关系结构

依然盗图:
clipboard.png

从内存分配角度,ByteBuf可分为两类

  • 堆内存HeapByteBuf字节缓冲区
  • 直接内存DirectByteBuf字节缓冲区

从内存回收角度,ByteBuf也可分为两类:

  • 普通缓冲区UnpooledByteBuf
  • 池化缓冲区PooledByteBuf

纵观该关继承节构,给我留下的印象就是每层各司其职:读操做以及其它的一些公共功能由父类实现,差别化功能由子类实现。

下面聊下笔者感兴趣的几个点……

AbstractByteBuf的写操做簇

AbstractByteBuf的写操做有不少,这里以writeBytes(byte[] src, int srcIndex, int length)方法为例

@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
    ensureWritable(length);    //1、确保可写,对边界进行验证
    setBytes(writerIndex, src, srcIndex, length);    //2、写入操做,不一样类型的子类实现方式不一样
    writerIndex += length;
    return this;
}

注释部分分别展开看下。

注释1、确保可写,对边界进行验证

跟调用栈ensureWritable -> ensureWritable0,观察ensureWritable0方法

final void ensureWritable0(int minWritableBytes) {
    ensureAccessible();    //确保对象可用
    if (minWritableBytes <= writableBytes()) {
        return;
    }

    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }

    // Normalize the current capacity to the power of 2.
    // 3、计算扩容量
    int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

    // Adjust to the new capacity.
    capacity(newCapacity);    //4、内存分配
}
  • 比较

先对要写入的字节数minWritableBytes进行判断:若是minWritableBytes < capacity - writeIndex,那么很好,不须要扩容;若是minWritableBytes > maxCapacity - writerIndex,也就是要写入字节数超过了容许的最大字节数,直接抛出越界异常IndexOutOfBoundsException。

眼尖的朋友可能发现了,两次用来判断的上界并不相同——capacity / maxCapacity。maxCapacity是AbstractByteBuf的属性,而capacity设定在其子类中。简单看下UnpooledDirectByteBuf的构造函数:

public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);    //为AbstractByteBuf的maxCapacity属性赋值
    
    /**
     *    ……
     *    省略无关部分
     */
     
    setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));    //capacity赋值
}

也就是说,ByteBuf的结构,可当作这样:

clipboard.png

  • 扩容计算
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    if (minNewCapacity < 0) {
        throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
    }
    if (minNewCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                minNewCapacity, maxCapacity));
    }

    /** 
     *  设置阀值为4MB
     *  1.若是扩展的容量大于阀值,对扩张后的内存和最大内存进行比较:大于最大长度使用最大长度,不然步进4M
     *  2.若是须要扩展的容量小于阀值,以64进行计数倍增:64->128->256;为防止倍增过猛,最后与最大值再次进行比较
     */
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}

具体的扩容策略,已拍入注释中,尽可查看!

注释2、写入操做,不一样类型的子类实现方式不一样

对比下UnpooledDirectByteBufUnpooledHeapByteBuf的实现

  • UnpooledDirectByteBuf
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length);
    ByteBuffer tmpBuf = internalNioBuffer();    //分配
    tmpBuf.clear().position(index).limit(index + length);
    tmpBuf.put(src, srcIndex, length);
    return this;
}
  • UnpooledHeapByteBuf
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length);
    System.arraycopy(src, srcIndex, array, index, length); //分配
    return this;
}

篇幅有限,不展开说了,结论就是:
UnpooledDirectByteBuf的底层实现为ByteBuffer.allocateDirect,分配时复制体经过buffer.duplicate()获取复制体;而UnpooledHeapByteBuf的底层实现为byte[],分配时经过System.arraycopy方法拷贝副本。

AbstractReferenceCountedByteBuf

AbstractReferenceCountedByteBuf的名字就挺有意思——“引用计数”,一副JVM垃圾回收的即视感。而事实上,也差很少一个意思。

看下类属性:

private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
          AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

private volatile int refCnt;

以原子方式更新属性的AtomicIntegerFieldUpdater起了关键做用,将会对volatile修饰的refCnt进行更新,见retain方法(下面展现的是retain的关键部分retain0):

private ByteBuf retain0(final int increment) {
    int oldRef = refCntUpdater.getAndAdd(this, increment);
    if (oldRef <= 0 || oldRef + increment < oldRef) {
        // Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
        refCntUpdater.getAndAdd(this, -increment);
        throw new IllegalReferenceCountException(oldRef, increment);
    }
    return this;
}

源码阅读颇有意思的一点就是能看到些本身不熟悉的类,好比AtomicIntegerFieldUpdater我之前就没接触过!

内存池

内存池可有效的提高效率,道理和线程池、数据库链接池相通,即省去了重复建立销毁的过程

到目前为止,看到的都是ByteBuf中的各Unpooled实现,而池化版的ByteBuf没怎么提过。为什么如此?由于池化的实现较复杂,以我如今的功力尚不能彻底掌握透彻。

先聊下内存池的设计思路,涨涨姿式:
为了集中集中管理内存的分配和释放,同事提升分配和释放内存时候的性能,不少框架和应用都会经过预先申请一大块内存,而后经过提供相应的分配和释放接口来使用内存。这样一来,堆内存的管理就被集中到几个类或函数中,因为再也不频繁使用系统调用来申请和释放内存,应用或系统的性能也会大大提升。 ——节选自《Netty权威指南》

Netty的ByteBuf内存池也是按照这个思路搞的。首先,看下官方注释:

/**
 * Notation: The following terms are important to understand the code
 * > page  - a page is the smallest unit of memory chunk that can be allocated
 * > chunk - a chunk is a collection of pages
 * > in this code chunkSize = 2^{maxOrder} * pageSize
 */

这里面有两个重要的概念page(页)和chunk(块),chunk管理多个page组成二叉树结构,大概就是这个样子:

clipboard.png

选择二叉树是有缘由的:

/**
 * To search for the first offset in chunk that has at least requested size available we construct a
 * complete balanced binary tree and store it in an array (just like heaps) - memoryMap
 */

为了在chunk中找到至少可用的size的偏移量offset。
继线性结构后,人们又发明了树形结构的意义在于“提高查询效率”,也一样是这里选择二叉树的缘由。

小于一个page的内存,直接在PoolSubpage中分配完成。

某块内存是否分配,将经过状态位进行标识。

后记

一如既往的啰嗦几句,最近工做忙,更新文章较慢,但愿本身能坚持,如发现问题望你们指正!thanks..

相关文章
相关标签/搜索