pipeline 有管道,流水线的意思,最先使用在 Unix 操做系统中,可让不一样功能的程序相互通信,使软件更加”高内聚,低耦合”,它以一种”链式模型”来串起不一样的程序或组件,使它们组成一条直线的工做流。promise
ChannelPipeline 是处理或拦截channel的进站事件和出站事件的双向链表,事件在ChannelPipeline中流动和传递,能够增长或删除ChannelHandler来实现对不一样业务逻辑的处理。通俗的说,ChannelPipeline是工厂里的流水线,ChannelHandler是流水线上的工人。
ChannelPipeline在建立Channel时会自动建立,每一个Channel都拥有本身的ChannelPipeline。缓存
ChannelHandlerContext是将ChannelHandler和ChannelPipeline关联起来的上下文环境,每添加一个handler都会建立ChannelHandlerContext实例,管理ChannelHandler在ChannelPipeline中的传播流向。安全
ChannelPipeline依赖于Channel的建立而自动建立,保存了channel,将全部handler组织起来,至关于工厂的流水线。
ChannelHandler拥有独立功能逻辑,能够注册到多个ChannelPipeline,是不保存channel的,至关于工厂的工人。
ChannelHandlerContext是关联ChannelHandler和ChannelPipeline的上下文环境,保存了ChannelPipeline,控制ChannelHandler在ChannelPipeline中的传播流向,至关于流水线上的小组长。bash
从前面《Netty 源码解析-服务端启动流程解析》和《Netty 源码解析-客户端链接接入及读I/O解析》咱们知道,当有新链接接入时,咱们执行注册流程,注册成功后,会调用channelRegistered,咱们从这个方法开始并发
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
复制代码
initChannel是在服务启动时配置的参数childHandler重写了父类方法ide
private class IOChannelInitialize extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel");
ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0));
ch.pipeline().addLast(new IOHandler());
}
}
复制代码
咱们回忆一下,pipeline是在哪里建立的oop
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
复制代码
当建立channel时会自动建立pipelinepost
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
复制代码
在这里会建立两个默认的handler,一个InboundHandler --> TailContext,一个OutboundHandler --> HeadContext
再看addLast方法ui
@Override
public ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
复制代码
在这里生成一个handler名字,生成规则由handler类名加 ”#0”this
@Override
public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
…
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, generateName(h), h);
}
return this;
}
复制代码
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
}
return this;
}
复制代码
因为pipeline是线程非安全的,经过加锁来保证并发访问的安全,进行handler名称重复性校验,将handler包装成DefaultChannelHandlerContext,最后再添加到pipeline
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx);
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
name2ctx.put(name, newCtx);
callHandlerAdded(newCtx);
}
复制代码
这里分三步
(1) 对DefaultChannelHandlerContext进行重复性校验,若是DefaultChannelHandlerContext不是能够在多个pipeline中共享的,且已经被添加到pipeline中,则抛出异常
(2) 修改pipeline中的指针
添加IdleStateHandler以前
HeadContext --> IOChannelInitialize --> TailContext
添加IdleStateHandler以后
HeadContext --> IOChannelInitialize --> IdleStateHandler --> TailContext
(3) 将handler名和DefaultChannelHandlerContext创建映射关系
(4) 回调handler添加完成监听事件
最后删除IOChannelInitialize
在这里咱们选一个比较典型的读事件解析,其余事件流程基本相似
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
…
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
…
}
复制代码
当boss线程监听到读事件,会调用**unsafe.read()**方法
@Override
public final void read() {
…
pipeline.fireChannelRead(byteBuf);
…
}
复制代码
入站事件从head开始,tail结束
@Override
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
复制代码
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(msg);
}
});
}
return this;
}
复制代码
查找pipeline中下一个Inbound事件
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
复制代码
private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
复制代码
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
复制代码
将这个channel读事件标识为true,并传到下一个handler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
System.out.println(msg.toString());
}
复制代码
这里执行IOHandler重写的channelRead()方法,并调用父类channelRead方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
复制代码
继续调用事件链上的下一个handler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
复制代码
这里会调用TailContext的Read方法,释放msg缓存
总结:传播Inbound事件是从HeadContext节点往上传播,一直到TailContext节点结束
ByteBuf resp = Unpooled.copiedBuffer("hello".getBytes());
ctx.channel().write(resp);
复制代码
咱们在项目中像上面这样直接调用write写数据,并不能直接写进channel,而是写到缓冲区,还要调用flush方法才能将数据刷进channel,或者直接调用writeAndFlush。
在这里咱们选择比较典型的write事件来解析Outbound流程,其余事件流程相似
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
复制代码
经过上下文绑定的channel直接调用write方法,调用channel相对应的事件链上的handler
@Override
public ChannelFuture write(Object msg) {
return tail.write(msg);
}
复制代码
写事件是从tail向head调用,和读事件恰好相反
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
复制代码
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
...
write(msg, false, promise);
...
}
复制代码
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
...
}
...
}
复制代码
通过屡次跳转,获取上一个Ounbound事件链的handler
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
复制代码
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
复制代码
@Override
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
...
outboundBuffer.addMessage(msg, size, promise);
...
}
复制代码
从这里咱们看到,最终是把数据丢到了缓冲区,自此netty 的pipeline模型咱们解析完毕
有关inbound事件和outbound事件的传输, 可经过下图进行概括:
以为对您有帮助请点 "赞"