netty io.netty.channel 简介1

Interface AddressedEnvelope<M,A extends SocketAddress>

    此接口将一个消息、发送地址和接收地址封装到了一块儿
html

Interface Channel

    此接口表示到网络socket或者组件(component)的一个链接,其提供了IO操做的一些功能,好比read, write, connect, and bind.一个channel能够给用户提供以下功能:1.当前channel的状态(open、connected等)。2.channel的配置参数(如receive buffer size)。3.该channel支持的全部IO操做(read, write, connect, and bind)。4.还能够提供与此channel关联的ChannelPipeline,此pipeline主要负责处理该channel的全部IO事件和请求。java

  全部的IO操做都是异步的。在Netty中全部的IO操做都是异步的。这就意味着任何IO操做都调用以后都会当即返回,不能保证IO操做在调用结束的时候完成。调用IO操做以后会返回一个ChannelFuture对象,该对象会在IO操做成功失败取消的时候进行notify.api

  通道是有层次关系的。根据channel的建立方式,channel能够有一个parent,例如,一个由ServerSocketChannel接受请求建立的SocketChannel,调用其parent()方法,会返回ServerSocketChannel。层次结构的语义取决于channel依赖的传输具体实现,例如,你能够建立一个channel实现类,其能够建立一个和其共享一个socket连接的子channel,就像BEEP和SSH安全

  向下转型访问特殊操做。有些传输实现会暴露一些该实现特有的操做,经过向下转型能够调用这些操做,例如,对于旧的数据报传输,咱们能够讲channel转换成DatagramChannel,而后就能够调用其特有的multicast join / leave等操做。网络

  释放资源。对一个channel操做完毕以后,必定要调用close()close(ChannelPromise) 方法来释放资源。并发

Interface ChannelConfig

    此类封装了channel配置属性信息异步

    若是须要特殊的配置信息,须要作向下转换,具体代码以下:socket

Channel ch = ...; 
SocketChannelConfig cfg = (SocketChannelConfig) ch.getConfig();
cfg.setTcpNoDelay(false);


    选项map(Option map)。是一个动态只写的属性,其提供了另一种方式来设置属性,而不须要向下进行转换。经过setOptions(Map).方法能够更新option map。好比上面的代码,咱们能够不用将ch转换为具体的SocketChannelConfig 具体代码以下:ide

Channel ch = ...; 
cfg.setsetOption(ChannelOption.TCP_NODELAY,false);

Interface ChannelFactory<T extends Channel>

    建立channel的工厂post

Interface ChannelFuture

    其封装了异步IO操做的结果

    Netty中全部的IO操做都是异步的。这就意味着任何IO调用都会当即返回,并且不保证IO操做在调用结束的时候完成,调用IO操做会返回一个ChannelFurniture对象,经过这个对象你能够获得IO操做的状态信息和结果。channelfuture对象要么是未完成状态(uncompleted),要么是完成状态(completed)。当一个IO操做开始的时候会建立一个channelfuture对象,初始的channelfuture对象是未完成状态,它既不是成功(succeeded),也不是失败(failed),更没有取消(cancelled),由于IO操做尚未完成(finished)。若是IO操做完成了,有可能成功(succeeded),失败(failed),或者是取消(cancelled),channelfuture对象会被标记为完成状态(completed,并会附有相信的信息,好比失败的缘由,须要注意的是失败(failed)和取消(cancelled)都属于完成状态。未完成和完成 与成功、失败、取消是两个不一样的维度。下面图表示channelfuture的状态,左边是初始未完成状态,右边是完成状态,可能有三种成功失败和取消:

                                      +---------------------------+
                                      | Completed successfully    |
                                      +---------------------------+
                                 +---->      isDone() = true      |
 +--------------------------+    |    |   isSuccess() = true      |
 |        Uncompleted       |    |    +===========================+
 +--------------------------+    |    | Completed with failure    |
 |      isDone() = false    |    |    +---------------------------+
 |   isSuccess() = false    |----+---->   isDone() = true         |
 | isCancelled() = false    |    |    |    cause() = non-null     |
 |       cause() = null     |    |    +===========================+
 +--------------------------+    |    | Completed by cancellation |
                                 |    +---------------------------+
                                 +---->      isDone() = true      |
                                      | isCancelled() = true      |
                                      +---------------------------+

    此接口提供了不少方法来帮助你检查IO操做状态好比是否已经完成或者获取IO操做的结果,你能够添加ChannelFutureListener来监听channelfuture对象,这样当IO操做完成的时候,你会被通知到。

  推荐使用addListener,不建议使用await。addListener方法实在channelfuture上监听事件,是非阻塞的方法,当IO调用结束的时候,你会收到通知,在这以前你能够作别的事情,能够提高效率。而await方法是阻塞的。一旦调用以后,当前线程会阻塞,直到IO操做完成,并且会增长死锁的风险。

  不要在ChannelHandler中调用channelfuture的await方法。ChannelHandler中的时间处理方法是由IO线程调用的,一旦await方法被IO线程调用,IO操做将会等待永远不会完成,由于await方法阻塞了他等待的IO操做,就形成了死锁,代码以下:

// BAD - NEVER DO THIS @Override//永远不要这样用
 public void channelRead(ChannelHandlerContext ctx, GoodByeMessage msg) {     ChannelFuture future = ctx.channel().close();
     future.awaitUninterruptibly();
     // Perform post-closure operation
     // ...
 }

 // GOOD @Override//正确的作法
 public void channelRead(ChannelHandlerContext ctx,  GoodByeMessage msg) {     ChannelFuture future = ctx.channel().close();
     future.addListener(new ChannelFutureListener() {
         public void operationComplete(ChannelFuture future) {
             // Perform post-closure operation
             // ...
         }
     });
 }

    尽管await方法有上述缺点,可是调用await显然更简便,若是必定要调用await方法,请记住不要再IO线程里调用channelfuture的await方法,不然系统为了防止死锁,会抛出BlockingOperationException

不要混淆IO超(IO timeout)和await超时(await timeout)。调用方法Future.await(long),Future.await(long, TimeUnit), Future.awaitUninterruptibly(long), 或者Future.awaitUninterruptibly(long, TimeUnit)的超时与IO超时没有任何关系。若是IO超时channelfuture对象会被标记为带失败的完成状态(completed with failure),IO超时的参数能够经过option设置,代码以下:

// BAD - NEVER DO THIS 永远不要这样作
 Bootstrap b = ...; 
 ChannelFuture f = b.connect(...);
 f.awaitUninterruptibly(10, TimeUnit.SECONDS);//IO超时应该设置到channelconfig,而不是channelfuture
 if (f.isCancelled()) {
     // Connection attempt cancelled by user
 } else if (!f.isSuccess()) {
     // You might get a NullPointerException here because the future
     // might not be completed yet.
     f.cause().printStackTrace();
 } else {
     // Connection established successfully
 }

 // GOOD 正确的作法
 Bootstrap b = ...;
 // Configure the connect timeout option. 
 b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);//这里最终会经过channelconfig来配置
 ChannelFuture f = b.connect(...);
 f.awaitUninterruptibly();

 // Now we are sure the future is completed.
 assert f.isDone();

 if (f.isCancelled()) {
     // Connection attempt cancelled by user
 } else if (!f.isSuccess()) {
     f.cause().printStackTrace();
 } else {
     // Connection established successfully
 }

Interface ChannelFutureListener

    用来监听channelfuture的结果,调用ChannelFuture.addListener(GenericFutureListener)方法以后,异步IO的操做完成以后会通知channelfuturelistener

    GenericFutureListener.operationComplete(Future)是直接被IO线程调用的,所以若是在该方法中调用耗时任务或者是阻塞的操做会致使意外停顿。若是你确实须要执行一个耗时操做或耗时操做,请用线程池另起一个线程来执行耗时操做。

Interface ChannelHandler

    此接口负责处理一个IO事件,或者拦截一个IO操做。并将事件或操做转发给ChannelPipeline中的下一个channelhandler对象。

  建议继承ChannelHandlerAdapter代替实现channelHandler接口。由于channelhandler接口有不少方法须要实现,而ChannelhandlerAdaptor默认实现了一些方法,大部分状况下你只须要实现一些必要的方法就能够了。

  上下文对象(The context object)。channelhandler须要ChannelHandlerContext对象。channelhandler对象经过channelhandlercontext对象与channelhandler的所属的channelpipeline交互。经过context对象,channelhandler能够将事件转发给他的上游和下游,或者动态修改pipeline,对于特殊的handler能够存储信息(经过AttributeKeys)。

  状态管理。channelhandler常常须要存储一些状态信息,最简单的推荐的方法是使用成员变量,代码以下:

 public interface Message {
     // your methods here
 }

 public class DataServerHandler extends SimpleChannelInboundHandler<Message> {     
     private boolean loggedIn;

     @Override
     protected void messageReceived(ChannelHandlerContext ctx, Message message) {         
         Channel ch = e.getChannel();
         if (message instanceof LoginMessage) {
             authenticate((LoginMessage) message);             
             loggedIn = true;
         } else (message instanceof GetDataMessage) {
             if (loggedIn) {
                 ch.write(fetchSecret((GetDataMessage) message));
             } else {
                 fail();
             }
         }
     }
     ...
 }

 上面代码中由于channelhandler实例中有一个变量来专门表示一个连接的状态,即一个链接有一个状态,因此你必须为每个新channel建立一个新的channelhandler实例,避免竞争条件下一个未经受权的客户端获取重要信息。正确代码以下:

 // Create a new handler instance per channel.
 // See ChannelInitializer.initChannel(Channel).
 public class DataServerInitializer extends ChannelInitializer<Channel> {    
     @Override
     public void initChannel(Channel channel) {
         channel.pipeline().addLast("handler", new DataServerHandler());
     }
 }

  Using AttributeKey虽然建议使用成员变量来存储channelhandler的状态,可是为了考虑安全问题须要为每一个channel建立一个channelhandler实例,有些状况下你可能不想建立那么多实例,在这种状况下,你须要用到AttributeKeys,他能够附着到(attached)channelhandlercontext上,代码以下:

 public interface Message {
     // your methods here
 } 
 @Sharable //这个注解很重要后面会介绍
 public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
     private final AttributeKey<Boolean> auth =           
         AttributeKey.valueOf("auth");     
     @Override
     protected void messageReceived(ChannelHandlerContext ctx, Message message) {         
         Attribute<Boolean> attr = ctx.attr(auth);         
         Channel ch = ctx.channel();

         if (message instanceof LoginMessage) {
             authenticate((LoginMessage) o);             
             attr.set(true);
         } else (message instanceof GetDataMessage) {
             if (Boolean.TRUE.equals(attr.get())) {
                 ch.write(fetchSecret((GetDataMessage) o));
             } else {
                 fail();
             }
         }
     }
     ...
 }

    经过上面的代码能够将channelhandler的状态attach到channelhandlercontext上,你能够将这个channelhandler实例添加到不一样的pipeline,代码以下:

 public class DataServerInitializer extends ChannelInitializer<Channel> {

     private static final DataServerHandler SHARED = new DataServerHandler();     
     @Override
     public void initChannel(Channel channel) {
         channel.pipeline().addLast("handler", SHARED);
     }
 }


  @Sharable注解 上面的用attributekey实例代码中用到了@Sharable注解,若是channelhandler加上了@sharable注解,意味着你能够只建立一个实例,而后你能够将该实例放到任意不一样的pipeline中,而没必要考虑竞争条件。若是不加这个注解,你每次向pipeline中添加channelhandler,都须要建立一个新的实例,不然会有并发问题。

相关文章
相关标签/搜索