深刻分析java线程池的实现原理

前言java

线程是稀缺资源,若是被无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有如下好处:数组

一、下降资源消耗;缓存

二、提升响应速度;并发

三、提升线程的可管理性。 Java1.5中引入的Executor框架把任务的提交和执行进行解耦,只须要定义好任务,而后提交给线程池,而不用关心该任务是如何执行、被哪一个线程执行,以及何时执行。框架

demo异步

一、Executors.newFixedThreadPool(10)初始化一个包含10个线程的线程池executor;性能

二、经过executor.execute方法提交20个任务,每一个任务打印当前的线程名;this

三、负责执行任务的线程的生命周期都由Executor框架进行管理;线程

ThreadPoolExecutor设计

Executors是java线程池的工厂类,经过它能够快速初始化一个符合业务需求的线程池,如Executors.newFixedThreadPool方法能够生成一个拥有固定线程数的线程池。

其本质是经过不一样的参数初始化一个ThreadPoolExecutor对象,具体参数描述以下:

corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池建立一个新线程执行任务,直到当前线程数等于corePoolSize;若是当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;若是执行了线程池的prestartAllCoreThreads()方法,线程池会提早建立并启动全部核心线程。

maximumPoolSize

线程池中容许的最大线程数。若是当前阻塞队列满了,且继续提交任务,则建立新的线程执行任务,前提是当前线程数小于maximumPoolSize;

keepAliveTime

线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认状况下,该参数只在线程数大于corePoolSize时才有用;

unit

keepAliveTime的单位;

workQueue

用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了以下阻塞队列:

一、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;

二、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量一般要高于ArrayBlockingQuene;

三、SynchronousQuene:一个不存储元素的阻塞队列,每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于LinkedBlockingQuene;

四、priorityBlockingQuene:具备优先级的无界阻塞队列;

threadFactory

建立线程的工厂,经过自定义的线程工厂能够给每一个新建的线程设置一个具备识别度的线程名。

handler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工做线程,若是继续提交任务,必须采起一种策略处理该任务,线程池提供了4种策略:

一、AbortPolicy:直接抛出异常,默认策略;

二、CallerRunsPolicy:用调用者所在的线程来执行任务;

三、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

四、DiscardPolicy:直接丢弃任务; 固然也能够根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

Exectors

Exectors工厂类提供了线程池的初始化接口,主要有以下几种:

newFixedThreadPool

初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene做为阻塞队列,不过当线程池没有可执行任务时,也不会释放线程。

newCachedThreadPool

一、初始化一个能够缓存线程的线程池,默认缓存60s,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue做为阻塞队列;

二、和newFixedThreadPool建立的线程池不一样,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,若是没有空闲线程,则建立新线程执行任务,会致使必定的系统开销;

因此,使用该线程池时,必定要注意控制并发的任务数,不然建立大量的线程可能致使严重的性能问题。

newSingleThreadExecutor

初始化的线程池中只有一个线程,若是该线程异常结束,会从新建立一个新的线程继续执行任务,惟一的线程能够保证所提交任务的顺序执行,内部使用LinkedBlockingQueue做为阻塞队列。

newScheduledThreadPool

初始化的线程池能够在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可使用该线程池按期的同步数据。

实现原理

除了newScheduledThreadPool的内部实现特殊一点以外,其它几个线程池都是基于ThreadPoolExecutor类实现的。

线程池内部状态

其中AtomicInteger变量ctl的功能很是强大:利用低29位表示线程池中线程数,经过高3位表示线程池的运行状态:

一、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;

二、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;

三、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,并且会中断正在运行的任务;

四、TIDYING : 2 << COUNT_BITS,即高3位为010;

五、TERMINATED: 3 << COUNT_BITS,即高3位为011;

任务提交

线程池框架提供了两种方式提交任务,根据不一样的业务需求选择不一样的方式。

Executor.execute()

经过Executor.execute()方法提交的任务,必须实现Runnable接口,该方式提交的任务不能获取返回值,所以没法判断任务是否执行成功。

ExecutorService.submit()

经过ExecutorService.submit()方法提交的任务,能够获取任务执行完的返回值。

任务执行 当向线程池中提交一个任务,线程池会如何处理该任务?

execute实现

具体的执行流程以下:

一、workerCountOf方法根据ctl的低29位,获得线程池的当前线程数,若是线程数小于corePoolSize,则执行addWorker方法建立新的线程执行任务;不然执行步骤(2);

二、若是线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中,则执行步骤(3),不然执行步骤(4);

三、再次检查线程池的状态,若是线程池没有RUNNING,且成功从阻塞队列中删除任务,则执行reject方法处理任务;

四、执行addWorker方法建立新的线程执行任务,若是addWoker执行失败,则执行reject方法处理任务;

addWorker实现

从方法execute的实现能够看出:addWorker主要负责建立新的线程并执行任务,代码实现以下:

这只是addWoker方法实现的前半部分:

一、判断线程池的状态,若是线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回;

二、经过参数core判断当前须要建立的线程是否为核心线程,若是core为true,且当前线程数小于corePoolSize,则跳出循环,开始建立新的线程,具体实现以下:

线程池的工做线程经过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程,其中Worker类设计以下:

一、继承了AQS类,能够方便的实现工做线程的停止操做;

二、实现了Runnable接口,能够将自身做为一个任务在工做线程中执行;

三、当前提交的任务firstTask做为参数传入Worker的构造方法;

从Woker类的构造方法实现能够发现:线程工厂在建立线程thread时,将Woker实例自己this做为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。

runWorker实现

runWorker方法是线程池的核心:

一、线程启动以后,经过unlock方法释放锁,设置AQS的state为0,表示运行中断;

二、获取第一个任务firstTask,执行任务的run方法,不过在执行任务以前,会进行加锁操做,任务执行完会释放锁;

三、在执行任务的先后,能够根据业务场景自定义beforeExecute和afterExecute方法;

四、firstTask执行完成以后,经过getTask方法从阻塞队列中获取等待的任务,若是队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

getTask实现

整个getTask操做在自旋下完成:

一、workQueue.take:若是阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;

二、workQueue.poll:若是在keepAliveTime时间内,阻塞队列仍是没有任务,则返回null;

因此,线程池中实现的线程能够一直执行由用户提交的任务。

Future和Callable实现

经过ExecutorService.submit()方法提交的任务,能够获取任务执行完的返回值。

在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。

一、Callable接口相似于Runnable,只是Runnable没有返回值。

二、Callable任务除了返回正常结果以外,若是发生异常,该异常也会被返回,即Future能够拿到异步执行任务各类结果;

三、Future.get方法会致使主线程阻塞,直到Callable任务执行完成;

submit实现

经过submit方法提交的Callable任务会被封装成了一个FutureTask对象。

FutureTask

一、FutureTask在不一样阶段拥有不一样的状态state,初始化为NEW;

二、FutureTask类实现了Runnable接口,这样就能够经过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;

FutureTask.get实现

内部经过awaitDone方法对主线程进行阻塞,具体实现以下:

一、若是主线程被中断,则抛出中断异常;

二、判断FutureTask当前的state,若是大于COMPLETING,说明任务已经执行完成,则直接返回;

三、若是当前state等于COMPLETING,说明任务已经执行完,这时主线程只需经过yield方法让出cpu资源,等待state变成NORMAL;

四、经过WaitNode类封装当前线程,并经过UNSAFE添加到waiters链表;

五、最终经过LockSupport的park或parkNanos挂起线程; FutureTask.run实现

FutureTask.run方法是在线程池中被执行的,而非主线程

一、经过执行Callable任务的call方法;

二、若是call执行成功,则经过set方法保存结果;

三、若是call执行有异常,则经过setException保存异常;

set

setException

set和setException方法中,都会经过UnSAFE修改FutureTask的状态,并执行finishCompletion方法通知主线程任务已经执行完成;

finishCompletion

一、执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中;

二、FutureTask任务执行完成后,经过UNSAFE设置waiters的值,并经过LockSupport类unpark方法唤醒主线程;

相关文章
相关标签/搜索