netty中的EventLoop和EventLoopGroup

Netty框架的主要线程就是I/O线程,线程模型设计的好坏,决定了系统的吞吐量、并发性和安全性等架构质量属性。html

1、Netty的线程模型

  在讨论Netty线程模型时候,通常首先会想到的是经典的Reactor线程模型,尽管不一样的NIO框架对应Reactor模式的实现存在差别,但本质上仍是遵循了Reactor的基础线程模型。java

Reactor的3中线程模型:git

  • Reactor单线程模型
  • Reactor多线程模型
  • 主从Reactor多线程模型

见《Netty中的三种Reactor(反应堆)github

NioEventLoopGroup 与 Reactor 线程模型的对应关系      见《Netty中的三种Reactor(反应堆)

Netty的线程模型并非一成不变的,实际经过用户的启动配置参数来配置。算法

 在建立ServerBootstrap类实例前,先建立两个EventLoopGroup,它们其实是两个独立的Reactor线程池,bossGroup负责接收客户端的链接,workerGroup负责处理IO相关的读写操做,或者执行系统task、定时task等。数据库

用于接收客户端请求的线程池职责以下:编程

  1. 接收客户端TCP链接,初始化Channel参数;
  2. 将链路状态变动事件通知给ChannelPipeline;

处理IO操做的线程池职责以下:后端

  1. 异步读取远端数据,发送读事件到ChannelPipeline;
  2. 异步发送数据到远端,调用ChannelPipeline的发送消息接口;
  3. 执行系统调用Task;
  4. 执行定时任务Task,如空闲链路检测等;

经过调整两个EventLoopGroup的线程数、是否共享线程池等方式,Netty的Reactor线程模型能够在单线程、多线程和主从多线程间切换,用户能够根据实际状况灵活配置。   数组

为了提升性能,Netty在不少地方采用了无锁化设计。例如在IO线程的内部进行串行操做,避免多线程竞争致使的性能降低。尽管串行化设计看上去CPU利用率不高,并发程度不够,可是经过调整NIO线程池的线程参数,能够同时启动多个串行化的线程并行运行,这种局部无锁化的设计相比一个队列——多个工做线程的模型性能更优。promise

它的设计原理以下

Netty的NioEventLoop读取到消息以后,调用ChannelPipeline的fireChannelRead方法,只要用户不主动切换线程,就一直由NioEventLoop调用用户的Handler,期间不进行线程切换。这种串行化的处理方式避免了多线程操做致使的锁竞争,从性能角度看是最优的。

 Netty多线程编程的最佳实践以下: 

  1. 服务端建立两个EventLoopGroup,用于逻辑隔离NIO acceptor和NIO IO线程;
  2. 尽可能避免在用户Handler里面启动用户线程(解码后将POJO消息发送到后端业务线程除外);
  3. 解码要在NIO线程调用的解码Handler中进行,不要切换到用户线程中完成消息的解码;
  4. 若是业务逻辑比较简单,没有复杂的业务逻辑计算,没有可能阻塞线程的操做如磁盘操做、数据库操做、网络操做等,能够直接在NIO线程中进行业务逻辑操做,不用切换到用户线程;
  5. 若是业务逻辑比较复杂,不要在NIO线程上操做,应将解码后的POJO封装成Task提交到业务线程池中执行,以保证NIO线程被尽快释放,处理其余IO操做;

2、NioEventLoopGroup

NioEventLoopGroup 类层次结构

  • 从类结构可知,NioEventLoopGroup是一个Schedule类型的线程池,线程池中的线程用数组存放,

    EventLoopGroup(实际上是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children 数组, 其大小是 nThreads, 这样就构成了一个线程池,线程池大小经过

    在实例化 NioEventLoopGroup 时, 若是指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2;
  • MultithreadEventExecutorGroup 中会调用 newChild 抽象方法(抽象方法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例)来初始化 children 数组,也是在NioEventLoopGroup中调用NioEventLoop的构造函数来建立NioEventLoop

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
    }
  • NioEventLoop相关后面再讲;

 

3、NioEventLoop

  NioEventLoop 继承于 SingleThreadEventLoop, 而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor. SingleThreadEventExecutor 是 Netty 中对本地线程的抽象, 它内部有一个 Thread thread 属性, 存储了一个本地 Java 线程. 所以咱们能够认为, 一个 NioEventLoop 其实和一个特定的线程绑定, 而且在其生命周期内, 绑定的线程都不会再改变

3.一、NioEventLoop 类层次结构

NioEventLoop 的类层次结构图仍是比较复杂的, 不过咱们只须要关注几个重要的点便可. 首先 NioEventLoop 的继承链以下:

NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor

3.二、NioEventLoop实例化过程

从上图能够看到, SingleThreadEventExecutor 有一个名为 thread 的 Thread 类型字段, 这个字段就表明了与 SingleThreadEventExecutor 关联的本地线程。在 SingleThreadEventExecutor 构造器中, 经过 threadFactory.newThread 建立了一个新的 Java 线程. 在这个线程中所作的事情主要就是调用 SingleThreadEventExecutor.this.run() 方法, 而由于 NioEventLoop 实现了这个方法, 所以根据多态性, 其实调用的是 NioEventLoop.run() 方法。

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    this.parent = parent;
    this.addTaskWakesUp = addTaskWakesUp;

    thread = threadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // 省略清理代码
                ...
            }
        }
    });
    threadProperties = new DefaultThreadProperties(thread);
    taskQueue = newTaskQueue();
}

3.三、NioEventLoop的设计两大功能:

一、是做为 IO 线程, 执行与 Channel 相关的 IO 操做, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理等;

二、第二个任务是做为任务队列执行任务, 任务能够分为2类:

  2.一、普通task:经过调用NioEventLoop的execute(Runnable task)方法往任务队列里增长任务,Netty有不少系统task,建立他们的主要缘由是:当io线程和用户线程同时操做网络资源的时候,为了防止并发操做致使的锁竞争,将用户线程的操做封装成Task放入消息队列中,由i/o线程负责执行,这样就实现了局部无锁化
  2.二、定时任务:执行schedule()方法

依次看这些功能的源码吧:

对比2.1:上面2.1中的普通任务,NioEventLoop经过队列来保存任务。在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 经过它, 咱们能够调用一个 NioEventLoop 实例的 execute 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行。

    protected Queue<Runnable> newTaskQueue() {
        return new LinkedBlockingQueue<Runnable>();
    }

添加任务的SingleThreadEventExecutor中的execute()方法:

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp) {
            wakeup(inEventLoop);
        }
    }

上面2.1中的普通task的执行:

    protected boolean runAllTasks() {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }

        for (;;) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                return true;
            }
        }
    }

对比2.2:schedule(定时) 任务处理

除了经过 execute 添加普通的 Runnable 任务外, 咱们还能够经过调用 eventLoop.scheduleXXX 之类的方法来添加一个定时任务。
EventLoop 中实现任务队列的功能在超类 SingleThreadEventExecutor 实现的, 而 schedule 功能的实现是在 SingleThreadEventExecutor 的父类, 即 AbstractScheduledEventExecutor 中实现的。
在 AbstractScheduledEventExecutor 中, 有以 scheduledTaskQueue 字段:

Queue<ScheduledFutureTask<?>> scheduledTaskQueue;

scheduledTaskQueue 是一个队列(Queue), 其中存放的元素是 ScheduledFutureTask. 而 ScheduledFutureTask 咱们很容易猜到, 它是对 Schedule 任务的一个抽象。咱们来看一下 AbstractScheduledEventExecutor 所实现的 schedule 方法吧:

@Override
public  ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(unit, "unit");
    if (delay < 0) {
        throw new IllegalArgumentException(
                String.format("delay: %d (expected: >= 0)", delay));
    }
    return schedule(new ScheduledFutureTask<Void>(
            this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}

这是其中一个重载的 schedule, 当一个 Runnable 传递进来后, 会被封装为一个 ScheduledFutureTask 对象, 这个对象会记录下这个 Runnable 在什么时候运行、已何种频率运行等信息。
当构建了 ScheduledFutureTask 后, 会继续调用 另外一个重载的 schedule 方法:

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduledTaskQueue().add(task);
    } else {
        execute(new OneTimeTask() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task);
            }
        });
    }

    return task;
}

在这个方法中, ScheduledFutureTask 对象就会被添加到 scheduledTaskQueue 中了。

任务的执行

当一个任务被添加到 taskQueue 后, 它是怎么被 EventLoop 执行的呢?
让咱们回到 NioEventLoop.run() 方法中, 在这个方法里, 会分别调用 processSelectedKeys() 和 runAllTasks() 方法, 来进行 IO 事件的处理和 task 的处理. processSelectedKeys() 方法咱们已经分析过了, 下面咱们来看一下 runAllTasks() 中到底有什么名堂吧。
runAllTasks 方法有两个重载的方法, 一个是无参数的, 另外一个有一个参数的. 首先来看一下无参数的 runAllTasks:

protected boolean runAllTasks() { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; } } }

 

咱们前面已经提到过, EventLoop 能够经过调用 EventLoop.execute 来将一个 Runnable 提交到 taskQueue 中, 也能够经过调用 EventLoop.schedule 来提交一个 schedule 任务到 scheduledTaskQueue 中。在此方法的一开始调用的 fetchFromScheduledTaskQueue() 其实就是将 scheduledTaskQueue 中已经能够执行的(即定时时间已到的 schedule 任务) 拿出来并添加到 taskQueue 中, 做为可执行的 task 等待被调度执行。
它的源码以下:

private void fetchFromScheduledTaskQueue() { if (hasScheduledTasks()) { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { break; } taskQueue.add(scheduledTask); } } }

接下来 runAllTasks() 方法就会不断调用 task = pollTask() 从 taskQueue 中获取一个可执行的 task, 而后调用它的 run() 方法来运行此 task。

注意, 由于 EventLoop 既须要执行 IO 操做, 又须要执行 task, 所以咱们在调用 EventLoop.execute 方法提交任务时, 不要提交耗时任务, 更不能提交一些会形成阻塞的任务, 否则会致使咱们的 IO 线程得不到调度, 影响整个程序的并发量.

=======================

 

3.四、Netty中的IO处理循环 (上面1中的做为io线程,用于IO操做的源码解析以下:

3.4.一、回顾下Nio的Selector 的基本使用流程见《Java NIO系列教程(六) 多路复用器Selector

  1. 经过 Selector.open() 打开一个 Selector.

  2. 将 Channel 注册到 Selector 中, 并设置须要监听的事件(interest set)

  3. 不断重复:

    • 调用 select() 方法

    • 调用 selector.selectedKeys() 获取 selected keys

    • 迭代每一个 selected key:

      • 1) 从 selected key 中获取 对应的 Channel 和附加信息(若是有的话)

      • 2) 判断是哪些 IO 事件已经就绪了, 而后处理它们. 若是是 OP_ACCEPT 事件, 则调用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 获取 SocketChannel, 并将它设置为 非阻塞的, 而后将这个 Channel 注册到 Selector 中.

      • 3) 根据须要更改 selected key 的监听事件.

      • 4) 将已经处理过的 key 从 selected keys 集合中删除.

3.4.二、依次看NIO中的关键几步分别在Netty中如何实现的:

对比1:经过 Selector.open() 打开一个 Selector 《?》这一小节中已经提到了, Netty 中是经过调用 SelectorProvider.openSocketChannel() 来打开一个新的 Java NIO SocketChannel:
private static SocketChannel newSocket(SelectorProvider provider) {
    ...
    return provider.openSocketChannel();
}

对比2:将 Channel 注册到 Selector 中, 并设置须要监听的事件(interest set) 的操做咱们在《?》, 咱们在来回顾一下, 在客户端的 Channel 注册过程当中, 会有以下调用链:

Bootstrap.initAndRegister -> 
    AbstractBootstrap.initAndRegister -> 
        MultithreadEventLoopGroup.register -> 
            SingleThreadEventLoop.register -> 
                AbstractUnsafe.register ->
                    AbstractUnsafe.register0 ->
                        AbstractNioChannel.doRegister

在 AbstractUnsafe.register 方法中调用了 register0 方法:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 省略条件判断和错误处理
    AbstractChannel.this.eventLoop = eventLoop;
    register0(promise);
}

register0 方法代码以下:

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    // Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    if (firstRegistration && isActive()) {
        pipeline.fireChannelActive();
    }
}

register0 又调用了 AbstractNioChannel.doRegister:

@Override
protected void doRegister() throws Exception {
    // 省略错误处理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

在这里 javaChannel() 返回的是一个 Java NIO SocketChannel 对象, 咱们将此 SocketChannel 注册到前面第一步获取的 Selector 中。

对比3:thread 的 run 循环

当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用(3.二、NioEventLoop实例化过程), 进而致使了 EventLoop 所对应的 Java 线程的启动。

run()中的selector相关的用于IO操做的,网络的读取操做是在run方法中去执行的,首先看有没有未执行的任务,有的话直接执行,不然就去轮训看是否有就绪的Channel,以下:

@Override
    protected void run() {
//死循环,NioEventLoop的事件循环就在这里
for (;;) { oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) {//父类SingleThreadEventExecutor中定义的taskQueue是否有任务,若是有马上执行(父类SingleThreadEventExecutor中还定义了delayedTaskQueue) selectNow(); } else {//若是没有select则进行轮询 select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } } //当轮训到有就绪的Channel时,就进行网络的读写操做 cancelledKeys = 0; final long ioStartTime = System.nanoTime(); needsToSelectAgain = false; if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }

对比3-IO 事件的轮询

分支1:selectNow() 和 select(oldWakenUp) 之间有什么区别:

void selectNow() throws IOException {
    try {
        selector.selectNow();
    } finally {
        // restore wakup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

selector 字段正是 Java NIO 中的多路复用器 Selector. 那么这里 selector.selectNow() 就很好理解了, selectNow() 方法会检查当前是否有就绪的 IO 事件, 若是有, 则返回就绪 IO 事件的个数; 若是没有, 则返回0. 注意, selectNow() 是当即返回的, 不会阻塞当前线程. 当 selectNow() 调用后, finally 语句块中会检查 wakenUp 变量是否为 true, 当为 true 时, 调用 selector.wakeup() 唤醒 select() 的阻塞调用。

再来看一下 else 分支的 select(oldWakenUp) 方法:
select(oldWakenUp)中轮询时,可能为空,也没有wakeup操做或是最新的消息处理,则说明本次轮训是一个空轮训,此时会触发jdk的epoll bug,它会致使Selector进行空轮训,使i/o线程处于100%。为了不这个bug。须要对selector进行统计:

1)对selector操做周期进行统计
2)每完成一次轮训进行一次计数
3)当在某个周期内超过必定次数说明触发了bug,此时须要进行从新创建Selector,并赋值新值,将原来的进行关闭。
调用rebuildSelector方法。
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);    //当前时间延时1分钟
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
                
                //当轮询到新的事件或者wakenUp或者有任务时跳出轮询的循环
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
                    // Selected something,
                    // waken up by user, or
                    // the task queue has a pending task.
                    break;
                }
                
                //不然,可能已经触发jdk的epoll bug,经过下面各类策略退出
                if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                            selectCnt);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = System.nanoTime();
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
            }
            // Harmless exception - log anyway
        }
    }

对比3--IO 事件的处理

在 NioEventLoop.run() 方法中, 第一步是经过 select/selectNow 调用查询当前是否有就绪的 IO 事件. 那么当有 IO 事件就绪时, 第二步天然就是处理这些 IO 事件啦.
首先让咱们来看一下 NioEventLoop.run 中循环的剩余部分:

//run()方法的中间一段
//...
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();

                    processSelectedKeys();

                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
//...

上面列出的代码中, 有两个关键的调用, 第一个是 processSelectedKeys() 调用, 根据字面意思, 咱们能够猜出这个方法确定是查询就绪的 IO 事件, 而后处理它; 第二个调用是 runAllTasks(), 这个方法咱们也能够一眼就看出来它的功能就是运行 taskQueue 中的任务。
这里的代码还有一个十分有意思的地方, 即 ioRatio. 那什么是 ioRatio呢? 它表示的是此线程分配给 IO 操做所占的时间比(即运行 processSelectedKeys 耗时在整个循环中所占用的时间). 例如 ioRatio 默认是 50, 则表示 IO 操做和执行 task 的所占用的线程执行时间比是 1 : 1. 当知道了 IO 操做耗时和它所占用的时间比, 那么执行 task 的时间就能够很方便的计算出来了。

此时会去调用processSelectedKeysPlain方法,默认没有开启SelectedKey的优化方法。这里执行的方法以下:
设 IO 操做耗时为 ioTime, ioTime 占的时间比例为 ioRatio, 则:
    ioTime / ioRatio = taskTime / taskRatio
    taskRatio = 100 - ioRatio
    => taskTime = ioTime * (100 - ioRatio) / ioRatio

根据上面的公式, 当咱们设置 ioRate = 70 时, 则表示 IO 运行耗时占比为70%, 即假设某次循环一共耗时为 100ms, 那么根据公式, 咱们知道 processSelectedKeys() 方法调用所耗时大概为70ms(即 IO 耗时), 而 runAllTasks() 耗时大概为 30ms(即执行 task 耗时).
当 ioRatio 为 100 时, Netty 就不考虑 IO 耗时的占比, 而是分别调用 processSelectedKeys()runAllTasks(); 而当 ioRatio 不为 100时, 则执行到 else 分支, 在这个分支中, 首先记录下 processSelectedKeys() 所执行的时间(即 IO 操做的耗时), 而后根据公式, 计算出执行 task 所占用的时间, 而后以此为参数, 调用 runAllTasks().

咱们这里先分析一下 processSelectedKeys() 方法调用, runAllTasks() 咱们留到下一节再分析。


processSelectedKeys() 方法的源码以下:

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

这个方法中, 会根据 selectedKeys 字段是否为空, 而分别调用 processSelectedKeysOptimized 或 processSelectedKeysPlainselectedKeys 字段是在调用 openSelector() 方法时, 根据 JVM 平台的不一样, 而有设置不一样的值, 在我所调试这个值是不为 null 的. 其实 processSelectedKeysOptimized 方法 processSelectedKeysPlain 没有太大的区别,先看processSelectedKeysPlain():

    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        //判断selectedkey是否为空,为空直接返回,若是不为空就去获取selectedkey上的channel
        if (selectedKeys.isEmpty()) {
            return;
        }
        
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {//为NioServerSocketChannel或是NioSocketChannel
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }
这里会去判断selectedkey是否为空,若是不为空就去获取selectedkey上的channel,获取到(NioServerSocketChannel或是NioSocketChannel)channel后,判断其类型,这里netty的都是AbstractNioChannel类,就会调用processSelectedKey()方法。
 
processSelectedKey()方法

processSelectedKey 中处理了三个事件, 分别是:

  • OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。

  • OP_WRITE, 可写事件, 即上层能够向 Channel 写入数据。

  • OP_CONNECT, 链接创建事件, 即 TCP 链接已经创建, Channel 处于 active 状态。

简单的说就是对网络位判断,当网络位为写的时候,则说明有半包消息没有发送完成,须要继续调用flush方法进行发送。后面的若是网络操做位为链接状态,则须要对链接结果进行判断。

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        //看选择键是否可用
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            //可用而后进行位运算来判断当前状态
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//若是是读或者是链接操做,则调用Unsafe的read方法
                //此处的Unsafe的实现是一个多态。(多是调用NioServerSocketChannel或是NioSocketChannel的doReadBytes方法)
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }
这里会看选择键是否可用,可用而后对位进行判断,若是是读或者是链接操做,则调用Unsafe的read方法。此处的Unsafe的实现是一个多态。(多是调用NioServerSocketChannel或是NioSocketChannel的doReadBytes方法),
Unsafe接口的read()方法有的实现有:

对于服务端处理链接的请求以下:

 因为NioMessageUnsafe是AbstractNioMessageChannel的内部类,调用外部类doReadMessages()方法

下面是服务端NioServerSocketChannel中的该方法

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }
对于客户端的调用以下:
NioSocketChannel中的doReadBytes方法以下:
    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
    }
 
再回到processSelectedKeys()方法中的第二个分支 processSelectedKeysOptimized():
    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                for (;;) {
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                    i++;
                }

                selectAgain();
                // Need to flip the optimized selectedKeys to get the right reference to the array
                // and reset the index to -1 which will then set to 0 on the for loop
                // to start over again.
                //
                // See https://github.com/netty/netty/issues/1523
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }

上面代码关键的点就两个:迭代 selectedKeys 获取就绪的 IO 事件, 而后为每一个事件都调用 processSelectedKey 来处理它。
这里正好完美对应上了咱们提到的 Selector 的使用流程中的第三步里操做。
还有一点须要注意的是, 咱们能够调用 selectionKey.attach(object) 给一个 selectionKey 设置一个附加的字段, 而后能够经过 Object attachedObj = selectionKey.attachment() 获取它. 上面代代码正是经过了 k.attachment() 来获取一个附加在 selectionKey 中的对象, 那么这个对象是什么呢? 它又是在哪里设置的呢? 咱们再来回忆一下 SocketChannel 是如何注册到 Selector 中的:

在客户端的 Channel 注册过程当中, 会有以下调用链:

Bootstrap.initAndRegister -> 
    AbstractBootstrap.initAndRegister -> 
        MultithreadEventLoopGroup.register -> 
            SingleThreadEventLoop.register -> 
                AbstractUnsafe.register ->
                    AbstractUnsafe.register0 ->
                        AbstractNioChannel.doRegister

最后的 AbstractNioChannel.doRegister 方法会调用 SocketChannel.register 方法注册一个 SocketChannel 到指定的 Selector:

@Override
protected void doRegister() throws Exception {
    // 省略错误处理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

特别注意一下 register 的第三个参数, 这个参数是设置 selectionKey 的附加对象的, 和调用 selectionKey.attach(object) 的效果同样. 而调用 register 所传递的第三个参数是 this, 它其实就是一个 NioSocketChannel 的实例. 那么这里就很清楚了, 咱们在将 SocketChannel 注册到 Selector 中时, 将 SocketChannel 所对应的 NioSocketChannel 以附加字段的方式添加到了selectionKey 中.
再回到 processSelectedKeysOptimized 方法中, 当咱们获取到附加的对象后, 咱们就调用 processSelectedKey 来处理这个 IO 事件:

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a);
} else {
    @SuppressWarnings("unchecked")
    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    processSelectedKey(k, task);
}
 
netty中特有的:非io的系统task和定时任务的处理
 再回到NioEventLoop的run()方法,处理完网络的io后,Eventloop要执行一些非io的系统task和定时任务,任务的权重上面有算法介绍,代码以下:
                final long ioTime = System.nanoTime() - ioStartTime;

                final int ioRatio = this.ioRatio;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
 
因为要同时执行io和非io的操做,为了充分使用cpu,会按必定的比例去进行执行,若是io的任务大于定时任务和task,则能够将io比例调大。反之调小,默认是50%,其执行方法以下:
    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromDelayedQueue();
        //获取SingleThreadEventExecutor中的taskQueue
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            //当循环次数为64时,这时候会去比较上次执行时间和延时的关系,若是大于延时那么就退出,这里获取naotime是每64次进行获取一次。
            //这么作的目的是一来是获取naotime比较耗时,另外也不能长时间执行task,让io阻塞,因此通常每64个任务就会返回
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

一、获取SingleThreadEventExecutor中的taskQueue,若是没有任务就退出;
二、当循环次数为64时,这时候会去比较上次执行时间和延时的关系,若是大于延时那么就退出,这里获取naotime是每64次进行获取一次。这么作的目的是一来是获取naotime比较耗时,另外也不能长时间执行task,让io阻塞,因此通常每64个任务就会返回;

最后eventloop的run方法,会判断是否优雅关闭,若是是优雅关闭会执行closeAll方法,以下:
    private void closeAll() {
        selectAgain();
        Set<SelectionKey> keys = selector.keys();
        Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
        for (SelectionKey k: keys) {
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                channels.add((AbstractNioChannel) a);
            } else {
                k.cancel();
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, k, null);
            }
        }

        for (AbstractNioChannel ch: channels) {
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }
相关文章
相关标签/搜索