Netty实战六之ChannelHandler和ChannelPipeline

一、Channel的生命周期java

Interface Channel定义了一组和ChannelInboundHandler API密切相关的简单但功能强大的状态模型,如下列出Channel的4个状态。promise

ChannelUnregistered:Channel已经被建立,但还未注册到EventLoop缓存

ChannelRegistered:Channel已经被注册到了EventLoop安全

ChannelActive:Channel处于活动状态(已经链接到它的远程节点)。它如今能够接收和发送数据了并发

ChannelInactive:Channel没有链接到远程节点异步

Channel的正常生命周期以下图所示,当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline中的ChannelHandler,其能够随后对它们作出响应。
Netty实战六之ChannelHandler和ChannelPipelineide

二、ChannelHandler的生命周期工具

下表列出了interface ChannelHandler定义的生命周期操做,在ChannelHandler被添加到ChannelPipeline中或者被从ChannelPipeline中移除时会调用这些操做,这些方法中的每个都接受一个ChannelHandlerContext参数。oop

handlerAdded:当把ChannelHandler添加到ChannelPipeline中时被调用布局

handlerRemoved:当从ChannelPipeline中移除ChannelHandler时被调用

exceptionCaught:当处理过程当中在ChannelPipeline中有错误产生时被调用

Netty定义了下面两个重要的ChannelHandler子接口:

·ChannelInboundHandler——处理入站数据以及各类状态变化

·ChannelOutboundHandler——处理出站数据而且容许拦截全部的操做

三、ChannelInboundHandler接口

当某个ChannelInboundHandler的实现重写channelRead()方法时,它将负责显式地释放与池化ByteBuf实例相关的内存,Netty为此提供了一个实用方法ReferenceCountUtil.release()。

@ChannelHandler.Sharable//扩展了ChannelInboundHandlerAdapterpublic class DiscardHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //丢弃已接收的消息
        ReferenceCountUtil.release(msg);
    }
}

Netty将使用WARN级别的日志消息记录未释放的资源,使得能够很是简单地在代码中发现违规的实例,可是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用SimpleChannelInboundHandler。

@ChannelHandler.Sharable//扩展了SimpleChannelInboundHandlerpublic class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object>{

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {        //不须要任何显式的资源释放
        //No need to do anything special
    }
}

因为SimpleChannelInboundHandler会自动释放资源,因此你不该该存储指向任何消息的引用供未来使用,由于这些引用都将会失效。

四、ChannelOutboundHandler接口

出站操做和数据将由ChannelOutboundHandler处理,它的方法将被Channel、ChannelPipeline以及ChannelHandlerContext调用。

ChannelOutboundHandler的一个强大的功能是能够按需推迟操做或者事件,这使得能够经过一些复杂的方法来处理请求。例如,若是到远程节点的写入被暂停了,那么你能够推迟冲刷并在稍后继续。

ChannelPromise与ChannelFuture : ChannelOutboundHandler中的大部分方法都须要一个ChannelPromise参数,以便在操做完成时获得通知。ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(),从而使ChannelFuture不可变。

五、ChannelHandler适配器

你可使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter类做为本身的ChannelHandler的起始点。这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现,经过扩展抽象类ChannelHandlerAdapter,他们得到了他们共同的超接口ChannelHandler的方法。生成的类的层次结构以下图。

Netty实战六之ChannelHandler和ChannelPipeline

ChannelHandlerAdapter还提供了使用方法isSharable(),若是其对应的实现被标注为Sharable,那么这个方法都将返回true,表示它能够被添加到多个ChannelPipeline中。

在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法体调用了其相关联的ChannelHandlerContext上的等效方法,从而将事件转发到了ChannelPipeline中的下一个ChannelHandler中。

六、资源管理

每当经过调用ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()方法来处理数据时,你都须要确保没有任何的资源泄露。你可能还记得前面的章节中所提到的,Netty使用引用技术来处理池化的ByteBuf。因此在彻底使用完某个ByteBuf后,调整其引用计数是很重要的。

为了帮助你诊断潜在的(资源泄露)问题,Netty提供了class ResourceLeakDetector,它将对你应用程序的缓冲区分配作大约1%的采样来检测内存泄露。相关的开销是很是小的。

Netty定义了4种泄露检测级别。

DISABLED——禁用泄露检测

SIMPLE——使用1%的默认采样率检测并报告任何发现的泄露

ADVANCED——使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置

PARANOID——相似于ADVANCED,可是其将会对每次访问都进行采样,这对性能将会有很大的影响,应该只在调试阶段使用

泄露检测级别能够经过将下面的Java系统属性设置为表中的一个值来定义:

java -Dio.netty.leakDetectionLevel = ADVANCED

若是带着该JVM选项从新启动你的应用程序,你将看到本身的应用程序最近被泄露的缓冲区被访问的位置。

实现ChannelInboundHandler.channelRead()和ChannelOutboundHandler.write()方法时,应该如何使用这个诊断工具来防止泄露呢?让咱们看看你的channelRead()操做直接消费入站消息的状况,也就是说,他不会经过调用ChannelHandlerContext.fireChannelRead()方法将入站消息转发给下一个ChannelInboundHandler。

消费入站消息的简单方式: 因为消费入站数据是一项常规任务,因此Netty提供了一个特殊的被称为SimpleChannelInboundHandler的ChannelInboundHandler实现,这个实现会在消息被channelRead0()方法消费以后自动释放消息。

在出站方向这边,若是你处理了write()操做并丢弃了一个消息,那么你也应该负责释放它。如下代码展现了一个丢弃全部的写入数据的实现。

@ChannelHandler.Sharablepublic class DiscardoutBoundHandler extends ChannelOutboundHandlerAdapter{

@Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        //释放资源
        ReferenceCountUtil.release(msg);        //通知ChannelPromise数据已经被处理了
        promise.setSuccess();
    }
}

重要的是,不只要释放资源,还要通知ChannelPromise。不然可能会出现ChannelFutureListener收不到某个消息已经被处理了的通知的消息。

总之,若是一个消息被消费或者丢弃了,而且没有传递给ChannelPipeline中的下一个ChannelOutboundHandler,那么用户就有责任调用ReferenceCountUtil.release()。若是消息到达了实际的传输层,那么当它被写入时或者Channel关闭时,都将被自动释放。

七、ChannelPipeline接口

若是你认为ChannelPipeline是一个拦截流经Channel的入站和出站事件的ChannelHandler实例链,那么就很容易看出这些ChannelHandler之间的交互式如何组成一个应用程序数据和时间处理逻辑的核心的。

每个新建立的Channel都将会被分配一个新的ChannelPipeline。这项关联时永久的,Channel即不能附加另一个ChannelPipeline,也不能分离其当前的,在Netty组件的生命周期中,这是一项固定的操做,不须要开发人员的任何干预。

根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理,随后,经过调用ChannelHandlerContext实现,它将被转发给同一个超类型的下一个ChannelHandler。

ChannelHandlerContext:ChannelHandlerContext使得ChannelHandler可以和它的ChannelPipeline以及其余的ChannelHandler交互,ChannelHandler能够通知其所属的ChannelPipeline中的下一个ChannelHandler,甚至能够动态修改它所属的ChannelPipeline。ChannelHandlerContext具备丰富的用于处理事件和执行I/O操做的API。

下图展现了一个典型的同时具备入站和出站ChannelHandler的ChannelPipeline的布局,而且印证了咱们以前的关于ChannelPipeline主要由一系列的ChannelHandler所组成的说法,ChannelPipeline还提供了经过ChannelPipeline自己传播事件的方法。若是一个入站事件被触发,它将被从ChannelPipeline的头部开始一直被传播到ChannelPipeline的尾端。如图所示,一个出站I/O事件将从ChannelPipeline的最右边开始,而后向左传播。
Netty实战六之ChannelHandler和ChannelPipeline

在ChannelPipeline传播事件时,它会测试ChannelPipeline中的下一个ChannelHandler的类型是否和事件的运动方向相匹配。若是不匹配,ChannelPipeline将跳过该ChannelHandler并前进到下一个,直到它找到和该事件所指望的方向相匹配的为止。

八、修改ChannelPipeline

经过调用ChannelPipeline上的相关方法,ChannelHandler能够添加、删除或者替换其余的ChannelHandler,从而实时地修改ChannelPipeline的布局。

ChannelPipeline pipeline = ...;
        FirstHandler firstHandler = new FirstHandler();
        //将该实例做为“handler1”添加到ChannelPipeline中
        pipeline.addLast("handler1",firstHandler);
        //将一个SecondHandler的实例做为“handler2”添加到ChannelPipeline的第一个槽中,这意味着它将被放置在已有的“handler1”以前
        pipeline.addLast("handler2",new SecondHandler());
        //将一个ThirdHandler的实例做为“handler3”添加到ChannelPipeline的最后一个槽中
        pipeline.addLast("handler3",new ThirdHandler());        ...
        //经过名称移除“handler3”
        pipeline.remove("handler3");
        //经过引用移除FirstHandler
        pipeline.remove(firstHandler);
        //将SecondHandler(“handler2”)替换为FourthHandler:"handler4"
        pipeline.replace("handler2","handler4",new ForthHandler());

ChannelHandler的执行和阻塞:一般ChannelPipeline中的每个ChannelHandler都是经过它的EventLoop(I/O线程)来处理传递给它的事件的。因此相当重要的是不要阻塞这个线程,由于这会对总体的I/O处理产生负面的影响。但有时可能须要与那些使用阻塞API的遗留代码进行交互,对于这个状况,ChannelPipeline有一些接受一个EventExecutorGroup的add()方法,若是一个事件被传递给一个自定义的EventExecutorGroup,它将被包含在这个EventExecutorGroup中的某个EventExecutor所处理,从而被从该Channel自己的EventLoop中移除,对于这种用例,Netty提供了一种叫DefaultEventExecutorGroup的默认实现。

——ChannelPipeline保存了与Channel相关联的ChannelHandler

——ChannelPipeline能够根据须要、经过添加或者删除ChannelHandler来动态修改

——ChannelPipeline有着丰富的API用以被调用、以响应入站和出站事件

——ChannelHandlerContext和ChannelHandler之间的关联(绑定)是永远不会改变的,因此缓存对它的引用是安全的

九、使用ChannelHandlerContext

如下代码,将经过ChannelHandlerContext获取到Channel的引用,调用Channel上的write()方法将会致使写入事件从尾端到头部地流经ChannelPipeline。
Netty实战六之ChannelHandler和ChannelPipeline

如下代码展现了一个相似的例子,可是这一次是写入ChannelPipeline。咱们再次看到,(到ChannelPipeline的)引用是经过ChannelHandlerContext获取的。

ChannelHandlerContext ctx = ..; //获取到与ChannelHandlerContext相关联的Channel的引用 Channel channel = ctx.channel(); //经过Channel写入缓冲区
channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
为何会想要从ChannelPipeline中的某个特定点开始传播事件呢?

——为了减小将事件传经对它不感兴趣的ChannelHandler所带来的开销

——为了不将事件传经那些可能会对它感兴趣的ChannelHandler。

十、ChannelHandler和ChannelHandlerContext的高级用法

能够经过将ChannelHandler添加到ChannelPipeline中来实现动态的协议切换,缓存到ChannelHandlerContext的引用以供稍后使用,这可能会发生在任何的ChannelHandler方法以外,甚至来自于不一样的线程。

如下代码,缓存到ChannelHandlerContext的引用

public class WriteHandler extends ChannelHandlerAdapter{

    private ChannelHandlerContext ctx;    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        //存储到ChannelHandlerContext的引用以供稍后使用
        this.ctx = ctx;
    }

    public void send(String msg){        //使用以前存储的到ChannelHandlerContext的引用来发送消息
        ctx.writeAndFlush(msg);
    }
}

由于一个ChannelHandler能够从属于多个ChannelPipeline,因此它也能够绑定到多个ChannelHandlerContext实例,对于这种用法(指在多个ChannelPipeline中共享同一个ChannelHandler),对应的ChannelHandler必需要使用@Sharable注解标注;不然,试图将它添加到多个ChannelPipeline时将会触发异常,显而易见,为了安全地被用于多个并发的Channel(链接),这样的ChannelHandler必须是线程安全的。

如下代码,展现这种模式。

@ChannelHandler.Sharable//使用注解@Sharable标注public class SharableHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("Channel read message: " + msg);        //记录方法调用,并转发给下一个ChannelHandler
        ctx.fireChannelRead(msg);
    }

}

前面的ChannelHandler实现了符合全部的将其加入到多个ChannelPipeline的需求,即它使用了注解@Sharable标注,而且也不持有任何的状态。

如下代码,演示@Sharable的错误用法

@ChannelHandler.Sharablepublic class UnSharableHandler extends ChannelInboundHandlerAdapter{

    private int count;    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //将count字段值加1
        count++;        System.out.println("channelRead(...) called the " + count + " time");        //记录方法调用,并转发给下一个ChannelHandler
        ctx.fireChannelRead(msg);
    }
}

这段代码的问题在于它拥有状态,即用于跟踪方法调用次数的实例变量count。将这个类的一个实例添加到ChannelPipeline将极有可能在它被多个并发Channel访问时致使问题。(能够将ChannelRead()方法变为同步方法)

总之,只应该在肯定了你的ChannelHandler是线程安全的时才使用@Sharable注解。

为什么要共享同一个ChannelHandler:在多个ChannelPipeline中安装同一个ChannelHandler的一个常见的缘由是用于收集跨越多个Channel的统计信息。

十一、处理入站异常

异常处理是任何真实应用程序的重要组成部分,它也能够经过多种方式来实现,所以,Netty提供了几种方式用于处理入站或者出站处理过程当中所抛出的异常。

若是在处理入站事件的过程当中有异常被抛出,那么它将从它在ChannelInboundHandler里被触发的那一点开始流经ChannelPipeline。要想处理这种类型的入站异常,你须要在你的ChannelInboundHandler实现exceptionCaught方法。

如下代码,展现了其关闭Channel并打印了异常的栈跟踪信息

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

由于异常将会继续按照入站方向流动(就像全部入站事件同样),因此实现了前面所示逻辑的ChannelInboundHandler一般位于ChannelPipeline的最后,这确保了全部的入站异常都老是会被处理,不管他们可能会发生在ChannelPipeline中的什么位置。

你应该如何响应异常,可能很大程序上取决于你的应用程序,你可能想要关闭Channel(和链接),也可能会尝试进行恢复。若是你不实现任何处理入站异常的逻辑,那么Netty将会记录该异常没有被处理的事实。

——ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline中的下一个ChannelHandler

——若是异常到达了ChannelPipeline的尾端,它将会被记录为未处理

——要想定义自定义的处理逻辑,你须要重写exceptionCaught方法,而后你须要决定是否须要将该异常传播出去

十二、处理出站异常

——每一个出站操做都将返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener将在操做完成时被通知该操做是成功了仍是出错了

——几乎全部的ChannelOutboundHandler上的方法都会传入一个ChannelPromise的实例,做为ChannelFuture的子类,ChannelPromise也能够被分配用于异步通知的监听器,可是,ChannelPromise还具备提供当即通知的可写方法。

如下代码,添加channelFutureListener,它将打印栈跟踪信息,而且随后关闭Channel

ChannelFuture future = channel.wirte(someMessage);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {                if (!channelFuture.isSuccess()){
                    channelFuture.cause().printStackTrace();
                    channelFuture.channel().close();
                }
            }
        });

第二种方式是将ChannelFutrueListener添加到即将做为参数传递给ChannelOutboundHandler的方法的ChannelPromise。

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter{

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        promise.addListener(new ChannelFutureListener() {            @Override
            public void operationComplete(ChannelFuture f) throws Exception {                if (!f.isSuccess()){
                    f.cause().printStackTrace();
                    f.channel().close();
                }
            }
        });
    }
}

ChannelPromise的可写方法:经过调用ChannelPromise上的setSuccess()和setFailure()方法,可使一个操做的状态在ChannelHandler的方法返回给其调用者时便即刻被感知到。

相关文章
相关标签/搜索