在进行数据传输的过程当中,咱们常常会用到缓冲区。
在Java NIO 为咱们提供了原生的七种缓冲区实现,对应着Java 的七种基本类型。通常使用ByteBuffer较多。原生的Buffer虽然能知足咱们的平常使用,可是要进行复杂的应用的时候,确有点力不从心了,原生Buffer存在着如下缺点。所以Netty对其进行了封装,提供了更为友好的接口供咱们使用。java
Buffer只有一个位置标志位属性Position,咱们只能flip或者rewind方法来对position进行修改来处理数据的存取位置,一不当心就可能会致使错误。
Buffer只提供了存取、翻转、释放、标志、比较、批量移动等缓冲区的基本操做,咱们想使用高级的功能,就得本身手动进行封装及维护,使用很是不方便。
ByteBuf也是经过字节数组做为缓冲区来存取数据,经过外观模式聚合了JDK NIO元素的ByteBuffer,进行封装。
ByteBuf是经过readerIndex跟writerIndex两个位置指针来协助缓冲区的读写操做的。
在对象初始化的时候,readerIndex和writerIndex的值为0,随着读操做和写操做的进行,writerIndex和readerIndex都会增长,不过readerIndex不能超过writerIndex,在进行读取操做以后,0到readerIndex之间的空间会被视为discard,调用ByteBuf的discardReadBytes方法,能够对这部分空间进行释放重用,相似于ByteBuffer的compact操做,对缓冲区进行压缩。readerIndex到writerIndex的空间,至关于ByteBuffer的position到limit的空间,能够对其进行读取,WriterIndex到capacity的空间,则至关于ByteBuffer的limit到capacity的空间,是能够继续写入的。
readerIndex跟writerIndex让读写操做的位置指针分离,不须要对同一个位置指针进行调整,简化了缓冲区的读写操做。
一样,ByteBuf对读写操做进行了封装,提供了动态扩展的能力,当咱们对缓冲区进行写操做的时候,须要对剩余的可用空间进行校验,若是可用空间不足,同时要写入的字节数小于可写的最大字节数,会对缓冲区进行动态扩展,它会从新建立一个缓冲区,而后将之前的数据复制到新建立的缓冲区中,数组
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) { checkReadableBytes(length); getBytes(readerIndex, dst, dstIndex, length); readerIndex += length; return this; } protected final void checkReadableBytes(int minimumReadableBytes) { ensureAccessible(); if (minimumReadableBytes < 0) { throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)"); } if (readerIndex > writerIndex - minimumReadableBytes) { throw new IndexOutOfBoundsException(String.format( "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", readerIndex, minimumReadableBytes, writerIndex, this)); } }
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { ensureWritable(length); setBytes(writerIndex, src, srcIndex, length); writerIndex += length; return this; } public ByteBuf ensureWritable(int minWritableBytes) { if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); } if (minWritableBytes <= writableBytes()) { return this; } if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // Normalize the current capacity to the power of 2. int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes); // Adjust to the new capacity. capacity(newCapacity); return this; } private int calculateNewCapacity(int minNewCapacity) { final int maxCapacity = this.maxCapacity; final int threshold = 1048576 * 4; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } // If over threshold, do not double but just increase by threshold. if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } // Not over threshold. Double up to 4 MiB, starting from 64. int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); } //UnpooledHeapByteBuf的capacity实现 public ByteBuf capacity(int newCapacity) { ensureAccessible(); if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length; if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; }
public ByteBuf clear() { readerIndex = writerIndex = 0; return this; }
索引操做安全
缓冲区重用
能够经过discardReadByte方法去重用已经读取过的缓冲区。
首先对readerIndex进行判断:app
public ByteBuf discardReadBytes() { ensureAccessible(); if (readerIndex == 0) { return this; } if (readerIndex != writerIndex) { setBytes(0, this, readerIndex, writerIndex - readerIndex); writerIndex -= readerIndex; adjustMarkers(readerIndex); readerIndex = 0; } else { adjustMarkers(readerIndex); writerIndex = readerIndex = 0; } return this; } protected final void adjustMarkers(int decrement) { int markedReaderIndex = this.markedReaderIndex; if (markedReaderIndex <= decrement) { this.markedReaderIndex = 0; int markedWriterIndex = this.markedWriterIndex; if (markedWriterIndex <= decrement) { this.markedWriterIndex = 0; } else { this.markedWriterIndex = markedWriterIndex - decrement; } } else { this.markedReaderIndex = markedReaderIndex - decrement; markedWriterIndex -= decrement; } }
当咱们须要跳过某些不须要的字节的时候,能够调用skipBytes方法来跳过指定长度的字节来读取后面的数据。
首先对跳跃长度进行判断,若是跳跃长度小于0的话,会抛出IllegalArgumentException异常,或者跳跃长度大于当前缓冲区可读长度的话,会抛出IndexOutOfBoundsException异常。若是校验经过,新的readerindex为原readerIndex+length,若是新的readerIndex大于writerIndex的话,会抛出IndexOutOfBoundsException异常,不然就更新readerIndex。函数
public ByteBuf skipBytes(int length) { checkReadableBytes(length); int newReaderIndex = readerIndex + length; if (newReaderIndex > writerIndex) { throw new IndexOutOfBoundsException(String.format( "length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))", length, readerIndex, writerIndex)); } readerIndex = newReaderIndex; return this; }
AbstractReferenceCountedByteBuf是ByteBuf实现对引用进行计数的基类,用来跟踪对象的分配和销毁,实现自动内存回收。工具
public ByteBuf retain() { for (;;) { int refCnt = this.refCnt; if (refCnt == 0) { throw new IllegalReferenceCountException(0, 1); } if (refCnt == Integer.MAX_VALUE) { throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) { break; } } return this; } public final boolean release() { for (;;) { int refCnt = this.refCnt; if (refCnt == 0) { throw new IllegalReferenceCountException(0, -1); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) { if (refCnt == 1) { deallocate(); return true; } return false; } } }
UnpooledHeapByteBuf是一个非线程池实现的在堆内存进行内存分配的字节缓冲区,在每次IO操做的都会去建立一个UnpooledHeapByteBuf对象,若是频繁地对内存进行分配或者释放会对性能形成影响。源码分析
public ByteBuf capacity(int newCapacity) { ensureAccessible(); if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length; if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; }
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.length); System.arraycopy(src, srcIndex, array, index, length); return this; } protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) { checkIndex(index, length); if (srcIndex < 0 || srcIndex > srcCapacity - length) { throw new IndexOutOfBoundsException(String.format( "srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity)); } }
public ByteBuffer nioBuffer(int index, int length) { ensureAccessible(); return ByteBuffer.wrap(array, index, length).slice(); }
在Netty4以后加入内存池管理,经过内存池管理比以前ByteBuf的建立性能获得了极大提升。性能
PoolChunk主要负责内存块的分配及释放,chunk中的page会构建成一颗二叉树,默认状况下page的大小是8K,chunk的大小是2^11 page,即16M,构成了11层的二叉树,最下面一层的叶子节点有8192个,与page的数目同样,每一次内存的分配必须保证连续性,方便内存操做。每一个节点会记录本身在Memory Area的偏移地址,当一个节点表示的内存区域被分配以后,那么该节点会被标志为已分配,该节点的全部子节点的内存请求都会忽略。每次内存分配的都是8k(2^n)大小的内存块,当须要分配大小为chunkSize/(2^k)的内存端时,为了找到可用的内存段,会从第K层左边开始寻找可用节点。this
在内存分配中,为了可以集中管理内存的分配及释放,同时提供分配和释放内存的性能,通常都是会先预先分配一大块连续的内存,不须要重复频繁地进行内存操做,那一大块连续的内存就叫作memory Arena,而PoolArena是Netty的内存池实现类。
在Netty中,PoolArena是由多个Chunk组成的,而每一个Chunk则由多个Page组成。PoolArena是由Chunk和Page共同组织和管理的。线程
当对于小于一个Page的内存分配的时候,每一个Page会被划分为大小相等的内存块,它的大小是根据第一次申请内存分配的内存块大小来决定的。一个Page只能分配与第一次内存内存的内存块的大小相等的内存块,若是想要想要申请大小不想等的内存块,只能在新的Page上申请内存分配了。
Page中的存储区域的使用状况是经过一个long数组bitmap来维护的,每一位表示一个区域的占用状况。
static PooledHeapByteBuf newInstance(int maxCapacity) { PooledHeapByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; } final void reuse(int maxCapacity) { maxCapacity(maxCapacity); setRefCnt(1); setIndex0(0, 0); discardMarks(); }
public ByteBuf copy(int index, int length) { checkIndex(index, length); ByteBuf copy = alloc().directBuffer(length, maxCapacity()); copy.writeBytes(this, index, length); return copy; } public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); } // PooledByteBufAllocator protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); } //UnpooledByteBufAllocator protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { ByteBuf buf; if (PlatformDependent.hasUnsafe()) { buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
ByteBufHolder是ByteBuf的一个容器,它能够更方便地访问ByteBuf中的数据,在使用不一样的协议进行数据传输的时候,不一样的协议消息体包含的数据格式和字段不同,因此抽象一个ByteBufHolder对ByteBuf进行包装,不一样的子类有不一样的实现,使用者能够根据本身的须要进行实现。Netty提供了一个默认实现DefaultByteBufHolder。
ByteBufAllocator是字节缓冲区分配器,根据Netty字节缓冲区的实现不一样,分为两种不一样的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他们提供了不一样ByteBuf的分配方法。
CompositeByteBuf是一个虚拟的Buffer,它能够将多个ByteBuf组装为一个ByteBuf视图。
在Java NIO中,咱们有两种实现的方法
在Netty中,CompositeByByteBuf中维护了一个Component类型的集合。Component是ByteBuf的包装类,它聚合了ByteBuf.维护在集合中的位置偏移量等信息。通常状况下,咱们应该使用ByteBufAllocator.compositeBuffer()和Unpooled.wrappedBuffer(ByteBuf...)方法来建立CompositeByteBuf,而不是直接经过构造函数去实例化一个CompositeByteBuf对象。
private int addComponent0(int cIndex, ByteBuf buffer) { checkComponentIndex(cIndex); if (buffer == null) { throw new NullPointerException("buffer"); } int readableBytes = buffer.readableBytes(); // No need to consolidate - just add a component to the list. Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice()); if (cIndex == components.size()) { components.add(c); if (cIndex == 0) { c.endOffset = readableBytes; } else { Component prev = components.get(cIndex - 1); c.offset = prev.endOffset; c.endOffset = c.offset + readableBytes; } } else { components.add(cIndex, c); if (readableBytes != 0) { updateComponentOffsets(cIndex); } } return cIndex; } private void consolidateIfNeeded() { final int numComponents = components.size(); if (numComponents > maxNumComponents) { final int capacity = components.get(numComponents - 1).endOffset; ByteBuf consolidated = allocBuffer(capacity); for (int i = 0; i < numComponents; i ++) { Component c = components.get(i); ByteBuf b = c.buf; consolidated.writeBytes(b); c.freeIfNecessary(); } Component c = new Component(consolidated); c.endOffset = c.length; components.clear(); components.add(c); } } public CompositeByteBuf removeComponent(int cIndex) { checkComponentIndex(cIndex); Component comp = components.remove(cIndex); comp.freeIfNecessary(); if (comp.length > 0) { updateComponentOffsets(cIndex); } return this; } private static final class Component { final ByteBuf buf; final int length; int offset; int endOffset; Component(ByteBuf buf) { this.buf = buf; length = buf.readableBytes(); } void freeIfNecessary() { buf.release(); // We should not get a NPE here. If so, it must be a bug. } }
ByteBufUtil是ByteBuf的工具类,它提供了一系列的静态方法来操做ByteBuf。