本文主要讲解了Java
里面线程池的接口以及实现类,以及它们的基本使用方法,内容包括:java
Executor
/Executors
ExecutorService
ThreadPoolExecutor
ScheduledThreadPoolExecutor
Executor
+ExecutorService
Executor
是一个接口,里面只是定义了一个简单的任务提交方法:shell
//Executor package java.util.concurrent; public interface Executor { void execute(Runnable var1); }
而ExecutorService
也是一个接口,继承了Executor
,而且提供了更多用于任务提交和管理的一些方法,好比中止任务的执行等:安全
//ExecutorService package java.util.concurrent; import java.util.Collection; import java.util.List; public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException; <T> Future<T> submit(Callable<T> var1); <T> Future<T> submit(Runnable var1, T var2); Future<?> submit(Runnable var1); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException; }
下面将详细讲述ExecutorService
的两个重要实现:bash
ThreadPoolExecutor
ScheduledThreadPoolExecutor
ThreadPoolExecutor
这就是一般所说的线程池类,一般来讲,一个线程池有以下特征:并发
先来看一个简单的例子:框架
public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); //执行没有返回值的任务 executor.execute(()-> System.out.println(" Execute the runnable task.")); //执行带返回值的任务,用到了Future泛型类 Future<String> future = executor.submit(()->" Execute the callable task and this is the result."); //经过get()获取任务结果,get()会在任务未完成时一直阻塞 System.out.println(future.get()); //手动关闭线程池 executor.shutdown(); } }
从这个简单的例子能够看到,线程池能够执行带返回值以及不带返回值的任务,带返回值的话须要使用get()
方法阻塞获取。另外,运行完毕后须要手动关闭线程池,不然JVM
不会退出,由于线程池中有指定数量的活跃线程数量,而JVM
正常退出的条件是JVM
进程中不存在任何运行着的非守护进程。异步
构造方法的源码以下:ide
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
虽然提供了四个构造方法,但本质上调用的是最后一个构造方法,该构造方法带有7个参数,分别是:高并发
corePoolSize
:核心线程数量,即便当线程池中的核心线程不工做,核心线程的数量也不会减小。该参数的最小值为0,且小于等于maximumPoolSize
maximumPoolSize
:用于设置线程池中容许的线程数量的最大值keepAliveTime
:当线程池中的线程数量超过核心线程数而且处于空闲时,线程池将会回收一部分线程让出系统资源,该参数可用于设置超过corePoolSize
数量的线程在多长时间后被回收,与后一个表示时间单位的参数unit
配合使用unit
:用于设定keepAliveTime
的时间单位workQueure
:用于存放已提交至线程池但未被执行的任务threadFactory
:用于建立线程的工厂,开发者能够自定义ThreadFactory
来建立线程handler
:拒绝策略,当任务超过阻塞队列的边界时,线程池会拒绝新增的任务,主要用于设置拒绝策略线程池被成功建立后,内部的运行线程并不会被当即建立,ThreadPoolExecutor
会采用一种Lazy
的方式去建立而且运行。首次调用执行任务方法时才会建立线程,好比:this
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); assert executor.getActiveCount() == 0; assert executor.getMaximumPoolSize() == 4; assert executor.getCorePoolSize() == 2; executor.execute(()-> System.out.println(" Execute the runnable task.")); assert executor.getActiveCount() == 1; assert executor.getMaximumPoolSize() == 4; assert executor.getCorePoolSize() == 2;
(运行的时候请加上-ea
参数)
下面看一下任务的具体执行流程:
RejectedEcecutionHandler
keepAliveTime
指定时间,会回收线程,直到保留corePoolSize
个核心线程为止(不过核心线程也能够设置被超时回收,默认不开启核心线程超时)线程工厂ThreadFactory
是一个接口:
package java.util.concurrent; public interface ThreadFactory { Thread newThread(Runnable var1); }
使用线程工厂能够在建立线程时加入自定义配置,好比指定名字、优先级、是否为守护线程等,好比下面是线程工厂的一个简单实现:
public class TestThreadFactory implements ThreadFactory { private final static String PREFIX = "Test thread["; private final static String SUFFIX = "]"; private final static AtomicInteger THREAD_NUM = new AtomicInteger(); @Override public Thread newThread(Runnable runnable) { ThreadGroup group = new ThreadGroup("My pool"); Thread thread = new Thread(group,runnable,PREFIX+THREAD_NUM.getAndIncrement()+SUFFIX); thread.setPriority(5); return thread; } }
默认状况下,ThreadPoolExecutor
提供了四种拒绝策略:
DiscardPolicy
:丢弃策略,直接丢弃任务AbortPolicy
:终止策略,抛出RejectedExecutionException
DiscardOldestPolicy
:丢弃队列中最老任务的策略(严格意义来讲须要根据任务队列去选择,由于不是全部的队列都是FIFO
的)CallerRunsPolicy
:调用者线程执行策略,任务会在当前线程中阻塞执行固然,若是不能知足须要,能够实现RejectedExecutionHandler
接口去自定义策略:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable var1, ThreadPoolExecutor var2); }
若是不须要线程池,那么须要手动对线程池关闭。线程池提供了以下三种方式:
shutdown()
shutdownNow()
shutdown()+shutdownNow()
shutdown()
提供了一种有序关闭的方式去关闭线程池,调用该方法后,会等待当前执行的任务所有执行完成而后关闭,同时新提交任务将会被拒绝。注意该方法是非阻塞,当即返回的。若是须要查看关闭状态,可使用:
isShutdown()
:返回是否调用了shutdown()
的结果isTerminating()
:返回是否正在结束中isTerminated()
:返回是否已经结束shutdownNow()
方法首先将线程池状态修改成shutdown
状态,而后将未被执行的任务挂起,接着将尝试中断运行中的线程,最后返回未执行的任务:
public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); IntStream.range(0,10).forEach(i-> executor.execute(()-> { try{ TimeUnit.SECONDS.sleep(5); }catch (Exception e){ e.printStackTrace(); } })); List<Runnable> runnables = executor.shutdownNow(); System.out.println(runnables.size()); }
输出:
8 BUILD SUCCESSFUL in 326ms 2 actionable tasks: 2 executed java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:339) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at com.company.Main.lambda$main$0(Main.java:29) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:339) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at com.company.Main.lambda$main$0(Main.java:29) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 3:14:36 AM: Task execution finished 'Main.main()'.
为了确保安全关闭线程池,通常会使用组合方式关闭,确保正在运行的任务被正常执行的同时又能提升线程池被关闭的成功率,例子以下:
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); IntStream.range(0,10).forEach(i-> executor.execute(()-> { try{ TimeUnit.SECONDS.sleep(5); }catch (Exception e){ e.printStackTrace(); } })); //首先调用shutdown()尝试关闭 executor.shutdown(); try{ //若是等待一段时间后还没关闭 if(!executor.awaitTermination(10,TimeUnit.SECONDS)){ //强制关闭 executor.shutdownNow(); //若是强制关闭失败,好比运行的线程异常耗时且不能被中断 if(!executor.awaitTermination(10,TimeUnit.SECONDS)){ //其余处理,这里只是输出中断失败的信息 System.out.println("Terminate failed."); } } }catch (InterruptedException e){ //若是当前线程被中断,而且捕获了异常,执行当即关闭方法 executor.shutdownNow(); //从新抛出中断信号 Thread.currentThread().interrupt(); }
ScheduledThreadPoolExecutor
ScheduledExecutorService
继承了ExecutorService
,而且提供了任务被定时执行的特性,可使用ScheduledThreadPoolExecutor
去实现某些特殊的任务执行。固然实现固定任务的方法或者框架有不少,有原生的shell
实现,老式的Timer/TimerTask
实现,或者专门的框架Quartz
实现,这里要说的是JDK
内部的实现ScheduledThreadPoolExecutor
。
ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor
,除了具有ThreadPoolExecutor
的全部方法外,还定义了4个与schedule
有关的方法:
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
:一个one-shot
(只执行一次)的方法, 任务(callable
)会在单位时间(delay
)后被执行,而且当即返回ScheduledFuture
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
:也是一个one-shot
方法,任务会在单位时间后被执行,与第一个方法不一样的是返回的ScheduledFuture
不包含任何执行结果,可是能够经过返回的ScheduledFuture
判断任务是否执行结束ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
:任务会根据固定的速率在initialDelay
后不断被执行ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
:任务将以固定延迟单位时间的方式执行任务关于后二者的区别以下:
public static void main(String[] args) throws Exception { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); Runnable runnable = ()->{ long startTimestamp = System.currentTimeMillis(); System.out.println("current timestamp: "+startTimestamp); try{ TimeUnit.MILLISECONDS.sleep(current().nextInt(100)); }catch (Exception e){ e.printStackTrace(); } System.out.println("elapsed time: "+(System.currentTimeMillis() - startTimestamp)); }; executor.scheduleAtFixedRate(runnable,10,1000,TimeUnit.MILLISECONDS); // executor.scheduleWithFixedDelay(runnable,10,1000,TimeUnit.MILLISECONDS); }
输出:
current timestamp: 1619351675438 elapsed time: 97 current timestamp: 1619351676438 elapsed time: 85 current timestamp: 1619351677438 elapsed time: 1 current timestamp: 1619351678438 elapsed time: 1 current timestamp: 1619351679438 elapsed time: 68 current timestamp: 1619351680438 elapsed time: 99
能够看到任务始终以一种固定的速率运行,每次运行的开始时间始终相隔1000ms
。
而使用FixedDelay
的输出以下:
current timestamp: 1619351754890 elapsed time: 53 current timestamp: 1619351755944 elapsed time: 30 current timestamp: 1619351756974 elapsed time: 13 current timestamp: 1619351757987 elapsed time: 80 current timestamp: 1619351759068 elapsed time: 94 current timestamp: 1619351760162 elapsed time: 29
每次开始的时间为上一次执行完成后的时间再加上时间间隔(1000ms
)。
Executors
中的线程池Executors
类提供了六种建立线程池的静态方法:
FixedThreadPool
SingleThreadExecutor
CachedThreadPool
ScheduledThreadPool
SingleThreadScheduledExecutor
WorkStealingPool
下面分别来看一下。
FixedThreadPool
源码以下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); }
FixedThreadPool
底层调用的是ThreadPoolExecutor
,默认建立的核心线程数与最大线程数相等,任务队列为无边界的LinkedBlockingQueue
。
SingleThreadExecutor
相关源码以下:
public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory)); } private static class FinalizableDelegatedExecutorService extends Executors.DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
能够看到SingleThreadPool
其实是内部类FinalizableDelegatedExecutorService
的包装,核心线程与最大线程数均为1,任务队列为无边界的LinkedBlockingQueue
。发生GC
的时候,会调用shutdown()
方法。
CachedThreadPool
源码以下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory); }
CachedThreadPool
会根据须要建立新线程,一般用于执行量大的,耗时较短的异步任务。未被使用且空闲时间超过60s
的线程会被回收。
ScheduledThreadPool
源码以下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
建立指定核心数ScheduledThreadPoolExecutor
。
SingleThreadScheduledExecutor
源码以下:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory)); } private static class DelegatedScheduledExecutorService extends Executors.DelegatedExecutorService implements ScheduledExecutorService { private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); this.e = executor; } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return this.e.schedule(command, delay, unit); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { return this.e.schedule(callable, delay, unit); } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return this.e.scheduleAtFixedRate(command, initialDelay, period, unit); } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return this.e.scheduleWithFixedDelay(command, initialDelay, delay, unit); } }
其实就是SingelThreadPool
+ScheduledThreadPool
。
WorkStealingPool
源码以下:
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true); } public static ExecutorService newWorkStealingPool() { return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true); }
WorkStealingPool
是JDK8
引入的线程池,返回的是ForkJoinPool
。在WorkStealingPool
中,若是每一个线程处理的任务执行比较耗时,那么它负责的任务会被其余线程“窃取”,进而提升并发处理的效率。