当咱们进行数据传输的时候,每每须要使用到缓冲区,经常使用的缓冲区就是JDK NIO类库提供的java.nio.Buffer。java
实际上,7种基础类型(Boolean除外)都有本身的缓冲区实现,对于NIO编程而言,咱们主要使用的是ByteBuffer。从功能角度而言,ByteBuffer彻底能够知足NIO编程的须要,可是因为NIO编程的复杂性,ByteBuffer也有其局限性,它的主要缺点以下。编程
(1)ByteBuffer长度固定,一旦分配完成,它的容量不能动态扩展和收缩,当须要编码的POJO对象大于ByteBuffer的容量时,会发生索引越界异常;后端
(2)ByteBuffer只有一个标识位置的指针position,读写的时候须要手工调用flip()和rewind()等,使用者必须当心谨慎地处理这些API,不然很容易致使程序处理失败;api
(3)ByteBuffer的API功能有限,一些高级和实用的特性它不支持,须要使用者本身编程实现。数组
为了弥补这些不足,Netty提供了本身的ByteBuffer实现——ByteBuf。缓存
网络数据的基本单位老是字节。Java NIO 提供了ByteBuffer 做为它的字节容器,可是这个类使用起来过于复杂,并且也有些繁琐。cookie
Netty 的ByteBuffer 替代品是ByteBuf,一个强大的实现,既解决了JDK API 的局限性,又为网络应用程序的开发者提供了更好的API。在本章中咱们将会说明和JDK 的ByteBuffer 相比,ByteBuf 的卓越功能性和灵活性。这
也将有助于更好地理解Netty 数据处理的通常方式。网络
Netty 的数据处理API 经过两个组件暴露——abstract class ByteBuf 和interface ByteBufHolder。
下面是一些ByteBuf API 的优势:数据结构
其余类可用于管理ByteBuf 实例的分配,以及执行各类针对于数据容器自己和它所持有的数据的操做。咱们将在仔细研究ByteBuf 和ByteBufHolder 时探讨这些特性。dom
一般状况下,当咱们对ByteBuffer进行put操做的时候,若是缓冲区剩余可写空间不够,就会发生BufferOverflowException异常。为了不发生这个问题,一般在进行put操做的时候会对剩余可用空间进行校验,若是剩余空间不足,须要从新建立一个新的ByteBuffer,并将以前的ByteBuffer复制到新建立的ByteBuffer中,最后释放老的ByteBuffer,代码示例以下。
public ByteBuffer put(ByteBuffer src) { if (src instanceof HeapByteBuffer) { if (src == this) throw new IllegalArgumentException(); HeapByteBuffer sb = (HeapByteBuffer)src; int n = sb.remaining(); if (n > remaining()) throw new BufferOverflowException(); System.arraycopy(sb.hb, sb.ix(sb.position()), hb, ix(position()), n); sb.position(sb.position() + n); position(position() + n); } else if (src.isDirect()) { int n = src.remaining(); if (n > remaining()) throw new BufferOverflowException(); src.get(hb, ix(position()), n); position(position() + n); } else { super.put(src); } return this; }
由于全部的网络通讯都涉及字节序列的移动,因此高效易用的数据结构明显是必不可少的。Netty 的ByteBuf 实现知足并超越了这些需求。让咱们首先来看看它是如何经过使用不一样的索引来简化对它所包含的数据的访问的吧。
ByteBuf 维护了两个不一样的索引:一个用于读取,一个用于写入。当你从ByteBuf 读取时,它的readerIndex 将会被递增已经被读取的字节数。一样地,当你写入ByteBuf 时,它的writerIndex 也会被递增。图5-1 展现了一个空ByteBuf 的布局结构和状态(一个读索引和写索引都设置为0 的16 字节ByteBuf)。
能够看到,正常状况下,一个ByteBuf被两个索引分红三部分。
readerIndex 达到和writerIndex 位于同一位置,表示咱们到达"能够读取的"数据的末尾。就如同试图读取超出数组末尾的数据同样,试图读取超出该点的数据将会触发一个IndexOutOfBoundsException。
名称以read 或者write 开头的ByteBuf 方法,将会推动其对应的索引,而名称以set 或者get 开头的操做则不会。后面的这些方法将在做为一个参数传入的一个相对索引上执行操做。
和ByteBuffer 同样,ByteBuf也是一个缓存区类,它有三种缓存区类型:
最经常使用的ByteBuf 模式是将数据存储在JVM 的堆空间中,能够被jvm自动回收。这种模式被称为支撑数组(backing array),它能在没有使用池化的状况下提供快速的分配和释放。这种方式,如代码清单5-1 所示,很是适合于有遗留的数据须要处理的状况。
ByteBuf heapBuf = ...; //检查ByteBuf 是否有一个支撑数组 if (heapBuf.hasArray()) { //若是有,则获取对该数组的引用 byte[] array = heapBuf.array(); //计算第一个字节的偏移量。 int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); //得到可读字节数 int length = heapBuf.readableBytes(); //使用数组、偏移量和长度做为参数调用你的方法 handleArray(array, offset, length); }
当hasArray()方法返回false 时,尝试访问支撑数组将触发一个UnsupportedOperationException。这个模式相似于JDK 的ByteBuffer 的用法。
堆缓存区的缺点在于若是进行Socket的I/O读写,须要额外进行一次内存复制,将堆内存对应的缓冲区复制到内核的channel中,性能会有必定的降低。
咱们先来了解一下什么是直接缓存区:
咱们知道java的ByteBuffer类型就有直接和非直接缓存区这两种类型。
他们的区别以下:
咱们直接看代码:
ByteBuf directBuf = ...; //检查ByteBuf 是否由数组支撑。若是不是,则这是一个直接缓冲区 if (!directBuf.hasArray()) { //获取可读字节数 int length = directBuf.readableBytes(); //分配一个新的数组来保存具备该长度的字节数据 byte[] array = new byte[length]; //将字节复制到该数组 directBuf.getBytes(directBuf.readerIndex(), array); //使用数组、偏移量和长度做为参数调用你的方法 handleArray(array, 0, length); }
第三种也是最后一种模式使用的是复合缓冲区,它为多个ByteBuf 提供一个聚合视图。在这里你能够根据须要添加或者删除ByteBuf 实例,这是一个JDK 的ByteBuffer 没有的特性。
Netty 经过一个ByteBuf 子类——CompositeByteBuf——实现了这个模式,它提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示。
为了举例说明,让咱们考虑一下一个由两部分——头部和主体——组成的将经过HTTP 协议传输的消息。这两部分由应用程序的不一样模块产生,将会在消息被发送的时候组装。该应用程序能够选择为多个消息重用相同的消息主体。当这种状况发生时,对于每一个消息都将会建立一个新的头部。
由于咱们不想为每一个消息都从新分配这两个缓冲区,因此使用CompositeByteBuf 是一个完美的选择。它在消除了不必的复制的同时,暴露了通用的ByteBuf API。图5-2 展现了生成的消息布局。
代码清单5-3 展现了如何经过使用JDK 的ByteBuffer 来实现这一需求。建立了一个包含两个ByteBuffer 的数组用来保存这些消息组件,同时建立了第三个ByteBuffer 用来保存全部这些数据的副本。
// Use an array to hold the message parts ByteBuffer[] message = new ByteBuffer[] { header, body }; // Create a new ByteBuffer and use copy to merge the header and body ByteBuffer message2 = ByteBuffer.allocate(header.remaining() + body.remaining()); message2.put(header); message2.put(body); message 2.flip();
分配和复制操做,以及伴随着对数组管理的须要,使得这个版本的实现效率低下并且笨拙。
代码清单5-4 展现了一个使用了CompositeByteBuf 的版本。
CompositeByteBuf messageBuf = Unpooled.compositeBuffer(); //将ByteBuf 实例追加到CompositeByteBuf ByteBuf headerBuf = ...; // can be backing or direct ByteBuf bodyBuf = ...; // can be backing or direct messageBuf.addComponents(headerBuf, bodyBuf); ..... //删除位于索引位置为 0(第一个组件)的ByteBuf messageBuf.removeComponent(0); // remove the header //循环遍历全部的ByteBuf 实例 for (ByteBuf buf : messageBuf) { System.out.println(buf.toString()); }
CompositeByteBuf 可能不支持访问其支撑数组,所以访问CompositeByteBuf 中的数据相似于(访问)直接缓冲区的模式,如代码清单5-5 所示。
CompositeByteBuf compBuf = Unpooled.compositeBuffer(); //得到可读字节数 int length = compBuf.readableBytes(); //分配一个具备可读字节数长度的新数组 byte[] array = new byte[length]; //将字节读到该数组中 compBuf.getBytes(compBuf.readerIndex(), array); //使用偏移量和长度做为参数使用该数组 handleArray(array, 0, array.length);
经验代表:ByteBuf的最佳实践是在I/O通讯线程的读写缓冲区使用DirectByteBuf,后端业务消息的编解码模块使用HeapByteBuf
ByteBuf 提供了许多超出基本读、写操做的方法用于修改它的数据。在接下来的章节中,咱们将会讨论这些中最重要的部分。
如同在普通的Java 字节数组中同样,ByteBuf 的索引是从零开始的:第一个字节的索引是0,最后一个字节的索引老是capacity() - 1。代码清单5-6 代表,对存储机制的封装使得遍历ByteBuf 的内容很是简单。
ByteBuf buffer = ...; for (int i = 0; i < buffer.capacity(); i++) { byte b = buffer.getByte(i); System.out.println((char)b); }
须要注意的是,使用那些须要一个索引值参数的方法来访问数据既不会改变readerIndex 也不会改变writerIndex。若是有须要,也能够经过调用readerIndex(index)或者writerIndex(index)来手动移动这二者。
在ByteBuf中有多种能够用来肯定指定值的索引的方法。最简单的是使用indexOf()方法。较复杂的查找能够经过那些须要一个ByteBufProcessor做为参数的方法达成。这个接口只定义了一个方法:
boolean process(byte value)
它将检查输入值是不是正在查找的值。
ByteBufProcessor针对一些常见的值定义了许多便利的枚举。假设你的应用程序须要和所谓的包含有以NULL结尾的内容的Flash套接字,能够调用:
forEach Byte(ByteBufProcessor.FIND_NUL)
如代码清单展现了一个查找回车符(r)的索引的例子。:
ByteBuf buffer = ...; int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);
正如咱们所提到过的,有两种类别的读/写操做:
表5-1 列举了最经常使用的get()方法。完整列表请参考对应的API 文档。
这里面getBytes方法咱们须要强调一下,好比buf.getBytes(buf.readerIndex(), array);表示将从buf实例的readerIndex为起点的数据传入指定的目的地(一个数组中)。
如今,让咱们研究一下read()操做,其做用于当前的readerIndex 或writerIndex。这些方法将用于从ByteBuf 中读取数据,如同它是一个流。表5-3 展现了最经常使用的方法。
几乎每一个read()方法都有对应的write()方法,用于将数据追加到ByteBuf 中。注意,表5-4 中所列出的这些方法的参数是须要写入的值,而不是索引值
正如咱们以前看过的这张图:
在上图中标记为可丢弃字节的分段包含了已经被读过的字节。经过调用discardReadBytes()方法,能够丢弃它们并回收空间。这个分段的初始大小为0,存储在readerIndex 中,会随着read 操做的执行而增长(get*操做不会移动readerIndex)。
上图展现了下图中所展现的缓冲区上调用discardReadBytes()方法后的结果。能够看到,可丢弃字节分段中的空间已经变为可写的了。注意,在调用discardReadBytes()以后,对可写分段的内容并无任何的保证。
虽然你可能会倾向于频繁地调用discardReadBytes()方法以确保可写分段的最大化,可是请注意,这将极有可能会致使内存复制,由于可读字节(图中标记为CONTENT 的部分)必须被移动到缓冲区的开始位置。咱们建议只在有真正须要的时候才这样作,例如,当内存很是宝贵的时候。
ByteBuf 的可读字节分段存储了实际数据。新分配的、包装的或者复制的缓冲区的默认的readerIndex 值为0。任何名称以read 或者skip 开头的操做都将检索或者跳过位于当前readerIndex 的数据,而且将它增长已读字节数。
如下代码清单展现了如何读取全部能够读的字节。
ByteBuf buffer = ...; while (buffer.isReadable()) { System.out.println(buffer.readByte()); }
可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。新分配的缓冲区的writerIndex 的默认值为0。任何名称以write 开头的操做都将从当前的writerIndex 处开始写数据,并将它增长已经写入的字节数。若是尝试往目标写入超过目标容量的数据,将会引起一个IndexOutOfBoundException。
如下代码清单是一个用随机整数值填充缓冲区,直到它空间不足为止的例子。writeableBytes()方法在这里被用来肯定该缓冲区中是否还有足够的空间。
// Fills the writable bytes of a buffer with random integers. ByteBuf buffer = ...; //由于一个int为四个字节 while (buffer.writableBytes() >= 4) { buffer.writeInt(random.nextInt()); }
JDK 的InputStream 定义了mark(int readlimit)和reset()方法,这些方法分别被用来将流中的当前位置标记为指定的值,以及将流重置到该位置。
一样,能够经过调用markReaderIndex()、markWriterIndex()、resetWriterIndex()和resetReaderIndex()来标记和重置ByteBuf 的readerIndex 和writerIndex。这些和InputStream 上的调用相似,只是没有readlimit 参数来指定标记何时失效。
也能够经过调用readerIndex(int)或者writerIndex(int)来将索引移动到指定位置。试图将任何一个索引设置到一个无效的位置都将致使一个IndexOutOfBoundsException。能够经过调用clear()方法来将readerIndex 和writerIndex 都设置为0。注意,这并不会清除内存中的内容。
调用clear()比调用discardReadBytes()轻量得多,由于它将只是重置索引而不会复制任何的内存。
派生缓冲区为ByteBuf 提供了以专门的方式来呈现该ByteBuf内容的视图。这类视图能够经过如下方法被建立的:
每一个这些方法都将返回一个新的ByteBuf 实例,它具备本身的读索引、写索引和标记索引。其内部存储和JDK 的ByteBuffer同样也是共享的。这使得派生缓冲区的建立成本是很低廉的,可是这也意味着,若是你修改了它的内容,也同时修改了其对应的源实例,因此要当心。
Charset utf8 = Charset.forName("UTF-8"); //建立一个ByteBuf "Netty in Action" ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); //建立该ByteBuf 从索引0 开始到索引15结束的一个新切片 ByteBuf sliced = buf.slice(0, 15); System.out.println(sliced.toString(utf8)); //更新索引0 处的字节 buf.setByte(0, (byte)'J'); //将会成功,由于数据是共享的,对其中一个所作的更改对另一个也是可见的 assert buf.getByte(0) == sliced.getByte(0);
若是须要一个现有缓冲区的真实副本,请使用copy()或者copy(int, int)方法。不一样于派生缓冲区,由这个调用所返回的ByteBuf 拥有独立的数据副本。
Charset utf8 = Charset.forName("UTF-8"); ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); ByteBuf copy = buf.copy(0, 15); System.out.println(copy.toString(utf8)); buf.setByte(0, (byte) 'J'); //将会成功,由于数据不是共享的 assert buf.getByte(0) != copy.getByte(0);
若是咱们不修改原始ByteBuf 的切片或者副本,这两种场景是相同的。只要有可能,咱们尽可能使用slice()方法来避免复制内存的开销。
咱们常常发现,除了实际的数据负载以外,咱们还须要存储各类属性值。HTTP 响应即是一个很好的例子,除了表示为字节的内容,还包括状态码、cookie 等。
为了处理这种常见的用例,Netty 提供了ByteBufHolder,咱们能够看看他的默认实现:
能够看出,它主要就是封装了一个ByteBuf对象,以及对这个对象的一些操做api。如今假如咱们要构造一个HTTP响应的对象,那么就能够在继承ByteBufHolder的基础上在拓展其余的好比状态码、cookie等字段,达到本身的目的。
它经常使用的api以下:
在这一节中,咱们将描述管理ByteBuf 实例的不一样方式。
为了下降分配和释放内存的开销,Netty 经过interface ByteBufAllocator 实现了(ByteBuf 的)池化,它能够用来分配咱们所描述过的任意类型的ByteBuf 实例。
关于ioBuffer,默认地,当所运行的环境具备sun.misc.Unsafe 支持时,返回基于直接内存存储的ByteBuf,不然返回基于堆内存存储的ByteBuf;当指定使用PreferHeapByteBufAllocator 时,则只会返回基于堆内存存储的ByteBuf。
咱们能够经过Channel(每一个均可以有一个不一样的ByteBufAllocator 实例)或者绑定到ChannelHandler 的ChannelHandlerContext 获取一个到ByteBufAllocator 的引用。代码清单5-14 说明了这两种方法。
获取一个到ByteBufAllocator 的引用
//从Channel 获取一个到ByteBufAllocator 的引用 Channel channel = ...; ByteBufAllocator allocator = channel.alloc(); .... //从ChannelHandlerContext 获取一个到ByteBufAllocator 的引用 ChannelHandlerContext ctx = ...; ByteBufAllocator allocator2 = ctx.alloc(); ...
Netty提供了两种ByteBufAllocator的实现:PooledByteBufAllocator和UnpooledByteBufAllocator。前者池化了ByteBuf的实例以提升性能并最大限度地减小内存碎片。此实现使用了一种称为jemalloc的已被大量现代操做系统所采用的高效方法来分配内存。后者的实现不池化ByteBuf实例,而且在每次它被调用时都会返回一个新的实例。
Netty默认使用了PooledByteBufAllocator
可能某些状况下,你未能获取一个到ByteBufAllocator 的引用。对于这种状况,Netty 提供了一个简单的称为Unpooled 的工具类,它提供了静态的辅助方法来建立未池化的ByteBuf实例。表5-8 列举了这些中最重要的方法。
Unpooled 类还使得ByteBuf 一样可用于那些并不须要Netty 的其余组件的非网络项目,使得其能得益于高性能的可扩展的缓冲区API。
ByteBufUtil 提供了用于操做ByteBuf 的静态的辅助方法。由于这个API 是通用的,而且和池化无关,因此这些方法已然在分配类的外部实现。
这些静态方法中最有价值的可能就是hexdump()方法,它以十六进制的表示形式打印ByteBuf 的内容。这在各类状况下都颇有用,例如,出于调试的目的记录ByteBuf 的内容。十六进制的表示一般会提供一个比字节值的直接表示形式更加有用的日志条目,此外,十六进制的版本还能够很容易地转换回实际的字节表示。
另外一个有用的方法是boolean equals(ByteBuf, ByteBuf),它被用来判断两个ByteBuf实例的相等性。若是你实现本身的ByteBuf 子类,你可能会发现ByteBufUtil 的其余有用方法。
引用计数是一种经过在某个对象所持有的资源再也不被其余对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。Netty 在第4 版中为ByteBuf 和ByteBufHolder 引入了引用计数技术,它们都实现了interface ReferenceCounted。
引用计数背后的想法并非特别的复杂;它主要涉及跟踪到某个特定对象的活动引用的数量。一个ReferenceCounted 实现的实例将一般以活动的引用计数为1 做为开始。只要引用计数大于0,就能保证对象不会被释放。当活动引用的数量减小到0 时,该实例就会被释放。注意,虽然释放的确切语义多是特定于实现的,可是至少已经释放的对象应该不可再用了。
引用计数对于池化实现(如PooledByteBufAllocator)来讲是相当重要的,它下降了内存分配的开销。代码清单5-15 展现了相关的示例。
//从Channel 获取ByteBufAllocator Channel channel = ...; ByteBufAllocator allocator = channel.alloc(); .... //从ByteBufAllocator分配一个ByteBuf ByteBuf buffer = allocator.directBuffer(); //检查引用计数是否为预期的1 assert buffer.refCnt() == 1; ... //减小到该对象的活动引用。当减小到0 时,该对象被释放,而且该方法返回true ByteBuf buffer = ...; boolean released = buffer.release();
试图访问一个已经被释放的引用计数的对象,将会致使一个IllegalReferenceCountException。
注意,一个特定的(ReferenceCounted 的实现)类,能够用它本身的独特方式来定义它的引用计数规则。例如,咱们能够设想一个类,其release()方法的实现老是将引用计数设为零,而不用关心它的当前值,从而一次性地使全部的活动引用都失效。
谁负责释放release呢 通常来讲,是由最后访问(引用计数)对象的那一方来负责将它释放。在第6 章中,咱们将会解释这个概念和ChannelHandler 以及ChannelPipeline 的相关性。