Netty中另外两个重要组件—— ChannelHandle,ChannelHandleContext,Pipeline。Netty中I/O事件的传播机制以及数据的过滤和写出均由它们负责。git
// AbstractChannel(..)
protected AbstractChannel(Channel parent) {
...
// 建立pipeline
pipeline = newChannelPipeline();
}
// newChannelPipeline()
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
// DefaultChannelPipeline(..)
protected DefaultChannelPipeline(Channel channel) {
...
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
复制代码
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
...
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
...
}
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
...
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
...
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
...
}
复制代码
head与tail它们都会调用父类AbstractChannelHandlerContext构造器去完成初始化,由此咱们能够预见ChanelPipeline里面存放的是一个个ChannelHandlerContext,根据DefaultChannelPipeline构造方法咱们能够知道它们数据结构为双向链表,根据AbstractChannelHandlerContext构造方法,咱们能够发现head指定的为出栈处理,而tail指定的为入栈处理器。github
pipeline里面的事件传播机制咱们接下来验证,可是咱们能够推测出入栈从head开始传播,由于它是出栈处理器,因此它只管往下传播不作任何处理,一直到tail会结束。出栈从tail开始传播,由于他是入栈处理器,因此它只管往下传播事件便可,也不作任何处理。这么看来对于入栈,从head开始到tail结束;对于出栈偏偏相反,从tail开始到head结束。promise
// filterName(..)
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
// 判断重名
private void checkDuplicateName(String name) {
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
// 找有没有同名的context
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
复制代码
// 插入到链表中tail节点的前面。
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
复制代码
final void callHandlerAdded() throws Exception {
...
if (setAddComplete()) {
// 调用具体handler的handlerAdded方法
handler().handlerAdded(this);
}
}
复制代码
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
// 相同堆内地址即为找到
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
复制代码
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
复制代码
final void callHandlerRemoved() throws Exception {
try {
// Only call handlerRemoved(...) if we called handlerAdded(...) before.
if (handlerState == ADD_COMPLETE) {
handler().handlerRemoved(this);
}
} finally {
// Mark the handler as removed in any case.
setRemoved();
}
}
复制代码
// 省略代码
...
serverBootstrap
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new Inbound1())
.addLast(new InBound2())
.addLast(new Inbound3());
}
});
...
public class Inbound1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("at InBound1: " + msg);
ctx.fireChannelRead(msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().pipeline().fireChannelRead("hello cj");
}
}
public class Inbound2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("at InBound2: " + msg);
ctx.fireChannelRead(msg);
}
}
public class Inbound3 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("at InBound3: " + msg);
ctx.fireChannelRead(msg);
}
}
复制代码
从head开始一直向下一个inboud传播直到tail结束,也能够看到ChannelHandlerContext起到的正是中间纽带的做用, 它能拿到handle也能够向上获取到channel与pipeline,一个channel只会有一个pipeline,一个pipeline能够有多个入栈handler和出栈handler,并且每一个handler都会被ChannelHandlerContext包裹着。事件传播依赖的ChannelHandlerContext的fire*方法。bash
按照咱们上边说的那样 InBoud1 -> InBound2 -> InBoud3数据结构
public class Outbound1 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println("oubound1 write:" + msg);
ctx.write(msg, promise);
}
}
public class Outbound2 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println("oubound2 write:" + msg);
ctx.write(msg, promise);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(()-> {
ctx.channel().pipeline().write("hello cj...");
}, 5, TimeUnit.SECONDS);
}
}
public class Outbound3 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println("oubound3 write:" + msg);
ctx.write(msg, promise);
}
}
复制代码
与入栈事件传递顺序是彻底相反的,也就是从链表尾部开始。ide
public class Inbound1 extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Inbound1...");
super.exceptionCaught(ctx, cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
throw new RuntimeException("cj test throw caught...");
}
}
public class Inbound3 extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Inbound2...");
super.exceptionCaught(ctx, cause);
}
}
public class Outbound1 extends ChannelOutboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Outbound1...");
super.exceptionCaught(ctx, cause);
}
}
public class Outbound2 extends ChannelOutboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Outbound2...");
super.exceptionCaught(ctx, cause);
}
}
public class Outbound3 extends ChannelOutboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Outbound3...");
super.exceptionCaught(ctx, cause);
}
}
复制代码
异常的传播过程是从head一直遍历到tail结束,并在tail中将其打印出来。ui
ctx.write("hello cj...");
ctx.pipeline().write("hello cj...");
复制代码
ctx.write(..) 咱们按照上面的内容是能够想到的,ctx.write实际上是直接激活当前节点的下一个节点write,因此它不会从尾部开始向前遍历全部的outbound,而ctx.pipeline().write(..)咱们看源码能够知道,它先调用pipeline的write方法,跟踪源码(下图)能够发现,他是从tail开始遍历的,全部的outboud会依次被执行。同理inbound也是如此this