Netty源码解析3-Pipeline

请戳GitHub原文: https://github.com/wangzhiwubigdata/God-Of-BigDatajava

更多文章关注:多线程/集合/分布式/Netty/NIO/RPCgit

Channel实现概览

在Netty里,Channel是通信的载体,而ChannelHandler负责Channel中的逻辑处理。github

那么ChannelPipeline是什么呢?我以为能够理解为ChannelHandler的容器:一个Channel包含一个ChannelPipeline,全部ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。面试

在Netty中,ChannelEvent是数据或者状态的载体,例如传输的数据对应MessageEvent,状态的改变对应ChannelStateEvent。当对Channel进行操做时,会产生一个ChannelEvent,并发送到ChannelPipeline。ChannelPipeline会选择一个ChannelHandler进行处理。这个ChannelHandler处理以后,可能会产生新的ChannelEvent,并流转到下一个ChannelHandler。编程

channel pipeline

例如,一个数据最开始是一个MessageEvent,它附带了一个未解码的原始二进制消息ChannelBuffer,而后某个Handler将其解码成了一个数据对象,并生成了一个新的MessageEvent,并传递给下一步进行处理。markdown

到了这里,能够看到,其实Channel的核心流程位于ChannelPipeline中。因而咱们进入ChannelPipeline的深层梦境里,来看看它具体的实现。网络

ChannelPipeline的主流程

Netty的ChannelPipeline包含两条线路:Upstream和Downstream。Upstream对应上行,接收到的消息、被动的状态改变,都属于Upstream。Downstream则对应下行,发送的消息、主动的状态改变,都属于Downstream。ChannelPipeline接口包含了两个重要的方法:sendUpstream(ChannelEvent e)sendDownstream(ChannelEvent e),就分别对应了Upstream和Downstream。多线程

对应的,ChannelPipeline里包含的ChannelHandler也包含两类:ChannelUpstreamHandlerChannelDownstreamHandler。每条线路的Handler是互相独立的。它们都很简单的只包含一个方法:ChannelUpstreamHandler.handleUpstreamChannelDownstreamHandler.handleDownstream并发

Netty官方的javadoc里有一张图(ChannelPipeline接口里),很是形象的说明了这个机制(我对原图进行了一点修改,加上了ChannelSink,由于我以为这部分对理解代码流程会有些帮助):框架

channel pipeline

什么叫ChannelSink呢?ChannelSink包含一个重要方法ChannelSink.eventSunk,能够接受任意ChannelEvent。“sink"的意思是"下沉”,那么"ChannelSink"好像能够理解为"Channel下沉的地方"?实际上,它的做用确实是这样,也能够换个说法:“处于末尾的万能Handler”。最初读到这里,也有些困惑,这么理解以后,就感受简单许多。只有Downstream包含ChannelSink,这里会作一些创建链接、绑定端口等重要操做。为何UploadStream没有ChannelSink呢?我只能认为,一方面,不符合"sink"的意义,另外一方面,也没有什么处理好作的吧!

这里有个值得注意的地方:在一条“流”里,一个ChannelEvent并不会主动的"流"经全部的Handler,而是由上一个Handler显式的调用ChannelPipeline.sendUp(Down)stream产生,并交给下一个Handler处理。也就是说,每一个Handler接收到一个ChannelEvent,并处理结束后,若是须要继续处理,那么它须要调用sendUp(Down)stream新发起一个事件。若是它再也不发起事件,那么处理就到此结束,即便它后面仍然有Handler没有执行。这个机制能够保证最大的灵活性,固然对Handler的前后顺序也有了更严格的要求。

顺便说一句,在Netty 3.x里,这个机制会致使大量的ChannelEvent对象建立,所以Netty 4.x版本对此进行了改进。twitter的finagle框架实践中,就提到从Netty 3.x升级到Netty 4.x,能够大大下降GC开销。有兴趣的能够看看这篇文章:https://blog.twitter.com/2013/netty-4-at-twitter-reduced-gc-overhead

下面咱们从代码层面来对这里面发生的事情进行深刻分析,这部分涉及到一些细节,须要打开项目源码,对照来看,会比较有收获。

深刻ChannelPipeline内部

DefaultChannelPipeline的内部结构

ChannelPipeline的主要的实现代码在DefaultChannelPipeline类里。列一下DefaultChannelPipeline的主要字段:

public class DefaultChannelPipeline implements ChannelPipeline {
    
        private volatile Channel channel;
        private volatile ChannelSink sink;
        private volatile DefaultChannelHandlerContext head;
        private volatile DefaultChannelHandlerContext tail;
        private final Map<String, DefaultChannelHandlerContext> name2ctx =
            new HashMap<String, DefaultChannelHandlerContext>(4);
    }

这里须要介绍一下ChannelHandlerContext这个接口。顾名思义,ChannelHandlerContext保存了Netty与Handler相关的的上下文信息。而我们这里的DefaultChannelHandlerContext,则是对ChannelHandler的一个包装。一个DefaultChannelHandlerContext内部,除了包含一个ChannelHandler,还保存了"next"和"prev"两个指针,从而造成一个双向链表。

所以,在DefaultChannelPipeline中,咱们看到的是对DefaultChannelHandlerContext的引用,而不是对ChannelHandler的直接引用。这里包含"head"和"tail"两个引用,分别指向链表的头和尾。而name2ctx则是一个按名字索引DefaultChannelHandlerContext用户的一个map,主要在按照名称删除或者添加ChannelHandler时使用。

sendUpstream和sendDownstream

前面提到了,ChannelPipeline接口的两个重要的方法:sendUpstream(ChannelEvent e)sendDownstream(ChannelEvent e)全部事件的发起都是基于这两个方法进行的。Channels类有一系列fireChannelBound之类的fireXXXX方法,其实都是对这两个方法的facade包装。

下面来看一下这两个方法的实现。先看sendUpstream(对代码作了一些简化,保留主逻辑):

public void sendUpstream(ChannelEvent e) {
        DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
        head.getHandler().handleUpstream(head, e);
    }
    
    private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
        DefaultChannelHandlerContext realCtx = ctx;
        while (!realCtx.canHandleUpstream()) {
            realCtx = realCtx.next;
            if (realCtx == null) {
                return null;
            }
        }
        return realCtx;
    }

这里最终调用了ChannelUpstreamHandler.handleUpstream来处理这个ChannelEvent。有意思的是,这里咱们看不到任何"将Handler向后移一位"的操做,可是咱们总不能每次都用同一个Handler来进行处理啊?实际上,咱们更为经常使用的是ChannelHandlerContext.handleUpstream方法(实现是DefaultChannelHandlerContext.sendUpstream方法):

public void sendUpstream(ChannelEvent e) {
		DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
		DefaultChannelPipeline.this.sendUpstream(next, e);
	}

能够看到,这里最终仍然调用了ChannelPipeline.sendUpstream方法,可是它会将Handler指针后移

咱们接下来看看DefaultChannelHandlerContext.sendDownstream:

public void sendDownstream(ChannelEvent e) {
		DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
		if (prev == null) {
			try {
				getSink().eventSunk(DefaultChannelPipeline.this, e);
			} catch (Throwable t) {
				notifyHandlerException(e, t);
			}
		} else {
			DefaultChannelPipeline.this.sendDownstream(prev, e);
		}
	}

与sendUpstream好像不大相同哦?这里有两点:一是到达末尾时,就如梦境二所说,会调用ChannelSink进行处理;二是这里指针是往前移的,因此咱们知道了:

**UpstreamHandler是从前日后执行的,DownstreamHandler是从后往前执行的。**在ChannelPipeline里添加时须要注意顺序了!

DefaultChannelPipeline里还有些机制,像添加/删除/替换Handler,以及ChannelPipelineFactory等,比较好理解,就不细说了。

回到现实:Pipeline解决的问题

好了,深刻分析完代码,有点头晕了,咱们回到最开始的地方,来想想,Netty的Pipeline机制解决了什么问题?

我认为至少有两点:

一是提供了ChannelHandler的编程模型,基于ChannelHandler开发业务逻辑,基本不须要关心网络通信方面的事情,专一于编码/解码/逻辑处理就能够了。Handler也是比较方便的开发模式,在不少框架中都有用到。

二是实现了所谓的"Universal Asynchronous API"。这也是Netty官方标榜的一个功能。用过OIO和NIO的都知道,这两套API风格相差极大,要从一个迁移到另外一个成本是很大的。即便是NIO,异步和同步编程差距也很大。而Netty屏蔽了OIO和NIO的API差别,经过Channel提供对外接口,并经过ChannelPipeline将其链接起来,所以替换起来很是简单。

universal API

理清了ChannelPipeline的主流程,咱们对Channel部分的大体结构算是弄清楚了。但是到了这里,咱们依然对一个链接具体怎么处理没有什么概念,下篇文章,咱们会分析一下,在Netty中,捷径如何处理链接的创建、数据的传输这些事情。

参考资料:

请戳GitHub原文: https://github.com/wangzhiwubigdata/God-Of-BigData

                   关注公众号,内推,面试,资源下载,关注更多大数据技术~
                   大数据成神之路~预计更新500+篇文章,已经更新60+篇~
相关文章
相关标签/搜索