【转载】从使用到原理学习Java线程池

线程池的技术背景

在面向对象编程中,建立和销毁对象是很费时间的,由于建立一个对象要获取内存资源或者其它更多资源。在Java中更是如此,虚拟机将试图跟踪每个对象,以便可以在对象销毁后进行垃圾回收。css

因此提升服务程序效率的一个手段就是尽量减小建立和销毁对象的次数,特别是一些很耗资源的对象建立和销毁。如何利用已有对象来服务就是一个须要解决的关键问题,其实这就是一些”池化资源”技术产生的缘由。html

例如Android中常见到的不少通用组件通常都离不开”池”的概念,如各类图片加载库,网络请求库,即便Android的消息传递机制中的Meaasge当使用Meaasge.obtain()就是使用的Meaasge池中的对象,所以这个概念很重要。本文将介绍的线程池技术一样符合这一思想。java

线程池的优势:编程

  • 重用线程池中的线程,减小因对象建立,销毁所带来的性能开销;
  • 能有效的控制线程的最大并发数,提升系统资源利用率,同时避免过多的资源竞争,避免堵塞;
  • 可以多线程进行简单的管理,使线程的使用简单、高效。

线程池框架Executor

java中的线程池是经过Executor框架实现的,Executor 框架包括类:Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable和Future、FutureTask的使用等。bash

Executor: 全部线程池的接口,只有一个方法。markdown

public interface Executor { void execute(Runnable command); }

ExecutorService: 增长Executor的行为,是Executor实现类的最直接接口。网络

Executors: 提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService 接口。多线程

ThreadPoolExecutor:线程池的具体实现类,通常用的各类线程池都是基于这个类实现的。
构造方法以下:并发

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
  • corePoolSize:线程池的核心线程数,线程池中运行的线程数也永远不会超过 corePoolSize 个,默认状况下能够一直存活。能够经过设置allowCoreThreadTimeOut为True,此时 核心线程数就是0,此时keepAliveTime控制全部线程的超时时间。
  • maximumPoolSize:线程池容许的最大线程数;
  • keepAliveTime: 指的是空闲线程结束的超时时间;
  • unit :是一个枚举,表示 keepAliveTime 的单位;
  • workQueue:表示存听任务的BlockingQueue<Runnable队列。
  • BlockingQueue:阻塞队列(BlockingQueue)是java.util.concurrent下的主要用来控制线程同步的工具。若是BlockQueue是空的,从BlockingQueue取东西的操做将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒。一样,若是BlockingQueue是满的,任何试图往里存东西的操做也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操做。
    阻塞队列经常使用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。具体的实现类有LinkedBlockingQueue,ArrayBlockingQueued等。通常其内部的都是经过Lock和Condition(显示锁(Lock)及Condition的学习与使用)来实现阻塞和唤醒。

线程池的工做过程以下:框架

  1. 线程池刚建立时,里面没有一个线程。任务队列是做为参数传进来的。不过,就算队列里面有任务,线程池也不会立刻执行它们。
  2. 当调用 execute() 方法添加一个任务时,线程池会作以下判断:
    • 若是正在运行的线程数量小于 corePoolSize,那么立刻建立线程运行这个任务;
    • 若是正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
    • 若是这时候队列满了,并且正在运行的线程数量小于 maximumPoolSize,那么仍是要建立非核心线程马上运行这个任务;
    • 若是队列满了,并且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
  4. 当一个线程无事可作,超过必定的时间(keepAliveTime)时,线程池会判断,若是当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。因此线程池的全部任务完成后,它最终会收缩到 corePoolSize 的大小。

线程池的建立和使用

生成线程池采用了工具类Executors的静态方法,如下是几种常见的线程池。

SingleThreadExecutor:单个后台线程 (其缓冲队列是无界的)

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

建立一个单线程的线程池。这个线程池只有一个核心线程在工做,也就是至关于单线程串行执行全部任务。若是这个惟一的线程由于异常结束,那么会有一个新的线程来替代它。此线程池保证全部任务的执行顺序按照任务的提交顺序执行。

FixedThreadPool:只有核心线程的线程池,大小固定 (其缓冲队列是无界的) 。

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

建立固定大小的线程池。每次提交一个任务就建立一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,若是某个线程由于执行异常而结束,那么线程池会补充一个新线程。

CachedThreadPool:无界线程池,能够进行自动线程回收。

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

若是线程池的大小超过了处理任务所须要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增长时,此线程池又能够智能的添加新线程来处理任务。此线程池不会对线程池大小作限制,线程池大小彻底依赖于操做系统(或者说JVM)可以建立的最大线程大小。SynchronousQueue是一个是缓冲区为1的阻塞队列。

ScheduledThreadPool:核心线程池固定,大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

public static ExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPool(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }

建立一个周期性执行任务的线程池。若是闲置,非核心线程池会在DEFAULT_KEEPALIVEMILLIS时间内回收。

线程池最经常使用的提交任务的方法有两种:

execute:

ExecutorService.execute(Runnable runable);

submit:

FutureTask task = ExecutorService.submit(Runnable runnable);

FutureTask<T> task = ExecutorService.submit(Runnable runnable,T Result); FutureTask<T> task = ExecutorService.submit(Callable<T> callable);

submit(Callable callable)的实现,submit(Runnable runnable)同理。

public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); FutureTask<T> ftask = newTaskFor(task); execute(ftask); return ftask; }

能够看出submit开启的是有返回结果的任务,会返回一个FutureTask对象,这样就能经过get()方法获得结果。submit最终调用的也是execute(Runnable runable),submit只是将Callable对象或Runnable封装成一个FutureTask对象,由于FutureTask是个Runnable,因此能够在execute中执行。关于Callable对象和Runnable怎么封装成FutureTask对象,见Callable和Future、FutureTask的使用

线程池实现的原理

若是只讲线程池的使用,那这篇博客没有什么大的价值,充其量也就是熟悉Executor相关API的过程。线程池的实现过程没有用到Synchronized关键字,用的都是Volatile,Lock和同步(阻塞)队列,Atomic相关类,FutureTask等等,由于后者的性能更优。理解的过程能够很好的学习源码中并发控制的思想。

在开篇提到过线程池的优势是可总结为如下三点:

  1. 线程复用
  2. 控制最大并发数
  3. 管理线程

1.线程复用过程

理解线程复用原理首先应了解线程生命周期。

在线程的生命周期中,它要通过新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)5种状态。

Thread经过new来新建一个线程,这个过程是是初始化一些线程信息,如线程名,id,线程所属group等,能够认为只是个普通的对象。调用Thread的start()后Java虚拟机会为其建立方法调用栈和程序计数器,同时将hasBeenStarted为true,以后调用start方法就会有异常。

处于这个状态中的线程并无开始运行,只是表示该线程能够运行了。至于该线程什么时候开始运行,取决于JVM里线程调度器的调度。当线程获取cpu后,run()方法会被调用。不要本身去调用Thread的run()方法。以后根据CPU的调度在就绪——运行——阻塞间切换,直到run()方法结束或其余方式中止线程,进入dead状态。

因此实现线程复用的原理应该就是要保持线程处于存活状态(就绪,运行或阻塞)。接下来来看下ThreadPoolExecutor是怎么实现线程复用的。

在ThreadPoolExecutor主要Worker类来控制线程的复用。看下Worker类简化后的代码,这样方便理解:

private final class Worker implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null){ task.run(); } }

Worker是一个Runnable,同时拥有一个thread,这个thread就是要开启的线程,在新建Worker对象时同时新建一个Thread对象,同时将Worker本身做为参数传入TThread,这样当Thread的start()方法调用时,运行的其实是Worker的run()方法,接着到runWorker()中,有个while循环,一直从getTask()里获得Runnable对象,顺序执行。getTask()又是怎么获得Runnable对象的呢?

依旧是简化后的代码:

private Runnable getTask() { if(一些特殊状况) { return null; } Runnable r = workQueue.take(); return r; }

这个workQueue就是初始化ThreadPoolExecutor时存听任务的BlockingQueue队列,这个队列里的存放的都是将要执行的Runnable任务。由于BlockingQueue是个阻塞队列,BlockingQueue.take()获得若是是空,则进入等待状态直到BlockingQueue有新的对象被加入时唤醒阻塞的线程。因此通常状况Thread的run()方法就不会结束,而是不断执行从workQueue里的Runnable任务,这就达到了线程复用的原理了。

2.控制最大并发数

那Runnable是何时放入workQueue?Worker又是何时建立,Worker里的Thread的又是何时调用start()开启新线程来执行Worker的run()方法的呢?有上面的分析看出Worker里的runWorker()执行任务时是一个接一个,串行进行的,那并发是怎么体现的呢?

很容易想到是在execute(Runnable runnable)时会作上面的一些任务。看下execute里是怎么作的。

execute:

简化后的代码

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 当前线程数 < corePoolSize if (workerCountOf(c) < corePoolSize) { // 直接启动新的线程。 if (addWorker(command, true)) return; c = ctl.get(); } // 活动线程数 >= corePoolSize // runState为RUNNING && 队列未满 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次检验是否为RUNNING状态 // 非RUNNING状态 则从workQueue中移除任务并拒绝 if (!isRunning(recheck) && remove(command)) reject(command);// 采用线程池指定的策略拒绝任务 // 两种状况: // 1.非RUNNING状态拒绝新的任务 // 2.队列满了启动新的线程失败(workCount > maximumPoolSize) } else if (!addWorker(command, false)) reject(command); }

addWorker:

简化后的代码

private boolean addWorker(Runnable firstTask, boolean core) { int wc = workerCountOf(c); if (wc >= (core ? corePoolSize : maximumPoolSize)) { return false; } w = new Worker(firstTask); final Thread t = w.thread; t.start(); }

根据代码再来看上面提到的线程池工做过程当中的添加任务的状况:

* 若是正在运行的线程数量小于 corePoolSize,那么立刻建立线程运行这个任务;   
* 若是正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列; * 若是这时候队列满了,并且正在运行的线程数量小于 maximumPoolSize,那么仍是要建立非核心线程马上运行这个任务; * 若是队列满了,并且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException。

这就是Android的AsyncTask在并行执行是在超出最大任务数是抛出RejectExecutionException的缘由所在,详见基于最新版本的AsyncTask源码解读及AsyncTask的黑暗面

经过addWorker若是成功建立新的线程成功,则经过start()开启新线程,同时将firstTask做为这个Worker里的run()中执行的第一个任务。

虽然每一个Worker的任务是串行处理,但若是建立了多个Worker,由于共用一个workQueue,因此就会并行处理了。

因此根据corePoolSize和maximumPoolSize来控制最大并发数。大体过程可用下图表示。

上面的讲解和图来能够很好的理解的这个过程。

若是是作Android开发的,而且对Handler原理比较熟悉,你可能会以为这个图挺熟悉,其中的一些过程和Handler,Looper,Meaasge使用中,很类似。Handler.send(Message)至关于execute(Runnuble),Looper中维护的Meaasge队列至关于BlockingQueue,只不过须要本身经过同步来维护这个队列,Looper中的loop()函数循环从Meaasge队列取Meaasge和Worker中的runWork()不断从BlockingQueue取Runnable是一样的道理。

3.管理线程

经过线程池能够很好的管理线程的复用,控制并发数,以及销毁等过程,线程的复用和控制并发上面已经讲了,而线程的管理过程已经穿插在其中了,也很好理解。

在ThreadPoolExecutor有个ctl的AtomicInteger变量。经过这一个变量保存了两个内容:

  • 全部线程的数量
  • 每一个线程所处的状态

其中低29位存线程数,高3位存runState,经过位运算来获得不一样的值。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //获得线程的状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //获得Worker的的数量 private static int workerCountOf(int c) { return c & CAPACITY; } // 判断线程是否在运行 private static boolean isRunning(int c) { return c < SHUTDOWN; }

这里主要经过shutdown和shutdownNow()来分析线程池的关闭过程。首先线程池有五种状态来控制任务添加与执行。主要介绍如下三种:

  • RUNNING状态:线程池正常运行,能够接受新的任务并处理队列中的任务;
  • SHUTDOWN状态:再也不接受新的任务,可是会执行队列中的任务;
  • STOP状态:再也不接受新任务,不处理队列中的任务

shutdown这个方法会将runState置为SHUTDOWN,会终止全部空闲的线程,而仍在工做的线程不受影响,因此队列中的任务人会被执行。shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方法会终止全部的线程,因此队列中的任务也不会被执行了。

总结

经过对ThreadPoolExecutor源码的分析,从整体上了解了线程池的建立,任务的添加,执行等过程,熟悉这些过程,使用线程池就会更轻松了。

而从中学到的一些对并发控制,以及生产者——消费者模型任务处理的使用,对之后理解或解决其余相关问题会有很大的帮助。好比Android中的Handler机制,而Looper中的Messager队列用一个BlookQueue来处理一样是能够的,这写就是读源码的收获吧。

相关文章
相关标签/搜索