运算符 | 描述 |
---|---|
& | 与 |
| | 或 |
~ | 非 |
^ | 异或 |
<< | 左移 |
>> | 右移 |
&(与)缓存
十进制 | 二进制 |
---|---|
3 | 0 0 1 1 |
5 | 0 1 0 1 |
& 后结果:1 | 0 0 0 1 |
即:对应位都为 1 时,才为 1,不然全为 0。安全
|(或)ide
十进制 | 二进制 |
---|---|
3 | 0 0 1 1 |
5 | 0 1 0 1 |
| 后结果 :7 | 0 1 1 1 |
即:对应位只要有 1 时,即为 1,不然全为 0。oop
~(非)学习
十进制 | 二进制 |
---|---|
3 | 0 0 1 1 |
~ 后结果:12 | 1 1 0 0 |
即:对应位取反。this
异或 ^atom
十进制 | 二进制 |
---|---|
3 | 0 0 1 1 |
5 | 0 1 0 1 |
^ 后结果:6 | 0 1 1 0 |
即:只要对应为不一样即为 1。spa
咱们在以往学习Netty中见到过相似于如下代码:.net
selectionKey.interestOps(interestOps | readInterestOp);
咱们重点关注位运算:interestOps | readInterestOp线程
该行代码的意思是位运算计算一个数字,该数字包含 | 先后的数字!
//初始化一个值 int interestOps = 0; //给当前这个值增长一个可读事件 interestOps |= OP_READ; //给当前的值增长一个可写的事件 interestOps |= OP_WRITE; //判断当前的事件是否是包含可读事件 true boolean isRead = (interestOps & OP_READ) == OP_READ; //判断当前的事件是否是不包含可读事件 false boolean isRead = (interestOps & OP_READ) == 0; //剔除可读事件 interestOps &= ~OP_READ; //剔除可写事件 interestOps &= ~OP_WRITE;
io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); //建立一个管道上下文 尾部节点 tail = new TailContext(this); //建立一个管道上下文 头部节点 head = new HeadContext(this); //头部节点的下一个节点设置为尾部节点 head.next = tail; //尾部节点的上一个节点设置为头部节点 tail.prev = head; }
能够看到,这里初始化管道的时候,管道内部存在两个Handler tail和head节点,两个节点组成双向链表!
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered"); super.channelRegistered(ctx); } });
上述代码再一个Netty开发中是很常见的一个代码,这里向通道内添加了一个 ChannelInboundHandlerAdapter
,咱们进入到addLast方法:
@Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } //进入到 addLast @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { ObjectUtil.checkNotNull(handlers, "handlers"); for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; } //进入到 addLast(executor, null, h); @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //验证是否重复添加改handler checkMultiplicity(handler); //将handler封装为上下文对象 newCtx = newContext(group, filterName(name, handler), handler); //将该节点添加到双向链表中 addLast0(newCtx); ........................忽略其余代码.............. } ........................忽略其余代码.............. return this; }
这里总共分为两步:
验证Handler是否被重复添加
checkMultiplicity(handler);
private static void checkMultiplicity(ChannelHandler handler) { //验证是否是 ChannelHandlerAdapter 类型的,若是不是直接忽略 if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; //若是不是可共享的并且是已经添加过的直接报错 if (!h.isSharable() && h.added) { throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); } //若是是可共享的或者未添加的,将该handler内的 added属性设置为true证实该handler已经被添加 h.added = true; } }
他是如何判断是否被添加过的呢?
每个Handler中都存在一个 added属性,当这个属性为true的时候,证实这个Handler已经被添加过了,Netty常规状况下为了考虑线程安全问题,是不容许一个Handler被重复的使用的!
可是咱们有时候会有这样一个需求,Handler的功能比较相似,并且咱们经过代码手段,避免了线程安全问题,因此又想重复添加Handler,Netty提供了一个注解 @Sharable
注解,当存在该注解的时候,证实这个Handler是能够被复用的,能够被重复添加!
因此,checkMultiplicity方法经过判断类是否增长了@Sharable
注解和added
属性是否为空来验证Handle是否违规重复添加了!
当验证经过以后,将added设置为true,证实这个Handler已经被添加过了!
将Handler封装为包装对象
newCtx = newContext(group, filterName(name, handler), handler);
这里比较难理解的就是这个,咱们进入到newContext方法里面:
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); }
进入到 DefaultChannelHandlerContext类的源码里面:
DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { //调用父类进行掩码计算 super(pipeline, executor, name, handler.getClass()); //保存一个handler this.handler = handler; }
这里除了会保存一个handler还会调用父类,咱们介入到父类里面:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; //标识 是in仍是out this.executionMask = mask(handlerClass); // 若是由EventLoop或给定的Executor驱动的驱动程序是OrderedEventExecutor的实例,则其顺序为。 ordered = executor == null || executor instanceof OrderedEventExecutor; }
这里会保存一些属性,这些属性都是咱们前面讲过的,你们自行分析下,咱们重点关注掩码的计算:
this.executionMask = mask(handlerClass);
static int mask(Class<? extends ChannelHandler> clazz) { //直接再缓存中取出 Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get(); Integer mask = cache.get(clazz); //缓存中不存在 if (mask == null) { mask = mask0(clazz); cache.put(clazz, mask); } return mask; }
先从缓存中取出,若是不存在就调用 mask0(clazz); 方法计算,而后再放进缓存,咱们进入到mask0(clazz);
方法:
private static int mask0(Class<? extends ChannelHandler> handlerType) { int mask = MASK_EXCEPTION_CAUGHT; try { if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) { // 若是是 ChannelInboundHandler 实例,全部 Inbound 事件置为 1 mask |= MASK_ALL_INBOUND; //判断是否存在Skip注解 若是催你在这个跳过的注解 就移除这个 if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_REGISTERED; } ..................忽略相似的代码..................... } if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) { mask |= MASK_ALL_OUTBOUND; if (isSkippable(handlerType, "bind", ChannelHandlerContext.class, SocketAddress.class, ChannelPromise.class)) { mask &= ~MASK_BIND; } ..................忽略相似的代码..................... } } catch (Exception e) { ..................忽略异常的代码..................... } return mask; }
这会区分两种状况,一种是ChannelInboundHandler
类型的,一种是ChannelOutboundHandler
类型的,两者逻辑相同,咱们以ChannelInboundHandler为例:
首先,再ChannelHandlerMask类里面定义了不少的预设掩码值:
/** * 如下是方法表明的掩码值 */ static final int MASK_EXCEPTION_CAUGHT = 1; /** * channelRegistered方法的掩码 */ static final int MASK_CHANNEL_REGISTERED = 1 << 1; /** * channelUnregistered方法的掩码 */ static final int MASK_CHANNEL_UNREGISTERED = 1 << 2; /** * 后面的以此类推 */ static final int MASK_CHANNEL_ACTIVE = 1 << 3; static final int MASK_CHANNEL_INACTIVE = 1 << 4; static final int MASK_CHANNEL_READ = 1 << 5; static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6; static final int MASK_USER_EVENT_TRIGGERED = 1 << 7; static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8; /** * bind方法的掩码 */ static final int MASK_BIND = 1 << 9; /** * connect方法的掩码 */ static final int MASK_CONNECT = 1 << 10; /** * 后面的以此类推 */ static final int MASK_DISCONNECT = 1 << 11; static final int MASK_CLOSE = 1 << 12; static final int MASK_DEREGISTER = 1 << 13; static final int MASK_READ = 1 << 14; static final int MASK_WRITE = 1 << 15; static final int MASK_FLUSH = 1 << 16; /** * 包含所有 Inbound方法的掩码 */ private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED | MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ | MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED; /** * 包含所有 outbound方法的掩码 */ private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT | MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
咱们回到 mask0方法:
mask |= MASK_ALL_INBOUND;
一开始,咱们会直接将一个handler的掩码计算为拥有所有方法的掩码!
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_REGISTERED; }
判断该方法是否存在 @Skip
注解,若是存在就排除掉这个掩码!
整个逻辑执行完毕后,这个掩码就只会包含handler中没有被@Sikp注解注解的方法掩码!
有同窗可能疑问,我在书写handler的时候并无增长@Sikp
注解呀! 咱们都知道,实现一个Handler就一定须要继承 ChannelInboundHandlerAdapter
或者ChannelOutboundHandlerAdapter
, 咱们随便挑一个类进去看:
能够看到,这些方法其实都是被默认添加了的,只不过咱们重写以后没添加!如今咱们明白,handler是如何区分你实现了那些方法的了!
这里会将handler包装为HandlerContext对象,相似于tailContext和HeadContext同样,此时上下文对象的结构以下:
将HandlerContext添加进pipeline中:
addLast0(newCtx);
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
整个过程如上,无非就是指针指向地址的变换,比较简单,不作深刻分析!
ch.pipeline().remove("xxxxxx")
@Override public final ChannelPipeline remove(ChannelHandler handler) { remove(getContextOrDie(handler)); return this; }
寻找处理器Handler的上下文
getContextOrDie(handler)
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) { //寻找handler AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler); if (ctx == null) { throw new NoSuchElementException(handler.getClass().getName()); } else { return ctx; } } //context(handler); @Override public final ChannelHandlerContext context(ChannelHandler handler) { ObjectUtil.checkNotNull(handler, "handler"); AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == null) { return null; } //循环迭代 判断是否寻找到这个handler if (ctx.handler() == handler) { //返回这个handler的上下文对象 return ctx; } ctx = ctx.next; } }
删除这个处理器
remove(getContextOrDie(handler));
private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) { if (ctx == null) { return null; } return (T) remove((AbstractChannelHandlerContext) ctx).handler(); } //直接进入到 删除Handler的主要逻辑 //(T) remove((AbstractChannelHandlerContext) ctx).handler(); private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) { //首先删除的handler不是tail和尾节点 assert ctx != head && ctx != tail; synchronized (this) { //删除上下文对象 atomicRemoveFromHandlerList(ctx); ................忽略.................... EventExecutor executor = ctx.executor(); if (!executor.inEventLoop()) { executor.execute(new Runnable() { @Override public void run() { //回调handlerRemoved方法 callHandlerRemoved0(ctx); } }); return ctx; } } callHandlerRemoved0(ctx); return ctx; }
首先咱们关注 atomicRemoveFromHandlerList(ctx);
:
private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) { //获取该节点的上级节点 AbstractChannelHandlerContext prev = ctx.prev; //获取该节点的下级节点 AbstractChannelHandlerContext next = ctx.next; //重建指针位置 prev.next = next; next.prev = prev; }
指针位置重建以后,咱们回调handlerRemoved方法
callHandlerRemoved0(ctx);
至此咱们就完成了pipeline的建立、添加、删除的源码解析!
咱们前面见到过不少的事件传播代码,咱们以 channelRegistered
方法的事件回调为例:
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
//通知管道 传播channelRegistered事件 // 触发 channelRegistered 事件 pipeline.fireChannelRegistered();
咱们进入到改行代码的源码:
@Override public final ChannelPipeline fireChannelRegistered() { //执行注册方法 从head方法 AbstractChannelHandlerContext.invokeChannelRegistered(head); return this; }
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } }
咱们能够看到,这里使用了 next.invokeChannelRegistered();
方法 咱们依旧按照同步方法进行分析!
private void invokeChannelRegistered() { if (invokeHandler()) { try { //如今调用的HeadContext的handler ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } }
咱们如今进入到了headContext,因此咱们进入到: io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered
:
@Override public void channelRegistered(ChannelHandlerContext ctx) { invokeHandlerAddedIfNeeded(); //向下传播事件 ctx.fireChannelRegistered(); }
这一段代码除了执行Head的invokeHandlerAddedIfNeeded方法以外,还又一次传播了channelRegistered事件,咱们进入到 ctx.fireChannelRegistered();
:
@Override public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED)); return this; }
咱们若是想要向下传播,咱们首先应该找到下一个节点是谁才能传播,Netty这里调用了findContextInbound(MASK_CHANNEL_REGISTERED)
查找下一个节点,我咱们先关注如下参数 MASK_CHANNEL_REGISTERED
, 他是channelRegistered方法的掩码
, 咱们进入到 findContextInbound
方法源码:
private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; do { //获取下一个inbun事件 ctx = ctx.next; //只要和掩码&运算后不为0的都是 inbunt事件 } while ((ctx.executionMask & mask) == 0); return ctx; }
从当前节点向下寻找,只要 掩码计算包含这个方法,就证实该context包含channelRegistered方法,就直接返回!
寻找到了handler以后,就开始调用了:
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } }
next.invokeChannelRegistered();
具体逻辑就和上面分析的一致了,调用该handler的ChannelRegistered方法!
传播某一个事件,就会使用哪一个事件的掩码,从当前节点向下寻找,知道对应的Handler以后,回调对应的方法!
关于管道的传播,你明白了吗?
提一个问题, 观察如下两种传播方式有何不一样:
ctx.fireChannelRegistered(); ctx.pipeline().fireChannelRegistered();