<!-- more -->java
本文原创地址,
个人博客
:
https://jsbintask.cn/2019/03/10/jdk/jdk8-threadpool/(食用效果最佳),转载请注明出处!
在实际工做中,线程
是一个咱们常常要打交道的角色,它能够帮咱们灵活利用资源,提高程序运行效率。可是咱们今天不是探讨线程
!咱们今天来聊聊另外一个与线程息息相关的角色:线程池
.本篇文章的目的就是全方位的解析线程池的做用,以及jdk中的接口,实现以及原理,另外对于某些重要概念,将从源码
的角度探讨。tip:本文较长,建议先码后看。
编程
首先咱们看一段建立线程而且运行的经常使用代码:数组
for (int i = 0; i < 100; i++) { new Thread(() -> { System.out.println("run thread->" + Thread.currentThread().getName()); //to do something, send email, message, io operator, network... }).start(); }
上面的代码很容易理解,咱们为了异步,或者效率考虑,将某些耗时操做放入一个新线程去运行,可是这样的代码却存在这样的问题:安全
时间
,资源
,对于线程的销毁一样须要系统资源。手动建立执行线程存在以上问题,而线程池就是用来解决这些问题的。怎么解决呢?咱们能够先粗略的定义一下线程池:多线程
线程池是一组已经建立好的,一直在等待任务执行的线程的集合。
由于线程池中线程是已经建立好的,因此对于任务的执行不会消耗掉额外的资源,线程池中线程个数由咱们自定义添加,可相对于资源,资源任务作出调整,对于某些任务,若是线程池还没有执行,可手动取消,线程任务变得可以管理!
因此,线程池的做用以下:并发
上面咱们已经知道了线程池的做用,而对于这样一个好用,重要的工具,jdk
固然已经为咱们提供了实现,这也是本篇文章的重点。
在jdk中,关于线程池的接口,类都定义在juc
(java.util.concurrent)包中,这是jdk专门为咱们提供用于并发编程的包,固然,本篇文章咱们只介绍与线程池有关的接口和类,首先咱们看下重点要学习的接口和类:
如图所示,咱们将一一讲解这6个类的做用而且分析。异步
首先咱们须要了解就是Executor
接口,它有一个方法,定义以下:
Executor自jdk1.5引入,这个接口只有一个方法execute
声明,它的做用以及定义以下:接收一个任务(Runnable
)而且执行。注意:同步执行仍是异步执行都可
!
由它的定义咱们就知道,它是一个线程池最基本的做用。可是在实际使用中,咱们经常使用的是另一个功能更多的子类ExecutorService
。工具
这个接口继承自Executor,它的方法定义就丰富多了,能够关闭,提交Future任务,批量提交任务,获取执行结果等,咱们一一讲解下每一个方法做用声明:学习
void shutdown()
: “优雅地”关闭线程池,为何是“优雅地”呢?由于这个线程池在关闭前会先等待线程池中已经有的任务执行完成,通常会配合方法awaitTermination
一块儿使用,调用该方法后,线程池中不能再加入新的任务。List<Runnable> shutdownNow();
: “尝试”终止正在执行的线程,返回在正在等待的任务列表,调用这个方法后,会调用正在执行线程的interrupt()
方法,因此若是正在执行的线程若是调用了sleep,join,await
等方法,会抛出InterruptedException
异常。boolean awaitTermination(long timeout, TimeUnit unit)
: 该方法是一个阻塞方法,参数分别为时间和时间单位。这个方法通常配合上面两个方法以后调用。若是先调用shutdown
方法,全部任务执行完成返回true,超时返回false,若是先调用的是shutdownNow
方法,正在执行的任务所有完成true,超时返回false。boolean isTerminated();
: 调用方法1或者2后,若是全部人物所有执行完毕则返回true,也就是说,就算全部任务执行完毕,可是不是先调用1或者2,也会返回false。<T> Future<T> submit(Callable<T> task);
: 提交一个可以返回结果的Callable
任务,返回任务结果抽象对象是Future
,调用Future.get()
方法能够阻塞等待获取执行结果,例如:result = exec.submit(aCallable).get();
,提交一个任务而且一直阻塞知道该任务执行完成获取到返回结果。this
<T> Future<T> submit(Runnable task, T result);
: 提交一个Runnable
任务,执行成功后调用Future.get()方法返回的是result(这是什么骚操做?)。Future<?> submit(Runnable task);
:和6不一样的是调用Future.get()
方法返回的是null
(这又是什么操做?)。<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
: 提交一组任务,而且返回每一个任务执行结果的抽象对象List<Future<T>>
,Future做用同上,值得注意的是:当调用其中任一Future.isDone()
(判断任务是否完成,正常,异常终止都算)方法时,必须等到全部任务都完成时才返回true,简单说:所有任务完成才算完成
。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
: 同方法8,多了一个时间参数,不一样的是:若是超时,Future.isDone()一样返回true。<T> T invokeAny(Collection<? extends Callable<T>> tasks)
:这个看名字和上面对比就容易理解了,返回第一个正常完成的任务地执行结果,后面没有完成的任务将被取消。<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
:同10相比,多了一个超时参数。不一样的是:在超时时间内,一个任务都没有完成,将抛出TimeoutException
。到如今,咱们已经知道了一个线程池基本的全部方法,知道了每一个方法的做用,接下来咱们就来看看具体实现,首先咱们研究下ExecutorService的具体实现抽象类:AbstractExecutorService
。
AbstractExecutorService
是一个抽象类,继承自ExecutorService
,它实现了ExecutorService接口的submit, invokeAll, invokeAny
方法,主要用于将ExecutorService的公共实现封装,方便子类更加方便使用,接下来咱们看看具体实现:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
execute方法交由子类实现了,这里咱们主要分析newTaskFor
方法,看它是如何构建Future对象的:
首先,RunnableFuture
接口定义以下:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
他就是Future和Runnable的组合,它的实现是FutureTask
:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; // ① try { for (Callable<T> t : tasks) { // ② RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); // ③ if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; // ④ return futures; } finally { if (!done) // ⑤ for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
execute
方法添加每个任务。Future.cancel()
(实际是调用执行线程的interrupt
方法。上面代码分析和咱们一开始讲解ExecutorService
的invokeAll
一致。
invokeAny
实际调用doInvokeAny
:
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs = // ① new ExecutorCompletionService<T>(this); try { ExecutionException ee = null; final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable<T>> it = tasks.iterator(); futures.add(ecs.submit(it.next())); // ② --ntasks; int active = 1; for (;;) { Future<T> f = ecs.poll(); // ③ if (f == null) { if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } else if (active == 0) break; else if (timed) { f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } else // ④ f = ecs.take(); } if (f != null) { // ⑤ --active; try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null) ee = new ExecutionException(); throw ee; } finally { for (int i = 0, size = futures.size(); i < size; i++) // ⑥ futures.get(i).cancel(true); } }
ExecutorCompletionService
ecs,这个对象实际是一个任务执行结果阻塞队列和线程池的结合,因此它能够加入任务,执行任务,将任务执行结果加入阻塞队列。上面代码分析和咱们一开始讲解ExecutorService
的invokeAny
一致。 到如今,咱们已经分析完了AbstractExecutorService
中的公共的方法,接下来就该研究最终的具体实现了:ThreadPoolExecutor
ThreadPoolExecutor
继承自AbstractExecutorService
,它是线程池的具体实现:
咱们首先分析下构造方法:`public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)`。
corePoolSize
:核心线程数,maximumPoolSize
:线程池最大容许线程数,workQueue
:任务队列,threadFactory
:线程建立工厂,handler
: 任务拒绝策,keepAliveTime, unit
:等待时长,它们的具体做用以下:提交一个task(Runnable)后(执行execute方法),检查总线程数是否小于corePoolSize,小于等于则使用threadFactory直接建立一个线程执行任务,大于则再次检查线程数量是否等于maximumPoolSize,等于则直接执行handler拒绝策略,小于则判断workQueue是否已经满了,没满则将任务加入等待线程执行,满了则使用threadFactory建立新线程执行队头任务。
经过流程图咱们知道每一个参数做用,这里值得注意的是,若是咱们将某些参数特殊化,则能够获得特殊的线程池:
allowCoreThreadTimeOut
使这个参数对线程池中全部线程都有效果。RejectedExecutionException
,这是线程池中的默认实现最后,咱们再分析下ThreadPoolExecutor核心方法execute
:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // ① if (workerCountOf(c) < corePoolSize) { // ② if (addWorker(command, true)) return; c = ctl.get(); // ③ } if (isRunning(c) && workQueue.offer(command)) { // ④ int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // ⑤ reject(command); else if (workerCountOf(recheck) == 0) // ⑥ addWorker(null, false); } else if (!addWorker(command, false)) // ⑦ reject(command); }
好了,到如今jdk中的线程池核心的实现,策略,分析咱们已经分析完成了。接下来我咱们就来看看关于线程池的另外的一些扩展,也就是图上的剩下的接口和类:
ScheduledExecutorService
继承自ExecutorService
,ExecutorService的分析上面咱们已经知道了,咱们来看看它扩展了哪些方法:
这个接口做为线程池的定义主要增长了能够定时执行任务
(执行一次)和按期执行任务
(重复执行),咱们来一一简述下每一个方法的做用。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
: 这个方法用于定时执行任务command,延迟的时间为delay*unit,它返回一个ScheduledFuture
对象用于获取执行结果或者剩余延时,调用Future.get()方法将阻塞当前线程最后返回null。public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
:同上,不一样的是,调用Future.get()方法将返回执行的结果,而不是null。public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,TimeUnit unit);
: 重复执行任务command,第一次执行时间为initialDelay延迟后,之后的执行时间将在initialDelay + period * n
,unit表明时间单位,值得注意的是,若是某次执行出现异常,后面该任务就不会再执行。或者经过返回对象Future手动取消,后面也将再也不执行。public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit);
: 效果同上,不一样点:若是command耗时为 y,则上面的计算公式为initialDelay + period * n + y
,也就是说,它的定时时间会加上任务耗时,而上面的方法则是一个固定的频率,不会算上任务执行时间!这是它扩展的四个方法,其中须要注意的是scheduleAtFixedRate和scheduleWithFixedDelay的细微差异,最后,咱们来看下它的实现类:ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
继承自ThreadPoolExecutor
类,实现了ScheduledExecutorService
接口,上面均已经分析。
它的构造器以下:
看起来比它的父类构造器简洁,主要由于它的任务队列workQueue是默认的(DelayedWorkQueue
),而且最大的线程数为最大值。接着咱们看下DelayedWorkQueue实现:
它内部使用数组维护了一个二叉树,提升了任务查找时间,而之因此ScheduledThreadPoolExecutor可以实现延时的关键也在于DelayedWorkQueue的take()
方法:
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // ① RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
好了,到目前为止jdk中关于线程池的6个核心类已经所有分析完毕了。接下来还有最后一个小问题,咱们手动建立线程池参数也太了,不论是ThreadPoolExecutor
仍是ScheduledThreadPoolExecutor
,这对于用户来讲彷佛并不太友好,固然,jdk已经想到了这个问题,因此,咱们最后再介绍一个建立这些线程池的工具类:Executors
:
它的主要工具方法以下:
比起手动建立,它帮咱们加了不少默认值,用起来固然就方便多了,好比说newFixedThreadPool
建立一个线程数固定的线程池,其实就是核心线程数等于最大线程数,和咱们一开始分析的结果同样。
值得注意的是:为了咱们的程序安全可控性考虑,咱们应该尽可能考虑手动建立线程池,知晓每个参数的做用,下降不稳定性!
本次,咱们首先从代码出发,分析了线程池给咱们带来的好处以及直接使用线程的弊端,接着引出了jdk中的已经实现了的线程池。而后重点分析了jdk中关于线程池的六个最重要的接口和类,而且从源码角度讲解了关键点实现,最后,处于方便考虑,咱们还知道jdk给咱们留了一个建立线程池的工具类,简化了手动建立线程池的步骤。
真正作到了知其然,知其因此然
。
关注我,这里只有干货!