类图以下,EventLoop接口继承了EventLoopGroup的接口,EventLoop和EventLoopGroup是io.netty.channel包中的类,为了与Channel的事件进行交互,EventExecutorGroup是io.netty.util.concurrent包中的类,他直接继承了JDK的java.util.concurrent包的ScheduledExecutorService接口,对ScheduledExecutorService接口进行加强,用于提供线程执行器。
在EventLoop接口中,只定义了一个方法parent(),用于返回他所属的EventLoopGroup。java
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup { @Override EventLoopGroup parent(); }
EventLoop和EventLoopGroup以及Thread、Channel的关系以下图:
这些关系是:数组
以NioEventLoopGroup为例,下面是他的部分类结构图,能够看出它实现了LoopGroup接口。
咱们从new NioEventLoopGroup()
开始。
NioEventLoopGroup类ide
// 这边没传线程数,就默认0 public NioEventLoopGroup() { this(0); } // 。。。部分略 // 这边会调用父类MultithreadEventLoopGroup的构造函数 public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
NioEventLoopGroup的几个构造参数:函数
MultithreadEventLoopGroup类,对线程数的处理。oop
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { // 判断传过来的线程数量是否等于0,等于0从DEFAULT_EVENT_LOOP_THREADS取值 // 在静态代码块中,从系统属性中io.netty.eventLoopThreads获取,缺省值为CPU核心数*2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
MultithreadEventExecutorGroup类,建立EventExecutor数组,并监听是否terminated源码分析
// 传递EventExecutorChooserFactory,这个是用来从EventExecutor数组选择一个EventExecutor protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { // 线程判断 if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } // 设置executor,ThreadPerTaskExecutor中的execute方法以下 // threadFactory.newThread(command).start(),说明每次有任务,就建立一个线程 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 设置子类,children至关于线程池的数组 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 经过executor和传过来的参数实例化 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { // 若是失败了,前面已经实例化的线程都要关闭 for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 根据线程的数量建立chooser // 若是线程数是2的幂次方,则使用PowerOfTwoEventExecutorChooser // 不然使用GenericEventExecutorChooser chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { // terminated的EventExecutor个数若是等于EventExecutor数组的个数,这个线程池就terminated if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; // 为每一个EventExecutor都添加监听 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } // 设置只读的EventExecutor数组 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
newChild方法,在NioEventLoopGroup类中,这个后面的args参数,就是咱们以前传过来的,其中Selector对象就是在这个方法里经过SelectorProvider的openSelector()获取的,EventLoop都有本身的Selector对象。this
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
总结一下,MultithreadEventExecutorGroup有个EventExecutor(NioEventLoop继承了SingleThreadEventLoop,SingleThreadEventLoop实现了EventLoop接口,而EventLoop继承了EventExecutor)类型的children,因此至关于NioEventLoopGroup内部有个EventLoop数组,这个EventLoop数组的大小,就是线程数。这些EventLoop的parent都是同一个Executor。spa
在Channel注册的时候(这个后面讲),会调用SingleThreadEventExecutor的execute方法。这个方法主要限制线程的数量,taskQueue队列的大小最小为16,若是系统属性io.netty.eventLoop.maxPendingTasks没有设置,就默认Integer.MAX_VALUE
。.net
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); // 当前调用线程是不是支撑 EventLoop 的线程 boolean inEventLoop = inEventLoop(); // 加入到队列,若是队列满了,则根据策略抛出异常 addTask(task); // 若是不是当前线程 if (!inEventLoop) { // 启动一个线程 startThread(); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
startThread方法,先判断是否知足启动的条件,实际的启动在doStartThread方法里。线程
private void startThread() { // 还没启动 if (state == ST_NOT_STARTED) { // 经过cas判断是否正确更改状态,这边可能有多个线程进行启动,可是只要一个启动就行了 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { try { // 开启线程 doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); } } } }
doStartThread,经过ThreadPerTaskExecutor线程池建立一个线程。
private void doStartThread() { assert thread == null; // 这个executor就是ThreadPerTaskExecutor,上面提过,每次有任务就建立一个线程 executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; // 更新最后执行时间 updateLastExecutionTime(); try { // 主要调用run方法,在这里执行队列的任务 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { // 省略 } } }); }
总结一下,一个EventLoop只有一个线程,并维护一个任务队列,经过这个线程来处理任务。