Netty 系列目录(http://www.javashuo.com/article/p-hskusway-em.html)html
Netty 基于事件驱动模型,使用不一样的事件来通知咱们状态的改变或者操做状态的改变。它定义了在整个链接的生命周期里当有事件发生的时候处理的核心抽象。java
Channel 为 Netty 网络操做抽象类,EventLoop 主要是为 Channel 处理 I/O 操做,二者配合参与 I/O 操做。算法
EventLoopGroup 是一个 EventLoop 的分组,它能够获取到一个或者多个 EventLoop 对象,所以它提供了迭代出 EventLoop 对象的方法。设计模式
其中 Executor、ExecutorService、AbstractExecutorService、ScheduledExecutorService 属于 JDK 定义的规范,Netty 实现了本身的自定义线程池。网络
EventExecutorGroup
提供了 next() 方法,除此以外是线程池的生命周期方法,如 shutdownGracefully。EventLoopGroup
提供了 register 方法,将 channel 绑定到线程上。EventExecutor
继承自 EventExecutorGroup,提供了 inEventLoop 方法。EventLoop
继承自 EventLoopGroup,提供了 register 注册和 parent 方法。MultithreadEventExecutorGroup
基于多线程的 EventExecutor (事件执⾏行器)的分组抽象类ThreadPerTaskExecutor
实现 Executor 接⼝口,每一个任务一个线程的执行器实现类(1) NioEventLoopGroup多线程
NioEventLoopGroup 构造方法中最重要的一件事是建立子线程 NioEventLoop,建立完成后子线程并未启动,该线程在 channel 注册时启动。app
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory, final RejectedExecutionHandler rejectedExecutionHandler) { super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler); }
nThreads, threadFactory
前两个参数用于建立线程池,若是 nThreads=0 则默认为 CPU 核数的 2 倍chooserFactory
用于循环获取 NioEventLoopGroup 中的下一个 NioEventLoop 的算法。2 的幂次方使用位运算selectorProvider, selectStrategyFactory
后二个参数用于建立 selectorrejectedExecutionHandler
异常处理的 Handler(2) MultithreadEventExecutorGroupide
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { // 1. executor 用于建立一个子线程 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 2. 建立全部的子线程 children children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { // 有异常则须要销毁资源 } } } // 3. chooser 用于循环获取 children 中的下一个元素的算法。2 的幂次方使用位运算 chooser = chooserFactory.newChooser(children); // 4. children 初始化完成则设置 setSuccess 为 true final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } // 5. 返回一个只读的 children 暴露给开发者 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
MultithreadEventExecutorGroup 建立了全部的子线程,其中最重要的方法是 newChild(executor, args)oop
(3) newChildthis
// NioEventLoopGroup 建立子线程 NioEventLoop protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
(4) NioEventLoop
NioEventLoop 构造时就建立了一个 selector 对象。下面看一个 NioEventLoop 的建立过程。
// 建立了一个 selector 对象 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; } // 负责 channel 的注册 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); tailTasks = newTaskQueue(maxPendingTasks); } // 负责执行任务 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
经过以上步骤就建立了 NioEventLoop 对象,但这个线程并未启动。很显然在 channel 注册到 NioEventLoop 时会启动该线程。
(1) execute
在 eventLoop 执行 execute 方法时,若是线程还未启动则须要先启动线程。
// SingleThreadEventExecutor public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } // 是否在 EventLoop 线程中 boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); // 启动线程 addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } // 唤醒线程 if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
!addTaskWakesUp 表示“添加任务时,是否唤醒线程”?!可是,怎么使⽤用取反了。这样反倒变成了,“添加任务时,是否【不】唤醒线程”。具体的缘由是为何呢?
真正的意思是,“添加任务后,任务是否会自动致使线程唤醒”。为何呢?
对于 Nio 使用的 NioEventLoop ,它的线程执行任务是基于 Selector 监听感兴趣的事件,因此当任务添加到 taskQueue 队列中时,线程是无感知的,因此须要调用 #wakeup(boolean inEventLoop) 方法,进行主动的唤醒。
对于 Oio 使用的 ThreadPerChannelEventLoop,它的线程执行是基于 taskQueue 队列列监听(阻塞拉取)事件和任务,因此当任务添加到 taskQueue 队列中时,线程是可感知的,至关于说,进行被动的唤醒。
(2) startThread 启动线程
private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); } } } } // 真正启动线程,执行的是 NioEventLoop 中的 run 任务 private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { // run 方法由子类 NioEventLoop 实现,是一个死循环 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { // 若是执行到这里,则说明须要关闭该线程 } } }); }
executor 默认是在 MultithreadEventExecutorGroup 的构造方法完成初始化的,代码以下:
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }
ThreadPerTaskExecutor 经过线程工厂 threadFactory 建立一个线程并启动,至此 NioEventLoop 开始工做了。在这个简单的类中使用的代理模式和命令模式两种设计模式。
(3) 线程状态变化
SingleThreadEventExecutor 维护了线程的状态字段 state。
天天用心记录一点点。内容也许不重要,但习惯很重要!