在当今计算机的CPU计算速度很是快的状况下,为了可以充分利用CPU性能提升程序运行效率咱们在程序中使用了线程。可是在高并发状况下会频繁的建立和销毁线程,这样就变相的阻碍了程序的执行速度,因此为了管理线程资源和减小线程建立以及销毁的性能消耗就引入了线程池。数据库
corePoolSize:线程池在建立完时,里面并无线程,只有当任务到来时再去建立线程。缓存
maxPoolSize:线程池可能会在核心线程数的基础上额外增长一些线程,可是线程数量的上限是maxPoolSize
。好比第一天执行的任务很是多,次日执行的任务很是少,可是有了maxPoolSize
参数,就能够加强任务处理的灵活性。bash
corePoolSize
即便线程没有在执行任务,也会建立新的线程。corePoolSize
,但小于maxPoolSize
则将任务放入队列。maxPoolSize
,则建立新的线程运行任务。maxPoolSize
,则拒绝该任务。执行流程:服务器
corePoolSize
和maxPoolSize
设置为相同的值,那么就会建立固定大小的线程池。maxPoolSize
参数设置为很大的值,例如Integer.MAX_VALUE
,能够容许线程池容纳任意数量的并发任务。corePoolSize
的线程,因此若是使用了无界队列(如:LinkedBlockingQueue
)就不会建立到超过corePoolSize
的线程数。若是线程池当前的线程数大于corePoolSize
,那么若是多余的线程的空闲时间大于keepAliveTime
,它们就会被终止。网络
keepAliveTime
参数的使用能够减小线程数过多冗余时的资源消耗。并发
新的线程由ThreadFactory
建立,默认使用Executors.defaultThreadFactory()
,建立出来的线程都在同一个线程组,拥有一样的NORM_PRIORITY
优先级而且都不是守护线程。若是本身指定ThreadFactory
,那么就能够改变线程名、线程组、优先级、是不是守护线程等。一般状况下直接使用defaultThreadFactory
就行。ide
SynchronousQueue
):任务很少时,只须要用队列进行简单的任务中转,这种队列没法存储任务,在使用这种队列时,须要将maxPoolSize
设置的大一点。LinkedBlockingQueue
):若是使用无界队列看成workQueue
,将maxQueue
设置的多大都没有用,使用无界队列的优势是能够防止流量突增,缺点是若是处理任务的速度跟不上提交任务的速度,这样就会致使无界队列中的任务愈来愈多,从而致使OOM
异常。ArrayBlockingQueue
):使用有界队列能够设置队列大小,让线程池的maxPoolSize
有意义。手动建立更好,由于这样可让咱们更加了解线程池的运行规则,避免资源耗尽的风险。函数
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
newFixedThreadPool
线程池经过传入相同的corePoolSize
和maxPoolSize
能够保证线程数量固定,0L
的keepAliveTime
表示时刻被销毁,workQueue
使用的是无界队列。这样潜在的问题就是当处理任务的速度赶不上任务提交的速度的时候,就可能会让大量任务堆积在workQueue
中,从而引起OOM
异常。高并发
/**
* 演示newFixedThreadPool线程池OOM问题
*/
public class FixedThreadPoolOOM {
private static ExecutorService executorService = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executorService.execute(new SubThread());
}
}
}
class SubThread implements Runnable {
@Override
public void run() {
try {
//延长任务时间
Thread.sleep(1000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
更改JVM
参数工具
运行结果
使用线程池打印线程名
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}
复制代码
newSingleThreadExecutor
源码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
从源码能够看出newSingleThreadExecutor
和newFixedThreadPool
基本相似,不一样的只是corePoolSize
和maxPoolSize
的值,因此newSingleThreadExecutor
也存在内存溢出问题。
newCachedThreadPool
也被称为可缓存线程池,它是一个
无界线程池,具备
自动回收多余线程的功能。
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}
复制代码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
newCachedThreadPool
的maxPoolSize
设置的值为Integer.MAX_VALUE
,因此可能会致使线程被无限建立,最终致使OOM
异常。
该线程池支持周期性任务的执行
public class ScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(10);
// scheduledExecutorService.schedule(new Task(), 5, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
}
}
复制代码
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
复制代码
根据业务场景不一样,本身设置线程池参数,例如内存有多大,本身取线程名子等。
线程数=CPU核心数 × (1+平均等待时间/平均工做时间)
corePoolSize
和maxPoolSize
,以固定的线程数来执行任务corePoolSize
和maxPoolSize
默认都是1,全程只以1条线程执行任务corePoolSize
是经过手动传入的,它的maxPoolSize
为Integer.MAX_VALUE
,而且具备自动回收线程的功能。由于这两个线程池的核心线程数和最大线程数都是相同的,也就没法预估任务量,因此须要在自身进行改进,就使用了无界队列。
由于缓存线程池的最大线程数是“无上限”的,每当任务来的时候直接建立线程进行执行就行了,因此不须要使用队列来存储任务。这样避免了使用队列进行任务的中转,提升了执行效率。
由于ScheduledThreadPool
是延迟任务线程池,因此使用延迟队列有利于对执行任务的时间作延迟。
workStealingPool
适用于执行产生子任务的环境,例如进行二叉树的遍历。workStealingPool
具备窃取能力。shutdown()
方法不必定会当即中止,这个方法仅仅是初始整个关闭过程。由于线程池中的线程有可能正在运行,而且队列中也有待处理的任务,不可能说停就停。因此每当调用该方法时,线程池会把正在执行的任务和队列中等待的任务都执行完毕再关闭,而且在此期间若是接收到新的任务会被拒绝。/**
* 演示关闭线程池
*/
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdown();
//再次提交任务
executorService.execute(new ShutDownTask());
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
复制代码
/**
* 演示关闭线程池
*/
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
System.out.println(executorService.isShutdown());
executorService.shutdown();
System.out.println(executorService.isShutdown());
//再次提交任务
// executorService.execute(new ShutDownTask());
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
复制代码
/**
* 演示关闭线程池
*/
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
System.out.println(executorService.isShutdown());
executorService.shutdown();
System.out.println(executorService.isShutdown());
System.out.println(executorService.isTerminated());
//再次提交任务
// executorService.execute(new ShutDownTask());
}
}
复制代码
isTerminated
方法的地方休眠10s
//在3s后判断线程池是否被终止,返回boolean值
System.out.println(executorService.awaitTermination(3L, TimeUnit.SECONDS));
复制代码
Executor
关闭时,新提交的任务会被拒绝。Executor
对最大线程数和工做队列容量使用有限边界而且已经饱和时。总结:第四种拒绝策略相对于前三种更加“机智”一些,能够避免前面三种策略产生的损失。在第四种策略下能够下降提交的速度,达到负反馈的效果。
/**
* 演示每一个任务执行的先后放钩子函数
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private boolean isPaused;
private final ReentrantLock lock = new ReentrantLock();
private Condition unPaused = lock.newCondition();
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unPaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
public void resume() {
lock.lock();
try {
isPaused = false;
//唤醒所有
unPaused.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池被恢复了");
}
}
复制代码
Executor
,是Executor
的子接口,在接口内部增长了一些新的方法,例如第6小节讲到的几个方法利用相同线程执行不一样任务
public void execute(Runnable command) {
// 判断任务是否为空,为空就抛出异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 若是当前线程数小于核心线程数,就增长Worker
if (workerCountOf(c) < corePoolSize) {
// command就是任务,点击addWorker方法
// 第二个参数用于判断当前线程数是否小于核心线程数
if (addWorker(command, true))
return;
c = ctl.get();
}
// 此时线程数大于等于核心线程数
// 判断线程池是否是正在运行并将任务放到工做队列中
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查线程状态
int recheck = ctl.get();
// 若是线程不是正在运行的,就删除掉任务而且拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0) //这里用于避免已经提交的任务没有线程进行执行
addWorker(null, false);
}
// 若是任务没法添加或者大于最大线程数就拒绝任务
else if (!addWorker(command, false))
reject(command);
}
复制代码
由于要查看的是Worker
因此进入到addWorker()
方法后点击Worker
类查看runWorker()
方法
w = new Worker(firstTask);
复制代码
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
复制代码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取到任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 只要任务不为空或者可以获取到任务就执行下面的方法
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// task是一个Runnable类型,调用run()方法就是运行线程
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
复制代码
总结:核心原理就是获取到task
,若是task
不为空就调用run()
方法,这样就实现了线程的复用,达到让相同的线程执行不一样任务的目的。
shutdownNow()
带来的效果workerCount
为零时,线程会转换到TIDYING状态,并将运行terminate()
钩子方法terminate()
运行完成