Java中的线程池是运用场景最多的并发框架,几乎全部须要异步或并发执行任务的程序均可以使用线程池。java
在开发过程当中,合理地使用线程池可以带来3个好处:git
下降资源消耗:经过重复利用已建立的线程下降线程建立和销毁形成的消耗。github
提升响应速度:当任务到达时,任务能够不须要等到线程建立就能当即执行。数据库
提升线程的可管理性:线程是稀缺资源,若是无限制地建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一分配、调优和监控。可是,要作到合理利用线程池,必须对其实现原理了如指掌。数组
一、使用Executors
工厂类提供的静态方法来建立线程池,方法以下:服务器
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
二、经过ThreadPoolExecutor
的构造函数建立,代码以下:多线程
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
参数说明:并发
一、corePoolSize框架
线程池中的核心线程数,当提交一个任务时,线程池建立一个新线程执行任务,直到当前线程数等于 corePoolSize;若是当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;若是执行了线程池的prestartAllCoreThreads()方法,线程池会提早建立并启动全部核心线程。异步
二、maximumPoolSize
线程池中容许的最大线程数。若是当前阻塞队列满了,且继续提交任务,则建立新的线程执行任务,前提是当前线程数小于maximumPoolSize。
三、keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认状况下,该参数只在线程数大于corePoolSize时才有用。
四、unit
keepAliveTime的单位
五、workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口。
在JDK中提供了以下阻塞队列:
ArrayBlockingQueue
:基于数组结构的有界阻塞队列,按FIFO排序任务;
LinkedBlockingQuene
:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量一般要高于ArrayBlockingQuene;
SynchronousQuene
:一个不存储元素的阻塞队列,每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于LinkedBlockingQuene;
priorityBlockingQuene
:具备优先级的无界阻塞队列;
六、threadFactory
建立线程的工厂,经过自定义的线程工厂能够给每一个新建的线程设置一个具备识别度的线程名。
七、handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工做线程,若是继续提交任务,必须采起一种策略处理该任务,线程池提供了4种策略:
AbortPolicy
:直接抛出异常,默认策略;CallerRunsPolicy
:用调用者所在的线程来执行任务;DiscardOldestPolicy
:丢弃阻塞队列中靠最前的任务,并执行当前任务;DiscardPolicy
:直接丢弃任务;固然也能够根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
使用示例
public class ThreadPoolExecutorExample {
// 一、经过threadPoolExecutor的构造函数建立线程池
private static ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 二、使用execute方法执行没有返回结果的任务
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
new Task().doSomething();
}
});
// 三、使用submit方法执行有返回结果的任务且须要实现Callable接口
Future future = threadPoolExecutor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return new Task().doOtherthing();
}
}
);
System.out.println(future.get());
}
}
class Task {
public void doSomething() {
System.out.println("doSomeThing ...");
}
public int doOtherthing() {
System.out.println("doOtherthing ..., and return 10.");
return 10;
}
}
复制代码
execute方法和submit方法的区别
execute方法
用于提交不须要返回值的任务,因此没法判断任务是否被线程池执行成功。
submit方法
用于提交须要返回值的任务。线程池会返回一个future类型的对象,经过这个future对象能够判断任务是否执行成功。
get()
方法会阻塞当前线程直到任务完成;get(long timeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后当即返回,这时候有可能任务没有执行完。
当向线程池提交一个任务以后,线程池是如何处理这个任务的呢?线程池的主要处理流程以下:
corePoolSize
,则建立新线程来执行;corePoolSize
,则将任务加入BlockingQueue;BlockingQueue
(队列已满),则建立新的线程来处理任务;maximumPoolSize
,任务将被拒绝,并调用handler.rejectedExecution(command, this)
方法。ThreadPoolExecutor
采起上述步骤的整体设计思路,是为了在执行executor()方法时,尽量地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor
完成预热以后(当前运行的线程数大于等于corePoolSize
),几乎全部的execute()
方法调用都是执行步骤2,而步骤2不须要获取全局锁。
// 初始化状态和数量,状态为RUNNING,线程数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 前3位表示状态,全部线程数占29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池容量大小为 1 << 29 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态
// RUNNING状态:11100000000000000000000000000000(前3位为111)
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN状态:00000000000000000000000000000000(前3位为000)
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP状态:00100000000000000000000000000000(前3位为001)
private static final int STOP = 1 << COUNT_BITS;
// TIDYING状态:01000000000000000000000000000000(前3位为010)
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED状态:01100000000000000000000000000000(前3位为011)
private static final int TERMINATED = 3 << COUNT_BITS;
// 获得状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获得线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
ThreadPoolExecutor线程池有5个状态,分别是:
terminated
方法。terminated
方法调用完成之后的状态。线程池的状态转换过程:
Threads waiting in awaitTermination() will return when the state reaches TERMINATED.
使用ThreadPoolExecutor执行任务的时候,可使用execute或submit方法,submit方法以下:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
复制代码
经过源码可知submit
方法一样也是由execute()完成的,execute()方法源码以下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 过程1
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 过程2
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检查线程池状态
if (! isRunning(recheck) && remove(command))
reject(command);
// 线程池中没有可用的工做线程时
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 过程3
// 过程4
reject(command);
}
复制代码
addWorker
方法的主要工做就是建立一个工做线程执行任务,代码以下:
/* * firstTask参数:用于指定新增的线程执行的第一个任务。 * core为true:表示在新增线程时会判断当前活动线程数是否少于corePoolSize, * false表示新增线程前须要判断当前活动线程数是否少于maximumPoolSize。 */
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/** * 只有当下面两种状况会继续执行,其余直接返回false(添加失败) * 一、rs == RUNNING * 二、rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty() *(执行了shutdown方法,可是阻塞队列还有任务没有执行) */
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 判断工做线程的数量是否超过线程池的限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// workerCount加1成功,跳出两层循坏。
if (compareAndIncrementWorkerCount(c))
break retry;
/** * 能执行到这里,都是由于多线程竞争,只有两种状况 * 一、workCount发生变化,compareAndIncrementWorkerCount失败, * 这种状况不须要从新获取ctl,继续for循环便可。 * 二、runState发生变化,可能执行了shutdown或者shutdownNow, * 这种状况从新走retry,取得最新的ctl并判断状态。 */
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
// worker是否执行标识
boolean workerStarted = false;
// worker是否添加成功标识
boolean workerAdded = false;
// 保存建立的worker变量
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
// 检查线程是否建立成功
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 加锁成功,从新检查线程池的状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将w存储到workers容器中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 添加成功标识
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 失败回退,从 wokers 移除 w, 线程数减一,尝试结束线程池(调用tryTerminate 方法)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
在分析t.start()
以前,须要了解Worker类。其源码以下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 工做线程
final Thread thread;
// 初始化任务
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
// 禁止中断,直到runWorker
setState(-1);
this.firstTask = firstTask;
// 很重要,worker实例被包装成thread执行的任务。
// 这样t.start启动后,将运行Worker的run方法。
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// 实现AQS的相关方法
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
复制代码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 自旋操做,获取队列中的任务
while (task != null || (task = getTask()) != null) {
// 加锁的做用是线程池关闭时,防止正在执行工做线程被中断。
w.lock();
/* * 在执行任务以前先作一些处理。 * 1. 若是线程池已经处于STOP状态而且当前线程没有被中断,中断线程。 * 2. 若是线程池还处于RUNNING或SHUTDOWN状态,而且当前线程已经被中断了, * 从新检查一下线程池状态,若是处于* * STOP状态而且没有被中断,那么中断线程。 */
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// hook method
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正的开始执行任务,调用的是run方法,而不是start方法。
// 这里run的时候可能会被中断,好比线程池调用了shutdownNow方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// hook method
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 回收woker
processWorkerExit(w, completedAbruptly);
}
}
复制代码
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 计算从队列获取任务的方式( poll or take)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当工做线程超过其最大值或者timed = true时其workQueue.isEmpty()时,返回null。
// 这意味为该worker将被回收。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
由上可知,当allowCoreThreadTimeOut
为true时,若是队列长时间没有任务,工做线程最终都会被销毁。
能够经过调用线程池的shutdown
或shutdownNow
方法来关闭线程池。它们的原理是遍历线程池中的工做线程,而后逐个调用线程的interrupt方法来中断线程,因此没法响应中断的任务可能永远没法终止。
可是它们存在必定的区别,shutdownNow首先将线程池的状态设置成STOP,而后尝试中止全部的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,而后中断全部没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,
isShutdown
方法就会返回true。当全部的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed
方法会返回true。
要想合理地配置线程池,就必须首先分析任务特性,能够从如下几个角度来分析。
任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
任务的优先级:高、中和低。
任务的执行时间:长、中和短。
任务的依赖性:是否依赖其余系统资源,如数据库链接。
性质不一样的任务能够用不一样规模的线程池分开处理:
1)CPU密集型任务应配置尽量小的线程,如配置N cpu +1个线程的线程池。
2)IO密集型任务线程并非一直在执行任务,则应配置尽量多的线程,如2*N cpu 。
3)混合型的任务,若是能够拆分,将其拆分红一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。若是这两个任务执行时间相差太大,则不必进行分解。
优先级不一样的任务可使用优先级队列PriorityBlockingQueue来处理。它可让优先级高的任务先获得执行,须要注意的是若是一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不一样的任务能够交给不一样规模的线程池来处理,或者也可使用优先级队列,让执行时间短的任务先执行。
依赖数据库链接池的任务,由于线程提交SQL后须要等待数据库返回结果,若是等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
能够经过Runtime.getRuntime().availableProcessors()方法得到当前设备的CPU个数。
建议使用有界队列,有界队列能增长系统的稳定性和预警能力,能够根据须要设大一点,好比几千。有一次咱们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,经过排查发现是数据库出现了问题,致使执行SQL变得很是缓慢,由于后台任务线程池里的任务全是须要向数据库查询和插入数据的,因此致使线程池里的工做线程所有阻塞住,任务积压在线程池里。若是当时咱们设置成无界队列,线程池的队列就会愈来愈多,有可能会撑满内存,致使整个系统不可用,而不仅是后台任务出现问题。固然咱们的系统全部的任务是用的单独的服务器部署的,而咱们使用不一样规模的线程池跑不一样类型的任务,可是出现这样问题时也会影响到其余任务。
若是在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,能够根据线程池的使用情况快速定位问题。能够经过线程池提供的参数进行监控,在监控线程池的时机可使用如下属性:
经过扩展线程池进行监控。能够经过继承线程池来自定义线程池,重写线程池的beforeExecute
、afterExecute
和terminated
方法,也能够在任务执行前、执行后和线程池关闭以前执行一些代码来进行监控。
若是读完以为有收获的话,欢迎点赞、关注、加公众号【牛觅技术】,查阅更多精彩历史!!!: