netty中的发动机--EventLoop及其实现类NioEventLoop的源码分析

EventLoop

在以前介绍Bootstrap的初始化以及启动过程时,咱们屡次接触了NioEventLoopGroup这个类,关于这个类的理解,还须要了解netty的线程模型。NioEventLoopGroup能够理解为一组线程,这些线程每个均可以独立地处理多个channel产生的io事件。java

NioEventLoopGroup初始化

咱们看其中一个参数比较多的构造方法,其余一些参数较少的构造方法使用了一些默认值,使用的默认参数以下:git

  • SelectorProvider类型,用于建立socket通道,udp通道,建立Selector对象等,默认值是SelectorProvider.provider(),大部分状况下使用默认值就行,这个方法最终建立的是一个WindowsSelectorProvider对象
  • SelectStrategyFactory,Select策略类的工厂类,它的默认值是DefaultSelectStrategyFactory.INSTANCE,就是一个SelectStrategyFactory对象自己,而SelectStrategyFactory工厂产生的是DefaultSelectStrategy策略类。
  • RejectedExecutionHandler,拒绝任务的策略类,决定在任务队列已满时采起什么样的策略,相似于jdk线程池的RejectedExecutionHandler的做用

接下来,咱们看一下其中的一个经常使用的构造方法,github

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
    final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

可见,当前类中并无什么初始化逻辑,直接调用了父类的构造方法,因此咱们接着看父类MultithreadEventLoopGroup的构造方法:设计模式

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

一样,并未作任务处理,直接调用父类构造方法,因此咱们接着看MultithreadEventExecutorGroup构造方法,初始化逻辑的实如今这个类中,api

MultithreadEventExecutorGroup构造方法

经过上一小结的分析,咱们知道NioEventLoopGroup的构造方法的主要逻辑的实现是在MultithreadEventExecutorGroup类中,而且在调用构造方法的过程当中加上了一个参数的默认值,即EventExecutorChooserFactory类型参数的默认值DefaultEventExecutorChooserFactory.INSTANCE,这个类以轮询(roundrobin)的方式从多个线程中依次选出线程用于注册channel。
总结一下这段代码的主要步骤:数组

  • 首先是一些变量的非空检查和合法性检查
  • 而后根据传入的线程数量,建立若干个子执行器,每一个执行器对应一个线程
  • 最后以子执行器数组为参数,使用选择器工厂类建立一个选择器
  • 最后给每一个子执行器添加一个监听器,以监听子执行器的终止,作一些簿记工做,使得在全部子执行器所有终止后将当前的执行器组终止promise

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
    EventExecutorChooserFactory chooserFactory, Object... args) {
    // 首先是变量的非空检查以及合法性判断,
    // nThreads在MultithreadEventLoopGroup的构造方法中已经通过一些默认值处理,
    if (nThreads <= 0) {
    throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }安全

    // 这里通常都会使用默认值,
      // ThreadPerTaskExecutor的做用即字面意思,一个任务一个线程
      if (executor == null) {
          executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
      }
    
      // 子执行器的数组,一个子执行器对应一个线程
      children = new EventExecutor[nThreads];
    
      // 根据传入的线程数量建立多个自执行器
      // 注意,这里子执行器建立好后并不会当即运行起来
      for (int i = 0; i < nThreads; i ++) {
          boolean success = false;
          try {
              children[i] = newChild(executor, args);
              success = true;
          } catch (Exception e) {
              // TODO: Think about if this is a good exception type
              throw new IllegalStateException("failed to create a child event loop", e);
          } finally {
              // 若是建立子执行器不成功,那么须要将已经建立好的子执行器也所有销毁
              if (!success) {
                  for (int j = 0; j < i; j ++) {
                      children[j].shutdownGracefully();
                  }
    
                  // 等待因此子执行器中止后在退出
                  for (int j = 0; j < i; j ++) {
                      EventExecutor e = children[j];
                      try {
                          while (!e.isTerminated()) {
                              e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                          }
                      } catch (InterruptedException interrupted) {
                          // Let the caller handle the interruption.
                          Thread.currentThread().interrupt();
                          break;
                      }
                  }
              }
          }
      }
    
      // 建立一个子执行器的选择器,选择器的做用是从子执行器中选出一个
      // 默认使用roundRobin的方式
      chooser = chooserFactory.newChooser(children);
    
      final FutureListener<Object> terminationListener = new FutureListener<Object>() {
          @Override
          public void operationComplete(Future<Object> future) throws Exception {
              if (terminatedChildren.incrementAndGet() == children.length) {
                  terminationFuture.setSuccess(null);
              }
          }
      };
    
      // 给每一个子执行器添加监听器,在子执行器终止的时候作一些工做
      // 每有一个子执行器终止时就将terminatedChildren变量加一
      // 当全部子执行器所有终止时,当前这个执行器组就终止了
      for (EventExecutor e: children) {
          e.terminationFuture().addListener(terminationListener);
      }
    
      // 包装一个不可变的集合
      Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
      Collections.addAll(childrenSet, children);
      readonlyChildren = Collections.unmodifiableSet(childrenSet);

    }网络

NioEventLoopGroup.newChild

上面的方法中调用了newChild方法来建立一个子执行器,而这个方法是一个抽象方法,咱们看NioEventLoopGroup类的实现:并发

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

可见仅仅是简单地建立了一个NioEventLoop对象。

小结

到这里,咱们就把NioEventLoopGroup的初始化过程分析完了。咱们不由思考,既然NioEventLoopGroup是一个执行器组,说白了就是一组线程,那这些线程是何时跑起来的呢?若是读者还有印象,应该能记得咱们在分析Bootstrap创建链接过程时,channel初始化以后须要注册到EventLoopGroup中,实际上是注册到其中的一个EventLoop上,注册逻辑最终是在AbstractChannel.AbstractUnsafe.register方法中实现的,其中有一段代码:

if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

首先调用eventLoop.inEventLoop()判断执行器的线程与当前线程是不是同一个,若是是则直接执行注册的代码,若是不是就调用eventLoop.execute将注册逻辑封装成一个任务放到执行器的任务队列中,接下里咱们就以这个方法为切入点,探究一会儿执行器线程的启动过程。

AbstractEventExecutor.inEventLoop

首先,让咱们来看一下这个方法,这个方法的做用是判断当前线程与执行器的线程是否同一个线程。

public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

SingleThreadEventExecutor.inEventLoop

代码很简单,就很少说了。
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}

SingleThreadEventExecutor.execute

方法很简单,核心逻辑在startThread方法中,

public void execute(Runnable task) {
    // 非空检查
    if (task == null) {
        throw new NullPointerException("task");
    }

    // 执行到这里通常都是外部调用者,
    boolean inEventLoop = inEventLoop();
    // 向任务队列中添加一个任务
    addTask(task);
    // 若是当前线程不是执行器的线程,那么须要检查执行器线程是否已经运行,
    // 若是还没在运行,就须要启动线程
    if (!inEventLoop) {
        startThread();
        // 检查线程是否被关闭
        if (isShutdown()) {
            boolean reject = false;
            try {
                // 将刚刚添加的任务移除
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    // addTaskWakesUp不知道这个变量意义是什么,NioEventLoop传进来的是false
    // 向任务队列中添加一个空任务,这样就可以唤醒阻塞的执行器线程
    // 有些状况下执行器线程会阻塞在taskQueue上,
    // 因此向阻塞队列中添加一个元素可以唤醒哪些由于队列空而被阻塞的线程
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

SingleThreadEventExecutor.startThread

这个方法的主要做用是维护内部的状态量state,使用cas指令并发状况下对状态量的修改是线程安全的,而且对于状态量的判断保证启动逻辑只被执行一次

private void startThread() {
    // 状态量的维护
    if (state == ST_NOT_STARTED) {
        // 这里使用了jdk中的原子更新器AtomicIntegerFieldUpdater类,
        // 使用cpu的cas指令保证并发状况下可以安全地维护状态量
        // 保证只有一个线程可以执行启动逻辑,保证启动逻辑只被执行一次
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                // 实际启动线程的逻辑
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

SingleThreadEventExecutor.doStartThread

这个方法我就不贴代码了,说一下它的主要做用:

  • 使用内部的Executor对象(通常是一个ThreadPerTaskExecutor)启动一个线程,并执行任务
  • 维护执行器的运行状态,主要是经过内部的状态量和cas指令来保证线程安全;此外维护内部的一些簿记量,例如线程自己的引用,线程启动时间等
  • 线程结束时作一些收尾和清理工做,例如将剩余的任务跑完,运行关闭钩子,关闭底层的selector(这个是具体的子类的清理逻辑),同时更新状态量

具体的业务逻辑仍然是在子类中实现的,也就是SingleThreadEventExecutor.run()方法的具体实现。

NioEventLoop.run

咱们仍然以NioEventLoop为例,看一下它实现的run方法。还大概讲一下它的主要逻辑:

  • 首选这个方法是一个循环,不断地经过调用jdk底层的selector接收io事件,并对不一样的io事件作处理,同时也会处理任务队列中的任务,以及定时调度或延迟调度的任务
  • 调用jdk的api, selector接收io事件
  • 处理各类类型的io事件
  • 处理任务

这里,我就不贴代码了,其中比较重要的是对一些并发状况的考虑和处理,如selector的唤醒时机。接下来,主要看一下对于各类io事件的处理,至于任务队列以及调度队列中任务的处理比较简单,就不展开了。

NioEventLoop.processSelectedKeysOptimized

这个方法会遍历全部接受到的io事件对应的selectionKey,而后依次处理。

private void processSelectedKeysOptimized() {
    // 遍历全部的io事件的SelectionKey
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.keys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            // 处理事件
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        // 若是须要从新select,那么把后面的selectionKey所有置0,而后再次调用selectNow方法
        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

NioEventLoop.processSelectedKey

这个方法首先对SelectionKey无效的状况作了处理,分为两种状况:channel自己无效了;channel仍然是正常的,只不过是被从当前的selector上注销了,可能在其余的selector中仍然是正常运行的

  • 对于第一种状况,须要关闭channel,即关闭底层的链接
  • 对于第二种状况则不须要作任何处理。

接下来,咱们着重分析一下对于四种事件的处理逻辑。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // 若是selectionKey是无效的,那么说明相应的channel是无效的,此时须要关闭这个channel
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        // 只关闭注册在当前EventLoop上的channel,
        // 理论上来讲,一个channel是能够注册到多个Eventloop上的,
        // SelectionKey无效多是由于channel从当前EventLoop上注销了,
        // 可是channel自己依然是正常的,而且注册在其余的EventLoop中
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        // close the channel if the key is not valid anymore
        // 到这里说明channel已经无效了,关闭它
        unsafe.close(unsafe.voidPromise());
        return;
    }

    // 下面处理正常状况
    try {
        // 准备好的io事件
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        // 处理connect事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        // 处理write事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // 处理read和accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

connect事件处理

从代码中能够看出,connect事件的处理时经过调用NioUnsafe.finishConnect完成的,咱们看一下AbstractNioUnsafe.finishConnect的实现:

public final void finishConnect() {
        // Note this method is invoked by the event loop only if the connection attempt was
        // neither cancelled nor timed out.

        assert eventLoop().inEventLoop();

        try {
            // 是否已经处于链接成功的状态
            boolean wasActive = isActive();
            // 抽象方法,有子类实现
            doFinishConnect();
            // 处理future对象,将其标记为成功
            fulfillConnectPromise(connectPromise, wasActive);
        } catch (Throwable t) {
            fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
        } finally {
            // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
            // See https://github.com/netty/netty/issues/1770
            if (connectTimeoutFuture != null) {
                connectTimeoutFuture.cancel(false);
            }
            connectPromise = null;
        }
    }

能够看出,主要是经过调用doFinishConnect实现完成链接的逻辑,具体到子类中,NioSocketChannel.doFinishConnect的实现是:

protected void doFinishConnect() throws Exception {
    if (!javaChannel().finishConnect()) {
        throw new Error();
    }
}

write事件处理

对于的write事件的处理时经过调用NioUnsafe.forceFlush方法完成,最终的实如今AbstractChannel.AbstractUnsafe.flush0中:
大致上看,这个方法的逻辑比较简单,可是实际上最复杂也是最核心的写入逻辑在子类实现的doWrite方法中。因为本篇的重点在于把NioEventLoop的主干逻辑梳理一下,因此这里再也不继续展开,后面会单独来分析这一块的源码,这里涉及到netty中对缓冲区的封装,其中涉及到一些比较复杂的逻辑。

protected void flush0() {
        // 若是正在写数据,直接返回
        if (inFlush0) {
            // Avoid re-entrance
            return;
        }

        // 输出的缓冲区
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return;
        }

        inFlush0 = true;

        // Mark all pending write requests as failure if the channel is inactive.
        if (!isActive()) {
            try {
                if (isOpen()) {
                    outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                } else {
                    // Do not trigger channelWritabilityChanged because the channel is closed already.
                    outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
                }
            } finally {
                inFlush0 = false;
            }
            return;
        }

        try {
            // 将缓冲区的数据写入到channel中
            doWrite(outboundBuffer);
        } catch (Throwable t) {
            if (t instanceof IOException && config().isAutoClose()) {
                /**
                 * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                 * failing all flushed messages and also ensure the actual close of the underlying transport
                 * will happen before the promises are notified.
                 *
                 * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                 * may still return {@code true} even if the channel should be closed as result of the exception.
                 */
                initialCloseCause = t;
                close(voidPromise(), t, newClosedChannelException(t), false);
            } else {
                try {
                    shutdownOutput(voidPromise(), t);
                } catch (Throwable t2) {
                    initialCloseCause = t;
                    close(voidPromise(), t2, newClosedChannelException(t), false);
                }
            }
        } finally {
            inFlush0 = false;
        }
    }

read事件和accept事件处理

乍看会比较奇怪,为何这两个事件要放到一块儿处理呢,他们明明是不一样的事件。这里主要仍是考虑到编码的统一,由于read事件只有NioSocketChannel才会有,而accept事件只有NioServerSocketChannel才会有,因此这里经过抽象方法,让不一样的子类去实现各自的逻辑,是的代码结构上更统一。咱们这里看一下NioScketChannel的实现,而对于NioServerSocketChannel的实现我会在后续分析netty服务端的启动过程时在具体讲到,即ServerBootstrap的启动过程。

NioByteUnsafe.read

总结一下这个方法的主要逻辑:

  • 首先会获取缓冲分配器和相应的处理器RecvByteBufAllocator.Handle对象
  • 循环读取数据,每次分配一个必定大小(大小可配置)的缓冲,将channel中待读取的数据读取到缓冲中
  • 以装载有数据的缓冲为消息体,向channel的处理流水线(即pipeline)中触发一个读取的事件,让读取到的数据在流水线中传播,被各个处理器处理
  • 重复此过程,知道channel中没有可供读取的数据
  • 最后向pipeline中触发一个读取完成的事件
  • 最后还要根据最后一次读取到的数据量决定是否关闭通道,若是最后一次读取到的数据量小于0,说明对端已经关闭了输出,因此这里须要将输入关闭,即通道处于半关闭状态。

    public final void read() {
    final ChannelConfig config = config();
    // 若是通道已经关闭,那么就不须要再读取数据,直接返回
    if (shouldBreakReadReady(config)) {
    clearReadPending();
    return;
    }
    final ChannelPipeline pipeline = pipeline();
    // 缓冲分配器
    final ByteBufAllocator allocator = config.getAllocator();
    // 缓冲分配的处理器,处理缓冲分配,读取计数等
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
          boolean close = false;
          try {
              do {
                  // 分配一个缓冲
                  byteBuf = allocHandle.allocate(allocator);
                  // 将通道的数据读取到缓冲中
                  allocHandle.lastBytesRead(doReadBytes(byteBuf));
                  // 若是没有读取到数据,说明通道中没有待读取的数据了,
                  if (allocHandle.lastBytesRead() <= 0) {
                      // nothing was read. release the buffer.
                      // 由于没读取到数据,因此应该释放缓冲
                      byteBuf.release();
                      byteBuf = null;
                      // 若是读取到的数据量是负数,说明通道已经关闭了
                      close = allocHandle.lastBytesRead() < 0;
                      if (close) {
                          // There is nothing left to read as we received an EOF.
                          readPending = false;
                      }
                      break;
                  }
    
                  // 更新Handle内部的簿记量
                  allocHandle.incMessagesRead(1);
                  readPending = false;
                  // 向channel的处理器流水线中触发一个事件,
                  // 让取到的数据可以被流水线上的各个ChannelHandler处理
                  pipeline.fireChannelRead(byteBuf);
                  byteBuf = null;
                  // 这里根据以下条件判断是否继续读:
                  // 上一次读取到的数据量大于0,而且读取到的数据量等于分配的缓冲的最大容量,
                  // 此时说明通道中还有待读取的数据
              } while (allocHandle.continueReading());
    
              // 读取完成
              allocHandle.readComplete();
              // 触发一个读取完成的事件
              pipeline.fireChannelReadComplete();
    
              if (close) {
                  closeOnRead(pipeline);
              }
          } catch (Throwable t) {
              handleReadException(pipeline, byteBuf, t, close, allocHandle);
          } finally {
              // Check if there is a readPending which was not processed yet.
              // This could be for two reasons:
              // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
              // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
              //
              // See https://github.com/netty/netty/issues/2254
              // 这里isAutoRead默认是true, 因此正常状况下会继续监听read事件
              if (!readPending && !config.isAutoRead()) {
                  removeReadOp();
              }
          }
      }

    }

总结

本篇主要分析了EventLoop的事件监听以及处理逻辑,此外处理处理io事件,也会处理添加进来的任务和定时调度任务和延迟调度任务。EventLoop就像是整个框架的发动机或者说是心脏,它经过jdk api进而简介地调用系统调用,不断地监听各类io事件,同时对不一样的io事件分门别类采用不一样的处理方式,对于read事件则会将网络io数据读取到缓冲中,并将读取到的数据传递给用户的处理器进行链式处理。Channelpipeline就像一个流水线同样,对触发的的各类事件进行处理。

遗留问题

  • NioSocketChannel.doWrite方法的写入逻辑的,待进一步分析
  • ChannelPipeline的详细分析,各类事件是怎么在处理器之间传播的,设计模式,代码结构等
  • 缓冲分配器和缓冲处理器的分析,它们是怎么对内存进行管理的,这也是netty高性能的缘由之一。
相关文章
相关标签/搜索