每一个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式作相互转换。这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种均可以将字节流从一种格式转换为另外一种格式java
所以,编码器操做出站数据,而解码器处理入站数据web
在这一节,咱们将研究 Netty 所提供的解码器类,并提供关于什么时候以及如何使用它们的具体示例,这些类覆盖了两个不一样的用例:浏览器
何时会用到解码器呢?很简单,每当须要为 ChannelPipeline 中的下一个 ChannelInboundHandler 转换入站数据时会用到。此外,得益于 ChannelPipeline 的设计,能够将多个解码器连接在一块儿,以实现任意复杂的转换逻辑服务器
将字节解码为消息是一项常见的任务,Netty 它提供了一个 抽象基类 ByteToMessageDecoder,这个类会对入站数据进行缓冲,直到它准备好处理websocket
下面举一个如何使用这个类的示例,假设你接收了一个包含简单 int 的字节流,每一个 int 都须要被单独处理。在这种状况下,你须要从入站 ByteBuf 中读取每一个 int,并将它传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。为了解码这个字节流,你要扩展 ByteToMessageDecoder 类(须要注意的是,原子类型的 int 在被添加到 List 中时,会被自动装箱为 Integer)网络
// 扩展 ByteToMessageDecoder,以将字节解码为特定的格式 public class ToIntegerDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //检查是否至少有 4 字节可读(1 个int的字节长度) if (in.readableBytes() >= 4) { //从入站 ByteBuf 中读取一个 int,并将其添加到解码消息的 List 中 out.add(in.readInt()); } } }
虽然 ByteToMessageDecoder 使得能够很简单地实现这种模式,可是你可能会发现,在调用 readInt()方法前不得不验证所输入的 ByteBuf 是否具备足够的数据有点繁琐。下面说的 ReplayingDecoder,它是一个特殊的解码器,以少许的开销消除了这个步骤框架
ReplayingDecoder 扩展了 ByteToMessageDecoder 类,使得咱们没必要调用 readableBytes() 方法。它经过使用一个自定义的 ByteBuf 实现,ReplayingDecoderByteBuf,包装传入的 ByteBuf 实现了这一点,其将在内部执行该调用异步
这个类的完整声明是:socket
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
类型参数 S 指定了用于状态管理的类型,其中 Void 表明不须要状态管理。下述代码展现了基于 ReplayingDecoder 从新实现的 ToIntegerDecoderide
// 扩展ReplayingDecoder<Void> 以将字节解码为消息 public class ToIntegerDecoder2 extends ReplayingDecoder<Void> { // 传入的 ByteBuf 是 ReplayingDecoderByteBuf @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 从入站 ByteBuf 中读取一个 int,并将其添加到解码消息的 List 中 out.add(in.readInt()); } }
和以前同样,从 ByteBuf 中提取的int将会被添加到List中。若是没有足够的字节可用,这 个 readInt() 方法的实现将会抛出一个 Error,其将在基类中被捕获并处理。当有更多的数据可供读取时,该 decode() 方法将会被再次调用
请注意 ReplayingDecoderByteBuf 的下面这些方面:
下面这些类用于处理更加复杂的用例:
在这一节,咱们将解释如何在两个消息格式之间进行转换,例如,从一种 POJO 类型转换为另外一种
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
参数类型 I 指定了 decode() 方法的输入参数 msg 的类型,它是你必须实现的惟一方法
咱们将编写一个 IntegerToStringDecoder 解码器来扩展 MessageToMessageDecoder,它的 decode() 方法会把 Integer 参数转换为 String 表示。和以前同样,解码的 String 将被添加到传出的 List 中,并转发给下一个 ChannelInboundHandler
public class IntegerToStringDecoder extends MessageToMessageEncoder<Integer> { @Override protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception { //将 Integer 消息转换为它的 String 表示,并将其添加到输出的 List 中 out.add(String.valueOf(msg)); } }
因为 Netty 是一个异步框架,因此须要在字节能够解码以前在内存中缓冲它们。所以,不能让解码器缓冲大量的数据以致于耗尽可用的内存。为了解除这个常见的顾虑,Netty 提供了 TooLongFrameException 类,其将由解码器在帧超出指定的大小限制时抛出
为了不这种状况,你能够设置一个最大字节数的阈值,若是超出该阈值,则会致使抛出一个 TooLongFrameException(随后会被 ChannelHandler.exceptionCaught() 方法捕获)。而后,如何处理该异常则彻底取决于该解码器的用户。某些协议(如 HTTP)可能容许你返回一个特殊的响应。而在其余的状况下,惟一的选择可能就是关闭对应的链接
下面的示例使用 TooLongFrameException 来通知 ChannelPipeline 中的其余 ChannelHandler 发生了帧大小溢出的。须要注意的是,若是你正在使用一个可变帧大小的协议,那么这种保护措施将是尤其重要的
public class SafeByteToMessageDecoder extends ByteToMessageDecoder { public static final int MAX_FRAME_SIZE = 1024; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int readable = in.readableBytes(); // 检查缓冲区是否有超过 MAX_FRAME_SIZE 个字节 if (readable > MAX_FRAME_SIZE) { // 跳过全部的可读字节,抛出 TooLongFrameException 并通知 ChannelHandler in.skipBytes(readable); throw new TooLongFrameException("Frame too big!"); } //do something } }
编码器实现了 ChannelOutboundHandler,并将出站数据从一种格式转换为另外一种格式,和咱们方才学习的解码器的功能正好相反。Netty 提供了一组类,用于帮助你编写具备如下功能的编码器:
前面咱们看到了如何使用 ByteToMessageDecoder 来将字节转换为消息,如今咱们使用 MessageToByteEncoder 来作逆向的事情
这个类只有一个方法,而解码器有两个。缘由是解码器一般须要在 Channel 关闭以后产生最后一个消息(所以也就有了 decodeLast() 方法。显然这不适用于编码器的场景 —— 在链接被关闭以后仍然产生一个消息是毫无心义的
下述代码展现了 ShortToByteEncoder,其接受一个 Short 类型的实例做为消息,将它编码为Short的原子类型值,并将它写入 ByteBuf 中,其将随后被转发给 ChannelPipeline 中的 下一个 ChannelOutboundHandler。每一个传出的 Short 值都将会占用 ByteBuf 中的 2 字节。
public class ShortToByteEncoder extends MessageToByteEncoder<Short> { @Override protected void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception { // 将 Short 写入 ByteBuf out.writeShort(msg); } }
MessageToMessageEncoder 类的 encode() 方法提供了将入站数据从一个消息格式解码为另外一种
下述代码使用 IntegerToStringEncoder 扩展了 MessageToMessageEncoder,编码器将每一个出站 Integer 的 String 表示添加到了该 List 中
public class IntegerToStringEncoder extends MessageToMessageEncoder { @Override protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception { out.add(String.valueOf(msg)); } }
虽然咱们一直将解码器和编码器做为单独的实体讨论,可是你有时将会发如今同一个类中管理入站和出站数据和消息的转换是颇有用的。Netty 的抽象编解码器类正好用于这个目的,由于它们每一个都将捆绑一个解码器/编码器对,以处理咱们一直在学习的这两种类型的操做。正如同你可能已经猜测到的,这些类同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口
为何咱们并无一直优先于单独的解码器和编码器使用这些复合类呢?由于经过尽量地将这两种功能分开,最大化了代码的可重用性和可扩展性,这是 Netty 设计的一个基本原则
让咱们来研究这样的一个场景:咱们须要将字节解码为某种形式的消息,多是 POJO,随后再次对它进行编码。ByteToMessageCodec 将为咱们处理好这一切,由于它结合了 ByteToMessageDecoder 以及它的逆向 —— MessageToByteEncoder
任何的请求/响应协议均可以做为使用 ByteToMessageCodec 的理想选择。例如,在某个 SMTP 的实现中,编解码器将读取传入字节,并将它们解码为一个自定义的消息类型,如 SmtpRequest。而在接收端,当一个响应被建立时,将会产生一个 SmtpResponse,其将被编码回字节以便进行传输
经过使用 MessageToMessageCodec,咱们能够在一个单个的类中实现该转换的往返过程。MessageToMessageCodec 是一个参数化的类,定义以下:
public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>
decode() 方法是将 INBOUND_IN 类型的消息转换为 OUTBOUND_IN 类型的消息,而 encode() 方法则进行它的逆向操做。将 INBOUND_IN 类型的消息看做是经过网络发送的类型, 而将 OUTBOUND_IN 类型的消息看做是应用程序所处理的类型,将可能有所裨益
WebSocket 协议
下面关于 MessageToMessageCodec 的示例引用了一个新出的 WebSocket 协议,这个协议能实现 Web 浏览器和服务器之间的全双向通讯
咱们的 WebSocketConvertHandler 在参数化 MessageToMessageCodec 时将使用 INBOUND_IN 类型的 WebSocketFrame,以及 OUTBOUND_IN 类型的 MyWebSocketFrame,后者是 WebSocketConvertHandler 自己的一个静态嵌套类
public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> { @Override protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception { // 实例化一个指定子类型的 WebSocketFrame ByteBuf payload = msg.getData().duplicate().retain(); switch (msg.getType()) { case BINARY: out.add(new BinaryWebSocketFrame(payload)); break; case TEXT: out.add(new TextWebSocketFrame(payload)); break; case CLOSE: out.add(new CloseWebSocketFrame(true, 0, payload)); break; case CONTINUATION: out.add(new ContinuationWebSocketFrame(payload)); break; case PONG: out.add(new PongWebSocketFrame(payload)); break; case PING: out.add(new PingWebSocketFrame(payload)); break; default: throw new IllegalStateException("Unsupported websocket msg " + msg); } } // 将 WebSocketFrame 解码为 MyWebSocketFrame,并设置 FrameType @Override protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception { ByteBuf paload = msg.content().duplicate().retain(); if (msg instanceof BinaryWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, paload)); } else if (msg instanceof CloseWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, paload)); } else if (msg instanceof PingWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, paload)); } else if (msg instanceof PongWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, paload)); } else if (msg instanceof TextWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, paload)); } else if (msg instanceof ContinuationWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, paload)); } else { throw new IllegalStateException("Unsupported websocket msg " + msg); } } public static final class MyWebSocketFrame { public enum FrameType { BINARY, CLOSE, PING, PONG, TEXT, CONTINUATION } private final FrameType type; private final ByteBuf data; public MyWebSocketFrame(FrameType type, ByteBuf data) { this.type = type; this.data = data; } public FrameType getType() { return type; } public ByteBuf getData() { return data; } } }
正如咱们前面所提到的,结合一个解码器和编码器可能会对可重用性形成影响。可是,有一 种方法既可以避免这种惩罚,又不会牺牲将一个解码器和一个编码器做为一个单独的单元部署所 带来的便利性。CombinedChannelDuplexHandler 提供了这个解决方案,其声明为:
public class CombinedChannelDuplexHandler <I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
这个类充当了 ChannelInboundHandler 和 ChannelOutboundHandler(该类的类型参数 I 和 O)的容器。经过提供分别继承了解码器类和编码器类的类型,咱们能够实现一个编解码器,而又没必要直接扩展抽象的编解码器类
首先,让咱们研究下述代码,该实现扩展了 ByteToMessageDecoder,由于它要从 ByteBuf 读取字符
public class ByteToCharDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while (in.readableBytes() >= 2) { out.add(in.readChar()); } } }
这里的 decode() 方法一次将从 ByteBuf 中提取 2 字节,并将它们做为 char 写入到 List 中,其将会被自动装箱为 Character 对象
下述代码将 Character 转换回字节。这个类扩展了 MessageToByteEncoder,由于它须要将 char 消息编码到 ByteBuf 中。这是经过直接写入 ByteBuf 作到的
public class CharToByteEncoder extends MessageToByteEncoder<Character> { @Override protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception { out.writeChar(msg); } }
既然咱们有了解码器和编码器,咱们能够结合它们来构建一个编解码器
// 经过该解码器和编码器实现参数化CombinedByteCharCodec public class CombinedChannelDuplexHandler extends io.netty.channel.CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> { public CombinedChannelDuplexHandler() { // 将委托实例传递给父类 super(new ByteToCharDecoder(), new CharToByteEncoder()); } }