我相信你们在项目中或多或少的都使用过线程,而线程是宝贵的资源,不能频繁的建立,应当给其余任务进行复用,因此就有了咱们的线程池。java
你知道咱们如何建立线程池吗?程序员
这我固然知道了,JDK主要提供了三种建立线程池的方法web
线程池如何使用缓存
ExecutorService threadPool = Executors.newFixedThreadPool(5);
threadPool.execute(() -> {
System.out.println("执行任务");
});
threadPool.shutdown();
复制代码
你给我讲讲线程池的原理呢?多线程
上面说的建立线程池的方法实际上都是经过建立ThreadPoolExecutor这个类来实现的,因此咱们直接看这个类的实现原理便可。
首先来看看它的构造方法app
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
复制代码
先说下它这几个核心参数的含义测试
固然只是知道这几个参数也没有什么太大的做用,咱们仍是要着眼全局来看ThreadPoolExecutor类。this
首先来认识下线程池中定义的状态,它们一直贯穿在整个主体spa
// ctl存储了两个值,一个是线程池的状态,
// 另外一个是活动线程数(workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池最多容许2^29-1个(大概5亿)线程存在,
// 固然首先要你的系统能新建这么多个
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
从上面的成员变量的定义咱们能够知道,线程池最多容许5亿个(2^29-1)个线程活动,那么为何不是2^31-1呢?由于设计者以为这个值已经够大了,若是未来以为这是一个瓶颈的话,会把这个换成Long类型。线程
同时线程池这里也存在了五个状态,它们解决着线程池的生命周期。
状态流转图以下:
咱们知道当咱们执行一个task的时候,调用的是execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 若是工做线程数小于核心线程数(corePoolSize),则建立一个工做线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 若是当前是running状态,而且任务队列可以添加任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.1 若是不处于running状态了(使用者可能调用了shutdown方法),
// 则将刚才添加到任务队列的任务移除
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.2 若是当前没有工做线程,
// 则新建一个工做线程来执行任务(任务已经被添加到了任务队列)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 队列已经满了的状况下,则新启动一个工做线程来执行任务
else if (!addWorker(command, false))
reject(command);
}
复制代码
而在addWorker方法中还存在有一些必要的判断逻辑,好比当前线程池是不是非running状态,队列是否为空等条件,固然最主要的逻辑仍是判断当前工做线程数量是否大于maximumPoolSize以及启动工做线程执行任务。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
for (;;) {
int wc = workerCountOf(c);
// 1. 判断当前工做线程是否知足条件
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 2. 增长工做线程数量
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
// 3. 建立工做线程
w = new Worker(firstTask);
final Thread t = w.thread;
workers.add(w);
if (workerAdded) {
// 4. 运行工做线程
t.start();
workerStarted = true;
}
return workerStarted;
}
复制代码
因此,总结下线程池的工做流程以下:
若是你以为上面很差记,我给你讲个火锅店的故事你就更加明白了。
之前有个火锅店,叫作朱帅帅火锅,老板是个刚辞掉程序员工做出来创业的帅小伙子,火锅店不大,只能摆上10张桌子(corePoolSize),若是吃火锅的来得早就能够去店里面坐(店里有空调),来晚了,店里面坐满了,后面来的人就要排队了(workQueue)。
排队的人数愈来愈多,朱帅帅一看不是办法,就给外面摆了几张临时桌子(非核心工做线程),让客人在外面吃。若是店里面有人吃完了或者外面临时桌子吃完了就让排队的人去吃。后面时间晚了,没有排队的人了,老板就让人撤了外面的临时桌子,毕竟摆在外面也不太好,并且还怕城管来。若是生意特别好,又来了特别多的人,已经超出火锅店的服务能力了,就只能喊他们去别家了。
上面的故事,你要品,细细的品,最后你会发现,代码来源于生活。
上面一直说到工做线程,工做线程究竟是个什么鬼?其实工做线程指的就是咱们的Worker类,它是ThreadPoolExecutor中的私有类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
// 运行worker的线程(new Thread(this))
final Thread thread;
// 须要执行的任务
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
复制代码
能够看到Woker不只继承了AbstractQueuedSynchronizer(实现独占锁功能),还实现了Runnable接口。
实际上线程池中的每个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象
Worker用本身做为task构造一个线程,同时把外层任务赋值给本身的task成员变量,至关于对task作了一个包装。
addWorker()方法中执行了worker.thread.start(),实际上执行的就是Worker的runWorker方法。
final void runWorker(Worker w) {
// 1. 获取任务开始执行任务,若是获取不到任务,当前的worker就会被JVM回收
while (task != null || (task = getTask()) != null) {
task.run();
}
}
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 判断线程池是否关闭
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 2. 判断是否须要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不容许进行超时;
// wc > corePoolSize,表示当前线程池中的工做线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,须要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 3. wc > maximumPoolSize的状况是由于可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
// timed && timedOut 若是为true,表示当前操做须要进行超时控制,而且上次从阻塞队列中获取任务发生了超时
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 4. timed若是为true,则经过阻塞队列的poll方法进行超时控制,
//若是在keepAliveTime时间内没有获取到任务,则返回null。若是为false,则直接阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
}
}
复制代码
上面的代码中尤为须要注意的是getTask()中的第3点,它的目的是控制线程池有效的工做线程数量。 从以前的分析咱们能够知道,若是当前线程池的工做线程数量超过了corePoolSize且小于maximumPoolSize,而且workQueue已满,则能够增长工做线程,但这时若是超时没有获取到任务,也就是timedOut为true的状况,说明workQueue已经为空了,也就说明了线程池中不须要那么多线程来执行任务了,能够把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize便可。
你刚才说到拒绝策略,都有哪些拒绝策略呀?
主要有下面4种拒绝策略
你给我说说你开头说的经过Executors建立的线程池三者有何不一样吗?
固定线程数量的线程池,corePoolSize等于maximumPoolSize,采用的阻塞队列是LinkedBlockingQueue,是一个无界队列,当任务量忽然很大,线程池来不及处理,就会将任务一直添加到队列中,就有可能致使内存溢出。
建立单个线程的线程池,corePoolSize = maximumPoolSize = 1,也采用的LinkedBlockingQueue这个无界队列,当任务量很大,线程池来不及处理,就有可能会致使内存溢出。
建立可缓存的线程池,corePoolSize = 1,maximumPoolSize = Interger.MAX_VALUE;可是使用的是SynousQueue,这个队列比较特殊,内部没有结构来存储任何元素,因此若是任务数很大,而建立的那个线程(corePoolSize=1)迟迟没有处理完成任务,就会一直建立线程来处理,也有OOM的可能。
cacheThreadPool中的cache其实指的就是SynousQueue,当往这个队列插入数据的时候,若是没有任务来取,插入这个过程会被阻塞。
你既然说了都有可能OOM,那么应该如何建立线程池呢?
实际使用中不建议经过Executors来建立线程池,而是经过 new ThreadPoolExecutor
的方式来建立,而队列也不建议使用无界队列,而要使用有界队列,好比ArrayBlockingQueue。而拒绝策略这个就看你本身需求了(系统提供的若是不知足,就本身写一个)
同时对于核心线程数的设置也不是越大越好,只能说根据你的需求来设置这个值,通常来说能够根据下面两点来进行合理配置
固然啦,这个考虑是不少方面的,不只仅和程序有关,还和硬件等资源有关,总之就是在测试的时候多多调试。
你们不要觉得我上面说的是句废话,请你自信一点,把觉得去掉。