Netty与Reactor模式

1. 引言

1.1 Netty、NIO、多线程

关于Netty与NIO、多线程之间的关系,能够参考@李林锋发布的一个Netty5.0架构剖析和源码解读的文章,在这篇文章中详细的介绍了Java I/O的演进过程和Linux I/O网络模型以及针对Netty实现IO多路复用的源码分析和逻辑架构。相信看完后会收获很多。前面的文章咱们分析了Netty的结构,本章来分析最错综复杂的部分(Netty中多线程以及NIO的应用)。nginx

理清楚NIO与Netty的关系以前,咱们必需要先来看看Reactor模式,Netty是一个典型的多线程与Reactor模式的使用,理解了这部分在宏观上理解Netty的NIO及多线程部分就不会有什么困难了。git

本章依然是针对Netty3.7源码进行分析。github

2. 什么是Reactor?

2.1 Reactor的由来

Reactor是一种普遍应用在服务器端开发的设计模式。Reactor中文大多译为“反应堆”,我当初接触这个概念的时候,就感受很厉害,是否是它的原理就跟“核反应”差很少?后来才知道其实没有什么关系,从Reactor的兄弟“Proactor”(多译为前摄器)就能看得出来,这两个词的中文翻译其实都不是太好,不够形象。实际上,Reactor模式又有别名“Dispatcher”或者“Notifier”,我以为这两个都更加能代表它的本质。编程

那么,Reactor模式到底是个什么东西呢?这要从事件驱动的开发方式提及。咱们知道,对于应用服务器,一个主要规律就是,CPU的处理速度是要远远快于IO速度的,若是CPU为了IO操做(例如从Socket读取一段数据)而阻塞显然是不划算的。好一点的方法是分为多进程或者线程去进行处理,可是这样会带来一些进程切换的开销,试想一个进程一个数据读了500ms,期间进程切换到它3次,可是CPU却什么都不能干,就这么切换走了,是否是也不划算?设计模式

这时先驱们找到了事件驱动,或者叫回调的方式,来完成这件事情。这种方式就是,应用业务向一个中间人注册一个回调(event handler),当IO就绪后,就这个中间人产生一个事件,并通知此handler进行处理。这种回调的方式,也体现了“好莱坞原则”(Hollywood principle)-“Don’t call us, we’ll call you”,在咱们熟悉的IoC中也有用到。看来软件开发真是互通的!服务器

好了,咱们如今来看Reactor模式。在前面事件驱动的例子里有个问题:咱们如何知道IO就绪这个事件,谁来充当这个中间人?Reactor模式的答案是:由一个不断等待和循环的单独进程(线程)来作这件事,它接受全部handler的注册,并负责先操做系统查询IO是否就绪,在就绪后就调用指定handler进行处理,这个角色的名字就叫作Reactor。网络

2.2 Reactor与NIO

Java中的NIO能够很好的和Reactor模式结合。关于NIO中的Reactor模式,我想没有什么资料能比Doug Lea大神(不知道Doug Lea?看看JDK集合包和并发包的做者吧)在《Scalable IO in Java》解释的更简洁和全面了。NIO中Reactor的核心是Selector,我写了一个简单的Reactor示例,这里我贴一个核心的Reactor的循环(这种循环结构又叫作EventLoop),剩余代码在这里。多线程

public void run() {
	try {
		while (!Thread.interrupted()) {
			selector.select();
			Set selected = selector.selectedKeys();
			Iterator it = selected.iterator();
			while (it.hasNext())
				dispatch((SelectionKey) (it.next()));
			selected.clear();
		}
	} catch (IOException ex) { /* ... */
	}
}

2.3 Reactor相关的概念

前面提到了Proactor模式,这又是什么呢?简单来讲,Reactor模式里,操做系统只负责通知IO就绪,具体的IO操做(例如读写)仍然是要在业务进程里阻塞的去作的,而Proactor模式则更进一步,由操做系统将IO操做执行好(例如读取,会将数据直接读到内存buffer中),而handler只负责处理本身的逻辑,真正作到了IO与程序处理异步执行。因此咱们通常又说Reactor是同步IO,Proactor是异步IO。架构

关于阻塞和非阻塞、异步和非异步,以及UNIX底层的机制,你们能够看看这篇文章IO – 同步,异步,阻塞,非阻塞 (亡羊补牢篇),以及陶辉(《深刻理解nginx》的做者)《高性能网络编程》的系列。并发

3. Reactor与Netty之间的关系

3.1 多线程下的Reactor

讲了一堆Reactor,咱们回到Netty。在《Scalable IO in Java》中讲到了一种多线程下的Reactor模式。在这个模式里,mainReactor只有一个,负责响应client的链接请求,并创建链接,它使用一个NIO Selector;subReactor能够有一个或者多个,每一个subReactor都会在一个独立线程中执行,而且维护一个独立的NIO Selector。

这样的好处很明显,由于subReactor也会执行一些比较耗时的IO操做,例如消息的读写,使用多个线程去执行,则更加有利于发挥CPU的运算能力,减小IO等待时间。

输入图片说明

3.2 Netty中的Reactor与NIO

好了,了解了多线程下的Reactor模式,咱们来看看Netty吧(如下部分主要针对NIO,OIO部分更加简单一点,不重复介绍了)。Netty里对应mainReactor的角色叫作“Boss”,而对应subReactor的角色叫作”Worker”。Boss负责分配请求,Worker负责执行,好像也很贴切!以TCP的Server端为例,这两个对应的实现类分别为NioServerBoss和NioWorker(Server和Client的Worker没有区别,由于创建链接以后,双方就是对等的进行传输了)。

Netty 3.7中Reactor的EventLoop在AbstractNioSelector.run()中,它实现了Runnable接口。这个类是Netty NIO部分的核心。它的逻辑很是复杂,其中还包括一些对JDK Bug的处理(例如rebuildSelector),刚开始读的时候不须要深刻那么细节。我精简了大部分代码,保留主干以下:

abstract class AbstractNioSelector implements NioSelector {

    //NIO Selector
    protected volatile Selector selector;

    //内部任务队列
    private final Queue taskQueue = new ConcurrentLinkedQueue();

    //selector循环
    public void run() {
        for (;;) {
            try {
                //处理内部任务队列
                processTaskQueue();
                //处理selector事件对应逻辑
                process(selector);
            } catch (Throwable t) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }

其中process是主要的处理事件的逻辑,例如在AbstractNioWorker中,处理逻辑以下:

protected void process(Selector selector) throws IOException {
    Set selectedKeys = selector.selectedKeys();
    if (selectedKeys.isEmpty()) {
        return;
    }
    for (Iterator i = selectedKeys.iterator(); i.hasNext();) {
        SelectionKey k = i.next();
        i.remove();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                if (!read(k)) {
                    // Connection already closed - no need to handle write.
                    continue;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                writeFromSelectorLoop(k);
            }
        } catch (CancelledKeyException e) {
            close(k);
        }

        if (cleanUpCancelledKeys()) {
            break; // break the loop to avoid ConcurrentModificationException
        }
    }
}

这不就是第二部分提到的selector经典用法了么?

在4.0以后,做者以为NioSelector这个叫法,以及区分NioBoss和NioWorker的作法稍微繁琐了点,干脆就将这些合并成了NioEventLoop,今后这两个角色就不作区分了。我却是以为新版本的会更优雅一点。

3.3 Netty中的多线程

下面咱们来看Netty的多线程部分。一旦对应的Boss或者Worker启动,就会分配给它们一个线程去一直执行。对应的概念为BossPool和WorkerPool。对于每一个NioServerSocketChannel,Boss的Reactor有一个线程,而Worker的线程数由Worker线程池大小决定,可是默认最大不会超过CPU核数*2,固然,这个参数能够经过NioServerSocketChannelFactory构造函数的参数来设置。

public NioServerSocketChannelFactory(
        Executor bossExecutor, Executor workerExecutor,
        int workerCount) {
    this(bossExecutor, 1, workerExecutor, workerCount);
}

最后咱们比较关心一个问题,咱们以前ChannlePipeline中的ChannleHandler是在哪一个线程执行的呢?答案是在Worker线程里执行的,而且会阻塞Worker的EventLoop。例如,在NioWorker中,读取消息完毕以后,会触发MessageReceived事件,这会使得Pipeline中的handler都获得执行。

protected boolean read(SelectionKey k) {
    ....

    if (readBytes > 0) {
        // Fire the event.
        fireMessageReceived(channel, buffer);
    }

    return true;
}

能够看到,对于处理事件较长的业务,并不太适合直接放到ChannelHandler中执行。那么怎么处理呢?咱们在Handler部分会进行介绍。

最后附上项目github地址,欢迎交流:https://github.com/code4craft/netty-learning

相关文章
相关标签/搜索