本文内容主要参考<<Netty In Action>> 和Netty
的文档和源码,偏笔记向.java
先简略了解一下ChannelPipeline
和ChannelHandler
.api
想象一个流水线车间.当组件从流水线头部进入,穿越流水线,流水线上的工人按顺序对组件进行加工,到达流水线尾部时商品组装完成.数组
能够将ChannelPipeline
当作流水线,ChannelHandler
当作流水线工人.源头的组件当作event,如read,write等等.promise
Channel
链接了网络套接字或可以进行I/O操做的组件,如read, write, connect, bind.
缓存
咱们能够经过Channel
获取一些信息.安全
Channel
的当前状态(如,是否链接,是否打开)Channel
的配置参数,如buffer的sizeChannelPipeline
和与通道相关的请求Channel
接口定义了一组和ChannelInboundHandler
API密切相关的状态模型.网络
当
Channel
的状态改变,会生成对应的event.这些event会转发给ChannelPipeline
中的ChannelHandler
,handler会对其进行响应.并发
下面列出了 interface ChannelHandler 定义的生命周期操做, 在 ChannelHandler被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用这些操做。这些方法中的每个都接受一个 ChannelHandlerContext 参数异步
ChannelInboundHandler
处理入站数据以及各类状态变化,当Channel
状态发生改变会调用ChannelInboundHandler
中的一些生命周期方法.这些方法与Channel
的生命密切相关.socket
入站数据,就是进入socket
的数据.下面展现一些该接口的生命周期API
当某个
ChannelInboundHandler
的实现重写channelRead()
方法时,它将负责显式地
释放与池化的 ByteBuf 实例相关的内存。 Netty 为此提供了一个实用方法ReferenceCountUtil.release()
.
@Sharable public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); } }
这种方式还挺繁琐的,Netty提供了一个SimpleChannelInboundHandler
,重写channelRead0()
方法,就能够在调用过程当中会自动释放资源.
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { // 不用调用ReferenceCountUtil.release(msg)也会释放资源 } }
原理就是这样,channelRead
方法包装了channelRead0
方法.
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }
出站操做和数据将由 ChannelOutboundHandler 处理。它的方法将被 Channel、 ChannelPipeline 以及 ChannelHandlerContext 调用。
ChannelOutboundHandler 的一个强大的功能是能够按需推迟操做或者事件,这使得能够经过一些复杂的方法来处理请求。例如, 若是到远程节点的写入被暂停了, 那么你能够推迟冲刷操做并在稍后继续。
ChannelPromise与ChannelFuture: ChannelOutboundHandler中的大部分方法都须要一个ChannelPromise参数, 以便在操做完成时获得通知。 ChannelPromiseChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(), 从而使ChannelFuture不可变.
ChannelHandlerAdapter顾名思义,就是handler的适配器.你须要知道什么是适配器模式,假设有一个A接口,咱们须要A的subclass实现功能,可是B类中正好有咱们须要的功能,不想复制粘贴B中的方法和属性了,那么能够写一个适配器类Adpter继承B实现A,这样一来Adpter是A的子类而且能直接使用B中的方法,这种模式就是适配器模式.
就好比Netty中的SslHandler
类,想使用ByteToMessageDecoder
中的方法进行解码,可是必须是ChannelHandler
子类对象才能加入到ChannelPipeline
中,经过以下签名和其实现细节(SslHandler
实现细节就不贴了)就可以做为一个Handler去处理消息了.
public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler
下图是ChannelHandler和Adpter的UML图示.
ChannelHandlerAdapter提供了一些实用方法
isSharable()
若是其对应的实现被标注为 Sharable, 那么这个方法将返回 true, 表示它能够被添加到多个 ChannelPipeline中 .若是想在本身的ChannelHandler中使用这些适配器类,只须要扩展他们,重写那些想要自定义的方法便可.
在使用ChannelInboundHandler.channelRead()
或ChannelOutboundHandler.write()
方法处理数据时要避免资源泄露,ByteBuf那篇文章提到过引用计数,当使用完某个ByteBuf以后记得调整引用计数.
Netty提供了一个class ResourceLeakDetector
来帮助诊断资源泄露,这可以帮助你判断应用的运行状况,可是若是但愿提升吞吐量(好比搞一些竞赛),关闭内存诊断能够提升吞吐量.
泄露检测级别能够经过将下面的 Java 系统属性设置为表中的一个值来定义:
java -Dio.netty.leakDetectionLevel=ADVANCED
若是带着该 JVM 选项从新启动你的应用程序,你将看到本身的应用程序最近被泄漏的缓冲
区被访问的位置。下面是一个典型的由单元测试产生的泄漏报告:
Running io.netty.handler.codec.xml.XmlFrameDecoderTest 15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. Recent access records: 1 #1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString( AdvancedLeakAwareByteBuf.java:697) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml( XmlFrameDecoderTest.java:157) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages( XmlFrameDecoderTest.java:133) ...
消费入站消息释放资源
@Sharable public class DiscardInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg);// 用于释放资源的工具类 } }
SimpleChannelInboundHandler
中的channelRead0()会消费消息以后自动释放资源.
出站释放资源
@Sharable public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { // 仍是经过util工具类释放资源 ReferenceCountUtil.release(msg); // 通知ChannelPromise,消息已经处理 promise.setSuccess(); } }
重要的是, 不只要释放资源,还要通知 ChannelPromise。不然可能会出现 ChannelFutureListener 收不到某个消息已经被处理了的通知的状况。总之,若是一个消息被消费或者丢弃了, 而且没有传递给 ChannelPipeline 中的下一个ChannelOutboundHandler, 那么用户就有责任调用ReferenceCountUtil.release()。若是消息到达了实际的传输层, 那么当它被写入时或者 Channel 关闭时,都将被自动释放。
每个新建立的 Channel 都将会被分配一个新的 ChannelPipeline。这项关联是永久性的; Channel 既不能附加另一个 ChannelPipeline,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操做,不须要开发人员的任何干预。
根据事件的起源,事件将会被 ChannelInboundHandler 或者 ChannelOutboundHandler 处理。随后, 经过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个ChannelHandler。
ChannelHandlerContext使得ChannelHandler可以和它的ChannelPipeline以及其余的ChannelHandler 交 互 。 ChannelHandler 可 以 通 知 其 所 属 的 ChannelPipeline 中 的 下 一 个ChannelHandler,甚至能够动态修改它所属的ChannelPipeline.
这是一个同时具备入站和出站 ChannelHandler 的 ChannelPipeline 的布局,而且印证了咱们以前的关于 ChannelPipeline 主要由一系列的 ChannelHandler 所组成的说法。 ChannelPipeline 还提供了经过 ChannelPipeline 自己传播事件的方法。若是一个入站事件被触发,它将被从 ChannelPipeline 的头部开始一直被传播到 Channel Pipeline 的尾端。
你可能会说, 从事件途经 ChannelPipeline 的角度来看, ChannelPipeline 的头部和尾端取决于该事件是入站的仍是出站的。然而 Netty 老是将 ChannelPipeline 的入站口(图 的左侧)做为头部,而将出站口(该图的右侧)做为尾端。
当你完成了经过调用 ChannelPipeline.add*()方法将入站处理器( ChannelInboundHandler)和 出 站 处 理 器 ( ChannelOutboundHandler ) 混 合 添 加 到 ChannelPipeline 之 后 , 每 一 个ChannelHandler 从头部到尾端的顺序位置正如同咱们方才所定义它们的同样。所以,若是你将图 6-3 中的处理器( ChannelHandler)从左到右进行编号,那么第一个被入站事件看到的 ChannelHandler 将是1,而第一个被出站事件看到的 ChannelHandler 将是 5。在 ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。若是不匹配, ChannelPipeline 将跳过该ChannelHandler 并前进到下一个,直到它找到和该事件所指望的方向相匹配的为止。 (固然, ChannelHandler 也能够同时实现ChannelInboundHandler 接口和 ChannelOutboundHandler 接口。)
修改指的是添加或删除ChannelHandler
代码示例
ChannelPipeline pipeline = ..; FirstHandler firstHandler = new FirstHandler(); // 先添加一个Handler到ChannelPipeline中 pipeline.addLast("handler1", firstHandler); // 这个Handler放在了first,意味着放在了handler1以前 pipeline.addFirst("handler2", new SecondHandler()); // 这个Handler被放到了last,意味着在handler1以后 pipeline.addLast("handler3", new ThirdHandler()); ... // 经过名称删除 pipeline.remove("handler3"); // 经过对象删除 pipeline.remove(firstHandler); // 名称"handler2"替换成名称"handler4",并切handler2的实例替换成了handler4的实例 pipeline.replace("handler2", "handler4", new ForthHandler());
这种方式很是灵活,按照须要更换或插入handler
达到咱们想要的效果.
ChannelHandler的执行和阻塞
一般 ChannelPipeline 中的每个 ChannelHandler 都是经过它的 EventLoop( I/O 线程)来处理传递给它的事件的。因此相当重要的是不要阻塞这个线程,由于这会对总体的 I/O 处理产生负面的影响。
但有时可能须要与那些使用阻塞 API 的遗留代码进行交互。对于这种状况, ChannelPipeline 有一些接受一个 EventExecutorGroup 的 add()方法。若是一个事件被传递给一个自定义的 EventExecutorGroup ,它将被包含在这个 EventExecutorGroup 中的某个 EventExecutor 所处理,从而被从该Channel 自己的 EventLoop 中移除。对于这种用例, Netty 提供了一个叫 DefaultEventExecutorGroup 的默认实现。
pipeline对handler的操做
入站
出站
每当有ChannelHandler
添加到ChannelPipeline
中,都会建立ChannelHandlerContext
.若是调用Channel
或ChannelPipeline
上的方法,会沿着整个ChannelPipeline
传播,若是调用ChannelHandlerContext
上的相同方法,则会从对应的当前ChannelHandler
进行传播.
ChannelHandlerContext
和 ChannelHandler
之间的关联(绑定)是永远不会改变的,因此缓存对它的引用是安全的;ChannelHandlerContext
的方法将产生更短的事件流, 应该尽量地利用这个特性来得到最大的性能。从ChannelHandlerContext访问channel
ChannelHandlerContext ctx = ..; // 获取channel引用 Channel channel = ctx.channel(); // 经过channel写入缓冲区 channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
从ChannelHandlerContext访问ChannelPipeline
ChannelHandlerContext ctx = ..; // 获取ChannelHandlerContext ChannelPipeline pipeline = ctx.pipeline(); // 经过ChannelPipeline写入缓冲区 pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
有时候咱们不想从头传递数据,想跳过几个handler,从某个handler开始传递数据.咱们必须获取目标handler以前的handler关联的ChannelHandlerContext.
ChannelHandlerContext ctx = ..; // 直接经过ChannelHandlerContext写数据,发送到下一个handler ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
好了,ChannelHandlerContext的基本使用应该掌握了,可是你真的理解ChannelHandlerContext,ChannelPipeline和Channelhandler之间的关系了吗.咱们老看一下Netty的源码.
先看一下AbstractChannelHandlerContext
类,这个类像不像双向链表中的一个Node,
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { ... volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; ... }
再来看一看DefaultChannelPipeline
,ChannelPipeline
中拥有ChannelHandlerContext
这个节点的head和tail,
并且DefaultChannelPipeline
类中并无ChannelHandler
成员或handler数组.
public class DefaultChannelPipeline implements ChannelPipeline { ... final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ...
因此addFirst
向pipeline中添加了handler到底添加到哪了呢.看一下pipeline中的addFirst方法
@Override public final ChannelPipeline addFirst(String name, ChannelHandler handler) { return addFirst(null, name, handler); } @Override public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { // 检查handler是否具备复用能力,不重要 checkMultiplicity(handler); // 名称,不重要. name = filterName(name, handler); // 这个方法建立了DefaultChannelHandlerContext,handler是其一个成员属性 // 你如今应该明白了上面说的添加handler会建立handlerContext了吧 newCtx = newContext(group, name, handler); // 这个方法 addFirst0(newCtx);
// 这个方法是调整pipeline中HandlerContext的指针, // 就是更新HandlerContext链表节点之间的位置 private void addFirst0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext nextCtx = head.next; newCtx.prev = head; newCtx.next = nextCtx; head.next = newCtx; nextCtx.prev = newCtx; }
简单总结一下,pipeline拥有context(自己像一个链表的节点)组成的节点的双向链表首尾,能够看作pipeline拥有一个context链表,context拥有成员handler,这即是三者之间的关系.实际上,handler做为消息处理的主要组件,实现了和pipeline的解耦,咱们能够只有一个handler,可是被封装进不一样的context可以被不一样的pipeline使用.
缓存ChannelHandlerContext引用
@Sharable public class WriteHandler extends ChannelHandlerAdapter { private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) { this.ctx = ctx; } public void send(String msg) { ctx.writeAndFlush(msg); } }
由于一个 ChannelHandler 能够从属于多个 ChannelPipeline,因此它也能够绑定到多个 ChannelHandlerContext 实例。 对于这种用法指在多个ChannelPipeline 中共享同一个 ChannelHandler, 对应的 ChannelHandler 必需要使用@Sharable 注解标注; 不然,试图将它添加到多个 ChannelPipeline 时将会触发异常。
@Sharable错误用法
@Sharable public class UnsharableHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { count++; System.out.println("channelRead(...) called the " + count + " time"); ctx.fireChannelRead(msg); } }
这段代码的问题在于它拥有状态 , 即用于跟踪方法调用次数的实例变量count。将这个类的一个实例添加到ChannelPipeline将极有可能在它被多个并发的Channel访问时致使问题。(固然,这个简单的问题能够经过使channelRead()方法变为同步方法来修正。)
总之,只应该在肯定了你的 ChannelHandler 是线程安全的时才使用@Sharable 注解。
处理入站事件的过程当中有异常被抛出,那么它将从它在ChannelInboundHandler里被触发的那一点开始流经 ChannelPipeline。要想处理这种类型的入站异常,你须要在你的 ChannelInboundHandler 实现中重写下面的方法。
public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause) throws Exception // 基本处理方式 public class InboundExceptionHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
由于异常将会继续按照入站方向流动(就像全部的入站事件同样), 因此实现了前面所示逻辑的 ChannelInboundHandler 一般位于 ChannelPipeline 的最后。这确保了全部的入站异常都老是会被处理,不管它们可能会发生在ChannelPipeline 中的什么位置。
ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline 中的下一个 ChannelHandler;
要想定义自定义的处理逻辑,你须要重写 exceptionCaught()方法。而后你须要决定是否须要将该异常传播出去。
ChannelPromise setSuccess(); ChannelPromise setFailure(Throwable cause);
1.添加ChannelFutureListener到ChannelFuture
ChannelFuture future = channel.write(someMessage); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } } });
2.添加ChannelFutureListener到ChannelPromise
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } } }); } }