咱们都是经过new Thread来建立一个线程,因为线程的建立和销毁都须要消耗必定的CPU资源,因此在高并发下这种建立线程的方式将严重影响代码执行效率。而线程池的做用就是让一个线程执行结束后不立刻销毁,继续执行新的任务,这样就节省了不断建立线程和销毁线程的开销。编程
建立Java线程池最为核心的类为ThreadPoolExecutor:并发
它提供了四种构造函数来建立线程池,其中最为核心的构造函数以下所示:dom
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
复制代码
这7个参数的含义以下:ide
corePoolSize 线程池核心线程数。即线程池中保留的线程个数,即便这些线程是空闲的,也不会被销毁,除非经过ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法开启了核心线程的超时策略;函数
maximumPoolSize 线程池中容许的最大线程个数;高并发
keepAliveTime 用于设置那些超出核心线程数量的线程的最大等待时间,超过这个时间尚未新任务的话,超出的线程将被销毁;this
unit 超时时间单位;spa
workQueue 线程队列。用于保存经过execute方法提交的,等待被执行的任务;线程
threadFactory 线程建立工程,即指定怎样建立线程;3d
handler 拒绝策略。即指定当线程提交的数量超出了maximumPoolSize后,该使用什么策略处理超出的线程。
在经过这个构造方法建立线程池的时候,这几个参数必须知足如下条件,不然将抛出IllegalArgumentException异常:
corePoolSize不能小于0;
keepAliveTime不能小于0;
maximumPoolSize 不能小于等于0;
maximumPoolSize不能小于corePoolSize;
此外,workQueue、threadFactory和handler不能为null,不然将抛出空指针异常。
下面举些例子来深刻理解这几个参数的含义。
使用上面的构造方法建立一个线程池:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
System.out.println("线程池建立完毕");
int activeCount = -1;
int queueSize = -1;
while (true) {
if (activeCount != threadPoolExecutor.getActiveCount()
|| queueSize != threadPoolExecutor.getQueue().size()) {
System.out.println("活跃线程个数 " + threadPoolExecutor.getActiveCount());
System.out.println("核心线程个数 " + threadPoolExecutor.getCorePoolSize());
System.out.println("队列线程个数 " + threadPoolExecutor.getQueue().size());
System.out.println("最大线程数 " + threadPoolExecutor.getMaximumPoolSize());
System.out.println("------------------------------------");
activeCount = threadPoolExecutor.getActiveCount();
queueSize = threadPoolExecutor.getQueue().size();
}
}
复制代码
上面的代码建立了一个核心线程数量为1,容许最大线程数量为2,最大活跃时间为10秒,线程队列长度为1的线程池。
假如咱们经过execute方法向线程池提交1个任务,看看结果如何:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
System.out.println("线程池建立完毕");
threadPoolExecutor.execute(() -> sleep(100));
int activeCount = -1;
int queueSize = -1;
while (true) {
if (activeCount != threadPoolExecutor.getActiveCount()
|| queueSize != threadPoolExecutor.getQueue().size()) {
System.out.println("活跃线程个数 " + threadPoolExecutor.getActiveCount());
System.out.println("核心线程个数 " + threadPoolExecutor.getCorePoolSize());
System.out.println("队列线程个数 " + threadPoolExecutor.getQueue().size());
System.out.println("最大线程数 " + threadPoolExecutor.getMaximumPoolSize());
System.out.println("------------------------------------");
activeCount = threadPoolExecutor.getActiveCount();
queueSize = threadPoolExecutor.getQueue().size();
}
}
复制代码
ThreadPoolExecutor的execute和submit方法均可以向线程池提交任务,区别是,submit方法可以返回执行结果,返回值类型为Future
sleep方法代码:
private static void sleep(long value) {
try {
System.out.println(Thread.currentThread().getName() + "线程执行sleep方法");
TimeUnit.SECONDS.sleep(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
复制代码
启动程序,控制台输出以下:
线程池核心线程数量为1,经过execute提交了一个任务后,因为核心线程是空闲的,因此任务被执行了。因为这个任务的逻辑是休眠100秒,因此在这100秒内,线程池的活跃线程数量为1。此外,由于提交的任务被核心线程执行了,因此并无线程须要被放到线程队列里等待,线程队列长度为0。
假如咱们经过execute方法向线程池提交2个任务,看看结果如何:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
复制代码
线程池核心线程数量为1,经过execute提交了2个任务后,一开始核心线程是空闲的,Thread-0被执行。因为这个任务的逻辑是休眠100秒,因此在这100秒内,线程池的活跃线程数量为1。由于核心线程数量为1,因此另一个任务在这100秒内不能被执行,因而被放到线程队列里等待,线程队列长度为1。
假如咱们经过execute方法向线程池提交3个任务,看看结果如何:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
复制代码
这三个任务都是休眠100秒,因此核心线程池中第一个任务正在被执行,第二个任务被放入到了线程队列。而当第三个任务被提交进来时,线程队列满了(咱们定义的长度为1),因为该线程池容许的最大线程数量为2,因此线程池还能够再建立一个线程来执行另一个任务,因而乎以前在线程队列里的线程被取出执行(FIFO),第三个任务被放入到了线程队列。
改变第二个和第三个任务的睡眠时间,观察输出:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(5));
threadPoolExecutor.execute(() -> sleep(5));
复制代码
第二个任务提交5秒后,任务执行完毕,因此线程队列里的任务被执行,因而队列线程个数为0,活跃线程数量为2(第一个和第三个任务)。再过5秒后,第三个任务执行完毕,因而活跃线程数量为1(第一个100秒还没执行完毕)。
在第三个任务结束的瞬间,咱们观察线程快照:
能够看到,线程池中有两个线程,Thread-0在执行第一个任务(休眠100秒,还没结束),Thread-1执行完第三个任务后并无立刻被销毁。过段时间后(10秒钟后)再观察线程快照:
能够看到,Thread-1这个线程被销毁了,由于咱们在建立线程池的时候,指定keepAliveTime 为10秒,10秒后,超出核心线程池线程外的那些线程将被销毁。
假如一次性提交4个任务,看看会怎样:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
复制代码
由于咱们设置的拒绝策略为AbortPolicy,因此最后提交的那个任务直接被拒绝了。更多拒绝策略下面会介绍到。
线程池包含如下几个状态:
当线程池中全部任务都处理完毕后,线程并不会本身关闭。咱们能够经过调用shutdown和shutdownNow方法来关闭线程池。二者的区别在于:
shutdown方法将线程池置为shutdown状态,拒绝新的任务提交,但线程池并不会立刻关闭,而是等待全部正在折行的和线程队列里的任务都执行完毕后,线程池才会被关闭。因此这个方法是平滑的关闭线程池。
shutdownNow方法将线程池置为stop状态,拒绝新的任务提交,中断正在执行的那些任务,而且清除线程队列里的任务并返回。因此这个方法是比较“暴力”的。
举两个例子观察下二者的区别:
shutdown例子:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 4, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.shutdown();
System.out.println("已经执行了线程池shutdown方法");
}
static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程当中被打断" + e.getMessage());
}
}
}
static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程当中被打断" + e.getMessage());
}
}
}
复制代码
启动程序,控制台输出以下:
能够看到,虽然在任务都被提交后立刻执行了shutdown方法,可是并不会立刻关闭线程池,而是等待全部被提交的任务都执行完了才关闭。
shutdownNow例子:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 4, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());
List<Runnable> runnables = threadPoolExecutor.shutdownNow(); // 立刻关闭,并返回还未被执行的任务
System.out.println(runnables);
System.out.println("已经执行了线程池shutdownNow方法");
}
static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程当中被打断" + e.getMessage());
}
}
}
static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程当中被打断" + e.getMessage());
}
}
}
复制代码
启动程序,控制台输出以下:
能够看到,在执行shutdownNow方法后,线程池立刻就被关闭了,正在执行中的两个任务被打断,而且返回了线程队列中等待被执行的两个任务。
经过上面两个例子咱们还能够看到shutdown和shutdownNow方法都不是阻塞的。常与shutdown搭配的方法有awaitTermination。
awaitTermination方法接收timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,不然返回false。该方法是阻塞的:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 4, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.shutdown();
boolean isShutdown = threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
if (isShutdown) {
System.out.println("线程池在3秒内成功关闭");
} else {
System.out.println("等了3秒还没关闭,不等了╰(‵□′)╯");
}
System.out.println("------------");
}
static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程当中被打断" + e.getMessage());
}
}
}
static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程当中被打断" + e.getMessage());
}
}
}
复制代码
启动程序输出以下:
当线程池没法再接收新的任务的时候,可采起以下四种策略:
CallerRunsPolicy CallerRunsPolicy策略:由调用线程处理该任务:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 3, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolExecutor.execute(new shortTask("任务1"));
threadPoolExecutor.execute(new longTask("任务2"));
threadPoolExecutor.execute(new longTask("任务3"));
threadPoolExecutor.execute(new shortTask("任务4"));
threadPoolExecutor.execute(new shortTask("任务5"));
threadPoolExecutor.shutdown();
}
static class shortTask implements Runnable {
private String name;
public shortTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程当中被打断" + e.getMessage());
}
}
}
static class longTask implements Runnable {
private String name;
public longTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程当中被打断" + e.getMessage());
}
}
}
复制代码
上面的线程池最多只能一次性提交4个任务,第5个任务提交后会被拒绝策略处理。启动程序输出以下:
能够看到,第5个提交的任务由调用线程(即main线程)处理该任务。
AbortPolicy AbortPolicy策略:丢弃任务,并抛出RejectedExecutionException异常。前面的例子就是使用该策略,因此再也不演示。
DiscardOldestPolicy DiscardOldestPolicy策略:丢弃最先被放入到线程队列的任务,将新提交的任务放入到线程队列末端:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 3, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.DiscardOldestPolicy());
threadPoolExecutor.execute(new shortTask("任务1"));
threadPoolExecutor.execute(new longTask("任务2"));
threadPoolExecutor.execute(new longTask("任务3"));
threadPoolExecutor.execute(new shortTask("任务4"));
threadPoolExecutor.execute(new shortTask("任务5"));
threadPoolExecutor.shutdown();
}
static class shortTask implements Runnable {
private String name;
public shortTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程当中被打断" + e.getMessage());
}
}
}
static class longTask implements Runnable {
private String name;
public longTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程当中被打断" + e.getMessage());
}
}
}
复制代码
启动程序输出以下:
能够看到最后提交的任务被执行了,而第3个任务是第一个被放到线程队列的任务,被丢弃了。
DiscardPolicy DiscardPolicy策略:直接丢弃新的任务,不抛异常:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 3, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.DiscardPolicy());
threadPoolExecutor.execute(new shortTask("任务1"));
threadPoolExecutor.execute(new longTask("任务2"));
threadPoolExecutor.execute(new longTask("任务3"));
threadPoolExecutor.execute(new shortTask("任务4"));
threadPoolExecutor.execute(new shortTask("任务5"));
threadPoolExecutor.shutdown();
}
static class shortTask implements Runnable {
private String name;
public shortTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程当中被打断" + e.getMessage());
}
}
}
static class longTask implements Runnable {
private String name;
public longTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程当中被打断" + e.getMessage());
}
}
}
复制代码
启动程序,输出以下:
第5个任务直接被拒绝丢弃了,而没有抛出任何异常。
除了使用ThreadPoolExecutor的构造方法建立线程池外,咱们也可使用Executors提供的工厂方法来建立不一样类型的线程池:
newFixedThreadPool 查看newFixedThreadPool方法源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
能够看到,经过newFixedThreadPool建立的是一个固定大小的线程池,大小由nThreads参数指定,它具备以下几个特色:
由于corePoolSize和maximumPoolSize的值都为nThreads,因此线程池中线程数量永远等于nThreads,不可能新建除了核心线程数的线程来处理任务,即keepAliveTime实际上在这里是无效的。
LinkedBlockingQueue是一个无界队列(最大长度为Integer.MAX_VALUE),因此这个线程池理论是能够无限的接收新的任务,这就是为何上面没有指定拒绝策略的缘由。
newCachedThreadPool 查看newCachedThreadPool方法源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
这是一个理论上无限大小的线程池:
核心线程数为0,SynchronousQueue队列是没有长度的队列,因此当有新的任务提交,若是有空闲的还未超时的(最大空闲时间60秒)线程则执行该任务,不然新增一个线程来处理该任务。
由于线程数量没有限制,理论上能够接收无限个新任务,因此这里也没有指定拒绝策略。
newSingleThreadExecutor 查看newSingleThreadExecutor源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
核心线程数和最大线程数都为1,每次只能有一个线程处理任务。
LinkedBlockingQueue队列能够接收无限个新任务。
newScheduledThreadPool 查看newScheduledThreadPool源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
复制代码
因此newScheduledThreadPool理论是也是能够接收无限个任务,DelayedWorkQueue也是一个无界队列。
使用newScheduledThreadPool建立的线程池除了能够处理普通的Runnable任务外,它还具备调度的功能:
1.延迟指定时间后执行:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延迟5秒执行
executorService.schedule(() -> System.out.println("hello"), 5, TimeUnit.SECONDS);
复制代码
2.按指定的速率执行:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延迟1秒执行,而后每5秒执行一次
executorService.scheduleAtFixedRate(
() -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
);
复制代码
3.按指定的时延执行:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(
() -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
);
复制代码
乍一看,scheduleAtFixedRate和scheduleWithFixedDelay没啥区别,实际它们仍是有区别的:
scheduleAtFixedRate按照固定速率执行任务,好比每5秒执行一个任务,即便上一个任务没有结束,5秒后也会开始处理新的任务;
scheduleWithFixedDelay按照固定的时延处理任务,好比每延迟5秒执行一个任务,不管上一个任务处理了1秒,1分钟仍是1小时,下一个任务老是在上一个任务执行完毕后5秒钟后开始执行。
对于这些线程池工厂方法的使用,阿里巴巴编程规程指出:
由于这几个线程池理论是均可以接收无限个任务,因此这就有内存溢出的风险。实际上只要咱们掌握了ThreadPoolExecutor构造函数7个参数的含义,咱们就能够根据不一样的业务来建立出符合需求的线程池。通常线程池的建立能够参考以下规则:
IO密集型任务,线程池线程数量能够设置为2 X CPU核心数;
计算密集型任务,线程池线程数量能够设置为CPU核心数 + 1。
一些API的用法 ThreadPoolExecutor提供了几个判断线程池状态的方法:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPoolExecutor.shutdown();
System.out.println("线程池为shutdown状态:" + threadPoolExecutor.isShutdown());
System.out.println("线程池正在关闭:" + threadPoolExecutor.isTerminating());
System.out.println("线程池已经关闭:" + threadPoolExecutor.isTerminated());
threadPoolExecutor.awaitTermination(6, TimeUnit.SECONDS);
System.out.println("线程池已经关闭" + threadPoolExecutor.isTerminated());
}
复制代码
程序输出以下:
前面咱们提到,线程池核心线程即便是空闲状态也不会被销毁,除非使用allowCoreThreadTimeOut设置了容许核心线程超时:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.allowCoreThreadTimeOut(true);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("任务执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
复制代码
程序输出以下所示:
5秒后任务执行完毕,核心线程处于空闲的状态。由于经过allowCoreThreadTimeOut方法设置了容许核心线程超时,因此3秒后(keepAliveTime设置为3秒),核心线程被销毁。核心线程被销毁后,线程池也就没有做用了,因而就自动关闭了。
值得注意的是,若是一个线程池调用了allowCoreThreadTimeOut(true)方法,那么它的keepAliveTime不能为0。
ThreadPoolExecutor提供了一remove方法,查看其源码:
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
复制代码
可看到,它删除的是线程队列中的任务,而非正在被执行的任务。举个例子:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("任务执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Runnable r = () -> System.out.println("看看我是否会被删除");
threadPoolExecutor.execute(r);
threadPoolExecutor.remove(r);
threadPoolExecutor.shutdown();
}
复制代码
执行程序,输出以下:
可看到任务并无被执行,已经被删除,由于惟一一个核心线程已经在执行任务了,因此后提交的这个任务被放到了线程队列里,而后经过remove方法删除。
默认状况下,只有当往线程池里提交了任务后,线程池才会启动核心线程处理任务。咱们能够经过调用prestartCoreThread方法,让核心线程即便没有任务提交,也处于等待执行任务的活跃状态:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 2, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
}
复制代码
程序输出以下所示:
该方法返回boolean类型值,若是因此核心线程都启动了,返回false,反之返回true。
还有一个和它相似的prestartAllCoreThreads方法,它的做用是一次性启动全部核心线程,让其处于活跃地等待执行任务的状态。
ThreadPoolExecutor的invokeAny方法用于随机执行任务集合中的某个任务,并返回执行结果,该方法是同步方法:
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 5, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
// 任务集合
List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
return i;
}).collect(Collectors.toList());
// 随机执行结果
Integer result = threadPoolExecutor.invokeAny(tasks);
System.out.println("-------------------");
System.out.println(result);
threadPoolExecutor.shutdownNow();
}
复制代码
启动程序,输出以下:
ThreadPoolExecutor的invokeAll则是执行任务集合中的全部任务,返回Future集合:
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 5, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
return i;
}).collect(Collectors.toList());
List<Future<Integer>> futureList = threadPoolExecutor.invokeAll(tasks);
futureList.stream().map(f->{
try {
return f.get();
} catch (InterruptedException | ExecutionException e) {
return null;
}
}).forEach(System.out::println);
threadPoolExecutor.shutdownNow();
}
复制代码
输出以下:
方法 | 描述 |
---|---|
allowCoreThreadTimeOut(boolean value) | 是否容许核心线程空闲后超时,是的话超时后核心线程将销毁,线程池自动关闭 |
awaitTermination(long timeout, TimeUnit unit) | 阻塞当前线程,等待线程池关闭,timeout用于指定等待时间。 |
execute(Runnable command) | 向线程池提交任务,没有返回值 |
submit(Runnable task) | 向线程池提交任务,返回Future |
isShutdown() | 判断线程池是否为shutdown状态 |
isTerminating() | 判断线程池是否正在关闭 |
isTerminated() | 判断线程池是否已经关闭 |
remove(Runnable task) | 移除线程队列中的指定任务 |
prestartCoreThread() | 提早让一个核心线程处于活跃状态,等待执行任务 |
prestartAllCoreThreads() | 提早让全部核心线程处于活跃状态,等待执行任务 |
getActiveCount() | 获取线程池活跃线程数 |
getCorePoolSize() | 获取线程池核心线程数 |
threadPoolExecutor.getQueue() | 获取线程池线程队列 |
getMaximumPoolSize() | 获取线程池最大线程数 |
shutdown() | 让线程池处于shutdown状态,再也不接收任务,等待全部正在运行中的任务结束后,关闭线程池。 |
shutdownNow() | 让线程池处于stop状态,再也不接受任务,尝试打断正在运行中的任务,并关闭线程池,返回线程队列中的任务。 |