1.计算机的基础知识
位逻辑运算符: &: 位与运算符,只有两个操做数都是true,结果才是true。 |: 位或运算符,只有两个操做数都是false,结果才是false。 ~: 位非运算符:若是位为0,结果是1,若是位为1,结果是0. ^: 位异或运算:两个数转为二进制,而后从高位开始比较,若是相同则为0,不相同则为1。
位移运算: <<: 无符号左移 >>: 无符号右移 >>>:带符号右移(没有带符号左移这种操做)
二进制: 二进制都是以补码的形式表示的 正数的原码,反码,补码都同样; 要获得负数的补码,必须先求负数的反码,负数的反码;负数的反码按位1改为0,0改为1;负数的补码等于反码+1
2.ThreadPoolExecutor简单示例
public class ThreadPoolExecutorTest { public static void main(String[] args) { BlockingQueue b = new ArrayBlockingQueue(100); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 500000, TimeUnit.SECONDS, b, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { } }); threadPoolExecutor.execute(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(111111); }); threadPoolExecutor.execute(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(2222222); }); } }
3.ThreadPoolExecutor属性分析
public class ThreadPoolExecutor extends AbstractExecutorService { //用于保存线程运行状态和当前线程池线程运行的数量 //高3位用于表明线程的运行状态,低29位表明线程池的线程最大数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //32-3=29 private static final int COUNT_BITS = Integer.SIZE - 3; //00011111111111111111111111111111,高3位为0,低29位为1,表明线程池的线程最大数量 //参与运算用于获得线程的运行状态和线程池线程的数量 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //线程池运行的状态,后面单独分析 //线程池处于运行状态,11100000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //线程池处于shutdown状态,00000000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //线程池处于结束状态,00100000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //线程池运行任务为空的时候的状态,01000000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //线程处于完全终止的状态,01100000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; //获取线程池运行状态,c表明ctl值 private static int runStateOf(int c) { return c & ~CAPACITY; } //获取线程池工做线程数量,c表明ctl值 private static int workerCountOf(int c) { return c & CAPACITY; } //获取ctl值,rs表明线程的运行状态,wc表明线程池工做线程数量,造成一个32位二进制数 //高三位表明线程池运行状态,低29位表明线程池工做线程数量 private static int ctlOf(int rs, int wc) { return rs | wc; } //传入ctl值和线程某个运行状态,比较ctl值是否小于传入的线程的某个运行状态 private static boolean runStateLessThan(int c, int s) { return c < s; } //传入ctl值和线程运行状态,比较ctl值是否大于传入的线程的某个运行状态 private static boolean runStateAtLeast(int c, int s) { return c >= s; } //判断线程运行状态是不是运行状态,由于RUNNING=-1是最小的状态值 private static boolean isRunning(int c) { return c < SHUTDOWN; } //经过CAS操做将工做线程数+1 private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } //经过CAS操做将工做线程数-1 private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } //do-while循环能够强制让工做线程数-1 private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); } //线程池的工做队列,在构造方法中初始化 private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); //保存worker的池子 private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition(); //记录线程池生命周期中,线程池运行的线程的最大数量 private int largestPoolSize; //线程池完成任务数量 private long completedTaskCount; //建立线程工厂 private volatile ThreadFactory threadFactory; //线程中断策列 private volatile RejectedExecutionHandler handler; //在指定时间单位下,线程存活时间 private volatile long keepAliveTime; //核心线程数 private volatile int corePoolSize; //最大线程数 private volatile int maximumPoolSize; //线程池满了后的中断策列 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); private final AccessControlContext acc; //当从工做队列中取不到任务时的时候,是否须要回收核心线程 private volatile boolean allowCoreThreadTimeOut; }
4.ThreadPoolExecutor构造方法分析
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { //会检查参数,不符合条件抛出异常 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); //工做队列,线程工厂,拒绝策列不能为空 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
5.线程池建立线程顺序分析
当咱们执行execute方法提交一个线程的时候,首先会判断当前线程池线程数是否超过核心线程数corePoolSize,如果没有超过,则建立新线程,如果超过,则尝试将Runnable提交到工做队列workQueue中。若是工做队列workQueue没有超过容量,则Runnable提交到工做队列中,若是超过了workQueue的容量,则尝试建立线程。若是此时建立的线程小于最大线程数maximumPoolSize,则建立线程,若是超过了maximumPoolSize,则执行拒绝策列。面试
6.ThreadPoolExecutor.execute方法分析
public void execute(Runnable command) { //若是runnable为空,抛出异常 if (command == null) throw new NullPointerException(); //获取ctl值,该值高3位表明线程池运行状态,低29位表明线程池当前运行线程数量 int c = ctl.get(); //CASE1:获取线程池运行线程数量,若是小于核心线程数,则建立线程,addWorker传入参数为core //也就是说,线程池不是一上来就把核心线程建立了,必须在提交runnable任务到线程池的时候才一个一个建立 if (workerCountOf(c) < corePoolSize) { //addWorker是建立线程的核心方法,关键点在Worker类的构造方法和runWorker方法的while循环 if (addWorker(command, true)) return; c = ctl.get(); } //CASE2:条件1不成立说明核心线程数已满,将任务添加到阻塞队列中。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //CASE3:条件1和2都不成立,说明核心线程已建立彻底而且任务队列已满 //调用addWorker建立非核心线程,若是返回false,说明线程数达到最大线程数,执行拒绝策列 else if (!addWorker(command, false)) reject(command); }
6.1.Worker类分析
每一个线程的建立都离不开Worker类,该类彻底包装了线程运行的所须要的属性,并提供了初始化线程和从阻塞队列中获取被阻塞的线程并执行的一系列方法。spring
观察该类代码发现,该类继承了AbstractQueuedSynchronizer并实现了Runnable接口,在建立线程的时候,实际Thread类的构造方法包装的就是Worker类本身(咱们知道通常Runnable须要被传入到Thread里面的,如:Thread t = new Thread(runnable), t.start()启动线程)。而从execute方法传过来的Runnable实现只是被封装到了firstTask中,建立出来的Thread在执行的时候,调用的start方法也只是启动了该类的runWorker方法,而真正封装咱们执行逻辑的firstTask这个Runnable类在后续调用中也只是执行本身的run方法而已,并再也不被Thread封装。安全
worker为何要继承AbstractQueuedSynchronizer呢?ide
由于在runWork的方法内,在调用firstTask处理业务逻辑前,会给代码加上独占锁,加这个独占锁的目的是什么呢?由于咱们在调用shutDown方法的时候,并不会终止处于运行中的线程。shutDown方法会根据独占锁来判断当前worker是否正在工做。学习
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; //须要建立的线程,该线程封装的Runnable是本身 final Thread thread; //execute传入的Runnable,该runnable并不被Thread包装,后续调用本身的run方法。 Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { //设置线程池处于运行状态 setState(-1); //封装execute传入进来的包含实际逻辑的Runnable this.firstTask = firstTask; //建立一个线程,这里注意,线程封装的Runnable是本身 //示例使用Executors.defaultThreadFactory() this.thread = getThreadFactory().newThread(this); } //被thread属性封装后调用start方法后,会自动启动该run方法,执行后续逻辑 //后续逻辑会调用firstTask.run()方法启动实际业务逻辑 public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
6.2.addWorker方法分析
addWorker是建立线程的核心方法,一个worker表明一个线程,而workers这个全局变量能够表明线程池,只有向workers里面添加worker成功的时候,才能表明建立线程成功了。addWorker在执行过程当中,会根据线程池状态和线程池数量判断是否能建立线程,建立线程成功会将记录线程池状态和数量的ctl值+1,并将worker加入到workers里面,更新线程池生命周期内线程池线程的最大数量,而后启动线程执行任务。ui
addWorker的core参数表明是不是在建立核心线程,core为true表明建立核心线程,false表明阻塞队列已满,建立非核心线程。this
返回值: true表明建立线程并启动成功,false表明建立线程失败。spa
何时返回false呢? CASE1.线程池状态rs>SHUTDOWN; CASE2.线程池状态为SHUTDOWN的时候,阻塞队列没有任务了,为空; CASE3.线程池状态为SHUTDOWN的时候,execute提交的Runnable(被封装到firstTask里面)不为空; CASE4.若是是建立核心线程,此时已经超过核心线程数;若是是建立非核心线程,此时已经超过最大线程数; CASE5.ThreadFactory建立线程为空,这里通常是咱们自定义线程工厂的时候出的问题;
private boolean addWorker(Runnable firstTask, boolean core) { //retry代码除了检查是否能建立线程外,还负责将ctl值+1,若是不能建立线程,则返回false; retry: for (;;) { //获取当前ctl值 int c = ctl.get(); //获取当前线程池运行状态 int rs = runStateOf(c); //CASE1.线程池状态rs>SHUTDOWN;返回false; //CASE2.线程池状态为SHUTDOWN的时候,阻塞队列没有任务了,为空; //CASE3.线程池状态为SHUTDOWN的时候,execute提交的Runnable(被封装到firstTask里面)不为空; if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //循环判断是否能将ctl+1设置成功 for (;;) { //获取当前运行中的线程数量 int wc = workerCountOf(c); //条件1:wc >= CAPACITY基本不可能,CAPACITY为理论上的最大线程数,一个5亿级的数字 //CASE4.根据core参数,若是是建立核心线程,此时已经超过核心线程数,则返回false //若是是建立非核心线程,此时已经超过最大线程数,则返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //条件成立,说明给ctl修改+1成功了,表明给线程数+1设置成功了,能够退出循环建立线程了 //若是在执行这段代码的时候,线程池状态正巧被改了,这里也会失败,由于ctl的高3位表明的是线程状态 if (compareAndIncrementWorkerCount(c)) break retry; //若是上面设置线程数+1失败,则实时获取线程状态并和当前的比较 c = ctl.get(); //状态被改变了跳到retry再次判断是否容许建立线程 if (runStateOf(c) != rs) continue retry; } } //代码走到这里表明已经容许建立线程了 //表示建立的worker是否已经启动,启动也表明线程建立成功了 boolean workerStarted = false; //添加worker到worker队列是否成功的状态 boolean workerAdded = false; //局部变量 Worker w = null; try { //构建work对象 w = new Worker(firstTask); //获取worker的构造方法建立的线程 final Thread t = w.thread; //这里加了这段判断是为了防止本身实现的TreadFactory有bug致使建立线程失败 if (t != null) { //向works这个hashset里面添加works的时候,须要全局加锁,如下代码线程并不安全 //该段代码个人理解就是为了维护largestPoolSize这个值,记录线程池生命周期中, //线程池运行的线程的最大数量 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //获取当前线程池状态 int rs = runStateOf(ctl.get()); //检查线程池状态必须是RUNNING或者处于SHUTDOWN的时候,并无提交具体的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //防止人为定义线程工厂建立线程并启动了start方法的状况 if (t.isAlive()) throw new IllegalThreadStateException(); //向线程池添加worker workers.add(w); //获取线程池线程数量 int s = workers.size(); //若是线程池线程数量>记录的值,更新线程池生命周期内最大线程记录数 if (s > largestPoolSize) largestPoolSize = s; //表示向线程池中添加线程成功了 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动线程,该方法实际会启动worker类的run方法,而后执行runWorker方法 t.start(); //设置线程启动状态为成功 workerStarted = true; } } } finally { if (! workerStarted) //后面再分析 addWorkerFailed(w); } return workerStarted; }
6.3.runWorker方法分析
addWorker方法建立线程成功,Worker类的Thread会调用start方法启动本身的run方法,由于Worker类实现了Runnable接口,run方法里面调用了runWorker方法。实际咱们execute方法传入的Runnable被封装到了Worker类的firstTask属性里面。而后在runWorker里面调用run方法启动具体的逻辑,注意这里并没用再用Thrad封装Runnable了。线程启动后,会一直运行While循环,循环第一次运行本身传入的Runnable,第二次及以后则经过getTask方法从任务队列种获取具体的Runnable任务了。一旦While循环内发生异常或者getTask返回空,则会调用processWorkerExit执行线程销毁逻辑。getTask方法获取不到具体任务的线程均可被认为是空闲线程。线程
final void runWorker(Worker w) { //wt=w.thread Thread wt = Thread.currentThread(); //execute实际传入的包含业务逻辑的Runnable,该Runnable再也不被Thread包装,调用本身的run方法 Runnable task = w.firstTask; //引用设置为null,帮助gc w.firstTask = null; //先调用unlock方法设置当前独占线程为空,线程运行状态为0 w.unlock(); //线程退出状态,true表明线程由于异常退出了 boolean completedAbruptly = true; try { //线程被建立并启动后就一直执行while循环,直到发生异常或者退出 //条件1:task != null,线程初创task不为空 //条件2:条件1不成立说明线程非初创而且核心线程数已满,说明已经建立好线程,从队列中取task任务 while (task != null || (task = getTask()) != null) { //设置独占锁,由于shutDown方法调用的时候不会马上终止运行中的线程, //会根据是否持有独占锁来判断当前worker是否处于运行状态 w.lock(); //线程池处于STOP/TIDYING/TERMINATION且当前线程没有设置中断状态 //给当前线程设一个中断状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //钩子方法,留给子类实现,在执行实际业务代码以前 beforeExecute(wt, task); Throwable thrown = null; try { //调用实际业务方法的逻辑 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //钩子方法,留给子类实现,在执行实际业务代码以后 afterExecute(task, thrown); } } finally { //将task设置为空 task = null; //每一个worker完成任务数量+1 w.completedTasks++; //释放掉锁,正常状况下会回到while循环继续执行getTask,发生异常会执行下面的finally //getTask为空也会退出while循环 w.unlock(); } } //若是getTask()返回空的时候,执行退出逻辑 completedAbruptly = false; } finally { //线程退出逻辑 //completedAbruptly=true由于异常退出 //completedAbruptly=false正常退出 processWorkerExit(w, completedAbruptly); } }
6.4.getTask方法分析
当getTask返回空的时候,线程能够执行销毁逻辑了。netty
getTask何时返回空?
1.线程池处于SHUTDOWN状态,工做队列为空的时候; 2.线程池处于STOP状态以上的时候,将线程池线程数-1并返回空; 3.当工做队列为空的时候;
注意,线程池的allowCoreThreadTimeOut属性会影响getTask方法,致使getTask方法一直阻塞在workQueue.take()这里的,这样就不会销毁线程。
1.allowCoreThreadTimeOut=true,使用非阻塞方法从队列获取任务 2.allowCoreThreadTimeOut=false,线程池线程数还未达到核心线程数上限,使用阻塞方法获取任务,这样就可使得核心线程不会被销毁,getTask方法一直阻塞等待获取队列种的任务。 3.allowCoreThreadTimeOut=false,线程池线程数达到核心线程数,使用非阻塞方法获取任务
private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); //成立1:线程池处于SHUTDOWN状态且工做队列为空的时候 //成立2:线程池处于STOP状态以上的时候,将线程池线程数-1并返回空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //获取线程池线程数 int wc = workerCountOf(c); //1.allowCoreThreadTimeOut=true,使用非阻塞方法从队列获取任务 //2.allowCoreThreadTimeOut=false,线程池线程数还未达到核心线程数,使用阻塞方法获取任务, //3.allowCoreThreadTimeOut=false,线程池线程数达到核心线程数,使用非阻塞方法获取任务 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //条件1-1:wc > maximumPoolSize,可能最大线程数设置的比核心线程数小,此时没有空闲线程能够接手 //条件1-2:timed && timedOut的timedOut在poll(keepAliveTime, TimeUnit.NANOSECONDS) //方法获取超时的时候,循环第二次执行的时候才可能致使条件为true //条件2:线程池还有其余线程,工做队列为空返回true //以上1和2条件成立了,任务返回为空。 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //尝试将线程数-1,并不必定会成功,可能被其余线程改过,失败则继续循环尝试-1 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //当timed=true的时候,使用poll获取超时候致使r=null的时候,timedOut=true, //再次执行循环 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();//阻塞方式从队列获取,防止线程线程执行销毁逻辑 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
6.5.processWorkerExit方法分析
该方法用于执行线程销毁的逻辑。
1.加锁,从workers队列中移除一个worker,将线程池完成任务总数量+1, 释放锁; 2.判断是否须要关闭线程池; 3.若是线程正常完成并退出,根据allowCoreThreadTimeOut判断是否须要回收核心线程, 若allowCoreThreadTimeOut=true,只须要保证线程池最少有一个线程便可,也就是说超过1的空闲线程必定会被销毁。 若allowCoreThreadTimeOut=false,在线程数未达到核心线程数上限的状况下,因为getTask方法的阻塞,不会执行线程销毁的逻辑;当线程数达到核心线程数上限的状况,且队列也达到上限数,这以后建立的任何线程在getTask方法获取不到具体任务的状况下都会销毁
private void processWorkerExit(Worker w, boolean completedAbruptly) { //由于异常退出,线程运行数-1 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //取出每一个worker执行的任务数量,并汇总到全局的任务执行数量中 completedTaskCount += w.completedTasks; //将worker从池中移除 workers.remove(w); } finally { mainLock.unlock(); } //尝试关闭线程池并处理空闲线程 tryTerminate(); //获取线程ctl值 int c = ctl.get(); //若是线程池当前状态小于STOP状态,说明线程池处于RUNNING,SHUTDOWN状态 if (runStateLessThan(c, STOP)) { //若是线程池是正常退出返回false走下面的流程 if (!completedAbruptly) { //allowCoreThreadTimeOut表示是否容许核心线程销毁 //min表示线程池容许的最小线程数,最少为1 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //条件1:min==0说明容许核心线程销毁 //条件2:工做队列不为空 if (min == 0 && ! workQueue.isEmpty()) //设置线程池最小线程数 min = 1; //若是当前线程池线程数大于min的值,返回,这里无论min是核心线程数仍是1 //也就是说,超过核心线程的线程数在getTask方法从队列取不到的时候必定会回收 //而核心线程是否回收会根据allowCoreThreadTimeOut属性来判断 if (workerCountOf(c) >= min) return; } //上面从workers池中删除了一个worker,这里添加进去一个空任务的worker //核心线程数=0的状况会执行到这里,会维持核心线程数最少为1 addWorker(null, false); } }
6.6.tryTerminate方法分析
尝试关闭线程池方法并处理空闲线程,interruptIdleWorkers方法处理空闲线程,设置中断状态。每一个线程退出都会单独调用该方法。
final void tryTerminate() { //自旋 for (;;) { //获取线程ctl值 int c = ctl.get(); //条件1:线程池处于RUNNING状态说明线程池当前正常,直接返回 //条件2:runStateAtLeast(c, TIDYING)说明已经有线程使得线程池由TIDYING -> TERMINATED状态 //转换了,当前线程直接返回 //条件3:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()) //说明线程池虽然处于SHUTDOWN状态,但工做队列不为空,得等队列处理完再尝试关闭线程池的逻辑。 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //能走到这里两种状况 //1.线程池状态>=STOP了 //2.线程池状态于SHUTDOWN状态,但队列为空了 if (workerCountOf(c) != 0) { //回收空闲的线程,由于执行runworer方法的时候worker会加锁,因此没加锁的都是空闲的 interruptIdleWorkers(ONLY_ONE); return; } //workerCountOf(c) == 0 时,会来到这里,说明线程都已经销毁了 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //设置线程池状态为TIDYING状态。 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //钩子方法,等用户实现 terminated(); } finally { //钩子方法设置线程池状态为TERMINATED状态 ctl.set(ctlOf(TERMINATED, 0)); //唤醒调用 awaitTermination() 方法的线程。 termination.signalAll(); } return; } } finally { mainLock.unlock(); } } }
6.7.interruptIdleWorkers方法分析
处理一个空闲线程方法。全部处于执行中的线程都会加锁(w.lock())。上面咱们提过,核心线程被take方法阻塞的时候,咱们这里设置线程t.interrupt(), 会解除take的阻塞。
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; //线程没有中断且尝试加锁成功,由于全部处于执行中的线程都会加锁(w.lock()) //未加锁的说明处于空闲中了。 if (!t.isInterrupted() && w.tryLock()) { try { //设置线程中断 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
6.8.awaitTermination方法分析
该方法是判断线程池状态状态是不是TERMINATED
,若是是则直接返回true
,不然会await
挂起当前线程指定的时间
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
6.9.shutDown和shutDownNow方法分析
shutDown方法会优雅的关闭线程池,设置线程池状态为SHUTDOWN,已经处于队列中的任务会继续等待执行完。
shutDownNow方法会当即关闭线程池,设置线程池状态为STOP。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; //获取线程池全局锁 mainLock.lock(); try { checkShutdownAccess(); //设置线程池状态为SHUTDOWN advanceRunState(SHUTDOWN); //中断空闲线程 interruptIdleWorkers(); //空方法,子类能够扩展 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { //释放线程池全局锁 mainLock.unlock(); } tryTerminate(); }
public List<Runnable> shutdownNow() { //返回值引用 List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; //获取线程池全局锁 mainLock.lock(); try { checkShutdownAccess(); //设置线程池状态为STOP advanceRunState(STOP); //中断线程池中全部线程 interruptWorkers(); //导出未处理的task tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); //返回当前任务队列中 未处理的任务。 return tasks; }
7.ThreadPoolExecutor拒绝策列
默认有如下4中拒绝策列,用户也能够实现RejectedExecutionHandler接口自定义。
CallerRunsPolicy将任务交给调用者执行 AbortPolicy抛出异常 DiscardPolicy什么都不作,直接丢弃 DiscardOldestPolicy丢弃老的,执行新的
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } //交给主线程执行 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } //中断拒绝 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } //直接抛弃什么都不作 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } //丢弃老的 ,执行新的 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
8.扩展:改变线程池的初始化过程
若是咱们想让线程按核心线程,最大线程,最后再进队列的方式初始化,应该怎么作?
public void execute(Runnable command) { //若是runnable为空,抛出异常 if (command == null) throw new NullPointerException(); //获取ctl值,该值高3位表明线程池运行状态,低29位表明线程池当前运行线程数量 int c = ctl.get(); //CASE1:获取线程池运行线程数量,若是小于核心线程数,则建立线程,addWorker传入参数为core //也就是说,线程池不是一上来就把核心线程建立了,必须在提交runnable任务到线程池的时候才一个一个建立 if (workerCountOf(c) < corePoolSize) { //addWorker是建立线程的核心方法,关键点在Worker类的构造方法和runWorker方法的while循环 if (addWorker(command, true)) return; c = ctl.get(); } //CASE2:条件1不成立说明核心线程数已满,将任务添加到阻塞队列中。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //CASE3:条件1和2都不成立,说明核心线程已建立彻底而且任务队列已满 //调用addWorker建立非核心线程,若是返回false,说明线程数达到最大线程数,执行拒绝策列 else if (!addWorker(command, false)) reject(command); }
咱们在说execute方法初始化线程池过程当中。CASE2:workQueue.offer(command)会将任务加入到队列。因此,咱们这里只须要自定义BlockingQueue,改造offer方法,在里面判断,当线程池线程数还未达到最大线程数的时候返回false便可。
Dubbo
的EagerThreadPool
自定义了一个BlockingQueue
,在offer()
方法中,若是当前线程池数量小于最大线程池时,直接返回false
,这里就达到了调节线程池执行顺序的目的。
9.推荐
分享一个朋友的公众号,有不少干货,包含netty,spring,线程,spring cloud等详细讲解,也有详细的学习规划图,面试题整理等,我感受在讲课这块比我讲的清楚: