一方面当执行大量异步任务时候线程池可以提供较好的性能,在不使用线程池的时候,每当须要执行异步任务时候是直接 new 一线程进行运行,而线程的建立和销毁是须要开销的。使用线程池时候,线程池里面的线程是可复用的,不会每次执行异步任务时候都从新建立和销毁线程。java
另外一方面线程池提供了一种资源限制和管理的手段,好比能够限制线程的个数,动态新增线程等,每一个 ThreadPoolExecutor
也保留了一些基本的统计数据,好比当前线程池完成的任务数目等。数组
如上类图,Executors 实际上是个工具类,里面提供了好多静态方法,根据用户选择返回不一样的线程池实例。安全
ThreadPoolExecutor
继承了 AbstractExecutorService
,成员变量 ctl 是个 Integer 的原子变量用来记录线程池状态 和 线程池中线程个数,相似于 ReentrantReadWriteLock
使用一个变量存放两种信息。多线程
这里假设 Integer 类型是 32 位二进制标示,则其中高 3 位用来表示线程池状态,后面 29 位用来记录线程池线程个数。并发
//用来标记线程池状态(高3位),线程个数(低29位) //默认是RUNNING状态,线程个数为0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //线程个数掩码位数,并非全部平台int类型是32位,因此准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数, private static final int COUNT_BITS = Integer.SIZE - 3; //线程最大个数(低29位)00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//(高3位):11100000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //(高3位):00000000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //(高3位):00100000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //(高3位):01000000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //(高3位):01100000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // 获取高三位 运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //获取低29位 线程个数 private static int workerCountOf(int c) { return c & CAPACITY; } //计算ctl新值,线程状态 与 线程个数 private static int ctlOf(int rs, int wc) { return rs | wc; }
RUNNING:接受新任务而且处理阻塞队列里的任务;异步
SHUTDOWN:拒绝新任务可是处理阻塞队列里的任务;ide
STOP:拒绝新任务而且抛弃阻塞队列里的任务,同时会中断正在处理的任务;函数
TIDYING:全部任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为 0,将要调用 terminated
方法;工具
TERMINATED:终止状态,terminated方法调用完成之后的状态。源码分析
1.RUNNING -> SHUTDOWN:显式调用 shutdown()
方法,或者隐式调用了 finalize()
,它里面调用了 shutdown()
方法。
2.RUNNING or SHUTDOWN -> STOP:显式调用 shutdownNow()
方法时候。
3.SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候。
4.STOP -> TIDYING:当线程池为空的时候。
5.TIDYING -> TERMINATED:当 terminated() hook
方法执行完成时候。
corePoolSize:线程池核心线程个数;
workQueue:用于保存等待执行的任务的阻塞队列;好比基于数组的有界 ArrayBlockingQueue
,基于链表的无界 LinkedBlockingQueue
,最多只有一个元素的同步队列 SynchronousQueue
,优先级队列 PriorityBlockingQueue
等。
maximunPoolSize:线程池最大线程数量。
ThreadFactory:建立线程的工厂。
RejectedExecutionHandler:饱和策略,当队列满了而且线程个数达到 maximunPoolSize
后采起的策略,好比 AbortPolicy
(抛出异常),CallerRunsPolicy
(使用调用者所在线程来运行任务),DiscardOldestPolicy
(调用 poll 丢弃一个任务,执行当前任务),DiscardPolicy
(默默丢弃,不抛出异常)。
keeyAliveTime:存活时间。若是当前线程池中的线程数量比核心线程数量要多,而且是闲置状态的话,这些闲置的线程能存活的最大时间。
TimeUnit,存活时间的时间单位。
1.newFixedThreadPool:建立一个核心线程个数和最大线程个数都为 nThreads 的线程池,而且阻塞队列长度为 Integer.MAX_VALUE
,keeyAliveTime=0
说明只要线程个数比核心线程个数多而且当前空闲则回收。代码以下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //使用自定义线程建立工厂 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
2.newSingleThreadExecutor:建立一个核心线程个数和最大线程个数都为1的线程池,而且阻塞队列长度为 Integer.MAX_VALUE
,keeyAliveTime=0
说明只要线程个数比核心线程个数多而且当前空闲则回收。代码以下:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } //使用本身的线程工厂 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
3.newCachedThreadPool:建立一个按需建立线程的线程池,初始线程个数为 0,最多线程个数为 Integer.MAX_VALUE
,而且阻塞队列为同步队列,keeyAliveTime=60
说明只要当前线程 60s 内空闲则回收。这个特殊在于加入到同步队列的任务会被立刻被执行,同步队列里面最多只有一个任务。代码以下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //使用自定义的线程工厂 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
如类图,其中 mainLock 是独占锁,用来控制新增 Worker 线程时候的原子性,termination 是该锁对应的条件队列,在线程调用 awaitTermination
时候用来存放阻塞的线程。
Worker 继承 AQS 和 Runnable 接口,是具体承载任务的对象,Worker 继承了 AQS,本身实现了简单不可重入独占锁,其中 status=0
标示锁未被获取状态,state=1
标示锁已经被获取的状态,state=-1
是建立 Worker 时候默认的状态,建立时候状态设置为 -1 是为了不在该线程在运行 runWorker()
方法前被中断,下面会具体讲解到。其中变量 firstTask 记录该工做线程执行的第一个任务,thread 是具体执行任务的线程。
DefaultThreadFactory
是线程工厂,newThread
方法是对线程的一个修饰,其中 poolNumber
是个静态的原子变量,用来统计线程工厂的个数,threadNumber
用来记录每一个线程工厂建立了多少线程,这两个值也做为线程池和线程的名称的一部分。
1 public void execute(Runnable command):execute
方法是提交任务 command 到线程池进行执行,用户线程提交任务到线程池的模型图以下所示:
如上图可知 ThreadPoolExecutor
的实现实际是一个生产消费模型,其中当用户添加任务到线程池时候至关于生产者生产元素,workers 线程工做集中的线程直接执行任务或者从任务队列里面获取任务至关于消费者消费元素。用户线程提交任务的 execute
方法具体代码以下:
public void execute(Runnable command) { //(1) 若是任务为null,则抛出NPE异常 if (command == null) throw new NullPointerException(); //(2)获取当前线程池的状态+线程个数变量的组合值 int c = ctl.get(); //(3)当前线程池线程个数是否小于corePoolSize,小于则开启新线程运行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //(4)若是线程池处于RUNNING状态,则添加任务到阻塞队列 if (isRunning(c) && workQueue.offer(command)) { //(4.1)二次检查 int recheck = ctl.get(); //(4.2)若是当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); //(4.3)否者若是当前线程池线程空,则添加一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //(5)若是队列满了,则新增线程,新增失败则执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
1.代码(3)判断若是当前线程池线程个数小于 corePoolSize,如上图会在 workers 里面新增一个核心线程(core 线程)执行该任务。
2.若是当前线程池线程个数大于等于 corePoolSize 执行代码(4),若是当前线程池处于 RUNNING 状态则添加当前任务到任务队列,这里须要判断线程池状态是由于有可能线程池已经处于非 RUNNING 状态,而非 RUNNING 状态下是抛弃新任务的。
3.若是任务添加任务队列成功,则代码(4.2)对线程池状态进行二次校验,这是由于添加任务到任务队列后,执行代码(4.2)前有可能线程池的状态已经变化了,这里进行二次校验,若是当前线程池状态不是 RUNNING 了则把任务从任务队列移除,移除后执行拒绝策略;若是二次校验经过,则执行代码(4.3)从新判断当前线程池里面是否还有线程,若是没有则新增一个线程。
4.若是代码(4)添加任务失败,则说明任务队列满了,则执行代码(5)尝试新开启线程(如上图 thread 3 和 thread 4)来执行该任务,若是当前线程池线程个数 > maximumPoolSize
则执行拒绝策略。
接下来看新增线程的 addWorkder
方法的源码,以下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //(6) 检查队列是否只在必要时为空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //(7)循环cas增长线程个数 for (;;) { int wc = workerCountOf(c); //(7.1)若是线程个数超限则返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //(7.2)cas增长线程个数,同时只有一个线程成功 if (compareAndIncrementWorkerCount(c)) break retry; //(7.3)cas失败了,则看线程池状态是否变化了,变化则跳到外层循环重试从新获取线程池状态,否者内层循环从新cas。 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } //(8)到这里说明cas成功了 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //(8.1)建立worker final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //(8.2)加独占锁,为了workers同步,由于可能多个线程调用了线程池的execute方法。 mainLock.lock(); try { //(8.3)从新检查线程池状态,为了不在获取锁前调用了shutdown接口 int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //(8.4)添加任务 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //(8.5)添加成功则启动任务 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
如上代码主要分两部分,第一部分的双重循环目的是经过 cas 操做增长线程池线程数,第二部分主要是并发安全的把任务添加到 workers 里面,而且启动任务执行。
先看第一部分的代码(6),以下所示:
rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
这样看很差理解,咱们展开!运算符后,至关于:
s >= SHUTDOWN && (rs != SHUTDOWN ||//(1) firstTask != null ||//(2) workQueue.isEmpty())//(3)
如上代码,也就是说代码(6)在下面几种状况下会返回 false:
1.当前线程池状态为 STOP,TIDYING,TERMINATED;
2.当前线程池状态为 SHUTDOWN 而且已经有了第一个任务;
3.当前线程池状态为 SHUTDOWN 而且任务队列为空。
回到上面看新增线程的 addWorkder
方法,发现内层循环做用是使用 cas 增长线程,代码(7.1)若是线程个数超限则返回 false,否者执行代码(7.2)执行 CAS 操做设置线程个数,cas 成功则退出双循环,CAS 失败则执行代码(7.3)看当前线程池的状态是否变化了,若是变了,则从新进入外层循环从新获取线程池状态,否者进入内层循环继续进行 cas 尝试。
执行到第二部分的代码(8)说明使用 CAS 成功的增长了线程个数,可是如今任务还没开始执行,这里使用全局的独占锁来控制把新增的 Worker 添加到工做集 workers。代码(8.1)建立了一个工做线程 Worker。
代码(8.2)获取了独占锁,代码(8.3)从新检查线程池状态,这是为了不在获取锁前其余线程调用了 shutdown 关闭了线程池,若是线程池已经被关闭,则释放锁,新增线程失败,否者执行代码(8.4)添加工做线程到线程工做集,而后释放锁,代码(8.5)若是判断若是工做线程新增成功,则启动工做线程。
3.2 工做线程 Worker 的执行
当用户线程提交任务到线程池后,具体是使用 worker 来执行的,先看下 Worker 的构造函数:
Worker(Runnable firstTask) { setState(-1); // 在调用runWorker前禁止中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);//建立一个线程 }
如上代码构造函数内首先设置 Worker 的状态为 -1,是为了不当前 worker 在调用 runWorker
方法前被中断(当其它线程调用了线程池的 shutdownNow 时候,若是 worker 状态 >= 0 则会中断该线程)。这里设置了线程的状态为 -1,因此该线程就不会被中断了。以下代码运行 runWorker 的代码(9)时候会调用 unlock 方法,该方法把 status 变为了 0,因此这时候调用 shutdownNow 会中断 worker 线程了。
接着咱们再看看runWorker方法,代码以下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); //(9)status设置为0,容许中断 boolean completedAbruptly = true; try { //(10) while (task != null || (task = getTask()) != null) { //(10.1) w.lock(); ... try { //(10.2)任务执行前干一些事情 beforeExecute(wt, task); Throwable thrown = null; try { task.run();//(10.3)执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //(10.4)任务执行完毕后干一些事情 afterExecute(task, thrown); } } finally { task = null; //(10.5)统计当前worker完成了多少个任务 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //(11)执行清工做 processWorkerExit(w, completedAbruptly); } }
如上代码(10)若是当前 task==null 或者调用 getTask 从任务队列获取的任务返回 null,则跳转到代码(11)执行。若是 task 不为 null 则执行代码(10.1)获取工做线程内部持有的独占锁,而后执行扩展接口代码(10.2)在具体任务执行前作一些事情,代码(10.3)具体执行任务,代码(10.4)在任务执行完毕后作一些事情,代码(10.5)统计当前 worker 完成了多少个任务,并释放锁。
这里在执行具体任务期间加锁,是为了不任务运行期间,其余线程调用了 shutdown 或者 shutdownNow 命令关闭了线程池。
其中代码(11)执行清理任务,其代码以下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { ...代码太长,这里就不展现了 //(11.1)统计整个线程池完成的任务个数,并从工做集里面删除当前woker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //(11.2)尝试设置线程池状态为TERMINATED,若是当前是shutdonw状态而且工做队列为空 //或者当前是stop状态当前线程池里面没有活动线程 tryTerminate(); //(11.3)若是当前线程个数小于核心个数,则增长 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
如上代码(11.1)统计线程池完成任务个数,可知在统计前加了全局锁,把当前工做线程中完成的任务累加到全局计数器,而后从工做集中删除当前 worker。
代码(11.2)判断若是当前线程池状态是 shutdonw 状态而且工做队列为空或者当前是 stop 状态当前线程池里面没有活动线程则设置线程池状态为 TERMINATED,若是设置为了 TERMINATED 状态还须要调用条件变量 termination 的 signalAll()
方法激活全部由于调用线程池的 awaitTermination
方法而被阻塞的线程
代码(11.3)则判断当前线程里面线程个数是否小于核心线程个数,若是是则新增一个线程。
3.3 shutdown 操做:调用 shutdown 后,线程池就不会在接受新的任务了,可是工做队列里面的任务仍是要执行的,该方法马上返回的,并不等待队列任务完成在返回。代码以下:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //(12)权限检查 checkShutdownAccess(); //(13)设置当前线程池状态为SHUTDOWN,若是已是SHUTDOWN则直接返回 advanceRunState(SHUTDOWN); //(14)设置中断标志 interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } //(15)尝试状态变为TERMINATED tryTerminate(); }
如上代码(12)检查若是设置了安全管理器,则看当前调用 shutdown 命令的线程是否有关闭线程的权限,若是有权限则还要看调用线程是否有中断工做线程的权限,若是没有权限则抛出 SecurityException
或者 NullPointerException
异常。
其中代码(13)内容以下,若是当前状态 >= SHUTDOWN 则直接返回,否者设置当前状态为 SHUTDOWN:
private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
代码(14)内容以下,设置全部空闲线程的中断标志,这里首先加了全局锁,同时只有一个线程能够调用 shutdown 设置中断标志,而后尝试获取 worker 本身的锁,获取成功则设置中断标识,因为正在执行的任务已经获取了锁,因此正在执行的任务没有被中断。这里中断的是阻塞到 getTask()
方法,企图从队列里面获取任务的线程,也就是空闲线程。
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; //若是工做线程没有被中断,而且没有正在运行则设置设置中断 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
代码(15)判断若是当前线程池状态是 shutdonw 状态而且工做队列为空或者当前是 stop 状态当前线程池里面没有活动线程则设置线程池状态为 TERMINATED,若是设置为了 TERMINATED 状态还须要调用条件变量 termination 的 signalAll()
方法激活全部由于调用线程池的 awaitTermination 方法而被阻塞的线程
3.4 shutdownNow 操做
调用 shutdownNow 后,线程池就不会在接受新的任务了,而且丢弃工做队列里面里面的任务,正在执行的任务会被中断,该方法是马上返回的,并不等待激活的任务执行完成在返回。返回值为这时候队列里面被丢弃的任务列表。代码以下:
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess();//(16)权限检查 advanceRunState(STOP);//(17) 设置线程池状态为stop interruptWorkers();//(18)中断全部线程 tasks = drainQueue();//(19)移动队列任务到tasks } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
如上代码首先调用代码(16)检查权限,而后调用代码(17)设置当前线程池状态为 stop,而后执行代码(18)中断全部的工做线程,这里须要注意的是中断全部的线程,包含空闲线程和正在执行任务的线程,代码以下:
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
而后代码(19)移动当前任务队列里面任务到 tasks 列表。
3.4 awaitTermination 操做
当线程调用 awaitTermination 方法后,当前线程会被阻塞,知道线程池状态变为了 TERMINATED 才返回,或者等待时间超时才返回,整个过程独占锁,代码以下:
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
如上代码首先获取了独占锁,而后无限循环内部首先判断当前线程池状态是否至少是 TERMINATED 状态,若是是则直接返回。否者说明当前线程池里面还有线程在执行,则看设置的超时时间 nanos 是否小于 0,小于 0 则说明不须要等待,则直接返回;若是大于0则调用条件变量 termination 的 awaitNanos 方法等待 nanos 时间,指望在这段时间内线程池状态内变为 TERMINATED 状态。
在讲解 shutdown 方法时候提到当线程池状态变为 TERMINATED 后,会调用 termination.signalAll()
用来激活调用条件变量 termination 的 await 系列方法被阻塞的全部线程,因此若是在调用了 awaitTermination
以后调用了 shutdown
方法,而且 shutdown 内部设置线程池状态为 TERMINATED
了,则 termination.awaitNanos
方法会返回。
另外在工做线程 Worker 的 runWorker 方法内当工做线程运行结束后,会调用 processWorkerExit 方法,processWorkerExit 方法内部也会调用 tryTerminate 方法测试当前是否应该把线程池设置为 TERMINATED 状态,若是是,则也会调用 termination.signalAll()
用来激活调用线程池的 awaitTermination
方法而被阻塞的线程
另外当等待时间超时后,termination.awaitNanos
也会返回,这时候会从新检查当前线程池状态是否为 TERMINATED,若是是则直接返回,否者继续阻塞挂起本身。
平常开发中当一个应用中须要建立多个线程池时候最好给线程池根据业务类型设置具体的名字,以便在出现问题时候方便进行定位,下面就经过实例来讲明不设置时候为什么难以定位问题,以及如何进行设置。
下面经过简单的代码来讲明不指定线程池名称为什么难定位问题,代码以下:
package com.hjc; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by cong on 2019/5/26. */ public class ThreadPoolExecutorTest { static ThreadPoolExecutor executorOne = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); static ThreadPoolExecutor executorTwo = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); public static void main(String[] args) { //接受用户连接模块 executorOne.execute(new Runnable() { public void run() { System.out.println("接受用户连接线程"); throw new NullPointerException(); } }); //具体处理用户请求模块 executorTwo.execute(new Runnable() { public void run() { System.out.println("具体处理业务请求线程"); } }); executorOne.shutdown(); executorTwo.shutdown(); } }
运行代码输出以下结果:
同理咱们并不知道是那个模块的线程池抛出了这个异常,那么咱们看下这个 pool-1-thread-1
是如何来的。实际上是使用了线程池默认的 ThreadFactory,翻看线程池建立的源码以下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } static class DefaultThreadFactory implements ThreadFactory { //(1) private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; //(2) private final AtomicInteger threadNumber = new AtomicInteger(1); //(3) private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { //(4) Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
如上代码 DefaultThreadFactory
的实现可知:
1.代码(1)poolNumber 是 static 的原子变量用来记录当前线程池的编号,它是应用级别的,全部线程池公用一个,好比建立第一个线程池时候线程池编号为1,建立第二个线程池时候线程池的编号为2,这里 pool-1-thread-1
里面的 pool-1 中的 1 就是这个值。
2.代码(2)threadNumber 是线程池级别的,每一个线程池有一个该变量用来记录该线程池中线程的编号,这里 pool-1-thread-1
里面的 thread - 1 中的 1 就是这个值。
3.代码(3)namePrefix是线程池中线程的前缀,默认固定为pool。
4.代码(4)具体建立线程,可知线程的名称使用 namePrefix + threadNumber.getAndIncrement()
拼接的。
从上知道咱们只需对 DefaultThreadFactory
的代码中 namePrefix 的初始化作手脚,当须要建立线程池是传入与业务相关的 namePrefix 名称就能够了,代码以下:
package com.hjc; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * Created by cong on 2019/5/26. */ // 命名线程工厂 public class HjcThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; HjcThreadFactory(String name) { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); if (null == name || name.isEmpty()) { name = "pool"; } namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
而后建立线程池时候以下:
package com.hjc; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by cong on 2019/5/26. */ public class ThreadPoolExecutorTest { static ThreadPoolExecutor executorOne = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),new HjcThreadFactory("ASYN-ACCEPT-POOL")); static ThreadPoolExecutor executorTwo = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),new HjcThreadFactory("ASYN-PROCESS-POOL")); public static void main(String[] args) { //接受用户连接模块 executorOne.execute(new Runnable() { public void run() { System.out.println("接受用户连接线程"); throw new NullPointerException(); } }); //具体处理用户请求模块 executorTwo.execute(new Runnable() { public void run() { System.out.println("具体处理业务请求线程"); } }); executorOne.shutdown(); executorTwo.shutdown(); } }
而后运行执行结果以下:
从运行结果抛出的异常,能够看到从 ASYN-ACCEPT-POOL-1-thread-1
就能够知道是接受连接线程池抛出的异常。
下面先看线程池中使用 ThreadLocal 的例子:
package com.hjc; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; /** * Created by cong on 2019/5/26. */ public class ThreadPoolTest { static class LocalVariable { private Long[] a = new Long[1024 * 1024]; } // (1) final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); // (2) final static ThreadLocal<LocalVariable> localVariable = new ThreadLocal<LocalVariable>(); public static void main(String[] args) throws InterruptedException { // (3) for (int i = 0; i < 50; ++i) { poolExecutor.execute(new Runnable() { public void run() { // (4) localVariable.set(new LocalVariable()); // (5) System.out.println("use local varaible"); //localVariable.remove(); } }); Thread.sleep(1000); } // (6) System.out.println("pool execute over"); } }
代码(1)建立了一个核心线程数和最大线程数为 5 的线程池,这个保证了线程池里面随时都有 5 个线程在运行。
代码(2)建立了一个 ThreadLocal 的变量,泛型参数为 LocalVariable,LocalVariable 内部是一个 Long 数组。
代码(3)向线程池里面放入 50 个任务
代码(4)设置当前线程的 localVariable 变量,也就是把 new 的 LocalVariable 变量放入当前线程的 threadLocals 变量。
因为没有调用线程池的 shutdown 或者 shutdownNow 方法因此线程池里面的用户线程不会退出,进而 JVM 进程也不会退出。
运行当前代码,使用 jconsole 监控堆内存变化以下图:
而后解开 localVariable.remove()
注释,而后在运行,观察堆内存变化以下:
从运行结果一可知,当主线程处于休眠时候进程占用了大概 77M 内存,运行结果二则占用了大概 25M 内存,可知运行代码一时候内存发生了泄露,下面分析下泄露的缘由。
运行结果一的代码,在设置线程的 localVariable 变量后没有调用 localVariable.remove()
方法,致使线程池里面的 5 个线程的 threadLocals 变量里面的 new LocalVariable()
实例没有被释放,虽然线程池里面的任务执行完毕了,可是线程池里面的 5 个线程会一直存在直到 JVM 进程被杀死。
这里须要注意的是因为 localVariable 被声明了 static,虽然线程的 ThreadLocalMap 里面是对localVariable的弱引用,localVariable也不会被回收。
运行结果二的代码因为线程在设置 localVariable 变量后及时调用了 localVariable.remove()
方法进行了清理,因此不会存在内存泄露。
总结:线程池里面设置了 ThreadLocal 变量必定要记得及时清理,由于线程池里面的核心线程是一直存在的,若是不清理,那么线程池的核心线程的 threadLocals 变量一直会持有 ThreadLocal 变量。