目录java
JAVA经过多线程的方式实现并发,为了方便线程池的管理,JAVA采用线程池的方式对线线程的整个生命周期进行管理。1.5后引入的Executor框架的最大优势是把任务的提交和执行解耦。编程
要执行任务的人只需把Task描述清楚,而后提交便可。这个Task是怎么被执行的,被谁执行的,何时执行的,提交的人就不用关心了。数组
线程池同时能够避免建立大量线程的开销,提升响应速度。最近在阅读JVM相关的东西,一个对象的建立须要如下过程:安全
若是每次都是如此的建立线程->执行任务->销毁线程,会形成很大的性能开销。复用已建立好的线程能够提升系统的性能,借助池化技术的思想,经过预先建立好多个线程,放在池中,这样能够在须要使用线程的时候直接获取,避免屡次重复建立、销毁带来的开销。服务器
前面提到一个名词——池化技术,那么到底什么是池化技术呢?池化技术简单点来讲,就是提早保存大量的资源,以备不时之需。在机器资源有限的状况下,使用池化技术能够大大的提升资源的利用率,提高性能等。多线程
在编程领域,比较典型的池化技术有:并发
线程池、链接池、内存池、对象池等。框架
在Java中建立线程池可使用ThreadPoolExecutor,其继承关系以下图函数
其构造函数为:源码分析
代码块
Java
public ThreadPoolExecutor(int corePoolSize, //核心线程的数量 int maximumPoolSize, //最大线程数量 long keepAliveTime, //超出核心线程数量之外的线程空余存活时间 TimeUnit unit, //存活时间的单位 BlockingQueue<Runnable> workQueue, //保存待执行任务的队列 ThreadFactory threadFactory, //建立新线程使用的工厂 RejectedExecutionHandler handler // 当任务没法执行时的处理器 ) {...}
在线程数少于核心数量时,有新任务进来就新建一个线程,即便有的线程没事干
等超出核心数量后,就不会新建线程了,空闲的线程就得去任务队列里取任务执行了
包括核心线程池数量 + 核心之外的数量
若是任务队列满了,而且池中线程数小于最大线程数,会再建立新的线程执行任务
若是给线程池设置 allowCoreThreadTimeOut(true),则核心线程在空闲时头上也会响起死亡的倒计时
若是任务是多而容易执行的,能够调大这个参数,那样线程就能够在存活的时间里有更大可能接受新任务
不一样的任务类型有不一样的选择,下一小节介绍
能够给线程起个好听的名字,设置个优先级啥的
handler:饱和策略,你们都很忙,咋办呢,有四种策略
若是把线程比做员工,那么线程池能够比做一个团队,核心池比做团队中正式员工数,核心池外的比做外包员工。
经过Executors静态工厂也能够构建经常使用的线程池,在详细介绍以前,还须要先了解线程池中任务的执行顺序
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
从注释中能够看处处理逻辑,从判断条件中能够看到核心模块
概略图:
详细流程图:
按照上面的总结,能够逐一分析Executors工厂类提供的现成的线程池:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
不招外包,有固定数量核心成员的正常互联网团队。
能够看到,FixedThreadPool 的核心线程数和最大线程数都是指定值,也就是说当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中。
此外 keepAliveTime 为 0,也就是多余的空余线程会被当即终止(因为这里没有多余线程,这个参数也没什么意义了)。
而这里选用的阻塞队列是 LinkedBlockingQueue,使用的是默认容量 Integer.MAX_VALUE,至关于没有上限。
所以这个线程池执行任务的流程以下:
线程数少于核心线程数,也就是设置的线程数时,新建线程执行任务
线程数等于核心线程数后,将任务加入阻塞队列
因为队列容量很是大,能够一直加加加
执行完任务的线程反复去队列中取任务执行
FixedThreadPool 用于负载比较重的服务器,为了资源的合理利用,须要限制当前线程数量。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
不招外包,只有一个核心成员的创业团队。
从参数能够看出来,SingleThreadExecutor 至关于特殊的 FixedThreadPool,它的执行流程以下:
线程池中没有线程时,新建一个线程执行任务
有一个线程之后,将任务加入阻塞队列,不停加加加
惟一的这一个线程不停地去队列里取任务执行
听起来很可怜的样子 - -。
SingleThreadExecutor 用于串行执行任务的场景,每一个任务必须按顺序执行,不须要并发执行。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
所有外包,没活最多待 60 秒的外包团队。
能够看到,CachedThreadPool 没有核心线程,非核心线程数无上限,也就是所有使用外包,可是每一个外包空闲的时间只有 60 秒,超事后就会被回收。
CachedThreadPool 使用的队列是 SynchronousQueue,这个队列的做用就是传递任务,并不会保存。
所以当提交任务的速度大于处理任务的速度时,每次提交一个任务,就会建立一个线程。极端状况下会建立过多的线程,耗尽 CPU 和内存资源。
它的执行流程以下:
没有核心线程,直接向 SynchronousQueue 中提交任务
若是有空闲线程,就去取出任务执行;若是没有空闲线程,就新建一个
执行完任务的线程有 60 秒生存时间,若是在这个时间内能够接到新任务,就能够继续活下去,不然就拜拜
因为空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。
CachedThreadPool 用于并发执行大量短时间的小任务,或者是负载较轻的服务器。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
按期维护的 2B 业务团队,核心与外包成员都有。
ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor, 最多线程数为 Integer.MAX_VALUE ,使用 DelayedWorkQueue 做为任务队列。
ScheduledThreadPoolExecutor 添加任务和执行任务的机制与ThreadPoolExecutor 有所不一样。
ScheduledThreadPoolExecutor 添加任务提供了另外两个方法:
scheduleAtFixedRate() :按某种速率周期执行
scheduleWithFixedDelay():在某个延迟后执行
它俩的代码以下:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
能够看到,这两种方法都是建立了一个 ScheduledFutureTask 对象,调用 decorateTask() 方法转成 RunnableScheduledFuture 对象,而后添加到队列中。
看下 ScheduledFutureTask 的主要属性:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { //添加到队列中的顺序 private final long sequenceNumber; //什么时候执行这个任务 private volatile long time; //执行的间隔周期 private final long period; //实际被添加到队列中的 task RunnableScheduledFuture<V> outerTask = this; //在 delay queue 中的索引,便于取消时快速查找 int heapIndex; //... }
DelayQueue 中封装了一个优先级队列,这个队列会对队列中的 ScheduledFutureTask 进行排序,两个任务的执行 time 不一样时,time 小的先执行;不然比较添加到队列中的顺序 sequenceNumber ,先提交的先执行。
ScheduledThreadPoolExecutor 的执行流程以下:
调用上面两个方法添加一个任务
线程池中的线程从 DelayQueue 中取任务
而后执行任务
具体执行任务的步骤也比较复杂:
线程从 DelayQueue 中获取 time 大于等于当前时间的 ScheduledFutureTask
DelayQueue.take()
执行完后修改这个 task 的 time 为下次被执行的时间
而后再把这个 task 放回队列中
DelayQueue.add()
ScheduledThreadPoolExecutor 用于须要多个后台线程执行周期任务,同时须要限制线程数量的场景。
阿里巴巴Java开发手册中明确指出,『不容许』使用Executors建立线程池。
经过上面的例子,咱们知道了Executors建立的线程池存在OOM的风险,那么究竟是什么缘由致使的呢?咱们须要深刻Executors的源码来分析一下。
其实,在上面的报错信息中,咱们是能够看出蛛丝马迹的,在以上的代码中其实已经说了,真正的致使OOM的实际上是LinkedBlockingQueue.offer方法。
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371) at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
若是对Java中的阻塞队列有所了解的话,看到这里或许就可以明白缘由了。
Java中的BlockingQueue主要有两种实现,分别是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一个用数组实现的有界阻塞队列,必须设置容量。
LinkedBlockingQueue是一个用链表实现的有界阻塞队列,容量能够选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。
这里的问题就出在:不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。也就是说,若是咱们不设置LinkedBlockingQueue的容量的话,其默认容量将会是Integer.MAX_VALUE。
而newFixedThreadPool中建立LinkedBlockingQueue时,并未指定容量。此时,LinkedBlockingQueue就是一个无边界队列,对于一个无边界队列来讲,是能够不断的向队列中加入任务的,这种状况下就有可能由于任务过多而致使内存溢出问题。
上面提到的问题主要体如今newFixedThreadPool和newSingleThreadExecutor两个工厂方法上,并非说newCachedThreadPool和newScheduledThreadPool这两个方法就安全了,这两种方式建立的最大线程数多是Integer.MAX_VALUE,而建立这么多线程,必然就有可能致使OOM。
从方法execute的实现能够看出:addWorker主要负责建立新的线程并执行任务,代码以下(这里代码有点长,不要紧,也是分块的,总共有5个关键的代码块):
第一个红框:作是否可以添加工做线程条件过滤:
第二个红框:作自旋,更新建立线程数量:
接着看后面的代码:
第一个红框:获取线程池主锁。
接下来,咱们看看workers是什么。
一个hashSet。因此,线程池底层的存储结构其实就是一个HashSet。
这两个钩子(beforeExecute,afterExecute)容许咱们本身继承线程池,作任务执行先后处理。
到这里,源代码分析到此为止。接下来作一下简单的总结。
所谓线程池本质是一个hashSet。多余的任务会放在阻塞队列中。
只有当阻塞队列满了后,才会触发非核心线程的建立。因此非核心线程只是临时过来打杂的。直到空闲了,而后本身关闭了。
线程池提供了两个钩子(beforeExecute,afterExecute)给咱们,咱们继承线程池,在执行任务先后作一些事情。
线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池)