今天是猿灯塔“365篇原创计划”第五篇。 接下来的时间灯塔君持续更新Netty系列一共九篇面试
Netty 源码解析(一): 开始数组
Netty 源码解析(二): Netty 的 Channel安全
Netty 源码解析(三): Netty的 Future 和 Promise微信
Netty 源码解析(四): Netty 的 ChannelPipelineapp
当前:Netty 源码解析(五): Netty 的线程池分析异步
Netty 源码解析(六): Channel 的 register 操做ide
Netty 源码解析(七): NioEventLoop 工做流程函数
Netty 源码解析(八): 回到 Channel 的 register 操做oop
Netty 源码解析(九): connect 过程和 bind 过程分析优化
今天呢!灯塔君跟你们讲:
Netty 的线程池分析
接下来,咱们来分析 Netty 中的线程池。Netty 中的线程池比较很差理解,由于它的类比较多,并且它们之间的关系错综复杂。看下图,感觉下 NioEventLoop 类和 NioEventLoopGroup 类的继承结构:
这张图我按照继承关系整理而来,你们仔细看一下就会发现,涉及到的类确实挺多的。本节来给你们理理清楚这部份内容。
首先,咱们说的 Netty 的线程池,指的就是 NioEventLoopGroup 的实例;线程池中的单个线程,指的是右边 NioEventLoop 的实例。
回顾下咱们第一节介绍的 Echo 例子,客户端和服务端的启动代码中,最开始咱们老是先实例化 NioEventLoopGroup:
// EchoClient 代码最开始:EventLoopGroup group = new NioEventLoopGroup();
// EchoServer 代码最开始:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
下面,咱们就从 NioEventLoopGroup 的源码开始进行分析。 咱们打开 NioEventLoopGroup 的源码,能够看到,NioEventLoopGroup 有多个构造方法用于参数设置,最简单地,咱们采用无参构造函数,或仅仅设置线程数量就能够了,其余的参数采用默认值。
好比上面的代码中,咱们只在实例化 bossGroup 的时候指定了参数,表明该线程池须要一个线程。
publicNioEventLoopGroup(){ this(0); } publicNioEventLoopGroup(intnThreads){ this(nThreads,(Executor)null); } ... //参数最全的构造方法 publicNioEventLoopGroup(intnThreads,Executorexecutor,EventExecutorChooserFactorychooserFactory, finalSelectorProviderselectorProvider, finalSelectStrategyFactoryselectStrategyFactory, finalRejectedExecutionHandlerrejectedExecutionHandler){ //调用父类的构造方法 super(nThreads,executor,chooserFactory,selectorProvider,selectStrategyFactory,rejectedExecutionHandler); } 复制代码
咱们来稍微看一下构造方法中的各个参数:
这里介绍这些参数是但愿你们有个印象而已,你们发现没有,在构造 NioEventLoopGroup 实例时的好几个参数,都是用来构造 NioEventLoop 用的。下面,咱们从 NioEventLoopGroup 的无参构造方法开始,跟着源码走:
publicNioEventLoopGroup(){ this(0); } publicNioEventLoopGroup(){ this(0); } 复制代码
而后一步步走下去,到这个构造方法:
publicNioEventLoopGroup(intnThreads,ThreadFactorythreadFactory,finalSelectorProviderselectorProvider,finalSelectStrategyFactoryselectStrategyFactory){ super(nThreads,threadFactory,selectorProvider,selectStrategyFactory,RejectedExecutionHandlers.reject()); }复制代码
你们本身要去跟一下源码,这样才知道中间设置了哪些默认值,下面这几个参数都被设置了默认值:
跟着源码走,咱们会来到父类 MultithreadEventLoopGroup 的构造方法中:
protectedMultithreadEventLoopGroup(intnThreads,ThreadFactorythreadFactory,Object...args){ super(nThreads==0?DEFAULT_EVENT_LOOP_THREADS:nThreads,threadFactory,args); }复制代码
这里咱们发现,若是采用无参构造函数,那么到这里的时候,默认地 nThreads 会被设置为 CPU 核心数 *2。你们能够看下 DEFAULT_EVENT_LOOP_THREADS 的默认值,以及 static 代码块的设值逻辑。咱们继续往下走:
protectedMultithreadEventExecutorGroup(intnThreads,ThreadFactorythreadFactory,Object...args){ this(nThreads,threadFactory==null?null:newThreadPerTaskExecutor(threadFactory),args);复制代码
到这一步的时候,new ThreadPerTaskExecutor(threadFactory)
会构造一个 executor。
咱们如今还不知道这个 executor 怎么用。这里咱们先看下它的源码:public
final
class
ThreadPerTaskExecutor
implements
Executor
{
private
final
`ThreadFactorythreadFactory;pu
blic Th
readPerTaskExecutor(ThreadFactorythreadFactory) {` `i`f(`th
readFactory==null){th
row` n`ewNul`l`Poi
nterException("threadFactory"); } this.
threadF
actory=threadFactory; } @Override` p`u`bl`ic void e`x`ecute(`R`unna`b`lecomma
nd) { //为每一个任务新建一个线程 t
hreadFactory.ne
wThread(command).start(); } } Executor 做为线程池的最
顶层
接`口, 咱们知道,它只有一个 execute(runnable) 方法,从上面咱们能够看到,实现类 ThreadPerTaskExecutor 的逻辑就是每来一个任务,新建一个线程。咱们先记住这个,前面也说了,它是给 NioEventLoop 用的,不是给 NioEventLoopGroup 用的。
.一步设置完了 executor,咱们继续往下看:
protectedMultithreadEventExecutorGroup(intnThreads,Executorexecutor,Object...args){ this(nThreads,executor,DefaultEventExecutorChooserFactory.INSTANCE,args); } 这一步设置了 chooserFactory,用来实现从线程池中选择一个线程的选择策略。 复制代码
ChooserFactory 的逻辑比较简单,咱们看下 DefaultEventExecutorChooserFactory 的实现:Override
public
`EventExecutorChoosernewChooser(E
ventExecutor[]executors) {i
f(
isPowerOfTwo(executors.length)){ ret`ur`n newP`o`wer
OfTwoEventExecutorChooser(executors); }else{
return` n`ewGene`r`icE
ventExecutorChooser(executors); } } 这里设置的
策略
也很简单:一、若是线程池的线程数量是 2^n,采用下面的方式会高效一些:@Override
publicEve
ntExecutornext() { re
turnex`e`c`u`tors[i
dx.getAndIncrement()&executors.length-1]; } 二、若是不是,用取模的方式:
@Override public
EventExecu
tornext() { returnexe
cutors`\[`M`a`th.abs
(idx.getAndIncrement()%executors.length)]; }`
走了这么久,咱们终于到了一个干实事的构造方法中了:
protectedMultithreadEventExecutorGroup(intnThreads,Executorexecutor, EventExecutorChooserFactorychooserFactory,Object...args){ if(nThreads<=0){ thrownewIllegalArgumentException(String.format("nThreads:%d(expected:>0)",nThreads)); } // executor 若是是 null,作一次和前面同样的默认设置。 if(executor==null){ executor=newThreadPerTaskExecutor(newDefaultThreadFactory()); } //这里的children数组很是重要,它就是线程池中的线程数组,这么说不太严谨,可是就大概这个意思 children=newEventExecutor[nThreads]; //下面这个for循环将实例化children数组中的每个元素 for(inti=0;i<nThreads;i++){ booleansuccess=false; try{ //实例化!!!!!! children[i]=newChild(executor,args); success=true; }catch(Exceptione){ //TODO:Thinkaboutifthisisagoodexceptiontype thrownewIllegalStateException("failedtocreateachildeventloop",e); }finally{ //若是有一个child实例化失败,那么success就会为false,而后进入下面的失败处理逻辑 if(!success){ //把已经成功实例化的“线程”shutdown,shutdown是异步操做 for(intj=0;j<i;j++){ children[j].shutdownGracefully(); } //等待这些线程成功shutdown for(intj=0;j<i;j++){ EventExecutore=children[j]; try{ while(!e.isTerminated()){ e.awaitTermination(Integer.MAX_VALUE,TimeUnit.SECONDS); } }catch(InterruptedExceptioninterrupted){ //把中断状态设置回去,交给关心的线程来处理. Thread.currentThread().interrupt(); break; } } } } } //================================================ //===到这里,就是表明上面的实例化全部线程已经成功结束=== //================================================ //经过以前设置的chooserFactory来实例化Chooser,把线程池数组传进去, //这就没必要再说了吧,实现线程选择策略 chooser=chooserFactory.newChooser(children); //设置一个Listener用来监听该线程池的termination事件 //下面的代码逻辑是:给池中每个线程都设置这个 listener,当监听到全部线程都 terminate 之后,这个线程池就算真正的 terminate 了。 finalFutureListener<Object>terminationListener=newFutureListener<Object>(){ @Override publicvoidoperationComplete(Future<Object>future)throwsException{ if(terminatedChildren.incrementAndGet()==children.length){ terminationFuture.setSuccess(null); } } }; for(EventExecutore:children){ e.terminationFuture().addListener(terminationListener); } //设置readonlyChildren,它是只读集合,之后用到再说 Set<EventExecutor>childrenSet=newLinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet,children); readonlyChildren=Collections.unmodifiableSet(childrenSet); }复制代码
上面的代码很是简单吧,没有什么须要特别说的,接下来,咱们来看看 newChild() 这个方法,这个方法很是重要,它将建立线程池中的线程。
我上面已经用过不少次"线程"这个词了,它可不是 Thread 的意思,而是指池中的个体,后面咱们会看到每一个"线程"在何时会真正建立 Thread 实例。反正每一个 NioEventLoop 实例内部都会有一个本身的 Thread 实例,因此把这两个概念混在一块儿也无所谓吧。
newChild(…)
方法在 NioEventLoopGroup 中覆写了,上面说的"线程"其实就是 NioEventLoop:
@Override protectedEventLoopnewChild(Executorexecutor,Object...args)throwsException{ returnnewNioEventLoop(this,executor,(SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(),(RejectedExecutionHandler)args[2]); } 它调用了 NioEventLoop 的构造方法:复制代码
NioEventLoop(NioEventLoopGroupparent,Executorexecutor,SelectorProviderselectorProvider, SelectStrategystrategy,RejectedExecutionHandlerrejectedExecutionHandler){ //调用父类构造器 super(parent,executor,false,DEFAULT_MAX_PENDING_TASKS,rejectedExecutionHandler); if(selectorProvider==null){ thrownewNullPointerException("selectorProvider"); } if(strategy==null){ thrownewNullPointerException("selectStrategy"); } provider=selectorProvider; //开启 NIO 中最重要的组件:Selector finalSelectorTupleselectorTuple=openSelector(); selector=selectorTuple.selector; unwrappedSelector=selectorTuple.unwrappedSelector; selectStrategy=strategy; }复制代码
咱们先粗略观察一下,而后再往下看:
这个时候,咱们来看一下 NioEventLoop 类的属性都有哪些,咱们先忽略它继承自父类的属性,单单看它本身的:
privateSelectorselector; privateSelectorunwrappedSelector; privateSelectedSelectionKeySetselectedKeys; privatefinalSelectorProviderprovider; privatefinalAtomicBooleanwakenUp=newAtomicBoolean(); privatefinalSelectStrategyselectStrategy; privatevolatileintioRatio=50; privateintcancelledKeys; privatebooleanneedsToSelectAgain;复制代码
结合它的构造方法咱们来总结一下:
而后咱们继续走它的构造方法,咱们看到上面的构造方法调用了父类的构造器,它的父类是 SingleThreadEventLoop。
protectedSingleThreadEventLoop(EventLoopGroupparent,Executorexecutor, booleanaddTaskWakesUp,intmaxPendingTasks, RejectedExecutionHandlerrejectedExecutionHandler){ super(parent,executor,addTaskWakesUp,maxPendingTasks,rejectedExecutionHandler); //咱们能够直接忽略这个东西,之后咱们也不会再介绍它 tailTasks=newTaskQueue(maxPendingTasks); } 复制代码
SingleThreadEventLoop 这个名字很诡异有没有?而后它的构造方法又调用了父类 SingleThreadEventExecutor 的构造方法:
protectedSingleThreadEventExecutor(EventExecutorGroupparent,Executorexecutor, booleanaddTaskWakesUp,intmaxPendingTasks, RejectedExecutionHandlerrejectedHandler){ super(parent); this.addTaskWakesUp=addTaskWakesUp; this.maxPendingTasks=Math.max(16,maxPendingTasks); this.executor=ObjectUtil.checkNotNull(executor,"executor"); //taskQueue,这个东西很重要,提交给NioEventLoop的任务都会进入到这个taskQueue中等待被执行 //这个queue的默认容量是16 taskQueue=newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler=ObjectUtil.checkNotNull(rejectedHandler,"rejectedHandler"); }复制代码
到这里就更加诡异了,NioEventLoop 的父类是 SingleThreadEventLoop,而 SingleThreadEventLoop 的父类是 SingleThreadEventExecutor,它的名字告诉咱们,它是一个 Executor,是一个线程池,并且是 Single Thread 单线程的。也就是说,线程池 NioEventLoopGroup 中的每个线程 NioEventLoop 也能够当作一个线程池来用,只不过它只有一个线程。这种设计虽然看上去很巧妙,不过有点反人类的样子。上面这个构造函数比较简单:
还记得默认策略吗:抛出RejectedExecutionException 异常。 在 NioEventLoopGroup 的默认构造中,它的实现是这样的:private
static
final
`RejectedExecutionHandlerREJECT=newReje
ctedExecutionHandler(){ @Overr
ide public
voidr
eject
ed(Runnabletask,SingleThreadEventExecutorexecutor) { throw` `n`ew`Rejec`t`edE
xecutionException(); } };`
而后,咱们再回到 NioEventLoop 的构造方法:
NioEventLoop(NioEventLoopGroupparent,Executorexecutor,SelectorProviderselectorProvider, SelectStrategystrategy,RejectedExecutionHandlerrejectedExecutionHandler){ //咱们刚刚说完了这个 super(parent,executor,false,DEFAULT_MAX_PENDING_TASKS,rejectedExecutionHandler); if(selectorProvider==null){ thrownewNullPointerException("selectorProvider"); } if(strategy==null){ thrownewNullPointerException("selectStrategy"); } provider=selectorProvider; //建立selector实例 finalSelectorTupleselectorTuple=openSelector(); selector=selectorTuple.selector; unwrappedSelector=selectorTuple.unwrappedSelector; selectStrategy=strategy; }复制代码
能够看到,最重要的方法其实就是 openSelector() 方法,它将建立 NIO 中最重要的一个组件 Selector。在这个方法中,Netty 也作了一些优化,这部分咱们就不去分析它了。到这里,咱们的线程池 NioEventLoopGroup 建立完成了,而且实例化了池中的全部 NioEventLoop 实例。同时,你们应该已经看到,上面并无真正建立 NioEventLoop 中的线程(没有建立 Thread 实例)。提早透露一下,建立线程的时机在第一个任务提交过来的时候,那么第一个任务是什么呢?是咱们立刻要说的 channel 的 register 操做。
365天干货不断微信搜索「猿灯塔」第一时间阅读,回复【资料】【面试】【简历】有我准备的一线大厂面试资料和简历模板