1、NioEventLoop的概述数组
NioEventLoop作为Netty线程模型的核心部分,从本质上讲是一个事件循环执行器,每一个NioEventLoop都会绑定一个对应的线程经过一个for(;;)
循环来处理与 Channel 相关的 IO 操做, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理等;其次做为任务队列, 执行 taskQueue 中的任务, 例如eventLoop.schedule 提交的定时任务也是这个线程执行的。而NioEventLoopGroup顾名思义,它是维护了一组这样的事件循环器,这也是Netty基于Reactor模式的具体设计体现。app
接下来咱们就结合具体的代码,对NioEventLoop的整个建立流程进行一个说明与总结ide
2、NioEventLoop的建立函数
咱们基于Netty构建服务端仍是客户端时,都首先须要建立NioEventLoopGroup 实例oop
// Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup 作为基于NIO的处理channle相关IO操做的事件循环器组,它的类层次结构以下优化
经过NioEventLoopGroup构造函数传入线程数量this
/** * Create a new instance using the specified number of threads, {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); }
NioEventLoopGroup最终的构造函数中会包含如下几个函数spa
一、nThreads:传入的线程数量.net
二、executor :线程执行器Executor接口,默认为空线程
三、selectorProvider:用于建立Selector的SelectorProvider
四、selectStrategyFactory:传入DefaultSelectStrategyFactory.INSTANCE, 一个使用默认选择策略的工厂。
五、RejectedExecutionHandlers.reject():Netty自定义线程拒绝策略
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
在父类MultithreadEventLoopGroup中,会根据你传入nThreads大小,肯定初始化的线程数量,为0且没有设置io.netty.eventLoopThreads参数项,则会以当前系统的核心线程数*2作为默认的线程数量
static { //若是没有设置io.netty.eventLoopThreads参数项,则会以当前运行系统的核心线程数*2做为线程数 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } /** * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...) */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
接下来在MultithreadEventExecutorGroup的构造函数中咱们会根据传入的线程数,去初始化和建立一组NioEventLoop
首先咱们看下NioEventLoop的类层次结构
下面在MultithreadEventExecutorGroup构造函数中主要完成如下几个功能:
一、初始化ThreadPerTaskExecutor线程执行器,并传入一个线程建立工厂,用于NioEventLoop对应线程的建立
二、根据传入的线程数,初始化一个EventExecutor数组,用于放置建立的NioEventLoop对象
三、循环数组,经过newChild方法建立NioEventLoop对象。
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { // 建立线程工厂,netty根据须要指定了线程的命名方式、优先级、是不是守护线程等属性 // 该线程池没有任何队列,提交任务后,建立任何线程类型都是 FastThreadLocalRunnable, 而且当即start。 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //初始化一组事件循环执行器 children = new EventExecutor[nThreads]; //根据传入的线程数,初始化一个线程数组 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 建立 new NioEventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } }
继续跟踪进入newChild(executor, args)内部,看到它会返回一个NioEventLoop对象
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
继续查看NioEventLoop构造函数和他的父类构造函数
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
父类构造函数
/** * Create a new instance * * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it * @param executor the {@link Executor} which will be used for executing * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * executor thread * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. * @param rejectedHandler the {@link RejectedExecutionHandler} to use. */ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
经过上面的代码咱们能够看到,初始化NioEventLoop主要完成了如下的功能
一、保存线程执行器ThreadPerTaskExecutor
二、建立一个selector
三、基于LinkedBlockingQueue建立一个taskQueue任务队列,用于保存要执行的任务
这些都是为了后续的循环执行Channel 相关事件所作准备。
到这里其实咱们建立了一组NioEventLoop,也就是一组事件循环执行器,每一个NioEventLoop中都有对应的一个线程和一个selector ;建立完毕以后,天然就是要为每个链接分配对应的NioEventLoop。Netty中经过
实现EventLoopGroup接口中的next()方法来返回一个可使用的的NioEventLoop
public interface EventLoopGroup extends EventExecutorGroup { /** * Return the next {@link EventLoop} to use */ @Override EventLoop next(); }
在MultithreadEventExecutorGroup中咱们能够查看它的具体实现方式
chooser = chooserFactory.newChooser(children); @Override public EventExecutor next() { return chooser.next(); }
进入代码内部咱们能够看到Netty针对数组大小,对数组下标的计算方式进行了优化
/** * Default implementation which uses simple round-robin to choose next {@link EventExecutor}. */ @UnstableApi public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } @SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { //判断是不是二的次幂,若是为true返回PowerOfTwoEventExecutorChooser,反之GenericEventExecutorChooser if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } //经过&运算的方式循环获取数组下标 @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } //经过取模的方式循环获取数组下标 @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } }
到此咱们基本把Netty中NioEventLoop及NioEventLoopGroup的建立流程及核心代码梳理了一遍。NioEventLoop作为Netty线程模型的核心部分包含的内容比较多,上面只是初始化及建立的一部份内容,后续的部分我会陆续的补齐,其中有错误和不足之处还请指正与海涵。