咱们知道,Java NIO的ByteBuffer只有一个position指针标识位置,读写切换时须要调用flip()方法,这样容易出错。而Netty为了解决这个问题,使用了两个指针readerIndex、writerIndex。固然,Netty的ByteBuf的功能不只仅如此,让咱们一块儿看看Netty的ButeBuf设计。后端
1、ByteBuf设计原理
1. 读写指针设计
ByteBuf经过两个指针协助缓冲区的读写操做,读操做用readerIndex,写操做用writerIndex。数组
readerIndex和writerIndex初始值都是0,随着写入writerIndex会增长,一样的,随着读取readerIndex会增长,可是readerIndex不能大于writerInder。并发
当读取了必定数据后,0 ~ readerIndex之间就被视为discard的,调用discardReadByte 方法,能够释放这段空间,令readerIndex = 0,writerIndex = writerIndex - discard。app
这样的设计致使读写互不干扰,读写切换再也不须要调整位置指针,极大的简化了缓冲区读写操做。性能
2. 扩容设计
咱们知道,Java NIO的ByteBuffer底层是数组,它自己并不提供扩容操做,若是缓冲区剩余可写空间不足,就会发生BufferOverflowExeption。this
public ByteBuffer put(byte[] src, int offset, int length) { checkBounds(offset, length, src.length); // 全部的put操做都有这个校验,可用空间不足直接抛异常 if (length > remaining()) throw new BufferOverflowException(); int end = offset + length; for (int i = offset; i < end; i++) this.put(src[i]); return this; } public final int remaining() { return limit - position; }
为了不这个问题,须要在put操做时先对剩余可用空间校验,若是剩余空间不足,须要本身建立新的ByteBuffer,并将以前的ByteBuffer copy过来,这样对使用者很不友善。而Netty的ByteBuffer对write操做进行了封装,由Netty作缓冲区剩余空间校验,若是可用缓冲区不足,ByteBuf会自动进行动态扩展。spa
2、ByteBuf主要继承关系
从内存分配角度来看,ByteBuf分为两类:设计
1)堆内存字节缓冲区:在堆内存中分配,能够被JVM自动回收,可是在进行Socket的IO读写时,须要多作一次内存复制(堆内存中的缓冲区复制到内核Channel中),性能会有所降低。指针
2)直接内存字节缓冲区:在堆外进行内存分配,须要本身分配内存及回收,可是它在写入或从Socket Channel中读取时,少一次内存复制,性能比堆内存字节缓冲区要高。code
tips:在IO通信的读写缓冲区使用直接内存字节缓冲区,在后端业务消息编解码模块中使用堆内存字节缓冲区,可使性能达到最优。
从内存回收的角度来看,ByteBuf也分为两类:
1)基于对象池的ByteBuf,它的特色就是能够重用ByteBuf。基于对象池的ByteBuf本身维护了一个内存池,能够循环利用建立的ByteBuf,提高内存使用效率。
2)普通的ByteBuf,不能重用,每次都要新建立一个ByteBuf。
3、AbstractByteBuf
1. 成员变量
// 用于检测对象是否泄漏 static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class); // 读操作 和 写操做 的位置指针 int readerIndex; private int writerIndex; // 读操做 和 写操做 的标记,能够经过reset()回到标记的地方 private int markedReaderIndex; private int markedWriterIndex; // 最大容量 private int maxCapacity; // private SwappedByteBuf swappedBuf;
2. 读操做
首先检查入参,若是要读的数据长度小于0,说明参数传错了,抛IllegalArgumentException异常;若是要读的数据长度大于可读数据长度,抛IndexOutOfBoundsException。
校验经过后,进行读操做,这个由子类实现。
最后,调整readerIndex指针。
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { checkReadableBytes(length); // 由子类实现 getBytes(readerIndex, dst, dstIndex, length); readerIndex += length; return this; } protected final void checkReadableBytes(int minimumReadableBytes) { ensureAccessible(); if (minimumReadableBytes < 0) { throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)"); } if (readerIndex > writerIndex - minimumReadableBytes) { throw new IndexOutOfBoundsException(String.format( "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", readerIndex, minimumReadableBytes, writerIndex, this)); } }
3. 写操做
首先作数据校验及缓冲区可用性校验。
若是可用缓冲区容量不足以放下整个byte数组,则须要扩容。扩容时须要先计算新的ByteBuf容量并建立,而后将老的ByteBuf复制到新的ByteBuf中。
最后才是写操做。
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { // 数据校验及扩容 ensureWritable(length); // 由子类实现 setBytes(writerIndex, src, srcIndex, length); writerIndex += length; return this; } public ByteBuf ensureWritable(int minWritableBytes) { // 校验入参 if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); } // 若是可写长度大于待写长度,直接写便可 if (minWritableBytes <= writableBytes()) { return this; } // 若是待写长度大于最多可写长度(扩容也无法知足),直接抛异常 if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // 计算新的ByteBuf的容量(找一个合适的新容量) int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes); // 重建新的缓冲区,并将老的缓冲区的数据copy过去,交给子类实现 capacity(newCapacity); return this; } private int calculateNewCapacity(int minNewCapacity) { final int maxCapacity = this.maxCapacity; final int threshold = 1048576 * 4; // 步长4MB if (minNewCapacity == threshold) { return threshold; } // If over threshold, do not double but just increase by threshold. if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold;//让它成为4MB的整数倍 if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } // 比4MB小,采用64K步进的方式扩容,避免内存浪费 int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); }
4. 释放已读缓冲区
前面说到,0 ~ readerIndex之间的数据已读取过,这一段被视为discard的,咱们能够调用discardReadBytes()方法,释放这部分缓冲区,达到缓冲区重用的目的。
可是,discardReadBytes()方法原理是数组拷贝,在执行这个方法时你须要先判断是否值得这样作。
public ByteBuf discardReadBytes() { ensureAccessible(); // 若是等于0,说明没有可释放缓冲区 if (readerIndex == 0) { return this; } if (readerIndex != writerIndex) { // 子类实现 setBytes(0, this, readerIndex, writerIndex - readerIndex); writerIndex -= readerIndex; // 同时调整markReaderIndex和markWriterIndex adjustMarkers(readerIndex); readerIndex = 0; } else { adjustMarkers(readerIndex); writerIndex = readerIndex = 0; } return this; } protected final void adjustMarkers(int decrement) { int markedReaderIndex = this.markedReaderIndex; if (markedReaderIndex <= decrement) { this.markedReaderIndex = 0; int markedWriterIndex = this.markedWriterIndex; if (markedWriterIndex <= decrement) { this.markedWriterIndex = 0; } else { this.markedWriterIndex = markedWriterIndex - decrement; } } else { this.markedReaderIndex = markedReaderIndex - decrement; markedWriterIndex -= decrement; } }
5. skipBytes(..)
在解码的时候,有时须要丢弃非法数据包,获取跳过不须要读取的字节码,此时可使用skipByte(..)方法,忽略指定长度的字节数组。
public ByteBuf skipBytes(int length) { // 校验入参 checkReadableBytes(length); int newReaderIndex = readerIndex + length; if (newReaderIndex > writerIndex) { throw new IndexOutOfBoundsException(String.format( "length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))", length, readerIndex, writerIndex)); } readerIndex = newReaderIndex; return this; }
4、AbstractReferenceCountedByteBuf
从名字能够看出,这个类的做用主要是对ButeBuf引用进行计数,用于跟踪对象的分配及销毁。
1. 成员变量
// 并发包中的类,对 AbstractReferenceCountedByteBuf 中的 refCnt,进行原子化操做 private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); // refCnt的偏移量,也就是 refCnt 在AbstractReferenceCountedByteBuf中的内存地址 private static final long REFCNT_FIELD_OFFSET; static { long refCntFieldOffset = -1; try { if (PlatformDependent.hasUnsafe()) { refCntFieldOffset = PlatformDependent.objectFieldOffset( AbstractReferenceCountedByteBuf.class.getDeclaredField("refCnt")); } } catch (Throwable t) { // Ignored } REFCNT_FIELD_OFFSET = refCntFieldOffset; } //对象引用次数 @SuppressWarnings("FieldMayBeFinal") private volatile int refCnt = 1;
2. 对象引用计数器
每调用一次 retain() 方法,引用计数器就加1。
因为 refCnt 初始值为1,每次申请加1,释放减1,当申请数等于释放数时,对象被回收,故 refCnt 不可能为0。若是为0,说明对象被错误、意外的引用了,抛出异常。
若是引用计数器达到整形最大值,则直接抛异常,除非是恶意破坏,不然不会出现这种状况吧。
最后就是对 refCnt 作原子性的 cas 操做。
public ByteBuf retain() { for (;;) { int refCnt = this.refCnt; if (refCnt == 0) { throw new IllegalReferenceCountException(0, 1); } if (refCnt == Integer.MAX_VALUE) { throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) { break; } } return this; }
有增必有减,咱们看一下 refCnt 减小的代码。
当对象被释放时,refCnt 减1,当减到 1 时,说明申请数等于释放数,须要将该对象回收掉。
public final boolean release() { for (;;) { int refCnt = this.refCnt; if (refCnt == 0) { throw new IllegalReferenceCountException(0, -1); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) { if (refCnt == 1) { // 垃圾回收 deallocate(); return true; } return false; } } }
5、UnpooledHeapByteBuf
前面的ByteBuf分类描述,咱们能够判断,UnpooledHeapByteBuf 就是普通的堆内存ByteBuf,没有内存池,没有堆外内存,使用起来更不容易出现内存管理方面的问题。
1. 成员变量
// 用于UnpooledHeapByteBuf内存分配 private final ByteBufAllocator alloc; // 缓冲区 private byte[] array; // Java NIO的ByteBuffer,用于Netty的ByteBuf到NIO的ByteBuffer转换 private ByteBuffer tmpNioBuf;
2. 动态扩展缓冲区
在 AbstractByteBuf 中咱们提到,动态扩展缓冲区的操做是交给子类完成的,这里咱们看一下 UnpooledHeapByteBuf 是怎么作的。
首先对新的byte数组作校验,而后进行ByteBuf重建。
这里分为三种状况,
1)newCapacity > oldCapacity,直接建立一个新数组,拷贝过去就好了
2)newCapacity == oldCapacity,不作处理
3)newCapacity < oldCapacity,先判断readerIndex,若是readerIndex大于等于newCapacity,说明没有数据须要复制到缓冲区,直接设置readerIndex和writerIndex的值为newCapacity便可;当readerIndex小于newCapacity时,readerIndex到writerIndex之间的数据须要复制到新的byte数组,这个时候,若是writerIndex - readerIndex > newCapacity,就会发生数组下标越界,为了防止越界,当writerIndex > newCapacity时,令writerIndex = newCapacity,而后作 byte 数组赋值操做。最后,替换掉ByteBuf中持有的 byte数组引用,并令NIO 的 ByteBuffer为 null。
public ByteBuf capacity(int newCapacity) { ensureAccessible(); // 1. 对入参作合法性校验 if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length; if (newCapacity > oldCapacity) { // 2. byte数组copy,而后替换掉原来的byte数组 byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { // 若是新容量小于老容量,则不须要动态扩展,可是须要截取当前缓冲区建立一个新的子缓冲区 byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { // 若是writerIndex大于newCapacity,则有可能发生越界,这里直接截断 writerIndex(writerIndex = newCapacity); } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { // 若是readerIndex大于等于新的capacity,说明没有数据须要复制到新缓冲区,直接将readerIndex和writerIndex设置为newCapacity便可 setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; } private void setArray(byte[] initialArray) { array = initialArray; tmpNioBuf = null; }
3. 字节数组复制
在AbstractByteBuf中的读写操做中,具体的读写操做由子类实现,咱们来看一下 UnpooledHeapByteBuf 是怎么作的。
在写操做中,首先检查入参,而后将数据 copy 至 ByteBuf 的 byte 数组中。
在读操做中,也是先检查入参,而后将 ByteBuf 的 byte 数组 copy到指定的byte数组里面。
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { // 根据AbstractByteBuf的写操做可知,index为writerIndex checkSrcIndex(index, length, srcIndex, src.length); System.arraycopy(src, srcIndex, array, index, length); return this; } protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) { checkIndex(index, length); if (srcIndex < 0 || srcIndex > srcCapacity - length) { throw new IndexOutOfBoundsException(String.format( "srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity)); } } protected final void checkIndex(int index, int fieldLength) { ensureAccessible(); if (fieldLength < 0) { throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)"); } // writerIndex + length > capacity,数组下表越界 if (index < 0 || index > capacity() - fieldLength) { throw new IndexOutOfBoundsException(String.format( "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity())); } }
// 读操做时,将字节数组copy出去
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { checkDstIndex(index, length, dstIndex, dst.length); System.arraycopy(array, index, dst, dstIndex, length); return this; }
4. Netty 的 ByteBuf 转换为 NIO 的 ByteNuffer
利用byte数组建立一个新的ByteBuffer,并调用slice方法,清除 discard 区域。
public ByteBuffer nioBuffer(int index, int length) { ensureAccessible(); // slice():copy一个原来的position到limit之间的有效数据,建立一个新的ByteBuffer return ByteBuffer.wrap(array, index, length).slice(); } public static ByteBuffer wrap(byte[] array, int offset, int length) { try { return new HeapByteBuffer(array, offset, length); } catch (IllegalArgumentException x) { throw new IndexOutOfBoundsException(); } }
6、UnpooledDirectByteBuf
与UnpooledHeapByteBuf不一样,UnpooledDIrectByteBuf是基于堆外内存建立的。
1. 成员变量
这里跟 UnpooledHeapByteBuf 最大的不一样就是,这里使用 ByteBuffer 存储数据,而 UnpooledHeapByteBuf 使用字节数组。另外一个不一样就是,这里的ByteBuffer使用的是NIO的DirectByteBuffer,须要本身手动释放内存。
// ByteBuf内存分配 private final ByteBufAllocator alloc; // 这里跟UnpooledHeapByteBuf不一样,这里使用的是NIO的ByteBuffer存储字节数组 private ByteBuffer buffer; private ByteBuffer tmpNioBuf; private int capacity;
//用于标记ByteBuffer是否释放了(这里使用堆外内存建立ByteBuffer,须要本身作垃圾回收) private boolean doNotFree;
2. 动态扩展缓冲区
这里的设计跟UnpooledHeapByteBuf是同样的,不一样的是这里使用的是ByteBuffer而不是byte数组。
public ByteBuf capacity(int newCapacity) { ensureAccessible(); // 1. 校验粗人惨 if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int readerIndex = readerIndex(); int writerIndex = writerIndex(); int oldCapacity = capacity; if (newCapacity > oldCapacity) { // 这里直接建立一个新的ByteBuffer,将老的ByteBuffer数据copy过去 ByteBuffer oldBuffer = buffer;
// 建立一个DirectByteBuffer ByteBuffer newBuffer = allocateDirect(newCapacity);
// 设置position和limit的值 oldBuffer.position(0).limit(oldBuffer.capacity()); newBuffer.position(0).limit(oldBuffer.capacity()); newBuffer.put(oldBuffer); newBuffer.clear(); // 替换老的ByteBuffer并释放掉老的ByteBuffer setByteBuffer(newBuffer); } else if (newCapacity < oldCapacity) { // 这里跟UnpooledHeapByteBuf处理是同样的,详细看UnpooledHeapByteBuf ByteBuffer oldBuffer = buffer; ByteBuffer newBuffer = allocateDirect(newCapacity); if (readerIndex < newCapacity) { if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } oldBuffer.position(readerIndex).limit(writerIndex); newBuffer.position(readerIndex).limit(writerIndex); newBuffer.put(oldBuffer); newBuffer.clear(); } else { setIndex(newCapacity, newCapacity); } setByteBuffer(newBuffer); } return this; } // 建立DirectByteBuffer protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } private void setByteBuffer(ByteBuffer buffer) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { // 释放oldByteBuffer freeDirect(oldBuffer); } } this.buffer = buffer; tmpNioBuf = null; capacity = buffer.remaining(); }
3. 字节数组复制
咱们先看写操做的setBytes()方法,一样的,先进行参数校验,而后建立一个临时的ByteBuffer,这个ByteBuffer与 buffer 的 content 共用,往 tmpBuf 中写数据至关于往 buffer 中写数据。
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { // 参数校验 checkSrcIndex(index, length, srcIndex, src.length); // 建立一个临时的tmpBuf ByteBuffer tmpBuf = internalNioBuffer(); tmpBuf.clear().position(index).limit(index + length); tmpBuf.put(src, srcIndex, length); return this; } private ByteBuffer internalNioBuffer() { ByteBuffer tmpNioBuf = this.tmpNioBuf; if (tmpNioBuf == null) { // 令tempNioBuf和buffer共用同一个ByteBuffer内容,修改了tmpNioByteBuf,也等同于修改了buffer // 可是它们的position、limit都是独立的 this.tmpNioBuf = tmpNioBuf = buffer.duplicate(); } return tmpNioBuf; }
而后再来看读操做的getBytes()方法,一样的,先检查入参,而后建立出一个临时的 ByteBuffer,由这个临时的 ByteBuffer 作读操做。
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { checkReadableBytes(length); getBytes(readerIndex, dst, dstIndex, length, true); readerIndex += length; return this; } private void getBytes(int index, byte[] dst, int dstIndex, int length, boolean internal) { checkDstIndex(index, length, dstIndex, dst.length); if (dstIndex < 0 || dstIndex > dst.length - length) { throw new IndexOutOfBoundsException(String.format( "dstIndex: %d, length: %d (expected: range(0, %d))", dstIndex, length, dst.length)); } ByteBuffer tmpBuf; if (internal) { tmpBuf = internalNioBuffer(); } else { tmpBuf = buffer.duplicate(); } tmpBuf.clear().position(index).limit(index + length); tmpBuf.get(dst, dstIndex, length); }
4. Netty 的 ByteBuf 转换为 NIO 的 ByteNuffer
这里直接拿buufer的content建立一个新的ByteBuffer。
public ByteBuffer nioBuffer(int index, int length) { return ((ByteBuffer) buffer.duplicate().position(index).limit(index + length)).slice(); } public ByteBuffer duplicate() { return new DirectByteBuffer(this, this.markValue(), this.position(), this.limit(), this.capacity(), 0); }
7、PooledDirectByteBuf
PooledDirectByteBuf基于内存池实现,与UnpooledDirectByteBuf惟一的不一样就是缓冲区的分配和销毁策略。
1. 建立字节缓冲区
因为采用内存池实现,因此新建实例的时候不能使用 new 建立,而是从内存池中获取,而后设置引用计数器的值。
static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); buf.maxCapacity(maxCapacity); return buf; }
2. 复制新的字节缓冲区
一样的,复制新字节缓冲区时,也须要经过内存池建立一个字节缓冲区,而后执行复制操做。
public ByteBuf copy(int index, int length) { // 参数校验 checkIndex(index, length); // 从内存池中建立一个ByteBuf ByteBuf copy = alloc().directBuffer(length, maxCapacity()); // 复制操做 copy.writeBytes(this, index, length); return copy; }
8、PooledHeapByteBuf
PooledHeapByteBuf 与 PooledDirectByteBuf 不一样的地方在于建立对象时使用的是byte数组而不是ByteBuffer,这里咱们就不在讨论了。