第六章:ChannelHandler

本章介绍java

    ChannelPipelinepromise

    ChannelHandlerContext安全

    ChannelHandler框架

    Inbound vs outbound(入站和出站)ide

Netty提供了一个强大的处理这些事情的功能,容许用户自定义ChannelHandler的实现来处理数据。使得ChannelHandler更强大的是能够链接每一个ChannelHandler来实现任务,这有助于代码的整洁和重用。oop

6.1 ChannelPipelinethis

ChannelPipeline是ChannelHandler实例的列表,用于处理或截获通道的接收和发送数据。spa

ChannelPipeline提供了一种高级的截取过滤器模式,让用户能够在ChannelPipeline中彻底控制一个事件及如何处理ChannelHandler与ChannelPipeline的交互。线程

对于每一个新的通道,会建立一个新的ChannelPipeline并附加至通道。一旦链接,Channel和ChannelPipeline之间的耦合是永久性的。Channel不能附加其余的ChannelPipeline或从ChannelPipeline分离。netty

下图描述了ChannelHandler在ChannelPipeline中的I/O处理,一个I/O操做能够由一个ChannelInboundHandler或ChannelOutboundHandler进行处理,并经过调用ChannelInboundHandler处理入站IO或经过ChannelOutboundHandler处理出站IO。

ChannelPipeline是ChannelHandler的一个列表;若是一个入站I/O事件被触发,这个事件会从第一个开始依次经过ChannelPipeline中的ChannelHandler;

如果一个入站I/O事件,则会从最后一个开始依次经过ChannelPipeline中的ChannelHandler。

ChannelHandler能够处理事件并检查类型,若是某个ChannelHandler不能处理则会跳过,并将事件传递到下一个ChannelHandler。

ChannelPipeline能够动态添加、删除、替换其中的ChannelHandler,这样的机制能够提升灵活性。

修改ChannelPipeline的方法:

    addFirst(...),添加ChannelHandler在ChannelPipeline的第一个位置

    addBefore(...),在ChannelPipeline中指定的ChannelHandler名称以前添加ChannelHandler

    addAfter(...),在ChannelPipeline中指定的ChannelHandler名称以后添加ChannelHandler

    addLast(ChannelHandler...),在ChannelPipeline的末尾添加ChannelHandler

    remove(...),删除ChannelPipeline中指定的ChannelHandler

    replace(...),替换ChannelPipeline中指定的ChannelHandler

ChannelPipeline pipeline = ch.pipeline(); 
FirstHandler firstHandler = new FirstHandler(); 
pipeline.addLast("handler1", firstHandler); 
pipeline.addFirst("handler2", new SecondHandler()); 
pipeline.addLast("handler3", new ThirdHandler()); 
pipeline.remove("“handler3“"); 
pipeline.remove(firstHandler); 
pipeline.replace("handler2", "handler4", new FourthHandler());

被添加到ChannelPipeline的ChannelHandler将经过IO-Thread处理事件,这意味了必须不能有其余的IO-Thread阻塞来影响IO的总体处理;有时候可能须要阻塞,例如JDBC。所以,Netty容许经过一个EventExecutorGroup到每个ChannelPipeline.add*方法,自定义的事件会被包含在EventExecutorGroup中的EventExecutor来处理,默认的实现是DefaultEventExecutorGroup。

6.2 ChannelHandlerContext

每一个ChannelHandler被添加到ChannelPipeline后,都会建立一个ChannelHandlerContext并与之建立的ChannelHandler关联绑定。ChannelHandlerContext容许ChannelHandler与其余的ChannelHandler实现进行交互,这是相同ChannelPipeline的一部分。

ChannelHandlerContext不会改变添加到其中的ChannelHandler,所以它是安全的。

6.2.1 通知下一个ChannelHandler

在相同的ChannelPipeline中经过调用ChannelInboundHandler和ChannelOutboundHandler中各个方法中的一个方法来通知最近的handler,通知开始的地方取决你如何设置。下图显示了ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系:

若是你想有一些事件流所有经过ChannelPipeline,有两个不一样的方法能够作到:

    调用Channel的方法

    调用ChannelPipeline的方法

这两个方法均可以让事件流所有经过ChannelPipeline。不管从头部仍是尾部开始,由于它主要依赖于事件的性质。若是是一个“入站”事件,它开始于头部;如果一个“出站”事件,则开始于尾部。

下面的代码显示了一个写事件如何经过ChannelPipeline从尾部开始:

@Override  
protected void initChannel(SocketChannel ch) throws Exception {  
    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {  
        @Override  
        public void channelActive(ChannelHandlerContext ctx) throws Exception {  
            //Event via Channel  
            Channel channel = ctx.channel();  
            channel.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));  
            //Event via ChannelPipeline  
            ChannelPipeline pipeline = ctx.pipeline();  
            pipeline.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));  
        }  
    });  
}

下图表示经过Channel或ChannelPipeline的通知:

可能你想从ChannelPipeline的指定位置开始,不想流经整个ChannelPipeline,以下状况:

    为了节省开销,不感兴趣的ChannelHandler不让经过

    排除一些ChannelHandler

在这种状况下,你可使用ChannelHandlerContext的ChannelHandler通知起点。它使用ChannelHandlerContext执行下一个ChannelHandler。下面代码显示了直接使用ChannelHandlerContext操做:

// Get reference of ChannelHandlerContext  
ChannelHandlerContext ctx = ..;  
// Write buffer via ChannelHandlerContext  
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

该消息流经ChannelPipeline到下一个ChannelHandler,在这种状况下使用ChannelHandlerContext开始下一个ChannelHandler。下图显示了事件流:

如上图显示的,从指定的ChannelHandlerContext开始,跳过前面全部的ChannelHandler,使用ChannelHandlerContext操做是常见的模式,最经常使用的是从ChannelHanlder调用操做,也能够在外部使用ChannelHandlerContext,由于这是线程安全的。

6.2.2 修改ChannelPipeline

调用ChannelHandlerContext的pipeline()方法能访问ChannelPipeline,能在运行时动态的增长、删除、替换ChannelPipeline中的ChannelHandler。能够保持ChannelHandlerContext供之后使用,如外部Handler方法触发一个事件,甚至从一个不一样的线程。

下面代码显示了保存ChannelHandlerContext供以后使用或其余线程使用:

public class WriteHandler extends ChannelHandlerAdapter {  
    private ChannelHandlerContext ctx;  
    @Override  
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  
        this.ctx = ctx;  
    }  
    public void send(String msg){  
        ctx.write(msg);  
    }  
}

 请注意,ChannelHandler实例若是带有@Sharable注解则能够被添加到多个ChannelPipeline。

也就是说单个ChannelHandler实例能够有多个ChannelHandlerContext,所以能够调用不一样ChannelHandlerContext获取同一个ChannelHandler。

若是添加不带@Sharable注解的ChannelHandler实例到多个ChannelPipeline则会抛出异常;使用@Sharable注解后的ChannelHandler必须在不一样的线程和不一样的通道上安全使用。怎么是不安全的使用?看下面代码:

@Sharable  
public class NotSharableHandler extends ChannelInboundHandlerAdapter {  
    private int count;  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        count++;  
        System.out.println("channelRead(...) called the " + count + " time“");  
        ctx.fireChannelRead(msg);  
    }  
      
}

上面是一个带@Sharable注解的Handler,它被多个线程使用时,里面count是不安全的,会致使count值错误。

为何要共享ChannelHandler?使用@Sharable注解共享一个ChannelHandler在一些需求中仍是有很好的做用的,如使用一个ChannelHandler来统计链接数或来处理一些全局数据等等。

6.3 状态模型

Netty有一个简单但强大的状态模型,并完美映射到ChannelInboundHandler的各个方法。下面是Channel生命周期四个不一样的状态:

    channelUnregistered

    channelRegistered

    channelActive

    channelInactive

Channel的状态在其生命周期中变化,由于状态变化须要触发,下图显示了Channel状态变化:

还能够看到额外的状态变化,由于用户容许从EventLoop中注销Channel暂停事件执行,而后再从新注册。在这种状况下,你会看到多个channelRegistered和channelUnregistered状态的变化,而永远只有一个channelActive和channelInactive的状态,由于一个通道在其生命周期内只能链接一次,以后就会被回收;从新链接,则是建立一个新的通道。

下图显示了从EventLoop中注销Channel后再从新注册的状态变化:

6.4 ChannelHandler和其子类

Netty中有3个实现了ChannelHandler接口的类,其中2个是接口,一个是抽象类。以下图

6.4.1 ChannelHandler中的方法

Netty定义了良好的类型层次结构来表示不一样的处理程序类型,全部的类型的父类是ChannelHandler。ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法。

    handlerAdded,ChannelHandler添加到实际上下文中准备处理事件

    handlerRemoved,将ChannelHandler从实际上下文中删除,再也不处理事件

    exceptionCaught,处理抛出的异常

上面三个方法都须要传递ChannelHandlerContext参数,每一个ChannelHandler被添加到ChannelPipeline时会自动建立ChannelHandlerContext。ChannelHandlerContext容许在本地通道安全的存储和检索值。Netty还提供了一个实现了ChannelHandler的抽象类:ChannelHandlerAdapter。ChannelHandlerAdapter实现了父类的全部方法,基本上就是传递事件到ChannelPipeline中的下一个ChannelHandler直到结束。

6.4.2 ChannelInboundHandler

ChannelInboundHandler提供了一些方法再接收数据或Channel状态改变时被调用。下面是ChannelInboundHandler的一些方法:

channelRegistered,ChannelHandlerContext的Channel被注册到EventLoop;

channelUnregistered,ChannelHandlerContext的Channel从EventLoop中注销

channelActive,ChannelHandlerContext的Channel已激活

channelInactive,ChannelHanderContxt的Channel结束生命周期

channelRead,从当前Channel的对端读取消息

channelReadComplete,消息读取完成后执行

userEventTriggered,一个用户事件被处罚

channelWritabilityChanged,改变通道的可写状态,可使用Channel.isWritable()检查

exceptionCaught,重写父类ChannelHandler的方法,处理异常

Netty提供了一个实现了ChannelInboundHandler接口并继承ChannelHandlerAdapter的类:ChannelInboundHandlerAdapter。ChannelInboundHandlerAdapter实现了ChannelInboundHandler的全部方法,做用就是处理消息并将消息转发到ChannelPipeline中的下一个ChannelHandler。

ChannelInboundHandlerAdapter的channelRead方法处理完消息后不会自动释放消息,若想自动释放收到的消息,可使用SimpleChannelInboundHandler<I>。

看下面代码:

ChannelInitializer用来初始化ChannelHandler,将自定义的各类ChannelHandler添加到ChannelPipeline中。/** 
 * 实现ChannelInboundHandlerAdapter的Handler,不会自动释放接收的消息对象 
 * @author c.k 
 * 
 */  
public class DiscardHandler extends ChannelInboundHandlerAdapter {  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        //手动释放消息  
        ReferenceCountUtil.release(msg);  
    }  
}
/** 
 * 继承SimpleChannelInboundHandler,会自动释放消息对象 
 * @author c.k 
 * 
 */  
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {  
        //不须要手动释放  
    }  
}

若是须要其余状态改变的通知,能够重写Handler的其余方法。一般自定义消息类型来解码字节,能够实现ChannelInboundHandler或ChannelInboundHandlerAdapter。有一个更好的解决方法,使用编解码器的框架能够很容的实现。使用ChannelInboundHandler、ChannelInboundHandlerAdapter、SimpleChannelInboundhandler这三个中的一个来处理接收消息,使用哪个取决于需求;大多数时候使用SimpleChannelInboundHandler处理消息,使用ChannelInboundHandlerAdapter处理其余的“入站”事件或状态改变。

ChannelInitializer用来初始化ChannelHandler,将自定义的各类ChannelHandler添加到ChannelPipeline中。

6.4.3 ChannelOutboundHandler

ChannelOutboundHandler用来处理“出站”的数据消息。ChannelOutboundHandler提供了下面一些方法:

bind,Channel绑定本地地址

connect,Channel链接操做

disconnect,Channel断开链接

close,关闭Channel

deregister,注销Channel

read,读取消息,实际是截获ChannelHandlerContext.read()

write,写操做,实际是经过ChannelPipeline写消息,Channel.flush()属性到实际通道

flush,刷新消息到通道

ChannelOutboundHandler是ChannelHandler的子类,实现了ChannelHandler的全部方法。全部最重要的方法采起ChannelPromise,所以一旦请求中止从ChannelPipeline转发参数则必须获得通知。

Netty提供了ChannelOutboundHandler的实现:ChannelOutboundHandlerAdapter。ChannelOutboundHandlerAdapter实现了父类的全部方法,而且能够根据须要重写感兴趣的方法。全部这些方法的实现,在默认状况下,都是经过调用ChannelHandlerContext的方法将事件转发到ChannelPipeline中下一个ChannelHandler。

看下面的代码:

public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {  
    @Override  
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  
        ReferenceCountUtil.release(msg);  
        promise.setSuccess();  
    }  
}

重要的是要记得释放致远并直通ChannelPromise,若ChannelPromise没有被通知可能会致使其中一个ChannelFutureListener不被通知去处理一个消息。

若是消息被消费而且没有被传递到ChannelPipeline中的下一个ChannelOutboundHandler,那么就须要调用ReferenceCountUtil.release(message)来释放消息资源。一旦消息被传递到实际的通道,它会自动写入消息或在通道关闭是释放。

相关文章
相关标签/搜索