做为一名 Java 开发者,对线程池绝对不陌生,不管是平时工做,仍是面试都会,线程池都是必会的知识点.并且不能不能之知其表面,理解不透彻,很容易在实战中出现 OOM,也可能在面试中被问趴😄.java
其实,若是研究过线程池的话,其实并不难,他的参数并很少,java.util.concurrent.ThreadPoolExecutor
中的参数列举出来就是这些.面试
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
RejectedExecutionException
异常main
)来执行这个线程.咱们知道了参数的含义,那么这些参数在执行过程当中究竟是怎么运行的呢,咱们先用文字分几种状况来描述一下.多线程
在说以前,先来看一个例子.ide
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new BasicThreadFactory.Builder().namingPattern("name-%d").build());
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("test");
}
});
复制代码
不得不说,在很长一段时间内,我都有一个疑问或者说误区,明明是线程池,为何每次都须要我 new 一个 线程(错误)
呢? 由于咱们开始学线程的时候先学 new Thread()
,后来又学了new Runnable()
,慢慢的就把这两个混为一坛了,其实 new Runnable()
并无新起一个线程,只是新建了一个可运行的任务,就是一个普通的对象而已,哈哈,这应该是一个很傻的错误认知. 回到上面说的具体含义.oop
corePoolSize
,这时候会在线程池中新建一个线程用于执行这个新的任务.corePoolSize
,这个时候就须要将这个新的任务加入到线程队列workQueue
中,一旦线程中的线程执行完成了一个任务,就会立刻从队列中去一个任务来执行.maximumPoolSize
大于corePoolSize
,就会新建线程来处理这个新的任务,知道总运行线程数达到maximumPoolSize
.maximumPoolSize
,还来了新的任务怎么办呢?就须要执行上面所说的拒绝策略了handler
了,按照配置的策略进行处理,默认不配置的状况下,使用的是AbortPolicy
.private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
复制代码
怎么判断上面说的流程是正确的呢?咱们能够跟进源码来仔细查看一下上面的流程,其实线程池执行的代码比较简单,一看变更,看了源码,掌握得应该会更加深入一些.ui
首先来看看execute()
方法this
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl是一个原子的控制位,能够表示线程池的状态和运行的线程数;
int c = ctl.get();
// 1. 若是运行线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//直接新建 worker(线程)执行.
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 若是上面的addWorker 失败了,就须要加入线程队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 加入后,检查状态;
if (! isRunning(recheck) && remove(command))
//检查运行状态不经过,移除任务,执行拒绝策略
reject(command);
// 若是当前的运行线程为0
else if (workerCountOf(recheck) == 0)
//就是用核心线程执行刚刚添加到队列的线程
addWorker(null, false);
}
// 3. 若是队列也满了,就新建线程继续处理
else if (!addWorker(command, false))
// 4. 若是不容许新建了,就执行拒绝策略
reject(command);
}
复制代码
按照一个正常流程来讲,咱们只考虑一个理想的环境.咱们能够分为上面的4步,正好和上面的文字描述对应.spa
可能爱思考的同窗发现,第2步,加入队列后,何时执行这个新加入的呢,难道有一个定时任务吗?并非.咱们能够看看这个addWorker()
方法.线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//第一层循环
for (;;) {
int c = ctl.get();
//获取当前线程池的状态;
int rs = runStateOf(c);
...
//第二层循环
for (;;) {
//获取线程池的运行线程个数
int wc = workerCountOf(c);
// 大于了最大容许的线程个数,固然要返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//经过了检查,就把 正在运行的线程数加1
if (compareAndIncrementWorkerCount(c))
//跳出第一层循环
break retry;
c = ctl.get(); // Re-read ctl
//加1 失败,可能有多线程冲突,检查一下最新的状态,继续重试;
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一个线程包装咱们的 Runnable
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
...
//加入 hashSet 中管理存在于线程池中线程
workers.add(w);
workerAdded = true;
if (workerAdded) {
// 启动 worker,worker就是线程中真正执行的线程,包装了咱们提供的 Runnable
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
上面的addWorker()
方法中,就是靠t.start()
来启动线程的. Worker
这个类存在于java.util.concurrent.ThreadPoolExecutor.Worker
,定义以下 只保留了相对重要的代码.code
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
....
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
...
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
}
复制代码
因此当t.start()
的时候,实际上,新建了一个线程,执行了runWorker(this);
方法: 这个方法里面有一个while
循环,getTask()
是从队列中获取一个任务.因此说这里能够解答上面放到队列里面的任务何时执行了,等到任意一个核心线程空闲出来时候,他就会循环去取队列中的任务执行.每一个核心线程和新起来的线程都是同步来执行你传进来的Runnable
的run
方法.
整个流程应该就比较清楚了.
上面说了这么多,核心参数都说的差很少了,那么keepAliveTime 这个参数在源码怎么来用的呢?
上面说到一个getTask()
方法从队列中取一个任务,看一下这个方法的代码(省略非主要的).
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
...
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
主要就是用于取任务这里,poll()
不会阻塞,take()
是阻塞的,因此当使用poll
取数据的时候,到达设定的超时,就会继续往下执行,若是超过设定时间仍是没有任务进来,就会将timedOut
设置为 true,返回 null. 这个timedOut
会控制上面的 if
判断,最终控制compareAndDecrementWorkerCount()
方法,就是讲运行的线程数减1个,那么下次若是又满了,就会新建一个,因此这个 Alive 就失效了.
整体来讲,经过源码来看问题能比较权威的解答一些问题.有时候源码彷佛也没有那么高深😄