事件触发、传递、处理是DefaultChannelPipleline实现的另外一个核心能力。在前面在章节中粗略地讲过了事件的处理流程,本章将会详细地分析其中的全部关键细节。这些关键点包括:html
事件的触发方法和处理方法promise
netty提供了三种触发事件的方式:经过Channel触发,经过ChannelPipleline触发,经过ChannelHandlerContext触发。ide
Channel触发oop
在netty源码解解析(4.0)-2 Chanel的接口设计这一章中,列出了Channel触发事件的全部方法。Channel定义的全部事件触发方法中,都是用来触发outbound事件的,只有read方法比较特殊,它直接触发outbound方法,若是能读到数据则会触发inbound方法。下面是Channel的事件触发方法,和ChannelHandler事件处理方法的对应关系。post
outbound事件this
Channel方法 | ChannelOutboundHandler方法 |
bind | bind(ChannelHandlerContext, SocketAddress, ChannelPromise) |
connect | connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise) |
disconnect | disconnect(ChannelHandlerContext, ChannelPromise) |
close | close(ChannelHandlerContext, ChannelPromise) |
deregister | deregister(ChannelHandlerContext, ChannelPromise) |
write | write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) |
flush | flush(ChannelHandlerContext ctx) |
writeAndFlush | 先调用write而后调用flush |
read | read(ChannelHandlerContext) |
inbound事件url
Channel方法 | ChannelInboundHandler方法 |
read | channelRead(ChannelHandlerContext, Object) |
channelReadComplete(ChannelHandlerContext) |
Channel经过调用ChannelPipleline的同名方方法触发事件,如下是AbstractChannel实现的bind的方法spa
1 @Override 2 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { 3 return pipeline.bind(localAddress, promise); 4 }
其余方法的实现和bind相似。设计
ChannelPipleline触发netty
在netty源码解解析(4.0)-8 ChannelPipeline的设计这一章中,列出了全部触发事件的方法。 ChannelPipleline的outbound事件的触发方法和处理方法的对应关系和Channel同样,这里就再也不重复罗列。下面是inbound事件的触发方法和ChannelHandler事件处理方法的对应关系:
inbound事件
ChannelPipleline方法 | ChannelInboundHandler方法 |
fireChannelRegistered | channelRegistered(ChannelHandlerContext) |
fireChannelUnregistered | channelUnregistered(ChannelHandlerContext) |
fireChannelActive | channelActive(ChannelHandlerContext) |
fireChannelInactive | channelInactive(ChannelHandlerContext) |
fireChannelRead | channelRead(ChannelHandlerContext, Object) |
fireChannelReadComplete | channelReadComplete(ChannelHandlerContext) |
fireExceptionCaught | exceptionCaught(ChannelHandlerContext, Throwable) |
fireUserEventTriggered | userEventTriggered(ChannelHandlerContext, Object) |
fireChannelWritabilityChanged | channelWritabilityChanged(ChannelHandlerContext) |
在DefaultChannelPipleline实现中,事件触发是经过调用AbstractChannelHandlerContext的方法实现的。inbound事件的触发方式是调用对应的invokeXXX静态方法。例如: fireChannelRegistered方法会调用invokeChannelRegistered静态方法:
1 @Override 2 public final ChannelPipeline fireChannelRegistered() { 3 AbstractChannelHandlerContext.invokeChannelRegistered(head); 4 return this; 5 }
这里会把链表的头做为输入参数,代表inbound事件是从链表头开始处理。其余inbound事件触发方法的实现和这个相似。
outbound事件的触发方式是调用AbstractChannelHandlerContext的同名方法,例如bind的方法的实现:
1 @Override 2 public final ChannelFuture bind(SocketAddress localAddress) { 3 return tail.bind(localAddress); 4 }
这调用链表尾的方法,代表outbind事件从链表尾开始处理。其余outbound事件的触发方法和这个相似。
ChannelHandlerContext触发
Channel的事件触发方法会调用DefaultChannelPipleline的触发方法,而DefaultChannelPipleline的触发方法调用AbstractChannelHandlerContext的触发方法。因此,不管是Channel仍是ChannelPipleline,他们的事件触发能力都是AbstractChannelHandlerContext提供的,所以ChannelHandlerContext事件触发方法和ChannelHandler事件处理方法的对应关系和Channel,ChannelPipleline是同样的。
三种触发方法的差别
Channel只能触发outbound事件,事件从链表的tail开始,传递到head。ChannelPipleline既能够触发outbound事件,又能触发inbound事件,outbound事件的处理和Channel触发同样,inbound事件的从链表的head开始,传递到tail。ChannelHandlerContext触发方式最为灵活,若是调用ChannelHandlerContext的实例触发事件,outbound事件从这个实例的节点开始向head方向传递,inbound事件从这个实例的节点开始向tail传递,此外还能够调用AbstractChannelHandlerContext提供的静态方法从链表中的任意一个节点开始触发可处理事件。总结起来就是,Channel和ChannelPipleline触发的事件只能从链表的head或tail节点开始触发和传递事件,而ChannelHanderContext能够从链表中任何一个节点触发和传递事件。
事件的传递
事件传递的功能在AbstractChannelHandlerContext,这个类的定义以下:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext
inbound事件的触发和传递
每一个inbound事件的触发传递实现包含3个方法,一个普通方法fireXXX,一个静态方法invokeXXX, 和一个普通方法invokeXXX。每一次inbound事件传递就是一轮fire-invoke-invoe的调用。下面是channelRegisterd事件的相关的代码。
1 @Override 2 public ChannelHandlerContext fireChannelRegistered() { 3 invokeChannelRegistered(findContextInbound()); 4 return this; 5 } 6 7 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { 8 EventExecutor executor = next.executor(); 9 if (executor.inEventLoop()) { 10 next.invokeChannelRegistered(); 11 } else { 12 executor.execute(new Runnable() { 13 @Override 14 public void run() { 15 next.invokeChannelRegistered(); 16 } 17 }); 18 } 19 } 20 21 private void invokeChannelRegistered() { 22 if (invokeHandler()) { 23 try { 24 ((ChannelInboundHandler) handler()).channelRegistered(this); 25 } catch (Throwable t) { 26 notifyHandlerException(t); 27 } 28 } else { 29 fireChannelRegistered(); 30 } 31 }
这三个方法各有不一样的职责:
在广泛invoveChannelRegistered中,正常状况下会调用handler的事件处理方法,这里是handler的channelRegistered方法。若是事件处理方法没有调用对应的fire方法,那么这个事件的传递就算终止了。因此事件传递还须要handler的配合。
inbound事件传递的关键实如今findContextInbound中,这个方法是实现以下:
1 private AbstractChannelHandlerContext findContextInbound() { 2 AbstractChannelHandlerContext ctx = this; 3 do { 4 ctx = ctx.next; 5 } while (!ctx.inbound); 6 return ctx; 7 }
这里使用next向后遍历节点,使用inbound属性判断节点持有的handler是否ChannelInboundHandler类型,直到找到一个合适的节点为止。若是没找到,则返回最后一个节点。这样就对链表中最后一个节点提出了一些特殊的要求:必须是持有ChannelInboundHandler的handler而且;而且要负责终止事件传递。DefaultPipleline.TailContext类的实现就知足了这两点要求。
outbound事件的触发和传递
每一个outbound事件的触发和传递包含两点方法: XXX, invokeXXX。 下面以bind事件为例看看outbound事件的触发和传递:
1 @Override 2 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { 3 if (localAddress == null) { 4 throw new NullPointerException("localAddress"); 5 } 6 if (isNotValidPromise(promise, false)) { 7 // cancelled 8 return promise; 9 } 10 11 final AbstractChannelHandlerContext next = findContextOutbound(); 12 EventExecutor executor = next.executor(); 13 if (executor.inEventLoop()) { 14 next.invokeBind(localAddress, promise); 15 } else { 16 safeExecute(executor, new Runnable() { 17 @Override 18 public void run() { 19 next.invokeBind(localAddress, promise); 20 } 21 }, promise, null); 22 } 23 return promise; 24 } 25 26 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { 27 if (invokeHandler()) { 28 try { 29 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); 30 } catch (Throwable t) { 31 notifyOutboundHandlerException(t, promise); 32 } 33 } else { 34 bind(localAddress, promise); 35 } 36 }
bind方法调用findContextOutbound找到链表上下一个持有ChannelOutboundHandler类型handler的节点,而且确保invokeBind方法在eventExecutor中执行。invokeBind方法负责调用handler对应的事件处理方法,这里是调用handler的bind方法。handler的bind方法中须要调用节点bind方法,这个事件才能继续传递下去。
outbound事件传递的关键实如今findContextOutbound中,这个方法的实现以下:
1 private AbstractChannelHandlerContext findContextOutbound() { 2 AbstractChannelHandlerContext ctx = this; 3 do { 4 ctx = ctx.prev; 5 } while (!ctx.outbound); 6 return ctx; 7 }
这里使用链表的prev向前遍历,使用outbound属性判断节点持有的handler是否ChannelOutboundHandler类型,直到找到一个为止。若是没找到,将会返回链表头的节点。这样对链表头的节点也提出了特殊的要求:它持有的handler必须是ChannelOutboundHandler类型。
链表节点持有的handler类型
在事件的传递和处理过程当中,必须把inbound事件交给ChannelInboundChandler类型的handler处理,把outbound事件交给ChannelOutboundChandler类型的handler处理。为了判断handler类型,定义了两个boolean类型的属性: inbound, outbound。inbound==true表示handler是ChannelInboundHandler类型, outbound==true表示handler是ChannelOutboundHandler类型。这两个值在AbstractChannelHandlerContext构造方法中初始化,初始化值来自输入的参数。DefaultChannelHandlerContext在构造方法中把这两个参数的值传入。
1 DefaultChannelHandlerContext( 2 DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { 3 super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); 4 if (handler == null) { 5 throw new NullPointerException("handler"); 6 } 7 this.handler = handler; 8 }
使用isInbound的的值设置inbound,isOutbound的值设置outbound。这两方法只是简单的使用了instanceof运算符。
1 private static boolean isInbound(ChannelHandler handler) { 2 return handler instanceof ChannelInboundHandler; 3 } 4 5 private static boolean isOutbound(ChannelHandler handler) { 6 return handler instanceof ChannelOutboundHandler; 7 }
为ChannelHandler分配eventExecutor
把一个channleHandler添加到ChannelPipleline中时,ChannelPipleline会给它分配一个eventExecutor, 它的全部的事件处理方法都会在这个executor中执行。若是使用带group参数的add方法,executor会从group中取,不然会把channel的eventLoop当成这个handler的executor使用。 从group中分配execuor的操做在建立持有handler的链表节点时完成:
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); }
childExecutor方法负责从group中取出一个executor分配给handler:
1 private EventExecutor childExecutor(EventExecutorGroup group) { 2 if (group == null) { 3 return null; 4 } 5 Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP); 6 if (pinEventExecutor != null && !pinEventExecutor) { 7 return group.next(); 8 } 9 Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors; 10 if (childExecutors == null) { 11 // Use size of 4 as most people only use one extra EventExecutor. 12 childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4); 13 } 14 // Pin one of the child executors once and remember it so that the same child executor 15 // is used to fire events for the same channel. 16 EventExecutor childExecutor = childExecutors.get(group); 17 if (childExecutor == null) { 18 childExecutor = group.next(); 19 childExecutors.put(group, childExecutor); 20 } 21 return childExecutor; 22 }
实际的分配操做要稍微复杂一些。取决于channel的ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP设置,若是没有设置这个选项或设置成true, 表示每一个channelPipleline只能从一个group中分配一个executor, 这是默认行为,实现代码是地9行-19行,这种状况下每个使用了同一个group的handler,都会被分配到同一个executor中。若是把这个选择设置成false,这是简单地从group中取出一个executor,实现代码是地7行,这种状况下,每个使用了同一个group的handler被均匀地分配到group中的每个executor中。
若是没有指定group,会在地3行退出,这里没有分配executor。这种状况会在AbstractChannelHandlerContext的executor方法中获得妥善处理:
1 @Override 2 public EventExecutor executor() { 3 if (executor == null) { 4 return channel().eventLoop(); 5 } else { 6 return executor; 7 } 8 }
第4行,处理了没分配executor的状况,调用channel的eventLoop方法获得channel的eventLoop。