网络数据的基本单位老是字节。Java NIO 提供了 ByteBuffer 做为它 的字节容器,可是这个类使用起来过于复杂,并且也有些繁琐。java
Netty 的 ByteBuffer 替代品是 ByteBuf,一个强大的实现,既解决了 JDK API 的局限性, 又为网络应用程序的开发者提供了更好的 API。算法
Netty 的数据处理 API 经过两个组件暴露——abstract class ByteBuf 和 interface ByteBufHolder。后端
下面是一些 ByteBuf API 的优势:数组
其余类可用于管理 ByteBuf 实例的分配,以及执行各类针对于数据容器自己和它所持有的 数据的操做。咱们将在仔细研究 ByteBuf 和 ByteBufHolder 时探讨这些特性。安全
由于全部的网络通讯都涉及字节序列的移动,因此高效易用的数据结构明显是必不可少的。 Netty 的 ByteBuf 实现知足并超越了这些需求。让咱们首先来看看它是如何经过使用不一样的索引 来简化对它所包含的数据的访问的吧。网络
ByteBuf 维护了两个不一样的索引:一个用于读取,一个用于写入。当你从 ByteBuf 读取时, 它的 readerIndex 将会被递增已经被读取的字节数。一样地,当你写入 ByteBuf 时,它的 writerIndex 也会被递增。图 5-1 展现了一个空 ByteBuf 的布局结构和状态。数据结构
要了解这些索引两两之间的关系,请考虑一下,若是打算读取字节直到 readerIndex 达到 和 writerIndex 一样的值时会发生什么。在那时,你将会到达“能够读取的”数据的末尾。就 如同试图读取超出数组末尾的数据同样,试图读取超出该点的数据将会触发一个 IndexOutOfBoundsException。多线程
ByteBuf是一个抽象类,内部所有是抽象的函数接口,AbstractByteBuf这个抽象类基本实现了ByteBuf,下面咱们经过分析AbstractByteBuf里面的实现来分析ByteBuf的工做原理。并发
ByteBuf都是基于字节序列的,相似于一个字节数组。在AbstractByteBuf里面定义了下面5个变量:ide
//源码 int readerIndex; //读索引 int writerIndex; //写索引 private int markedReaderIndex;//标记读索引 private int markedWriterIndex;//标记写索引 private int maxCapacity;//缓冲区的最大容量
ByteBuf 与JDK中的 ByteBuffer 的最大区别之一就是:
(1)netty的ByteBuf采用了读/写索引分离,一个初始化的ByteBuf的readerIndex和writerIndex都处于0位置。
(2)当读索引和写索引处于同一位置时,若是咱们继续读取,就会抛出异常IndexOutOfBoundsException。
(3)对于ByteBuf的任何读写操做都会分别单独的维护读索引和写索引。maxCapacity最大容量默认的限制就是Integer.MAX_VALUE。
JDK中的Buffer的类型 有heapBuffer和directBuffer两种类型,可是在netty中除了heap和direct类型外,还有composite Buffer(复合缓冲区类型)。
这是最经常使用的类型,ByteBuf将数据存储在JVM的堆空间,经过将数据存储在数组中实现的。
1)堆缓冲的优势是:因为数据存储在JVM的堆中能够快速建立和快速释放,而且提供了数组的直接快速访问的方法。
2)堆缓冲缺点是:每次读写数据都要先将数据拷贝到直接缓冲区再进行传递。
这种模式被称为支撑数组 (backing array),它能在没有使用池化的状况下提供快速的分配和释放。这种方式,如代码清单 5-1 所示,很是适合于有遗留的数据须要处理的状况。
ByteBuf heapBuf = ...; if (heapBuf.hasArray()) { byte[] array = heapBuf.array(); int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); int length = heapBuf.readableBytes(); handleArray(array, offset, length); }
NIO 在 JDK 1.4 中引入的 ByteBuffer 类容许 JVM 实现经过本地调用来分配内存。这主要是为了不在每次调用本地 I/O 操做以前(或者以后)将缓冲区的内容复 制到一个中间缓冲区(或者从中间缓冲区把内容复制到缓冲区)。
Direct Buffer在堆以外直接分配内存,直接缓冲区不会占用堆的容量。事实上,在经过套接字发送它以前,JVM将会在内部把你的缓冲 区复制到一个直接缓冲区中。因此若是使用直接缓冲区能够节约一次拷贝。
(1)Direct Buffer的优势是:在使用Socket传递数据时性能很好,因为数据直接在内存中,不存在从JVM拷贝数据到直接缓冲区的过程,性能好。
(2)缺点是:相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。若是你 正在处理遗留代码,你也可能会遇到另一个缺点:由于数据不是在堆上,因此你不得不进行一 次复制。
虽然netty的Direct Buffer有这个缺点,可是netty经过内存池来解决这个问题。直接缓冲池不支持数组访问数据,但能够经过间接的方式访问数据数组:
ByteBuf directBuf = ...; if (!directBuf.hasArray()) { int length = directBuf.readableBytes(); byte[] array = new byte[length]; directBuf.getBytes(directBuf.readerIndex(), array); handleArray(array, 0, length); }
不过对于一些IO通讯线程中读写缓冲时建议使用DirectByteBuffer,由于这涉及到大量的IO数据读写。对于后端的业务消息的编解码模块使用HeapByteBuffer。
第三种也是最后一种模式使用的是复合缓冲区,它为多个 ByteBuf 提供一个聚合视图。在 这里你能够根据须要添加或者删除 ByteBuf 实例,这是一个 JDK 的 ByteBuffer 实现彻底缺 失的特性。
Netty 经过一个 ByteBuf 子类——CompositeByteBuf——实现了这个模式,它提供了一 个将多个缓冲区表示为单个合并缓冲区的虚拟表示
Netty提供了Composite ByteBuf来处理复合缓冲区。例如:一条消息由Header和Body组成,将header和body组装成一条消息发送出去。下图显示了Composite ByteBuf组成header和body:
若是使用的是JDK的ByteBuffer就不能简单的实现,只能经过建立数组或则新的ByteBuffer,再将里面的内容复制到新的ByteBuffer中,下面给出了一个CompositeByteBuf的使用示例:
//组合缓冲区 CompositeByteBuf compBuf = Unpooled.compositeBuffer(); //堆缓冲区 ByteBuf heapBuf = Unpooled.buffer(8); //直接缓冲区 ByteBuf directBuf = Unpooled.directBuffer(16); //添加ByteBuf到CompositeByteBuf compBuf.addComponents(heapBuf, directBuf); //删除第一个ByteBuf compBuf.removeComponent(0); Iterator<ByteBuf> iter = compBuf.iterator(); while(iter.hasNext()){ System.out.println(iter.next().toString()); } //使用数组访问数据 if(!compBuf.hasArray()){ int len = compBuf.readableBytes(); byte[] arr = new byte[len]; compBuf.getBytes(0, arr); }
Netty使用了CompositeByteBuf来优化套接字的I/O操做,尽量地消除了 由JDK的缓冲区实现所致使的性能以及内存使用率的惩罚。( 这尤为适用于 JDK 所使用的一种称为分散/收集 I/O(Scatter/Gather I/O)的技术,定义为“一种输入和 输出的方法,其中,单个系统调用从单个数据流写到一组缓冲区中,或者,从单个数据源读到一组缓冲 区中”。《Linux System Programming》,做者 Robert Love(O’Reilly, 2007)) 这种优化发生在Netty的核心代码中, 所以不会被暴露出来,可是你应该知道它所带来的影响。
ByteBuf提供读/写索引,从0开始的索引,第一个字节索引是0,最后一个字节的索引是capacity-1,下面给出一个示例遍历ByteBuf的字节:
public static void main(String[] args) { //建立一个16字节的buffer,这里默认是建立heap buffer ByteBuf buf = Unpooled.buffer(16); //写数据到buffer for(int i=0; i<16; i++){ buf.writeByte(i+1); } //读数据 for(int i=0; i<buf.capacity(); i++){ System.out.print(buf.getByte(i)+", "); } } /***output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, */
这里有一点须要注意的是:经过那些须要一个索引值参数的方法(getByte(i))之一索引访问byte时不会改变真实的读索引和写索引,咱们能够经过ByteBuf的readerIndex()或则writerIndex()函数来分别推动读索引和写索引。
@Override public ByteBuf writeByte(int value) { ensureAccessible();//检验是否能够写入 ensureWritable0(1); _setByte(writerIndex++, value);//这里写索引自增了 return this; } @Override public byte readByte() { checkReadableBytes0(1); int i = readerIndex; byte b = _getByte(i); readerIndex = i + 1;//这里读索引自增了 return b; }
虽然 ByteBuf 同时具备读索引和写索引,可是 JDK 的 ByteBuffer 却只有一个索引,这 也就是为何必须调用 flip()方法来在读模式和写模式之间进行切换的缘由。
首先图 5-3 展现了 ByteBuf 是如何被它的两个索引划分红 3 个区域的
对于已经读过的字节,咱们须要回收,经过调用ByteBuf.discardReadBytes()来回收已经读取过的字节,discardReadBytes()将回收从索引0到readerIndex之间的字节。调用discardReadBytes()方法以后会变成以下图所示;
虽然你可能会倾向于频繁地调用 discardReadBytes()方法以确保可写分段的最大化,可是 请注意,很明显discardReadBytes()函数极可能会致使内存的复制,它须要移动ByteBuf中可读字节到开始位置,因此该操做会致使时间开销。说白了也就是时间换空间。
ByteBuf 的可读字节分段存储了实际数据。新分配的、包装的或者复制的缓冲区的默认的 readerIndex 值为 0。任何名称以 read 或者 skip 开头的操做都将检索或者跳过位于当前 readerIndex 的数据,而且将它增长已读字节数。
当咱们读取字节的时候,通常要先判断buffer中是否有字节可读,这时候能够调用isReadable()函数来判断:源码以下:
@Override public boolean isReadable() { return writerIndex > readerIndex; }
可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。新分配的缓冲区的 writerIndex 的默认值为 0。任何名称以 write 开头的操做都将从当前的 writerIndex 处 开始写数据,并将它增长已经写入的字节数。若是写操做的目标也是 ByteBuf,而且没有指定 源索引的值,则源缓冲区的 readerIndex 也一样会被增长相同的大小。
其实也就是判断 读索引是否小于写索引 来判断是否还能够读取字节。在判断是否可写时也是判断写索引是否小于最大容量来判断。
@Override public boolean isWritable() { return capacity() > writerIndex; }
清除ByteBuf来讲,有两种形式,第一种是clear()函数:源码以下:
@Override public ByteBuf clear() { readerIndex = writerIndex = 0; return this; }
很明显这种方式并无真实的清除缓冲区中的数据,而只是把读/写索引值从新都置为0了,这与discardReadBytes()方法有很大的区别。
从源码可知,每一个ByteBuf有两个标注索引,
private int markedReaderIndex;//标记读索引 private int markedWriterIndex;//标记写索引
能够经过重置方法返回上次标记的索引的位置。
调用duplicate()、slice()、slice(int index, int length)等方法能够建立一个现有缓冲区的视图(现有缓冲区与原有缓冲区是指向相同内存)。衍生的缓冲区有独立的readerIndex和writerIndex和标记索引。若是须要现有的缓冲区的全新副本,可使用copy()得到。
前面咱们也讲过了,ByteBuf主要有三种类型,heap、direct和composite类型,下面介绍建立这三种Buffer的方法:
(1)经过ByteBufAllocator这个接口来建立ByteBuf,这个接口能够建立上面的三种Buffer,通常都是经过channel的alloc()接口获取。
(2)经过Unpooled类里面的静态方法,建立Buffer
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
ByteBuf heapBuf = Unpooled.buffer(8); ByteBuf directBuf = Unpooled.directBuffer(16);
还有一点就是,ByteBuf里面的数据都是保存在字节数组里面的:
byte[] array;
先来讲说ByteBuffer的缺点:
(1)下面是NIO中ByteBuffer存储字节的字节数组的定义,咱们能够知道ByteBuffer的字节数组是被定义成final的,也就是长度固定。一旦分配完成就不能扩容和收缩,灵活性低,并且当待存储的对象字节很大可能出现数组越界,用户使用起来稍不当心就可能出现异常。若是要避免越界,在存储以前就要只要需求字节大小,若是buffer的空间不够就建立一个更大的新的ByteBuffer,再将以前的Buffer中数据复制过去,这样的效率是奇低的。
final byte[] hb;// Non-null only for heap buffers
(2)ByteBuffer只用了一个position指针来标识位置,读写模式切换时须要调用flip()函数和rewind()函数,使用起来须要很是当心,否则很容易出错误。
下面说说对应的ByteBuf的优势:
(1)ByteBuf是吸收ByteBuffer的缺点以后从新设计,存储字节的数组是动态的,最大是Integer.MAX_VALUE。这里的动态性存在write操做中,write时得知buffer不够时,会自动扩容。
(2) ByteBuf的读写索引分离,使用起来十分方便。此外ByteBuf还新增了不少方便实用的功能。
看类名咱们就能够知道,该类主要是对引用进行计数,有点相似于JVM中判断对象是否可回收的引用计数算法。这个类主要是根据ByteBuf的引用次数判断ByteBuf是否可被自动回收。下面来看看源码:
成员变量
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater; //静态代码段初始化refCntUpdater static { AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater = PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); if (updater == null) { updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); } refCntUpdater = updater; } private volatile int refCnt = 1;
首先咱们能看到refCntUpdater这个变量,这是一个原子变量类AtomicIntegerFieldUpdater,她是一个静态变量,并且是在static代码段里面实例化的,这说明这个类是单例的。这个类的主要做用是以原子的方式对成员变量进行更新操做以实现线程安全(这里线程安全的保证也就是CAS+volatile)。
而后是定义了refCnt变量,用于跟踪对象的引用次数,使用volatile修饰解决原子变量可视性问题。
对象引用计数器
那么,对对象的引用计数与释放是怎么实现的呢?核心就是两个函数:
//计数加1 retain(); //计数减一 release();
下面分析这两个函数源码:
每调用一次retain()函数一次,引用计数器就会加一,因为可能存在多线程并发使用的情景,因此必须保证累加操做是线程安全的,那么是怎么保证的呢?咱们来看一下源码:
public ByteBuf retain() { return retain0(1); } public ByteBuf retain(int increment) { return retain0(checkPositive(increment, "increment")); } /** 最后都是调用这个函数。 */ private ByteBuf retain0(int increment) { for (;;) { int refCnt = this.refCnt; final int nextCnt = refCnt + increment; // Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow. if (nextCnt <= increment) { throw new IllegalReferenceCountException(refCnt, increment); } if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) { break; } } return this; }
在retain0()函数中, 经过for(;;)来实现了自旋锁。经过自旋来对引用计数器refCnt执行加1操做。这里的加一操做是经过原子变量refCntUpdater的compareAndSet(this, refCnt, nextCnt)方法实现的,这个经过硬件级别的CAS保证了原子性,若是修改失败了就会不停的自旋,直到修改为功为止。
下面再看看释放的过程:release()函数:
private boolean release0(int decrement) { for (;;) { int refCnt = this.refCnt; if (refCnt < decrement) { throw new IllegalReferenceCountException(refCnt, -decrement); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) { if (refCnt == decrement) { deallocate(); return true; } return false; } } }
这里基本和retain()函数同样,也是经过自旋和CAS保证执行的正确的将计数器减一。这里须要注意的是当refCnt == decrement
也就是引用对象不可达时,就须要调用deallocate();方法来释放ByteBuf对象。
从类名就能够知道UnpooledHeapByteBuf 是基于堆内存的字节缓冲区,没有基于对象池实现,这意味着每次的IO读写都会建立一个UnpooledHeapByteBuf对象,会形成必定的性能影响,可是也不容易出现内存管理的问题。
成员变量
有三个成员变量,各自的含义见注释。
//缓冲区分配器,用于UnpooledHeapByteBuf的内存分配。在UnpooledHeapByteBuf构造器中实例化 private final ByteBufAllocator alloc; //字节数组做为缓冲区 byte[] array; //实现ByteBuf与NIO中ByteBuffer的转换 private ByteBuffer tmpNioBuf;
动态扩展缓冲区
在说道AbstractByteBuf的时候,ByteBuf是能够自动扩展缓冲区大小的,这里咱们分析一下在UnpooledHeapByteBuf中是怎么实现的。
public ByteBuf capacity(int newCapacity) { ensureAccessible(); if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length; if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; }
里面的实现并不复杂:
(1)首先获取本来的容量oldCapacity;
(2)若是新需求容量大于oldCapacity,以新的容量newCapacity建立字节数组,将原来的字节数组内容经过调用System.arraycopy(array, 0, newArray, 0, array.length);复制过去,并将新的字节数组设为ByteBuf的字节数组。
(3)若是新需求容量小于oldCapacity就不须要动态扩展,可是须要截取出一段新缓冲区。
PooledDirectByteBuf基于内存池实现的,具体的内存池的实现原理,比较复杂,我没分析清楚,具体的只知道,内存池就是一片提早申请的内存,当须要ByteBuf的时候,就从内存池中申请一片内存,这样效率比较高。
PooledDirectByteBuf和UnPooledDirectByteBuf基本同样,惟一不一样的就是内存分配策略。
建立字节缓冲区实例
因为PooledDirectByteBuf基于内存池实现的,因此不能经过new关键字直接实例化一个对象,而是直接从内存池中获取,而后设置引用计数器的值。看下源码:
static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; }
经过RECYCLER对象的get()函数从内存池获取PooledDirectByteBuf对象。而后在buf.reuse(maxCapacity);
函数里面设置引用计数器为1。