NioEventLoop是netty及其重要的组成部件,它的首要职责就是为注册在它上的channels服务,发现这些channels上发生的新链接、读写等I/O事件,而后将事件转交 channel 流水线处理。使用netty时,咱们首先要作的就是建立NioEventLoopGroup,这是一组NioEventLoop的集合,相似线程池与线程池组。一般,服务端会建立2个group,一个叫作bossGroup,一个叫作workerGroup。bossGroup负责监听绑定的端口,发现端口上的新链接,初始化后交由workerGroup处理后续的读写事件。java
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
咱们先看看bossGroup和workerGroup的构造方法。windows
public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } 除此以外,还有多达8种构造方法,这些构造方法能够指定5种参数: 一、最大线程数量。若是指定为0,那么Netty会将线程数量设置为CPU逻辑处理器数量的2倍 二、线程工厂。要求线程工厂类必须实现java.util.concurrent.ThreadFactory接口。若是没有指定线程工厂,那么默认DefaultThreadFactory。 三、SelectorProvider。若是没有指定SelectorProvider,那么默认的SelectorProvider为SelectorProvider.provider()。 四、SelectStrategyFactory。若是没有指定则默认为DefaultSelectStrategyFactory.INSTANCE 五、RejectedExecutionHandler。拒绝策略处理类,若是这个EventLoopGroup已被关闭,那么以后提交的Runnable任务会默认调用RejectedExecutionHandler的reject方法进行处理。若是没有指定,则默认调用拒绝策略。
最终,NioEventLoopGroup会重载到父类mUltiThreadEventExecutorGroup的构造方法上,这里省略了一些健壮性代码。数组
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) { // 步骤1 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 步骤2 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); } // 步骤3 chooser = chooserFactory.newChooser(children); // 步骤4 final FutureListener<Object> terminationListener = future -> { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } // 步骤5 Set<EventExecutor> childrenSet = new LinkedHashSet<>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
第一个步骤是建立线程池executor。从workerGroup构造方法可知,默认传进来的executor为null,因此首先建立executor。newDefaultThreadFactory的做用是设置线程的前缀名和线程优先级,默认状况下,前缀名是nioEventLoopGroup-x-y这样的命名规则,而线程优先级则是5,处于中间位置。
建立完newDefaultThreadFactory后,进入到ThreadPerTaskExecutor。它直接实现了juc包的线程池顶级接口,从构造方法能够看到它只是简单的把factory赋值给本身的成员变量。而它实现的接口方法调用了threadFactory的newThread方法。从名字能够看出,它构造了一个thread,并当即启动thread。app
public ThreadPerTaskExecutor(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); }
那么咱们回过头来看下DefaultThreadFactory的newThread方法,发现他建立了一个FastThreadLocalThread。这是netty自定义的一个线程类,顾名思义,netty认为它的性能更快。关于它的解析留待之后。这里步骤1建立线程池就完成了。总的来讲他与咱们一般使用的线程池不太同样,不设置线程池的线程数和任务队列,而是来一个任务启动一个线程。(问题:那任务一多岂不是直接线程爆炸?)负载均衡
@Override public Thread newThread(Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); return t; } protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); }
步骤2是建立workerGroup中的NioEventLoop。在示例代码中,传进来的线程数是0,显然不可能真正只建立0个nioEventLoop线程。在调用父类MultithreadEventLoopGroup构造函数时,对线程数进行了判断,若为0,则传入默认线程数,该值默认为2倍CPU核心数ide
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } // 静态代码块初始化DEFAULT+EVENT_LOOP_THREADS static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); }
接下来是经过newChild方法为每个EventExecutor建立一个对应的NioEventLoop。这个方法传入了一些args到NioEventLoop中,它们分别是:函数
进入NioEventLoop的构造函数,以下:oop
NioEventLoop构造函数 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; } // 父类构造函数 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; this.executor = ThreadExecutorMap.apply(executor, this); this.taskQueue = ObjectUtil.checkNotNull(taskQueue, taskQueue"); rejectedExecutionHandler = ObjectUtil.checkNotNullrejectedHandler, "rejectedHandler"); }
首先调用一个newTaskQueue方法建立一个任务队列。这是一个mpsc即多生产者单消费者的无锁队列。以后调用父类的构造函数,在父类的构造函数中,将NioEventLoopGroup设置为本身的parent,并经过匿名内部类建立了这样一个Executor————经过ThreadPerTaskExecutor执行传进来的任务,而且在执行时将当前线程与NioEventLoop绑定。其余属性也一一设置。
在nioEventLoop构造函数中,咱们发现建立了一个selector,不妨看一看netty对它的包装。性能
unwrappedSelector = provider.openSelector(); if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); }
首先看到netty定义了一个常量DISABLE_KEY_SET_OPTIMIZATION,若是这个常量设置为true,也即不对keyset进行优化,则直接返回未包装的selector。那么netty对selector进行了哪些优化?优化
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } }
往下,咱们看到了一个叫作selectedSelectionKeySet的类,点进去能够看到,它继承了AbstractSet,然而它的成员变量却让咱们想到了ArrayList,再看看它定义的方法,除了不支持remove和contains外,活脱脱一个简化版的ArrayList,甚至也支持扩容。
没错,netty确实经过反射的方式,将selectionKey从Set替换为了ArrayList。仔细一想,却又以为此番作法有些道理。众所周知,虽然HashSet和ArrayList随机查找的时间复杂度都是o(1),但相比数组直接经过偏移量定位,HashSet因为须要Hash运算,时间消耗上又稍稍逊色了些。再加上使用场景上,都是获取selectionKey集合而后遍历,Set去重的特性彻底用不上,也无怪乎追求性能的netty想要替换它了。
建立完workerGroup的NioEventLoop后,如何挑选一个nioEventLoop进行工做是netty接下来想要作的事。通常来讲轮询是一个很容易想到的方案,为此须要建立一个相似负载均衡做用的线程选择器。固然追求性能到丧心病狂的netty是不会轻易知足的。咱们看看netty在这样常见的方案里又作了哪些操做。
public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } // PowerOfTwo public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } // Generic public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; }
能够看到,netty根据workerGroup内线程的数量采起了2种不一样的线程选择器,当线程数x是2的幂次方时,能够经过&(x-1)来达到对x取模的效果,其余状况则须要直接取模。这与hashmap强制设置容量为2的幂次方有殊途同归之妙。
步骤4就是添加一些保证健壮性而添加的监听器了,这些监听器会在EventLoop被关闭后获得通知。
建立一个只读的NioEventLoop线程组
到此NioEventLoopGroup及其包含的NioEventLoop组就建立完成了