第7节初步学习了一下Java本来的线程池是如何工做的,以及Future的为何可以达到其效果,这些知识对于理解本章有很大的帮助,不了解的能够先看上一节。java
Netty为何会高效?回答就是良好的线程模型,和内存管理。在Java的NIO例子中就我将客户端的操做单独放在一个线程中处理了,这么作的缘由在于若是将客户端链接串起来,后来的链接就要等前一个处理完,固然这并不意味着多线程比单线程有优点,而是在于每一个客户端都须要进行读取准备好的缓存数据,再执行一些业务逻辑。若是业务逻辑耗时好久,那么顺序执行的方式没有多线程优点大。另外一个方面目前多核CPU很常见了,多线程是个不错的选择。这些在第一节就说明过,也提到过NIO并非提高了IO操做的速度,而是减小了CPU的浪费时间,这些概念不能搞混。数组
本节不涉及内存管理,只介绍相关的线程模型。promise
上图就是咱们须要关注的体系内容了,主要从EventExecutorGroup开始往下看,再上层的父接口是JDK提供的并发包内的内容,基础是线程池中能够执行周期任务的线程池服务。因此从这咱们能够知道Netty能够实现周期任务,好比心跳检测。接口定义下面将逐一介绍。缓存
isShuttingDown():是否正在关闭,或者是已经关闭。安全
shutdownGracefully():优雅停机,等待全部执行中的任务执行完成,并再也不接收新的任务。多线程
terminationFuture():返回一个该线程池管理的全部线程都terminated的时候触发的future。并发
shutdown():废弃了的关闭方法,被shutdownGracefully取代。app
next():返回一个被该Group管理的EventExecutor。ide
iterator():全部管理的EventExecutor的迭代器。oop
submit():提交一个线程任务。
schedule():周期执行一个任务。
上述方法基本上是对周期线程池的一个封装,可是扩展了EventExecuotr概念,即分了若干个小组,处理事件。另一个比较实用的就是优雅停机了。
EventLoopGroup中的方法不多,其主要是和channel结合了,就多了一个将channel注册到线程池中的方法。
EventExecutor继承自EventExecutorGroup,这个以前也提到过该类,至关于Group中的一个子集。
next():就是找group中下一个子集
parent():就是所属group
inEventLoop():当前线程是不是在该子集中
newXXX():这个是下一节内容,此处不介绍。
该接口就一个方法,就是parent();EventLoop和EventLoopGroup与EventExecutor和EventExecutorGroup是一组类似的概念。了解这些就能够了。
EventLoop和EventLoopGroup的实现十分简单,简单看下就能够了,这里介绍几个重要的实现类。
该类继承自上节说过的AbstractExecutorService,其最重要的是execute方法未实现。该类是对AbstractExecutorService的一个进一步加工,添加了group的概念,和不一样的Future建立方法。这里不要被以前的Java线程池模型所干扰,其不必定是线程池。回到上一节线程池的介绍,最终的样子都是Execute方法决定的。
该类是对AbstractEventExecutor的一个进一步实现,其实现了周期任务的执行。原理是内部持有一个优先队列ScheduledFutureTask。全部周期任务都添加到这个队列中,也实现了取出周期任务的方法,可是该抽象类并无具体执行周期任务的实现。
该类是对AbstractScheduledEventExecutor的一个实现,其基本上是咱们最终的一个EventLoop的雏形了,不少不一样协议的EventLoop都是基于它实现的。
虽然名字叫作单线程执行器,可是其不必定是单个线程。Executor默认使用的是ThreadPerTaskExecutor,其executor会为每个任务建立一个线程并执行,固然你也能够传入本身的executor。Queue使用的是LinkedBlockingQueue,无容量限制的任务队列。其提供了添加任务到任务队列,从任务队列中获取任务的方法。
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { fetchedAll = fetchFromScheduledTaskQueue(); if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } afterRunningAllTasks(); return ranAtLeastOne; } protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { Runnable task = pollTaskFrom(taskQueue); if (task == null) { return false; } for (;;) { safeExecute(task); task = pollTaskFrom(taskQueue); if (task == null) { return true; } } }
执行过程如上:1.先获取全部的周期任务,放入taskQueue;2.不断的执行taskQueue中的任务;3.afterRunningAllTasks就是一个自由发挥的方法。safeExecute就是直接执行run方法。
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); }
上面是该Executor初始化过程,run方法又是交给子类进行初始化了。
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { if (quietPeriod < 0) { throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); } if (timeout < quietPeriod) { throw new IllegalArgumentException( "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); } if (unit == null) { throw new NullPointerException("unit"); } if (isShuttingDown()) { return terminationFuture(); } boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; for (;;) { if (isShuttingDown()) { return terminationFuture(); } int newState; wakeup = true; oldState = state; if (inEventLoop) { newState = ST_SHUTTING_DOWN; } else { switch (oldState) { case ST_NOT_STARTED: case ST_STARTED: newState = ST_SHUTTING_DOWN; break; default: newState = oldState; wakeup = false; } } if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { break; } } gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); gracefulShutdownTimeout = unit.toNanos(timeout); if (oldState == ST_NOT_STARTED) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_TERMINATED); terminationFuture.tryFailure(cause); if (!(cause instanceof Exception)) { // Also rethrow as it may be an OOME for example PlatformDependent.throwException(cause); } return terminationFuture; } } if (wakeup) { wakeup(inEventLoop); } return terminationFuture(); }
上面是一个优雅停机的过程,改变该Executor的状态成ST_SHUTTING_DOWN,这里要注意addTask的时候只有shutdown状态才会拒绝,因此此时这里的逻辑还不会拒绝新任务添加,而后返回了一个terminationFuture,这里不作介绍。
此类继承自上面讲解的SingleThreadEventExecutor,这里多了一个tailTask队列,用于每次事件循环后置任务处理,暂且无论。重要的在于很早提到了register方法,将channel注册到线程中。
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
实际上就是生成了一个DefaultChannelPromise,将channel和线程绑定,最后都放入了unsafe对象中。
上面讲了一些杂乱无章的内容,这里借助NioEventLoop好好梳理一下整个设计流程。NioEventLoop继承自SingleThreadEventLoop,先对以前的相关研究进行总结:
1.EventExecutorGroup接口是继承自Java的周期任务接口,是一个事件处理器组的概念,其相关方法有:
是否正在关闭;优雅停机;获取一个事件处理器;提交一个任务;提交一个周期任务
2.EventExecutor接口是事件处理器,其继承自EventExecutorGroup,目的并非说每个事件处理器都是一个事件处理器组,而是为了复用接口定义的方法。每一个处理器都应该具有优雅停机,提交任务,判断是否关闭的方法,其它方法有:
获取处理器组;获取下一个处理器(覆盖了父接口的next方法);判断是否在事件循环内;建立Promise、ProgressivePromise、SucceededFuture、FailedFuture
3.EventLoopGroup继承自EventExecutorGroup,更新了父接口的含义,EventExecutor的定位是处理单个事件,group就是处理事件组了。EventLoop的定位是处理一个链接的生命周期过程当中的周期事件,group是多个EventLoop的集合了。这里又有一个尴尬的地方,group按照定义本不须要定义其它方法,可是因为Server端的设计(以前说过服务端的channel也是一个线程),使用的是group,因此group必须承担单个EventLoop的职责。最终添加了额外的方法:
获取下一个EventLoop;注册channel;
4.EventLoop,事件循环,其也是一个处理器,最终继承自EventExecutor和EventLoopGroup,方法只有一个:
获取父事件循环组EventLoopGroup。
上述接口光看名称很容易陷入误解,实际上定义是想将单个loop和group分离,可是实现上因为Server端包含一个服务端监听链接线程,一个客户端链接线程,其Group承担了单个的职责,因此定义了一些本该由单个执行器处理的方法,又为了复用方法,致使loop继承了group,这样看起来怪怪的,接口理解起来就混乱了。结合上面的描述,再看一遍继承图就更清楚了:
理解了上面的设定,咱们再来看看客户端的事件处理是如何设计的,即总结上诉抽象类作了哪些事情。
1. AbstractEventExecutor:入参就一个parent,该类完成了一个基本处理:
a.将next设置成本身(上面说过继承的group,这个操做就和group区分开了)。
b.优雅停机调用的是带有超时的停机方案,超时为15秒
c.覆盖了Java提供的newTask包装成FutureTask的方法,使用了本身的PromiseTask
d.提供安全执行方法:safeExecute,直接调用的run方法
该类是最基础的一个抽象类,基本做用就是与group在定义混乱上作了一个区分。提供了执行器与Future关联方法和一个基本的执行任务的方法。
2.AbstractScheduledEventExecutor:入参也是一个parent,该类对AbstractEventExecutor未处理的周期任务提供了具体的完成方法:
a.提供计算当前距服务启动的时间
b.提供存储ScheduledFutureTask的优先队列
c.提供了取消全部周期任务的方法
d.提供了获取一个符合的周期任务的方法,要知足时间,并获取后移除
e.提供了获取距最近一个周期任务的时间多久
f.提供了移除一个周期任务的方法
g.提供添加周期任务的方法
该类提供了周期任务执行的一些基本方法,涉及添加周期任务,移除,获取等方法。
3.SingleThreadEventExecutor:入参包括parent,addTaskWakesUp标志,maxPendingTasks最大任务队列数(16和io.netty.eventexecutor.maxPendingTasks(default Integer.MAX_VALUE)参数更大的那个值),executor执行器(默认是ThreadPerTaskExecutor,每一个任务建立一个线程执行),taskQueue任务队列(默认是LinkedBlockingQueue),rejectedExecutionHandler拒绝任务的处理类(默认直接抛出RejectedExecutionException)。该类主要完成了一个单线程的EventExecutor的基本操做:
a.建立一个taskQueue
b.中断线程
c.从任务队列中获取一个任务,takeTask连同周期任务也会获取
d.添加任务到任务队列
e.移除任务队列中指定任务
f.运行全部任务,会先将周期任务存入taskQueue,再使用safeExecute方法执行任务
g.实现了execute方法,会添加任务到任务队列,若是当前线程不是事件循环线程,开启一个线程。经过的就是持有的executor来开启的线程任务。execute方法调用了run方法,该类没有实现run方法。任务的添加都不是经过execute直接执行了,而是走的添加任务到taskQueue,由未实现的run线程来处理这些事件。
h.优雅停机
这样就有了一个基础的单线程模型了,开启线程,保存,取出任务的方法都有了,只有在开启线程中执行任务的run()方法还未实现。
4.SingleThreadEventLoop:入参和SingleThreadEventExecutor一致,不一样的是多了一个tailTasks。该类主要是针对netty自身的事件循环的定义来实现方法了:
a.注册channel,其实是生成了一个DefaultChannelPromise对象,持有了channel,和运行该channel的EventExecutor,而后将该对象交给最底层的unsafe处理。
b.添加一个事件周期结束后执行的尾任务tailTasks
c.执行尾任务
d.删除指定尾任务
该类就很简单,没有过多的内容,只是增长了一个每一个事件周期后执行的任务而已。
回顾完了,上面4个父类构建了一个基本的带定时任务,普通任务,事件循环后置任务的EventLoop,每一个channel绑定了一个线程执行器,经过DefaultChannelPromise持有二者,最终交给Unsafe操做。子类只须要实现run方法,处理任务队列中的任务。下面就是重头戏NioEventLoop这个客户端的线程是如何设计的了:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
调用的就是父类方法,不过在建立EventLoop的时候建立了selector,这个是NIO中也提到过的。该EventLoop是在Group中newChild建立的。
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
上面是咱们须要关注的run方法,该方法被单独的线程执行。经过一个策略来判断执行哪一个,默认的策略是任务队列中有任务,select执行一下,返回当前的事件数,没任务返回SelectStrategy.SELECT。简单的说就是有任务就补充新到来的事件执行全部的任务,没任务就执行新到的事件。先处理IO读写任务,再处理其余任务,ioRation设置的耗时比例是IO任务占一个执行周期的百分比,默认50,意思是IO执行了50秒,其余任务也会获得50秒的执行时间。后续操做就是获取全部select的key,执行全部的任务了。这里就有一个判断若是是停机状态,就会closeAll(),以前说优雅停机的时候就是设置了一个这个标志,最后是在执行任务以后判断。processSelectedKey环节都交给unsafe类完成了,这里就挂上了handler相关触发,handler的执行也就说明都在该线程内了。
上面的描述虽然把整个过程都关联上了,可是最主要的问题仍是混乱的:如何作到一个channel建立一个线程的?上面只是说明了channel和EventExecutor是绑定在DefaultChannelPromise并交给了Unsafe类,并无看到是如何建立线程的。并且另外一个问题在于,processSelectedKey是选择了全部的key,这不是全部的channel共享了一个线程吗?
要解决该问题要回到Bootstrap的channel创建过程:initAndRegister()方法中,经过channelFactory建立了一个channel对象,然后ChannelFuture regFuture = config().group().register(channel);主要就是观察register方法,该类是设置的线程池NioEventLoopGroup提供的方法,其继承的是MultithreadEventLoopGroup,是调用了next方法获取的EventLoop,最后接上上面channel和eventloop绑定的内容。next中获取的EventLoop早在类初始化的时候就生成了,在构造方法中MultithreadEventExecutorGroup,children就是Eventloop,next不过是挑选了一个线程池而已,默认数量是CPU核数的2倍。这个也就是前面说的,线程数量不是越多越好。
这样咱们明白了,客户端注册的时候是分配了一个线程给它。客户端并不须要多线程,可是仍是继续看后面的内容:AbstractUnsafe的register方法给出了相关解答。channel持有了该EventLoop,此时线程仍是未运行状态,只是有了这么一个对象而已。
if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
这里execute方法,这个是以前咱们讲过的一个方法,其会判断当前线程是不是该channel的线程,目前都没有初始化线程确定不是,将任务放入任务队列,开启一个线程(线程处于未启动状态才会开启,不然不执行),这个设计使得execute的时候只会开启一次线程,而全部的任务都会被放入任务队列,由这个线程执行。再回到run方法,这个是channel线程执行的方法,目前每一个NioEventLoop都执行了Select等方法啊,这不是处理了全部的channel的工做吗?并无达到一个channel生命周期控制在一个线程中啊。
这里其实是JAVA NIO的例子带来的误解,认为必须一个线程来使用select,而后遍历事件分配线程给channel执行读写操做。实际上在Netty中不同,Netty全部线程都在执行select并获取相关事件,可是实际上其并无执行全部的事件。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop != this || eventLoop == null) { return; } unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }
看这里,if (eventLoop != this || eventLoop == null) ,若是channel的eventLoop不是当前的eventLoop就不执行,这个就一小段代码,可是就直接决定了EventLoop只对本身所绑定的channel感兴趣,最终达到了只处理本身相关的任务的目的。
该类没有太多须要说明的内容,前面已经讲解了不少。
1.AbstractEventExecutorGroup,实现了基本的方法,全部方法都是调用next()即挑选出一个线程执行器完成的。
2.MultithreadEventExecutorGroup,实现了一个基本的线程池,持有子线程,主要工做是初始化了子线程数组,提供了next方法。
3.MultithreadEventLoopGroup,实现了Netty的channel线程池,提供了register方法,虽然也是调用next的register方法。
4.NioEventLoopGroup,实现了建立子线程数组时newChild方法,全部的EventLoop都是这个方法建立的。
group就是两个重点,一个next()挑选事件执行器,一个newChild()建立线程执行器对象。
本节耗费了大量的篇幅讲解了Netty的线程模型的设计思路,主要看点以下:
1.解释了EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup这四者之间的关系,和这么复杂混乱的继承关系的缘由。
2.解释了group是如何初始化线程,并绑定channel的,next().register()
3.解释了eventloop为何和channel绑定了,execute()开启线程,以及每一个eventloop都在获取IO事件,可是经过channel的eventloop是否等于当前过滤掉其它的事件,只处理本身绑定的channel事件。
因为每一个线程都在获取IO事件,因此这段逻辑变的很是复杂,这也就是我以前说的写好很IO很困难。
最后附上一张对前面全部内容总结的一个图,清醒一下头脑,从复杂的代码中脱身:
这个图就是一个基本的执行过程图了,可能有遗漏的地方,可是大致状况如图所示。