Netty为许多通用协议提供了编解码器和处理器,几乎能够开箱即用,这减小了你在那些至关繁琐的事务上原本会花费的时间与精力。咱们将探讨这些工具以及它们所带来的好处,其中包括Netty对于SSL/TLS和WebSocket的支持,以及如何简单地经过数据压缩来压榨HTTP,以获取更好的性能。java
一、经过SSL/TLS保护Netty应用程序web
SSL和TLS这样的安全协议,它们层叠在其余协议之上,用以实现数据安全。咱们在访问安全网站时遇到过这些协议,可是它们也可用于其余不是基于HTTP的应用程序,如安全SMTP(SMTPS)邮件服务器甚至是关系型数据库系统。数据库
为了支持SSL/TLS,Java提供了javax.net.ssl包,它的SSLContext和SSLEngine类使得实现解密和加密至关简单直接。Netty经过一个名为SslHandler的ChannelHandler实现利用了这个API,其中SslHandler在内部使用了SSLEngine来完成实际的工做。编程
下图展现了使用SslHandler的数据流 安全
如下代码展现了如何使用ChannelInitializer来将SslHandler添加到ChannelPipeline中。服务器
public class SSLChannelInitializer extends ChannelInitializer<Channel>{ private final SslContext context; private final boolean startTls; //若是设置为true,第一个写入的消息将不会被加密(客户端应该设置为true) public SSLChannelInitializer(SslContext context, boolean startTls) { this.context = context; this.startTls = startTls; } @Override protected void initChannel(Channel ch) throws Exception { //对于每一个SslHandler实例,都使用Channel的ByteBufAllocator从SslCOntext获取一个新的SSLEngine SSLEngine engine = context.newEngine(ch.alloc()); //将SslHandler做为第一个ChannelHandler添加到ChannelPipeline中 ch.pipeline().addLast("ssl",new SslHandler(engine,startTls)); } }
在大多数状况下,SslHandler将是ChannelPipeline中的第一个ChannelHandler。这确保了只有在全部其余的ChannelHandler将它们的逻辑应用到数据以后,才会进行加密。websocket
例如,在握手阶段,两个节点将相互验证而且商定一种加密方式。你能够经过配置SslHandler来修改它的行为,或者在SSL/TLS握手一旦完成以后提供通知,握手阶段完成以后,全部的数据都将会被加密。SSL/TLS握手将会被自动执行。网络
二、构建基于Netty的HTTP/HTTPS应用程序架构
HTTP/HTTPS是最多见的协议套件之一,而且随着智能手机的成功,它的应用也日益普遍,由于对于任何公司来讲,拥有一个能够被移动设备访问的网站几乎是必须的。这些协议也被用于其余方面。框架
HTTP是基于请求/响应模式的:客户端向服务器发送一个HTTP请求,而后服务器将会返回一个HTTP响应。Netty提供了多种编码器和解码器以简化对这个协议的使用。
下图分别展现了生产和消费HTTP请求和HTTP响应的方法。
正如上图所示,一个HTTP请求/响应可能由多个数据部分组成,而且它老是以一个LastHttpContent部分做为结束。FullHttpRequest和FullHttpResponse消息是特殊的子类型,分别表明了完整的请求和响应。全部类型的HTTP消息都实现HttpObject接口。
如下代码中的HttpPipelineInitializer类展现了将HTTP支持添加到你的应用程序时多么简单——几乎只须要将正确的ChannelHandler添加到ChannelPipeline中。
public class HttpPipelineInitializer extends ChannelInitializer<Channel>{ private final boolean client; public HttpPipelineInitializer(boolean client) { this.client = client; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (client){ //若是是客户端,则添加HttpResponseDecoder以处理来自服务器的响应 pipeline.addLast("decoder",new HttpResponseDecoder()); //添加HttpResponseEncoder以向服务器发送请求 pipeline.addLast("encoder",new HttpResponseEncoder()); } else { //若是是服务器,则添加HttpRequestDecoder以接收来自客户端的请求 pipeline.addLast("decoder",new HttpRequestDecoder()); //添加HttpRequestEncoder以向客户端发送响应 pipeline.addLast("encoder",new HttpRequestEncoder()); } } }
三、聚合HTTP消息
在ChannelInitializer将ChannelHandler安装到ChannelPipeline中以后,你即可以处理不一样类型的HttpObject消息了。可是因为HTTP的请求和响应可能由许多部分组成,所以你须要聚合它们以造成完整的消息。为了消除这项繁琐的任务,Netty提供了一个聚合器,它能够将多个消息部分合并为FullHttpRequest或者FullHttpResponse消息。经过这样的方式,你将老是看到完整的消息内容。
因为消息分段须要被缓冲,直到能够转发一个完整的消息给下一个ChannelInboundHandler,因此这个操做有轻微的开销。其所带来的好处即是你没必要关心消息碎片了。
引入这种自动聚合机制只不过是向ChannelPipeline中添加另一个ChannelHandler罢了。如如下代码所示。
public class HttpAggregatorInitializer extends ChannelInitializer<Channel>{ private final boolean isClient; public HttpAggregatorInitializer(boolean isClient) { this.isClient = isClient; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (isClient) { //若是是客户端,则添加HttpClineCodec pipeline.addLast("codec",new HttpClientCodec()); } else { //若是是服务端,则添加HttpServerCodec pipeline.addLast("codec",new HttpServerCodec()); } //将最大的消息大小为512KB的HttpObjectAggregator添加到ChannelPipeline pipeline.addLast("aggregator",new HttpObjectAggregator(512 * 1024)); } }
四、HTTP压缩
当使用HTTP时,建议开启压缩功能以尽量多地减小传输数据的大小。虽然压缩会带来一些CPU时钟周期上的开销,可是一般来讲它都是一个好主意,特别是对于文本数据来讲。
Netty为压缩和解压缩提供了ChannelHandler实现,它们同时支持gzip和deflate编码。
HTTP请求的头部信息,客户端能够经过提供如下头部信息来指示服务器它所支持的压缩格式:
Get /encrypted-area HTTP/1.1
Host: www.example.com
Accept-Encoding:gzip,deflate
然而,须要注意的是,服务器没有义务压缩它所发送的数据。
如下代码展现了一个例子。
public class HttpCompressionInitializer extends ChannelInitializer<Channel>{ private final boolean isClient; public HttpCompressionInitializer(boolean isClient) { this.isClient = isClient; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (isClient) { //若是是客户端,则添加HttpClientCodec pipeline.addLast("codec",new HttpClientCodec()); //若是是客户端,则添加HttpContentDecompressor以处理来自服务器的压缩内容 pipeline.addLast("decompressor",new HttpContentDecompressor()); } else { //若是是服务器,则添加HttpServerCodec pipeline.addLast("codec",new HttpServerCodec()); //若是是服务器,则添加HttpContentCompressor来压缩数据 pipeline.addLast("decompressor",new HttpContentCompressor()); } } }
五、使用HTTPS
如下代码显示,启用HTTPS只须要将SslHandler添加到ChannelPipeline的ChannelHandler组合中。
public class HttpsCodecInitializer extends ChannelInitializer<Channel>{ private final SslContext context; private final boolean isClient; public HttpsCodecInitializer(SslContext context, boolean isClient) { this.context = context; this.isClient = isClient; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); SSLEngine engine = context.newEngine(channel.alloc()); //将SslHandler添加到ChannelPipeline中以使用HTTPS pipeline.addLast("ssl",new SslHandler(engine)); if (isClient) { //若是是客户端,则添加HttpClientCodec pipeline.addLast("codec",new HttpClientCodec()); } else { //若是是服务器,则添加HttpServerCodec pipeline.addLast("codec",new HttpServerCodec()); } } }
以上例子,说明了Netty的架构方式是如何将代码重用变为杠杆做用的。只须要简单地将一个ChannelHandler添加到ChannelPipeline中,即可以提供一项新的功能,甚至像加密这样重要的功能都能提供。
六、WebSocket
WebSocket解决了一个长期存在的问题:既然底层的协议(HTTP)是一个请求/响应模式的交互序列,那么如何实时地发布信息?AJAX提供了必定程度上的改善,可是数据流仍然是由客户端所发送的请求驱动。
WebSocket规范以及它的实现表明了对一种更加有效的解决方案的尝试。简单地说,WebSocket提供了“在一个单个的TCP链接上提供双向的通讯·······结合WebSocketAPI·····它为网页和远程服务器之间的双向通讯提供了一种替代HTTP轮询的方案。”
WebSocket在客户端和服务器之间提供了真正的双向数据交换。WebSocket如今能够用于传输任意类型的数据,很像普通的套接字。
下图给出了WebSocket协议的通常概念。在这个场景下,通讯将做为普通的HTTP协议开始,随后升级到双向的WebSocket协议。
要想向你的应用程序中添加对于WebSocket的支持,你须要将适当的客户端或者服务器WebSocket ChannelHandler添加到ChannelPipeline中。这个类将处理由WebSocket定义的称为帧的特殊消息类型。
由于Netty主要是一种服务器端的技术,因此在这里咱们重点建立WebSocket服务器。代码以下所示,这个类处理协议升级握手,以及3种控制帧——Close、Ping和Pong。Text和Binary数据帧将会被传递给下一个ChannelHandler进行处理。
public class WebSocketServerInitializer extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast( new HttpServerCodec(), //为握手提供聚合的HttpRequest new HttpObjectAggregator(65535), //若是被请求的端点是“/websocket”则处理该升级握手 new WebSocketServerProtocolHandler("/websocket"), new TextFrameHandler(), new BinaryFrameHandler(), new ContinuationFrameHandler() ); } public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { //Handle text frame } } public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, BinaryWebSocketFrame binaryWebSocketFrame) throws Exception { //Handle binary frame } } public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ContinuationWebSocketFrame continuationWebSocketFrame) throws Exception { //Handle continuation frame } } }
保护WenSocket:要想为WebSocket添加安全性,只须要将SslHandler做为第一个ChannelHandler添加到ChannelPipeline中。
七、空闲的链接和超时
只要你有效地管理你的网络资源,这些技术就可使得你的应用程序更加高效、易用和安全。
检测空闲链接以及超时对于及时释放资源来讲是相当重要的。
让咱们仔细看看在实践中使用得最多的IdleStateHandler。如下代码展现了当使用一般的发送心跳信息到远程节点的方法时,若是在60秒以内没有接收或发送任何的数据,咱们将如何获得通知;若是没有响应,则链接会被关闭。
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //IdleStateHandler将在被触发时发送一个IdleStateEvent事件 pipeline.addLast(new IdleStateHandler(0,0,60, TimeUnit.SECONDS)); pipeline.addLast(new HeartbeatHandler()); } //实现userEventTriggered方法以发送心跳消息 public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter{ private static final ByteBuf HEARTBEAT_SEQUENCE = //
发送到远程节点的心跳消息
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1)); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //发送心跳消息,并在发送失败时关闭该链接 if (evt instanceof IdleStateEvent){ ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, evt); } } } }
以上示例演示了如何使用IdleStateHandler来测试远程节点是否仍然还活着,而且在它失活时经过关闭链接来释放资源
若是链接超过60秒没有接收或者发送任何的数据,那么IdleStateHandler将会使用一个IdleStateEvent事件来调用fireUserEventTriggered()方法。HeartbeatHandler实现了userEventTriggered()方法,若是这个方法检测到IdleStateEvent事件,它将会发送心跳消息,而且添加一个将在发送操做失败时关闭该链接的ChannelFutureListener。
八、解码基于分隔符的协议和基于长度的协议
基于分隔符的(delimited)消息协议使用定义的字符来标记的消息或消息段(一般被称为帧)的开头或者结尾。由RFC文档正式定义的许多协议(如SMTP、POP三、IMAP以及Telnet)都是这样。
下图展现了当帧由行尾序列\r\n分割时是如何被处理的。
如下代码展现了如何使用LineBasedFrameDecoder来处理上图的场景。
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //该LineBasedFrameDecoder将提取的帧转发给下一个ChannelInboundHandler pipeline.addLast(new LineBasedFrameDecoder(64*1024)); //添加FrameHandler以接收帧 pipeline.addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{ @Override //传入单个帧的内容 protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // do something with the data extracted from the frame } } }
若是你正在使用除了行尾符以外的分隔符的帧,那么你能够以相似的方法使用DelimiterBasedFrameDecoder,只须要将特定的分隔符序列指定到其构造函数便可。
这些解码器是实现你本身的基于分隔符的协议的工具。做为示例,咱们将使用下面的协议规范:
——传入数据流是一系列的帧,每一个帧都由换行符(\n)分割
——每一个帧都由一系列的元素组成,每一个元素都由单个空格字符分割
——一个帧的内容表明了一个命令、定义为一个命令名称后跟着数目可变的参数
咱们用于这个协议的自定义解码器将定义如下类:
——Cmd——将帧(命令)的内容存储在ByteBuf中,一个ByteBuf用于名称,另外一个用于参数
——CmdDecoder——从被重写了的decode()方法中获取一行字符串,并从它的内容构建一个Cmd的实例
——CmdHandler——从CmdDecoder获取解码的Cmd对象,并对它进行一些处理;
——CmdHandlerinitializer——为了简便起见,咱们将会把前面的这些类定义为专门的ChannelInitializer的嵌套类,其将会把这些ChannelInboundHandler安装到ChannelPipeline中。
如下代码,这个解码器的关键是扩展LineBasedFrameDecoder。
public class CmdHandlerInitializer extends ChannelInitializer<Channel>{ public static final byte SPACE = (byte)' '; @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //添加CmdDecoder以提取Cmd对象,并将它转发给下一个ChannelInboundHandler pipeline.addLast(new CmdDecoder(64*1024)); //添加CmdHandler以接收和处理Cmd对象 pipeline.addLast(new CmdHandler()); } //Cmd POJO public static final class Cmd{ private final ByteBuf name; private final ByteBuf args; public Cmd(ByteBuf name, ByteBuf args) { this.name = name; this.args = args; } public ByteBuf getName() { return name; } public ByteBuf getArgs() { return args; } } public static final class CmdDecoder extends LineBasedFrameDecoder{ public CmdDecoder(int maxLength) { super(maxLength); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { //从ByteBuf中提取由行尾符序列分隔的帧 ByteBuf frame = (ByteBuf) super.decode(ctx, buffer); if (frame == null){ //若是输入中没有帧,则返回null return null; } //查找第一个空格字符的索引 int index = frame.indexOf(frame.readerIndex(),frame.writerIndex(),SPACE); //使用包含有命令名称和参数的切片建立新的Cmd对象 return new Cmd(frame.slice(frame.readerIndex(),index),frame.slice(index + 1,frame.writerIndex())); } } public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Cmd cmd) throws Exception { //处理传经ChannelPipeline的Cmd对象 // do something with the command } } }
九、基于长度的协议
基于长度的协议经过将它的长度编码到帧的头部来定义帧,而不是使用特殊的分隔符来标记它的结束。
下图展现了FixedLengthFrameDecoder的功能,其在构造时已经指定了帧长度为8字节。
你将常常会遇到被编码到消息头部的帧大小不是固定值的协议。为了处理这种变长帧,你可使用LengthFieldBasedFrameDecoder,它将从头部字段肯定帧长,而后从数据流中提取指定的字节数。
下图展现了示例,其中长度字段在帧中的偏移量为0,而且长度为2字节。
LengthFieldBasedFrameDecoder提供了几个构造函数来支持各类各样的头部配置状况。如下代码展现了如何使用其3个构造参数分别为maxFrameLength、lengthFieldOffset和lengthFieldLength的构造函数。在这个场景中,帧的长度被编码到了帧起始的前8个字节中。
public class LengthFieldBasedFrameDecoder extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new io.netty.handler.codec.LengthFieldBasedFrameDecoder(64*1024,0,8)); //添加FrameHandler以处理每一个帧 pipeline.addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { //处理帧的数据 // Do something with the frame } } }
十、写大型数据
由于网络饱和的可能性,如何在异步框架中高效地写大块的数据是一个特殊的问题。因为写操做是非阻塞的,因此即便没有写出全部的数据,写操做也会在完成时返回并通知ChannelFuture。当这种状况发生时,若是仍然不停地写入,就有内存耗尽的风险,因此在写大型数据时,须要准备好处理到远程节点的链接是慢速链接的状况,这种状况会致使内存释放的延迟。
NIO的零拷贝特性,这种特性消除了将文件的内容从文件系统移动到网络栈的复制过程。全部的一切都发生在Netty的核心中,因此应用程序全部须要作的就是使用一个FileRegion接口的实现,其在Netty的API文档中的定义是:“经过支持零拷贝的文件传输的Channel来发送的文件区域。”
如下代码展现了如何经过从FileInputStream建立一个DefaultFileRegion,并将其写入Channel,从而利用零拷贝特性来传输一个文件的内容。
//建立一个FileInputStream FileInputStream in = new FileInputStream(file); FileRegion region = new DefaultFileRegion( in.getChannel(),0,file.length()); //发送该DefaultFileRegion,并注册一个ChannelFutureListener channel.writeAndFlush(region).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()) { //处理失败 Throwable cause = channelFuture.cause(); // Do something } } } );
这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在须要将数据从文件系统复制到用户内存中时,可使用ChunkedWriteHandler,它支持异步写大型数据流,而又不会致使大量的内存消耗。
关键是interface ChunkedInput ,其中类型参数B是readChunk()方法返回的类型。Netty预置了该接口的4个实现。
如下代码说明了ChunkedStream的用法,它是实践中最经常使用的实现。所示的类使用了一个File以及一个SslContext进行实例化。当initChannel()方法被调用时,它将使用所示的ChannelHandler链初始化该Channel。当Channel的状态变为活动时,WriteStreamHandler将会逐块地把来自文件中的数据做为ChunkedStream写入。数据在传输以前将会由SslHandler加密。
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel>{ private final File file; private final SslContext sslCtx; public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) { this.file = file; this.sslCtx = sslCtx; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //将SslHandler添加到ChannelPipeline中 pipeline.addLast(new SslHandler(sslCtx.newEngine(channel.alloc()))); //添加ChunkedWriteHandler以处理做为ChunkedInput传入的数据 pipeline.addLast(new ChunkedWriteHandler()); //一旦链接创建,WriteStreamHandler就开始写文件数据 pipeline.addLast(new WriteStreamHandler()); } public final class WriteStreamHandler extends ChannelInboundHandlerAdapter{ //当链接创建时,channelActive方法将使用ChunkedInput写文件数据 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); ctx.writeAndFlush( new ChunkedStream(new FileInputStream(file))); } } }
逐块输入:要使用你本身的ChunkedInput实现,请在ChannelPipeline中安装一个ChunkedWriteHandler
十一、JDK序列化数据
JDK提供了ObjectOutputStream和ObjectInputStream,用于经过网络对POJO的基本数据类型和图进行序列化和反序列化。该API并不复杂,并且能够被应用于任何实现了java.io.Serializable接口的对象。可是它的性能也不是很是高效。
若是你的应用程序必需要和使用了ObjectOutputStream和ObjectInputStream的远程节点交互,而且兼容性也是你最关心的,那么JDK序列化将是正确的选择。
十二、使用了JBoss Marshalling进行序列化
若是你能够自由地使用外部依赖,那么JBoss Marshalling将是一个理想的选择:他比JDK序列化最多快3倍,并且也更加紧凑。
如下代码展现了如何使用MarshallingDecoder和MarshallingEncoder。一样,几乎只是适当地配置ChannelPipeline罢了。
public class MarshallingInitializer extends ChannelInitializer<Channel>{ private final MarshallerProvider marshallerProvider; private final UnmarshallerProvider unmarshallerProvider; public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) { this.marshallerProvider = marshallerProvider; this.unmarshallerProvider = unmarshallerProvider; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new MarshallingDecoder(unmarshallerProvider)); pipeline.addLast(new MarshallingEncoder(marshallerProvider)); pipeline.addLast(new ObjectHandler()); } public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) throws Exception { //do something } } }
1三、经过Protocol Buffers序列化
Netty序列化的最后一个解决方案是利用Protocol Buffers的编解码器,它是一个由Google公司开发的、如今已经开源的数据交换格式。
Protocol Buffers以一种紧凑而高效的方式对结构化的数据进行编码以及解码。它具备许多编程语言绑定,使得它很适合跨语言的项目。
在这里咱们又看到了,使用protobuf只不过是将正确的ChannelHandler添加到ChannelPipeline中,以下代码。
public class ProtoBufInitializer extends ChannelInitializer<Channel>{ private final MessageLite lite; public ProtoBufInitializer(MessageLite lite) { this.lite = lite; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new ProtobufDecoder(lite)); pipeline.addLast(new ObjectHandler()); } public static final class ObjectHandler extends SimpleChannelInboundHandler<Object>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { // do something with the object } }