netty源码解解析(4.0)-13 Channel NIO实现: 关闭和清理

 Channel提供了3个方法用来实现关闭清理功能:disconnect,close,deregister。本章重点分析这个3个方法的功能的NIO实现。java

 

  disconnect实现: 断开链接promise

  disconnect方法的调用栈以下:多线程

复制代码
1 io.netty.channel.AbstractChannel#disconnect()
2 io.netty.channel.DefaultChannelPipeline#disconnect()
3 io.netty.channel.AbstractChannelHandlerContext#disconnect()
4 io.netty.channel.AbstractChannelHandlerContext#disconnect(io.netty.channel.ChannelPromise)
5 io.netty.channel.AbstractChannelHandlerContext#invokeDisconnect
6 io.netty.channel.DefaultChannelPipeline.HeadContext#disconnect
7 io.netty.channel.AbstractChannel.AbstractUnsafe#disconnect
8 io.netty.channel.socket.nio.NioSocketChannel#doDisconnect
9 io.netty.channel.socket.nio.NioSocketChannel#doClose
复制代码
  disconnect稍微复杂一些, 在io.netty.channel.AbstractChannelHandlerContext#disconnect(io.netty.channel.ChannelPromise)实现中,会根据channel是否支持disconnect操做来决定下一步动做:socket

if (!channel().metadata().hasDisconnect()) {
    next.invokeClose(promise);
} else {
    next.invokeDisconnect(promise);
}   
  之因此这样设计,是由于TCP和UDP的disconnect含义是不同的,对TCP来讲disconnect就是关闭socket;对UDP来讲,它没有链接的概念,默认状况下经过udp socket发送数据须要指定远程地址,但若是调用connect以后,就不需指定这个地址,数据报会被发送到connect指定的地址上,disconnect含义是删除connect指定的地址,发送数据时必须指定地址。因此在NIO的Channel实现中,TCP的disconnect是调用socket的close方法,UDP的disconnect是调用socket的disconnect方法,下面是两种不一样的disconnect实现。ide

复制代码
//TCP io.netty.channel.socket.nio.NioSocketChannel#doDisconnect
@Override
protected void doDisconnect() throws Exception {
    doClose();
}
@Override
protected void doClose() throws Exception {
    super.doClose();
    javaChannel().close();
}
//UDP io.netty.channel.socket.nio.NioDatagramChannel#doDisconnect
@Override
protected void doDisconnect() throws Exception {
    javaChannel().disconnect();
}
复制代码
  io.netty.channel.AbstractChannel.AbstractUnsafe#disconnect实现了disconnect的逻辑,先调用doDisconnect方法,这个方法是io.netty.channel.AbstractChannel定义的的抽象方法。若是channel的状态从active变成inactive,就调用pipeline的fireChannelInactive方法触发channelInactive事件。oop

  

  close实现: 关闭channelthis

  close方法的调用栈:.net

复制代码
io.netty.channel.AbstractChannel#close()
io.netty.channel.DefaultChannelPipeline#close()
io.netty.channel.AbstractChannelHandlerContext#close()
io.netty.channel.AbstractChannelHandlerContext#close(io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#invokeClose
io.netty.channel.DefaultChannelPipeline.HeadContext#close
io.netty.channel.AbstractChannel.AbstractUnsafe#close(io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannel.AbstractUnsafe#close(final ChannelPromise promise, final Throwable cause ,final ClosedChannelException closeCause, final boolean notify)
io.netty.channel.AbstractChannel.AbstractUnsafe#doClose0
io.netty.channel.socket.nio.NioSocketChannel#doClose  
复制代码
  close的逻辑实如今io.netty.channel.AbstractChannel.AbstractUnsafe#close(final ChannelPromise promise, final Throwable cause ,final ClosedChannelException closeCause, final boolean notify)中,这个close方法主要实现了一下几个功能:线程

确保在多线程环境下,屡次调用close和一次调用的影响一致,而且能够经过promis获得一样的结果。
保证在执行close的过程当中,不能向channel写数据。
调用doClose0执行执真正的close操做。
调用deregister对channel作最后的清理工做,并触发channelInactive, channelUnregistered事件。
  如下是这个方法的代码:设计

复制代码
 1 private void close(final ChannelPromise promise, final Throwable cause,
 2                    final ClosedChannelException closeCause, final boolean notify) {
 3     if (!promise.setUncancellable()) {
 4         return;
 5     }
 6 
 7     if (closeInitiated) {
 8         if (closeFuture.isDone()) {
 9             // Closed already.
10             safeSetSuccess(promise);
11         } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
12             // This means close() was called before so we just register a listener and return
13             closeFuture.addListener(new ChannelFutureListener() {
14                 @Override
15                 public void operationComplete(ChannelFuture future) throws Exception {
16                     promise.setSuccess();
17                 }
18             });
19         }
20         return;
21     }
22 
23     closeInitiated = true;
24 
25     final boolean wasActive = isActive();
26     final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
27     this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
28     Executor closeExecutor = prepareToClose();
29     if (closeExecutor != null) {
30         closeExecutor.execute(new Runnable() {
31             @Override
32             public void run() {
33                 try {
34                     // Execute the close.
35                     doClose0(promise);
36                 } finally {
37                     // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
38                     invokeLater(new Runnable() {
39                         @Override
40                         public void run() {
41                             if (outboundBuffer != null) {
42                                 // Fail all the queued messages
43                                 outboundBuffer.failFlushed(cause, notify);
44                                 outboundBuffer.close(www.michenggw.com closeCause);
45                             }
46                             fireChannelInactiveAndDeregister(wasActive);
47                         }
48                     });
49                 }
50             }
51         });
52     } else {
53         try {
54             // Close the channel and fail the queued messages in all cases.
55             doClose0(promise);
56         } finally {
57             if (outboundBuffer != null) {
58                 // Fail all the queued messages.
59                 outboundBuffer.failFlushed(cause, notify);
60                 outboundBuffer.close(closeCause);
61             }
62         }
63         if (inFlush0) {
64             invokeLater(new Runnable() {
65                 @Override
66                 public void run() {
67                     fireChannelInactiveAndDeregister(wasActive);
68                 }
69             });
70         } else {
71             fireChannelInactiveAndDeregister(wasActive);
72         }
73     }
74 }
复制代码
   7-23行,在这个方法被屡次调用的时候,只有一次能够执行的21行之后的代码。从代码看,这一点是用closeInitiated属性来保证的,但它是一个普通boolean类型的属性,在多线程状况下存在可见性问题。事实上一个channel unsafe实例的close方法,只会在一个线程中执行,closeInitiated只在这个方法中使用,所以不存在多线程间的可见性问题。虽然可能在多个不一样的线程中屡次调用Channel的close方法,可是这个close方法,只会在channel的eventLoop线程中执行。凡是经过io.netty.channel.DefaultChannelPipeline.HeadContext调用的channel unsafe方法,都必定在channel的eventLoop线程中执行。

  26,27行,把channel unsafe的outboundBuffer设置为null,  这样,在close的过程当中,全部channel的write方法都会经过promise返回错误。

  28行,prepareToClose默认实现是返回null, 它是一个protected方法,能够根据须要覆盖它,用来在关闭以前作一些准备工做,同时指定一个executor,让接下来的关闭动做都在这个executor中执行。

  33-49行,53-72行,这两段代码实现的都是功能都是同样的,不一样的是33-49行在prepareToClose提供的executor中执行。调用doClose0执行关闭操做,清理outboundBuffer(43,44),  调用fireChannelInactiveAndDeregister(46)触发channelInactive和channelDeregister事件。63-72行,经过inFlush0属性检查当前是否正在进程flush操做,若是是,使用invokerLater确保在当前方法和flush操做完成以后再触发事件。

  doClose0中是真正的关闭操做,它先调用doClose,而后设置promise的返回值:

复制代码
 1 //io.netty.channel.AbstractChannel.AbstractUnsafe#doClose0
 2 private void doClose0(ChannelPromise promise) {
 3     try {
 4         doClose();
 5         closeFuture.setClosed();
 6         safeSetSuccess(www.fengshen157.com/ promise);
 7     } catch (Throwable t) {
 8         closeFuture.setClosed();
 9         safeSetFailure(promise, t);
10     }
11 }
12 //io.netty.channel.socket.nio.NioSocketChannel#doClose
13 @Override
14 protected void doClose() throws Exception {
15     super.doClose();
16     javaChannel().close();
17 }
复制代码
   fireChannelInactiveAndDeregister是调用deregister实现,也就是说,正常状况下,调用Channel的close方法以后就会自动完成一个channel最后的清理工做,不须要再调用deregister方法。

1 private void fireChannelInactiveAndDeregister(final boolean wasActive) {
2     deregister(voidPromise(), wasActive && !isActive());
3 }
 

  deregister实现:从eventLoop中注销channel

  deregister的调用栈:

复制代码
1 io.netty.channel.AbstractChannel#deregister()
2 io.netty.channel.DefaultChannelPipeline#deregister()
3 io.netty.channel.AbstractChannelHandlerContext#deregister()
4 io.netty.channel.AbstractChannelHandlerContext#deregister(io.netty.channel.ChannelPromise)
5 io.netty.channel.AbstractChannelHandlerContext#invokeDeregister
6 io.netty.channel.DefaultChannelPipeline.HeadContext#deregister
7 io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(io.netty.channel.ChannelPromise)
8 io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(io.netty.channel.ChannelPromise, boolean)
9 io.netty.channel.nio.AbstractNioChannel#doDeregister
复制代码
   deregister的逻辑在中实现io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(final ChannelPromise promise, final boolean fireChannelInactive),这个方法的实现比较简单,主要就是调用doDeregister方法执行deregister操做,而后触发channelInactive事件(若是fireChannelInactive参数是true)和channelUnregistered事件。

复制代码
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
    if (!promise.setUncancellable(www.gcyL157.com)) {
        return;
    }
    if (!registered) {
        safeSetSuccess(promise);
        return;
    }
    invokeLater(new Runnable() {
        @Override
        public void run() {
            try {
                doDeregister();
            } catch (Throwable t) {
                logger.warn("Unexpected exception occurred while deregistering a channel.", t);
            } finally {
                if (fireChannelInactive) {
                    pipeline.fireChannelInactive();
                }
                if (registered) {
                    registered www.dasheng178.com= false;
                    pipeline.fireChannelUnregistered();
                }
                safeSetSuccess(promise);
            }
        }
    });
}
复制代码
  这里使用invokeLater执行主要逻辑的目的是为了保证把当前正在eventLoop队列中全部任何都执行完以后再执行真正的deregister操做。

  doDeregister默认实现是空,什么都没作,它是个protected方法。真正的实如今io.netty.channel.nio.AbstractNioChannel中,它只是简单地调用eventLoop的cancel方法把SocketChannel对应的SelectionKey从Selector中删除,这样selector就不会监听到这个socket上的任何事件了。

1 @Override 2 protected void doDeregister() throws Exception { 3     eventLoop().cancel(selectionKey());

相关文章
相关标签/搜索