Netty 系列目录(http://www.javashuo.com/article/p-hskusway-em.html)html
上文提到在启动 NioEventLoop 线程时会执行 SingleThreadEventExecutor#doStartThread(),在这个方法中调用 SingleThreadEventExecutor.this.run(),NioEventLoop 重写了 run() 方法。NioEventLoop#run() 代码以下:java
@Override protected void run() { for (;;) { try { // 1. select 策略选择 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { // 1.1 非阻塞的 select 策略。实际上,默认状况下,不会返回 CONTINUE 的策略 case SelectStrategy.CONTINUE: continue; // 1.2 阻塞的 select 策略 case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } // 1.3 不须要 select,目前已经有能够执行的任务了 default: } // 2. 执行网络 IO 事件和任务调度 cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { // 2.1. 处理网络 IO 事件 processSelectedKeys(); } finally { // 2.2. 处理系统 Task 和自定义 Task runAllTasks(); } } else { // 根据 ioRatio 计算非 IO 最多执行的时间 final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // 3. 关闭线程 try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
NioEventLoop#run() 作了记下事情:git
当 taskQueue 中没有任务时,那么 Netty 能够阻塞地等待 IO 就绪事件。而当 taskQueue 中有任务时,咱们天然地但愿所提交的任务能够尽快地执行 ,所以 Netty 会调用非阻塞的 selectNow() 方法,以保证 taskQueue 中的任务尽快能够执行。github
(1) hasTasks网络
首先,在 run 方法中,第一步是调用 hasTasks() 方法来判断当前任务队列中是否有任务并发
protected boolean hasTasks() { assert inEventLoop(); return !taskQueue.isEmpty(); }
这个方法很简单,仅仅是检查了一下 taskQueue 是否为空。至于 taskQueue 是什么呢,其实它就是存放一系列的须要由此 EventLoop 所执行的任务列表。关于 taskQueue,咱们这里暂时不表,等到后面再来详细分析它。ide
(2) DefaultSelectStrategyoop
// NioEventLoop#selectNowSupplier private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); } }; // 非阻塞的 select 策略。实际上,默认状况下,不会返回 CONTINUE 的策略 SelectStrategy.SELECT = -1; // 阻塞的 select 策略 SelectStrategy.CONTINUE = -2; // DefaultSelectStrategy public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; }
显然当 taskQueue 为空时,执行的是 select(oldWakenUp) 方法。那么 selectNow() 和 select(oldWakenUp) 之间有什么区别呢? 来看一下,selectNow() 的源码以下fetch
(3) selectNow优化
int selectNow() throws IOException { try { return selector.selectNow(); } finally { // restore wakeup state if needed if (wakenUp.get()) { selector.wakeup(); } } }
调用 JDK 底层的 selector.selectNow()。selectNow() 方法会检查当前是否有就绪的 IO 事件,若是有,则返回就绪 IO 事件的个数;若是没有,则返回 0。注意,selectNow() 是当即返回的,不会阻塞当前线程。当 selectNow() 调用后,finally 语句块中会检查 wakenUp 变量是否为 true,当为 true 时,调用 selector.wakeup() 唤醒 select() 的阻塞调用。
(4) select(boolean oldWakenUp)
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectedKeys = selector.select(timeoutMillis); } catch (CancelledKeyException e) { } }
在这个 select 方法中,调用了 selector.select(timeoutMillis),而这个调用是会阻塞住当前线程的,timeoutMillis是阻塞的超时时间。到来这里,咱们能够看到,当 hasTasks() 为真时,调用的的 selectNow() 方法是不会阻塞当前线程的,而当 hasTasks() 为假时,调用的 select(oldWakenUp) 是会阻塞当前线程的。
在 NioEventLoop.run() 方法中,第一步是经过 select/selectNow 调用查询当前是否有就绪的 IO 事件,那么当有 IO 事件就绪时,第二步天然就是处理这些 IO 事件啦。首先让咱们来看一下 NioEventLoop.run 中循环的剩余部分:
final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { // 2.1. 处理网络 IO 事件 processSelectedKeys(); } finally { // 2.2. 处理系统 Task 和自定义 Task runAllTasks(); } } else { // 根据 ioRatio 计算非 IO 最多执行的时间 final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } }
上面列出的代码中,有两个关键的调用:
这里的代码还有一个十分有意思的地方,即 ioRatio。那什么是 ioRatio 呢?它表示的是此线程分配给 IO 操做所占的时间比(即运行 processSelectedKeys 耗时在整个循环中所占用的时间)。例如 ioRatio 默认是 50,则表示 IO 操做和执行 task 的所占用的线程执行时间比是 1 : 1。当知道了 IO 操做耗时和它所占用的时间比,那么执行 task 的时间就能够很方便的计算出来了。
咱们这里先分析一下 processSelectedKeys() 方法调用,runAllTasks() 留到下面再分析。processSelectedKeys() 方法的源码以下:
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
因为默认未开启 selectedKeys 优化功能,因此会进入 processSelectedKeysPlain 分支执。下面继续分析 processSelectedKeysPlain 的代码实现。
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { // https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { // NioSocketChannel 或 NioServerSocketChannel 进行 IO 读写相关的操做 processSelectedKey(k, (AbstractNioChannel) a); } else { // 用户自行注册的 Task 任务,通常状况下不会执行 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } // 省略... } }
processSelectedKey 方法源码以下:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 省略... try { int readyOps = k.readyOps(); // 1. OP_CONNECT 读写前要先处理链接,不然可能抛 NotYetConnectedException 异常 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // 2. OP_WRITE if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // 3. OP_READ 或 OP_ACCEPT if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
这个代码是否是很熟悉啊?彻底是 Java NIO 的 Selector 的那一套处理流程嘛!processSelectedKey 中处理了三个
事件,分别是:
OP_READ
可读事件,即 Channel 中收到了新数据可供上层读取.OP_WRITE
可写事件,即上层能够向 Channel 写入数据.OP_CONNECT
链接创建事件,即 TCP 链接已经创建,Channel 处于 active 状态.下面咱们分别根据这三个事件来看一下 Netty 是怎么处理的吧。
当就绪的 IO 事件是 OP_READ,代码会调用 unsafe.read() 方法。unsafe 咱们已见过屡次,NioSocketChannel 的 Unsafe 是在 AbstractNioByteChannel 中实现的,而 NioServerSocketChannel 的 Unsafe 是在 NioMessageUnsafe 中实现。
public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 1. 分配缓冲区 ByteBuf byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 2. 从 NioSocketChannel 中读取数据 if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; // 3. 调用 pipeline.fireChannelRead 发送一个 inbound 事件 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
上面 read 方法其实概括起来,能够认为作了以下工做:
OP_WRITE 可写事件代码以下。这里代码比较简单,没有详细分析的必要了。
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
最后一个事件是 OP_CONNECT,即 TCP 链接已创建事
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 已链接后就须要注销 OP_CONNECT 事件 See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }
OP_CONNECT 事件的处理中,只作了两件事情:
正如代码中的注释所言, 咱们须要将 OP_CONNECT 从就绪事件集中清除, 否则会一直有 OP_CONNECT 事件。
调用 unsafe.finishConnect() 通知上层链接已创建
unsafe.finishConnect() 调用最后会调用到 pipeline().fireChannelActive(),产生一个 inbound 事件,通知 pipeline 中的各个 handler TCP 通道已创建(即 ChannelInboundHandler.channelActive 方法会被调用)
到了这里,咱们整个 NioEventLoop 的 IO 操做部分已经了解完了,接下来的一节咱们要重点分析一下 Netty 的任务
队列机制。
咱们已经提到过,在 Netty 中,一个 NioEventLoop 一般须要肩负起两种任务,第一个是做为 IO 线程,处理 IO 操做;第二个就是做为任务线程,处理 taskQueue 中的任务。这一节的重点就是分析一下 NioEventLoop 的任务队列机制
的。
// SingleThreadEventExecutor private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(maxPendingTasks); protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (!offerTask(task)) { reject(task); } } final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); }
所以实际上,taskQueue 是存放着待执行的任务的队列。
除了经过 execute 添加普通的 Runnable 任务外,咱们还能够经过调用 eventLoop.scheduleXXX 之类的方法来添加
一个定时任务。schedule 功能的实现是在 SingleThreadEventExecutor 的父类,即 AbstractScheduledEventExecutor 中实现的。
// SingleThreadEventExecutor PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
scheduledTaskQueue 是一个队列(Queue),其中存放的元素是 ScheduledFutureTask。而ScheduledFutureTask 咱们很容易猜到,它是对 Schedule 任务的一个抽象。咱们来看一下 AbstractScheduledEventExecutor 所实现的 schedule 方法:
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; }
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { fetchedAll = fetchFromScheduledTaskQueue(); if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } afterRunningAllTasks(); return ranAtLeastOne; }
咱们前面已经提到过,EventLoop 能够经过调用 EventLoop.execute 来将一个 Runnable 提交到 taskQueue 中,
也能够经过调用 EventLoop.schedule 来提交一个 schedule 任务到 scheduledTaskQueue 中。在此方法的一开
始调用的 fetchFromScheduledTaskQueue() 其实就是将 scheduledTaskQueue 中已经能够执行的(即定时时
间已到的 schedule 任务) 拿出来并添加到 taskQueue 中,做为可执行的 task 等待被调度执行。代码以下:
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } scheduledTask = pollScheduledTask(nanoTime); } return true; }
接下来 runAllTasks() 方法就会不断调用 task = pollTask() 从 taskQueue 中获取一个可执行的 task,而后调用它
的 run() 方法来运行此 task。
注意: 由于 EventLoop 既须要执行 IO 操做,又须要执行 task,所以咱们在调用 EventLoop.execute 方法提交
任务时,不要提交耗时任务,更不能提交一些会形成阻塞的任务,否则会致使咱们的 IO 线程得不到调度,影响整
个程序的并发量。
天天用心记录一点点。内容也许不重要,但习惯很重要!