线程池小结(JDK8)

 一、线程池的好处

  1. 下降资源消耗(重复利用已建立的线程减小建立和销毁线程的开销)
  2. 提升响应速度(无须建立线程)
  3. 提升线程的可管理性

二、相关类图

JDK5之后将工做单元和执行机制分离开来,工做单元包括Runnable和Callable;执行机制由Executor框架提供,管理线程的生命周期,将任务的提交和如何执行进行解耦。Executors是一个快速获得线程池的工具类,相关的类图以下所示:java

 

三、Executor框架接口

Executor接口数组

Executor接口只有一个execute方法,用来替代一般建立或启动线程的方法。框架

public interface Executor {
    void execute(Runnable command);
}

ExecutorService接口异步

ExecutorService接口继承自Executor接口,加入了关闭方法、submit方法和对Callable、Future的支持。ide

ScheduledExecutorService接口工具

 ScheduledExecutorService扩展ExecutorService接口并加入了对定时任务的支持。oop

四、ThreadPoolExecutor分析

ThreadPoolExecutor继承自AbstractExecutorService,也是实现了ExecutorService接口。ui

 4.1 内部状态this

 1     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 2     private static final int COUNT_BITS = Integer.SIZE - 3;
 3     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 4 
 5     // runState is stored in the high-order bits
 6     private static final int RUNNING    = -1 << COUNT_BITS;
 7     private static final int SHUTDOWN   =  0 << COUNT_BITS;
 8     private static final int STOP       =  1 << COUNT_BITS;
 9     private static final int TIDYING    =  2 << COUNT_BITS;
10     private static final int TERMINATED =  3 << COUNT_BITS;
11 
12     // Packing and unpacking ctl
13     private static int runStateOf(int c)     { return c & ~CAPACITY; }
14     private static int workerCountOf(int c)  { return c & CAPACITY; }
15     private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl是对线程池的运行状态(高3位)和线程池中有效线程的数量(低29位)进行控制的一个字段。线程池有五种状态,分别是:atom

  1. RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
  2. SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  3. STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,并且会中断正在运行的任务;
  4. TIDYING : 2 << COUNT_BITS,即高3位为010, 全部的任务都已经终止;
  5. TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated()方法已经执行完成。

 

4.2 构造方法

构造方法有4个,这里只列出其中最基础的一个。

 1     public ThreadPoolExecutor(int corePoolSize,
 2                               int maximumPoolSize,
 3                               long keepAliveTime,
 4                               TimeUnit unit,
 5                               BlockingQueue<Runnable> workQueue,
 6                               ThreadFactory threadFactory,
 7                               RejectedExecutionHandler handler) {
 8         if (corePoolSize < 0 ||
 9             maximumPoolSize <= 0 ||
10             maximumPoolSize < corePoolSize ||
11             keepAliveTime < 0)
12             throw new IllegalArgumentException();
13         if (workQueue == null || threadFactory == null || handler == null)
14             throw new NullPointerException();
15         this.corePoolSize = corePoolSize;
16         this.maximumPoolSize = maximumPoolSize;
17         this.workQueue = workQueue;
18         this.keepAliveTime = unit.toNanos(keepAliveTime);
19         this.threadFactory = threadFactory;
20         this.handler = handler;
21     }

构造方法中参数的含义以下:

  • corePoolSize:核心线程数量,线程池中应该常驻的线程数量
  • maximumPoolSize:线程池容许的最大线程数,非核心线程在超时以后会被清除
  • keepAliveTime:线程没有任务执行时能够保持的时间
  • unit:时间单位
  • workQueue:阻塞队列,存储等待执行的任务。JDK提供了以下4种阻塞队列:
    • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
    • LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量一般要高于ArrayBlockingQuene;
    • SynchronousQuene:一个不存储元素的阻塞队列,每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于LinkedBlockingQuene;
    • PriorityBlockingQuene:具备优先级的无界阻塞队列;
  • threadFactory:线程工厂,来建立线程
  • handler:线程池的饱和策略。若是阻塞队列满了而且没有空闲的线程,这时若是继续提交任务,就须要采起一种策略处理该任务。线程池提供了4种策略:
    • AbortPolicy:直接抛出异常,这是默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务。

4.3 execute方法

ThreadPoolExecutor.execute(task)实现了Executor.execute(task),用来提交任务,不能获取返回值,代码以下:

 1     public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         /*
 5          * Proceed in 3 steps:
 6          *
 7          * 1. If fewer than corePoolSize threads are running, try to
 8          * start a new thread with the given command as its first
 9          * task.  The call to addWorker atomically checks runState and
10          * workerCount, and so prevents false alarms that would add
11          * threads when it shouldn't, by returning false.
12          *
13          * 2. If a task can be successfully queued, then we still need
14          * to double-check whether we should have added a thread
15          * (because existing ones died since last checking) or that
16          * the pool shut down since entry into this method. So we
17          * recheck state and if necessary roll back the enqueuing if
18          * stopped, or start a new thread if there are none.
19          *
20          * 3. If we cannot queue task, then we try to add a new
21          * thread.  If it fails, we know we are shut down or saturated
22          * and so reject the task.
23          */
24         int c = ctl.get();
25     /*
26      * workerCountOf方法取出低29位的值,表示当前活动的线程数;
27      * 若是当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
28      * 并把任务添加到该线程中。
29      */
30     
31         if (workerCountOf(c) < corePoolSize) {
32         /*
33          * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断仍是maximumPoolSize来判断;
34          * 若是为true,根据corePoolSize来判断;
35          * 若是为false,则根据maximumPoolSize来判断
36          */
37             if (addWorker(command, true))
38                 return;
39         /*
40          * 若是添加失败,则从新获取ctl值
41          */
42             c = ctl.get();
43         }
44     /*
45      * 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
46      */
47         if (isRunning(c) && workQueue.offer(command)) {
48     // 从新获取ctl值
49             int recheck = ctl.get();
50         // 再次判断线程池的运行状态,若是不是运行状态,因为以前已经把command添加到workQueue中了,
51         // 这时须要移除该command
52         // 执行事后经过handler使用拒绝策略对该任务进行处理,整个方法返回
53             if (! isRunning(recheck) && remove(command))
54                 reject(command);
55         /*
56          * 获取线程池中的有效线程数,若是数量是0,则执行addWorker方法
57          * 这里传入的参数表示:
58          * 1. 第一个参数为null,表示在线程池中建立一个线程,但不去启动;
59          * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
60          * 若是判断workerCount大于0,则直接返回,在workQueue中新增的command会在未来的某个时刻被执行。
61          */
62             else if (workerCountOf(recheck) == 0)
63                 addWorker(null, false);
64         }
65     /*
66      * 若是执行到这里,有两种状况:
67      * 1. 线程池已经不是RUNNING状态;
68      * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize而且workQueue已满。
69      * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
70      * 若是失败则拒绝该任务
71      */
72         else if (!addWorker(command, false))
73             reject(command);
74     }

若是线程池状态一直是RUNNING,则执行过程以下:

  1. 若是workerCount < corePoolSize,则建立并启动一个线程来执行新提交的任务;
  2. 若是workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 若是workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则建立并启动一个线程来执行新提交的任务;
  4. 若是workerCount >= maximumPoolSize,而且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

 

4.4 addWorker方法

从executor的方法实现能够看出,addWorker主要负责建立新的线程并执行任务。线程池建立新线程执行任务时,须要获取全局锁:

 

 1     private boolean addWorker(Runnable firstTask, boolean core) {
 2         retry:
 3         for (;;) {
 4             int c = ctl.get();
 5             // 获取运行状态
 6             int rs = runStateOf(c);
 7         /*
 8          * 这个if判断
 9          * 若是rs >= SHUTDOWN,则表示此时再也不接收新任务;
10          * 接着判断如下3个条件,只要有1个不知足,则返回false:
11          * 1. rs == SHUTDOWN,这时表示关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务
12          * 2. firsTask为空
13          * 3. 阻塞队列不为空
14          *
15          * 首先考虑rs == SHUTDOWN的状况
16          * 这种状况下不会接受新提交的任务,因此在firstTask不为空的时候会返回false;
17          * 而后,若是firstTask为空,而且workQueue也为空,则返回false,
18          * 由于队列中已经没有任务了,不须要再添加线程了
19          */
20             // Check if queue empty only if necessary.
21             if (rs >= SHUTDOWN &&
22                 ! (rs == SHUTDOWN &&
23                    firstTask == null &&
24                    ! workQueue.isEmpty()))
25                 return false;
26 
27             for (;;) {
28                 // 获取线程数
29                 int wc = workerCountOf(c);
30               // 若是wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
31               // 这里的core是addWorker方法的第二个参数,若是为true表示根据corePoolSize来比较,
32               // 若是为false则根据maximumPoolSize来比较。
33                 if (wc >= CAPACITY ||
34                     wc >= (core ? corePoolSize : maximumPoolSize))
35                     return false;
36                // 尝试增长workerCount,若是成功,则跳出第一个for循环
37                 if (compareAndIncrementWorkerCount(c))
38                     break retry;
39               // 若是增长workerCount失败,则从新获取ctl的值
40                 c = ctl.get();  // Re-read ctl
41               // 若是当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
42                 if (runStateOf(c) != rs)
43                     continue retry;
44                 // else CAS failed due to workerCount change; retry inner loop
45             }
46         }
47 
48         boolean workerStarted = false;
49         boolean workerAdded = false;
50         Worker w = null;
51         try {
52             // 根据firstTask来建立Worker对象
53             w = new Worker(firstTask);
54             // 每个Worker对象都会建立一个线程
55             final Thread t = w.thread;
56             if (t != null) {
57                 final ReentrantLock mainLock = this.mainLock;
58                 mainLock.lock();
59                 try {
60                     // Recheck while holding lock.
61                     // Back out on ThreadFactory failure or if
62                     // shut down before lock acquired.
63                     int rs = runStateOf(ctl.get());
64                   // rs < SHUTDOWN表示是RUNNING状态;
65                   // 若是rs是RUNNING状态或者rs是SHUTDOWN状态而且firstTask为null,向线程池中添加线程。
66                   // 由于在SHUTDOWN时不会在添加新的任务,但仍是会执行workQueue中的任务
67                     if (rs < SHUTDOWN ||
68                         (rs == SHUTDOWN && firstTask == null)) {
69                         if (t.isAlive()) // precheck that t is startable
70                             throw new IllegalThreadStateException();
71                 // workers是一个HashSet
72                         workers.add(w);
73                         int s = workers.size();
74                 // largestPoolSize记录着线程池中出现过的最大线程数量
75                         if (s > largestPoolSize)
76                             largestPoolSize = s;
77                         workerAdded = true;
78                     }
79                 } finally {
80                     mainLock.unlock();
81                 }
82                 if (workerAdded) {
83             // 启动线程,执行任务(Worker.thread(firstTask).start());
84             //启动时会调用Worker类中的run方法,Worker自己实现了Runnable接口,因此一个Worker类型的对象也是一个线程。
85                     t.start();
86                     workerStarted = true;
87                 }
88             }
89         } finally {
90             if (! workerStarted)
91                 addWorkerFailed(w);
92         }
93         return workerStarted;
94     }

4.5 Worker类

线程池中的每个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象。Worker类设计以下:

  1. 继承了AQS类,用于判断线程是否空闲以及是否能够被中断,能够方便的实现工做线程的停止操做;
  2. 实现了Runnable接口,能够将自身做为一个任务在工做线程中执行;
  3. 当前提交的任务firstTask做为参数传入Worker的构造方法;
 1     private final class Worker
 2         extends AbstractQueuedSynchronizer
 3         implements Runnable
 4     {
 5         /**
 6          * This class will never be serialized, but we provide a
 7          * serialVersionUID to suppress a javac warning.
 8          */
 9         private static final long serialVersionUID = 6138294804551838833L;
10 
11         /** Thread this worker is running in.  Null if factory fails. */
12         final Thread thread;
13         /** Initial task to run.  Possibly null. */
14         Runnable firstTask;
15         /** Per-thread task counter */
16         volatile long completedTasks;
17 
18         /**
19          * Creates with given first task and thread from ThreadFactory.
20          * @param firstTask the first task (null if none)
21          */
22         Worker(Runnable firstTask) {
23             setState(-1); // inhibit interrupts until runWorker
24             this.firstTask = firstTask;
25             this.thread = getThreadFactory().newThread(this);
26         }
27 
28         /** Delegates main run loop to outer runWorker  */
29         public void run() {
30             runWorker(this);
31         }
32 
33         // Lock methods
34         //
35         // The value 0 represents the unlocked state.
36         // The value 1 represents the locked state.
37 
38         protected boolean isHeldExclusively() {
39             return getState() != 0;
40         }
41 
42         protected boolean tryAcquire(int unused) {
43             if (compareAndSetState(0, 1)) {
44                 setExclusiveOwnerThread(Thread.currentThread());
45                 return true;
46             }
47             return false;
48         }
49 
50         protected boolean tryRelease(int unused) {
51             setExclusiveOwnerThread(null);
52             setState(0);
53             return true;
54         }
55 
56         public void lock()        { acquire(1); }
57         public boolean tryLock()  { return tryAcquire(1); }
58         public void unlock()      { release(1); }
59         public boolean isLocked() { return isHeldExclusively(); }
60 
61         void interruptIfStarted() {
62             Thread t;
63             if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
64                 try {
65                     t.interrupt();
66                 } catch (SecurityException ignore) {
67                 }
68             }
69         }
70     }

4.6 runWorker方法

 Worker类中的run方法调用了runWorker方法来执行任务,执行过程以下:

  1. 线程启动以后,经过unlock方法释放锁,设置AQS的state为0,表示运行可中断;
  2. Worker执行firstTask或从workQueue中获取任务:
    1. 进行加锁操做,保证thread不被其余线程中断(除非线程池被中断)
    2. 检查线程池状态,假若线程池处于中断状态,当前线程将中断。
    3. 执行beforeExecute
    4. 执行任务的run方法
    5. 执行afterExecute方法
    6. 解锁操做
 1     final void runWorker(Worker w) {
 2         Thread wt = Thread.currentThread();
 3         // 获取第一个任务
 4         Runnable task = w.firstTask;
 5         w.firstTask = null;
 6         // 容许中断
 7         w.unlock(); // allow interrupts
 8         boolean completedAbruptly = true;
 9         try {
10         // 若是task为空,则经过getTask来获取任务
11             while (task != null || (task = getTask()) != null) {
12                 w.lock();
13                 // If pool is stopping, ensure thread is interrupted;
14                 // if not, ensure thread is not interrupted.  This
15                 // requires a recheck in second case to deal with
16                 // shutdownNow race while clearing interrupt
17                 if ((runStateAtLeast(ctl.get(), STOP) ||
18                      (Thread.interrupted() &&
19                       runStateAtLeast(ctl.get(), STOP))) &&
20                     !wt.isInterrupted())
21                     wt.interrupt();
22                 try {
23                     beforeExecute(wt, task);
24                     Throwable thrown = null;
25                     try {
26                         task.run();
27                     } catch (RuntimeException x) {
28                         thrown = x; throw x;
29                     } catch (Error x) {
30                         thrown = x; throw x;
31                     } catch (Throwable x) {
32                         thrown = x; throw new Error(x);
33                     } finally {
34                         afterExecute(task, thrown);
35                     }
36                 } finally {
37                     task = null;
38                     w.completedTasks++;
39                     w.unlock();
40                 }
41             }
42             completedAbruptly = false;
43         } finally {
44             processWorkerExit(w, completedAbruptly);
45         }
46     }

4.7 getTask方法

 getTask方法用来从阻塞队列中取等待的任务

 1     private Runnable getTask() {
 2         boolean timedOut = false; // Did the last poll() time out?
 3 
 4         for (;;) {
 5             int c = ctl.get();
 6             int rs = runStateOf(c);
 7 
 8             // Check if queue empty only if necessary.
 9             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
10                 decrementWorkerCount();
11                 return null;
12             }
13 
14             int wc = workerCountOf(c);
15 
16             // Are workers subject to culling?
17         // timed变量用于判断是否须要进行超时控制。
18         // allowCoreThreadTimeOut默认是false,也就是核心线程不容许进行超时;
19         // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
20         // 对于超过核心线程数量的这些线程,须要进行超时控制    
21             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
22 
23             if ((wc > maximumPoolSize || (timed && timedOut))
24                 && (wc > 1 || workQueue.isEmpty())) {
25                 if (compareAndDecrementWorkerCount(c))
26                     return null;
27                 continue;
28             }
29 
30             try {
31             /*
32              * 根据timed来判断,若是为true,则经过阻塞队列的poll方法进行超时控制,若是在keepAliveTime时间内没有获取到任务,则返回null;
33              * 不然经过take方法,若是这时队列为空,则take方法会阻塞直到队列不为空。
34              *
35              */
36                 Runnable r = timed ?
37                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
38                     workQueue.take();
39                 if (r != null)
40                     return r;
41                 timedOut = true;
42             } catch (InterruptedException retry) {
43                 timedOut = false;
44             }
45         }
46     }

5 任务的提交

  • submit任务,等待线程池execute
  • 执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中, 并阻塞等待运行结果;
  • FutureTask任务执行完成后,经过UNSAFE设置waiters相应的waitNode为null,并经过LockSupport类unpark方法唤醒主线程。
 1 public class Test{
 2 
 3     public static void main(String[] args) {
 4 
 5         ExecutorService es = Executors.newCachedThreadPool();
 6         Future<String> future = es.submit(new Callable<String>() {
 7             @Override
 8             public String call() throws Exception {
 9                 try {
10                     TimeUnit.SECONDS.sleep(2);
11                 } catch (InterruptedException e) {
12                     e.printStackTrace();
13                 }
14                 return "future result";
15             }
16         });
17         try {
18             String result = future.get();
19             System.out.println(result);
20         } catch (Exception e) {
21             e.printStackTrace();
22         }
23     }
24 }

在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。

  • Callable接口相似于Runnable,只是Runnable没有返回值。
  • Callable任务除了返回正常结果以外,若是发生异常,该异常也会被返回,即Future能够拿到异步执行任务各类结果;
  • Future.get方法会致使主线程阻塞,直到Callable任务执行完成;

 5.1 submit方法

AbstractExecutorService.submit()实现了ExecutorService.submit(),能够得到执行完的返回值。而ThreadPoolExecutor是AbstractExecutorService的子类,因此submit方法也是ThreadPoolExecutor的方法。

 1     public Future<?> submit(Runnable task) {
 2         if (task == null) throw new NullPointerException();
 3         RunnableFuture<Void> ftask = newTaskFor(task, null);
 4         execute(ftask);
 5         return ftask;
 6     }
 7     public <T> Future<T> submit(Runnable task, T result) {
 8         if (task == null) throw new NullPointerException();
 9         RunnableFuture<T> ftask = newTaskFor(task, result);
10         execute(ftask);
11         return ftask;
12     }
13     public <T> Future<T> submit(Callable<T> task) {
14         if (task == null) throw new NullPointerException();
15         RunnableFuture<T> ftask = newTaskFor(task);
16         execute(ftask);
17         return ftask;
18     }

经过submit方法提交的Callable或者Runnable任务会被封装成了一个FutureTask对象。经过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法。

5.2 FutureTask对象

类图

内部状态

    /**
     *...
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

内部状态的修改经过sun.misc.Unsafe修改。

get方法

1     public V get() throws InterruptedException, ExecutionException {
2         int s = state;
3         if (s <= COMPLETING)
4             s = awaitDone(false, 0L);
5         return report(s);
6     }

内部经过awaitDone方法对主线程进行阻塞,具体实现以下:

 1     /**
 2      * Awaits completion or aborts on interrupt or timeout.
 3      *
 4      * @param timed true if use timed waits
 5      * @param nanos time to wait, if timed
 6      * @return state upon completion
 7      */
 8     private int awaitDone(boolean timed, long nanos)
 9         throws InterruptedException {
10         final long deadline = timed ? System.nanoTime() + nanos : 0L;
11         WaitNode q = null;
12         boolean queued = false;
13         for (;;) {
14             if (Thread.interrupted()) {
15                 removeWaiter(q);
16                 throw new InterruptedException();
17             }
18 
19             int s = state;
20             if (s > COMPLETING) {
21                 if (q != null)
22                     q.thread = null;
23                 return s;
24             }
25             else if (s == COMPLETING) // cannot time out yet
26                 Thread.yield();
27             else if (q == null)
28                 q = new WaitNode();
29             else if (!queued)
30                 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
31                                                      q.next = waiters, q);
32             else if (timed) {
33                 nanos = deadline - System.nanoTime();
34                 if (nanos <= 0L) {
35                     removeWaiter(q);
36                     return state;
37                 }
38                 LockSupport.parkNanos(this, nanos);
39             }
40             else
41                 LockSupport.park(this);
42         }
43     }
  1. 若是主线程被中断,则抛出中断异常;
  2. 判断FutureTask当前的state,若是大于COMPLETING,说明任务已经执行完成,则直接返回;
  3. 若是当前state等于COMPLETING,说明任务已经执行完,这时主线程只需经过yield方法让出cpu资源,等待state变成NORMAL;
  4. 经过WaitNode类封装当前线程,并经过UNSAFE添加到waiters链表;
  5. 最终经过LockSupport的park或parkNanos挂起线程。

run方法

 1     public void run() {
 2         if (state != NEW ||
 3             !UNSAFE.compareAndSwapObject(this, runnerOffset,
 4                                          null, Thread.currentThread()))
 5             return;
 6         try {
 7             Callable<V> c = callable;
 8             if (c != null && state == NEW) {
 9                 V result;
10                 boolean ran;
11                 try {
12                     result = c.call();
13                     ran = true;
14                 } catch (Throwable ex) {
15                     result = null;
16                     ran = false;
17                     setException(ex);
18                 }
19                 if (ran)
20                     set(result);
21             }
22         } finally {
23             // runner must be non-null until state is settled to
24             // prevent concurrent calls to run()
25             runner = null;
26             // state must be re-read after nulling runner to prevent
27             // leaked interrupts
28             int s = state;
29             if (s >= INTERRUPTING)
30                 handlePossibleCancellationInterrupt(s);
31         }
32     }

FutureTask.run方法是在线程池中被执行的,而非主线程

  1. 经过执行Callable任务的call方法;
  2. 若是call执行成功,则经过set方法保存结果;
  3. 若是call执行有异常,则经过setException保存异常。

6 Executors类

 Exectors工厂类提供了线程池的初始化接口,主要有以下几种:

newFixedThreadPool

1     public static ExecutorService newFixedThreadPool(int nThreads) {
2         return new ThreadPoolExecutor(nThreads, nThreads,
3                                       0L, TimeUnit.MILLISECONDS,
4                                       new LinkedBlockingQueue<Runnable>());
5     }

建立一个固定大小、任务队列容量无界(Integer.MAX_VALUE)的线程池,其中corePoolSize =maximumPoolSize=nThreads,阻塞队列为LinkedBlockingQuene。

注意点:

  1. 线程池的线程数量达corePoolSize后,即便线程池没有可执行任务时,也不会释放线程;
  2. 线程池里的线程数量不超过corePoolSize,这致使了maximumPoolSizekeepAliveTime将会是个无用参数 ;
  3. 因为使用了无界队列, 因此FixedThreadPool永远不会拒绝, 即饱和策略失效。

newSingleThreadExecutor

1     public static ExecutorService newSingleThreadExecutor() {
2         return new FinalizableDelegatedExecutorService
3             (new ThreadPoolExecutor(1, 1,
4                                     0L, TimeUnit.MILLISECONDS,
5                                     new LinkedBlockingQueue<Runnable>()));
6     }

只有一个线程来执行无界任务队列的单一线程池。若是该线程异常结束,会从新建立一个新的线程继续执行任务,惟一的线程能够保证所提交任务的顺序执行。因为使用了无界队列, 因此SingleThreadPool永远不会拒绝,即饱和策略失效。与newFixedThreadPool(1)的区别在于单一线程池的大小不能再改变。

newCachedThreadPool

1     public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                       60L, TimeUnit.SECONDS,
4                                       new SynchronousQueue<Runnable>());
5     }

建立一个大小无界的缓冲线程池。任务队列是一个同步队列。缓冲线程池适用于执行耗时较小的异步任务。池的核心线程数=0 最大线程数=Integer.MAX_VLUE。与前两种稍微不一样的是:

  1. 任务加入到池中,若是池中有空闲线程,则用空闲线程执行,如无则建立新线程执行。
  2. 池中的线程空闲超过60秒,将被销毁释放。
  3. 池中的线程数随任务的多少变化。

newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

能定时执行任务的线程,池的核心线程数由参数指定。和前面3个线程池基于ThreadPoolExecutor类实现不一样的是,它基于ScheduledThreadPoolExecutor实现。

7 线程池的监控

 可使用ThreadPoolExecutor如下方法:

  • getTaskCount:线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
  • getLargestPoolSize:线程池曾经建立过的最大线程数量。经过这个数据能够知道线程池是否满过,也就是达到了maximumPoolSize;
  • getPoolSize:线程池当前的线程数量;
  • getActiveCount:当前线程池中正在执行任务的线程数量。
相关文章
相关标签/搜索