<!-- TOC -->html
<!-- /TOC -->java
为了平衡数据传输时CPU与各类IO设备速度的差别性,计算机设计者引入了缓冲区这一重要抽象。jdkNIO库提供了java.nio.Buffer接口,而且提供了7种默认实现,常见的实现类为ByteBuffer。不过netty并无直接使用nio的ByteBuffer,这主要是因为jdk的Buffer有如下几个缺点:数组
+-------------------+------------------+------------------+ | discardable bytes | readable bytes | writable bytes | | | (CONTENT) | | +-------------------+------------------+------------------+ | | | | 0 <= readerIndex <= writerIndex <= capacity
初始化时,readerIndex和writerIndex都是0,随着数据的写入writerIndex会增长,此时readable byte部分增长,writable bytes减小。当读取时,discardable bytes增长,readable bytes减小。因为读操做只修改readerIndex,写操做只修改writerIndex,让ByteBuf的使用更加容易理解,避免了因为遗漏flip致使的功能异常。 此外,当调用discardReadBytes方法时,能够把discardable bytes这部分的内存释放。整体想法是经过将readerIndex移动到0,writerIndex移动到writerIndex-readerIndex下标,具体移动下标的方式依据ByteBuf实现类有所不一样。这个方法能够显著提升缓冲区的空间复用率,避免无限度的扩容,但会发生字节数组的内存复制,属于以时间换空间的作法。多线程
前3个系列的方法及最后一个skipBytes都属于改变指针的方法。举例来讲,readByte会移动readerIndex1个下标位,而int是4个byte的大小,因此readInt会移动readerIndex4个下标位,相应的,writeByte会移动writerIndex1个下标位,writeInt会移动writerIndex4个下标位。set系列方法比较特殊,它的参数为index和value,意即将value写入指定的index位置,但这个操做不会改变readerIndex和writerIndex。skipBytes比较简单粗暴,直接将readerIndex移动指定长度。并发
markReaderIndex和markWriterIndex能够将对应的指针作一个标记,当须要从新操做这部分数据时,再使用resetReaderIndex或resetWriterIndex,将对应指针复位到mark的位置。高并发
这3种方法均可以复制一份字节数组,不一样之处在于duplicate和slice两个方法返回的新ByteBuf和原有的老ByteBuf之间的内容会互相影响,而copy则不会。duplicate和slice的区别在于前者复制整个ByteBuf的字节数组,然后者默认仅复制可读部分,但能够经过slice(index, length)分割指定的区间。性能
这是ByteBuf接口继承自ReferenceCounted接口的方法,用于引用计数,以便在不使用对象时及时释放。实现思路是当须要使用一个对象时,计数加1;再也不使用时,计数减1。考虑到多线程场景,通常也多采用AtomicInteger实现。netty却另辟蹊径,选择了volatile + AtomicIntegerFieldUpdater这样一种更节省内存的方式。this
在ByteBuf写入数据时会检查可写入的容量,若容量不足会进行扩容。spa
final void ensureWritable0(int minWritableBytes) { if (minWritableBytes <= writableBytes()) { return; } int minNewCapacity = writerIndex + minWritableBytes; int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity); int fastCapacity = writerIndex + maxFastWritableBytes(); if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) { newCapacity = fastCapacity; } capacity(newCapacity); }
忽略一些检验性质的代码后,能够看到扩容时先尝试将现有写索引加上须要写入的容量大小做为最小新容量,并调用ByteBufAllocate的calculateNewCapacity方法进行计算。跟入这个方法:线程
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { final int threshold = CALCULATE_THRESHOLD; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); }
能够看到这个方法的目的则是计算比可写容量稍大的2的幂次方。minNewCapacity由上一个方法传入,而maxCapacity则为Integer.MAX_VALUE。具体步骤是首先判断新容量minNewCapacity是否超过了计算限制CALCULATE_THRESHOLD,默认为4M,若是没有超过4MB,那么从64B开始不断以2的幂次方形式扩容,直到newCapacity超过minNewCapacity。而若一开始新容量就超过了4M,则调整新容量到4M的倍数+1。好比newCapacity为6M,由于6/4 = 1,因此调整为(1+1)*4M=8M。
在计算完容量以后会调用capacity方法。这是一个抽象方法,这里以UnpooledHeapByteBuf为例。
public ByteBuf capacity(int newCapacity) { checkNewCapacity(newCapacity); byte[] oldArray = array; int oldCapacity = oldArray.length; if (newCapacity == oldCapacity) { return this; } int bytesToCopy; if (newCapacity > oldCapacity) { bytesToCopy = oldCapacity; } else { trimIndicesToCapacity(newCapacity); bytesToCopy = newCapacity; } byte[] newArray = allocateArray(newCapacity); System.arraycopy(oldArray, 0, newArray, 0, bytesToCopy); setArray(newArray); freeArray(oldArray); return this; }
首先检查newCapacity是否大于0且小于最大容量。以后准备好老数组要复制的长度。trimIndicesToCapacity(newCapacity)是缩容时调用的,它将readerIndex和newCapacity的较小值设置为新的readerIndex,将newCapacity设置为新的writerIndex。 以后便分配一个新数组,并开始复制旧数组的元素。复制成功后,将新数组保存为成员变量,将老数组释放掉。
出于性能和空间的多方考虑,netty从3个维度定义了各类不一样的ByteBuf实现类,主要是池化、堆内堆外、能否使用Unsafe类这3个维度,从而演化出8种不一样的ByteBuf,它们分别是PooledUnsafeHeapBytebuf、PooledHeapByteBuf、PooledUnsafeDirectByteBuf、PooledDirectBytebuf、UnpooledUnsafeHeapByteBuf、UnpooledHeapByteBuf、UnpooledUnsafeDirectByteBuf、UnpooledDirectByteBuf。 ByteBuf接口之下有一个抽象类AbstractByteBuf,实现了接口定义的read、write、set相关的方法,但在实现时只作了检查,而具体逻辑则定义一系列以_开头的proteced方法,留待子类实现。
不一样于通常形式的建立对象,ByteBuf须要经过内存分配器ByteBufAllocate分配,对应于不一样的ByteBuf也会有不一样的BtteBufferAllocate。netty将之抽象为ByteBufAllocate接口。咱们看一下有哪些方法:
ByteBufAllocate有一个直接实现类AbstractByteBufAllocate,它实现了大部分方法,只留下2个抽象方法newHeapBuffer和newDirectBuffer交由子类实现。AbstractByteBufAllocate有2个子类PooledByteBufAllocate和UnpooledByteBufAllocate,在这里定义了pooled池化维度的分配方式。 看看UnpooledByteBufAllocate如何实现2个抽象方法:
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); }
能够看到实现类根据PlatformDependent.hasUnsafe()方法自动断定是否使用unsafe维度,这个方法经过在静态代码块中尝试初始化sun.misc.Unsafe来判断Unsafe类是否在当前平台可用,在juc中,这个类使用颇多,做为与高并发打交道的netty,出现这个类不使人意外。UnpooledUnsafeHeapByteBuf与UnpooledHeapByteBuf并非平级关系,事实上前者继承了后者,在构造方法上也直接调用UnpooledHeapByteBuf的构造方法。构造方法比较简单,初始化byte数组、初始容量、最大容量,将读写指针的设置为0,并将子类传入的this指针保存到alloc变量中。 两种Bytebuf的区别在于unsafe会尝试经过反射的方式建立byte数组,并将数组的地址保存起来,以后再获取数据时也会调用Unsafe的getByte方法,经过数组在内存中的地址+偏移量的形式直接获取,而普通的SafeByteBuf则是保存byte数组,经过数组索引即array[index]访问。
// UnsafeHeapByteBuf初始化数组 protected byte[] allocateArray(int initialCapacity) { return PlatformDependent.allocateUninitializedArray(initialCapacity); } // HeapByteBuf初始化数组 protected byte[] allocateArray(int initialCapacity) { return new byte[initialCapacity]; } // UnsafeHeapByteBuf经过UnsafeByteBufUtil获取字节 static byte getByte(byte[] data, int index) { return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index); } // HeapByteBuf获取字节 static byte getByte(byte[] memory, int index) { return memory[index]; }
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); }
DirectByteBuf构造方法大体与heap的相似,只是保存数据的容器由字节数组变为了jdk的ByteBuffer。相应的,分配与释放内存的方法也变成调用jdk的ByteBuffer方法。而UnsafeByteBuf更是直接用long类型记录内存地址。
// DirectByteBuf获取字节 protected byte _getByte(int index) { return buffer.get(index); } // UnsafeDirectByteBuf获取字节 protected byte _getByte(int index) { return UnsafeByteBufUtil.getByte(addr(index)); } // 获取内存地址 final long addr(int index) { return memoryAddress + index; } // UnsafeByteBufUtil获取字节 static byte getByte(long address) { return UNSAFE.getByte(address); }
因为PooledByteBufAllocate内容较为庞大,放入下一节讲述。 未完待续···
原文出处:https://www.cnblogs.com/spiritsx/p/12158853.html