结合线程池细说FutureTask及Future源码

1.FutureTask继承体系结构

2.从简单示例开始分析

多线程Runnable和Callable接口这里就很少说了,Callable有返回值,Runnable无返回值。node

public class FutureTaskTest {

    public static void main(String[] args) {
        ExecutorService executor = null;


        try {
            //线程池提交Runnable接口任务
            executor.execute(new MyRunnable());
            //线程池提交Callable接口任务
            executor = Executors.newFixedThreadPool(2);
            Future f = executor.submit(new MyCallLable<Integer>());
            System.out.println(f.get());

            //单线程方式
            FutureTask ft = new FutureTask(new MyCallLable<Integer>());
            Thread t = new Thread(ft);
            t.start();
            System.out.println(ft.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            if (executor != null) {
                executor.shutdown();
            }
        }

    }

    static class MyCallLable<Integer> implements Callable {
        @Override
        public Object call() throws Exception {
            return 1;
        }
    }

    static class MyRunnable implements  Runnable {
        @Override
        public void run() {
            System.out.println(2);
        }
    }

}

3.以线程池提交方式分析FutureTask源码

3.1.Executors.newFixedThreadPool分析

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

该方法建立了一个核心线程和最大线程数同样的线程池,使用LinkedBlockingQueue这种无界队列存储多余的任务,也就是说,若是咱们使用这种jdk自带的线程提交任务的时候,因为队列是无界的,当任务达到必定数量会形成内存溢出。这里再也不分析ThreadPoolExecutor代码,有兴趣的能够看个人另外一篇博文专门分析ThreadPoolExecutor源码的。该方法返回一个ExecutorService。面试

ThreadPoolExecutor继承体系以下图:spring

3.2.ExecutorService.submit方法分析

该方法实际调用的是实现类AbstractExecutorService.submit方法数据结构

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

这里的newTaskFor方法就会将Callable任务传递到FutureTask类中,并封装到其Callable属性中多线程

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

3.3.FutureTask属性分析

 /* 线程状态可能的转换:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
//当前任务状态
private volatile int state;
//新建立
private static final int NEW          = 0;
//即将结束,但尚未结束
private static final int COMPLETING   = 1;
//正常结束
private static final int NORMAL       = 2;
//异常状态:Callable接口的Call方法中具体业务逻辑出现异常
private static final int EXCEPTIONAL  = 3;
//任务被取消
private static final int CANCELLED    = 4;
//任务处于中断中
private static final int INTERRUPTING = 5;
//任务被中断
private static final int INTERRUPTED  = 6;

//任务提交传入的Callable,用来调用call方法
private Callable<V> callable;
//Call方法返回值
//1.若是任务正常结束,返回call方法的返回值
//2.若是call方法发生异常,返回具体的异常信息
private Object outcome;
//当前执行的线程
private volatile Thread runner;
//一个栈结构的数据类型,存储被get方法阻塞的线程的引用
private volatile WaitNode waiters;

3.4.FutureTask构造方法分析

public FutureTask(Callable<V> callable) {
    //外部须要传入Callable接口的实现
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    //将线程状态设置为先建立
    this.state = NEW;
}

3.5.FutureTask执行过程分析

3.5.1.使用线程池提交Callable接口状况下调用过程分析

从示例的线程池提交Calllable接口的案例中一步步分析:
1.executor.submit(new MyCallLable<Integer>())方法提交一个Callable实现;
2.第一步实际会调用AbstractExecutorService.submit方法;
3.AbstractExecutorService.submit内部调用newTaskFor方法生成一个FutureTask对象,并将MyCallLable任务封装到其Calllable属性中;
4.AbstractExecutorService.submit方法内部调用ThreadPoolExecutor.execute方法提交FutureTask对象到线程池;
5-6-7-8.实际就是线程池提交一个任务的执行过程,具体源码能够看个人另外一篇博客,这里比较复杂,概况的说了下;
9-10.线程池execute实际会执行FutureTask的run方法,在run方法中调用Callable.call,这就是线程池提交Callable执行的流程;

3.5.2.FutureTask.run方法分析

public void run() {
    //条件1:当前任务状态不是新建状态
    //条件2:当前线程不是FutureTask持有的线程
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        //退出执行
        return;
    try {
        //当前FutureTask持有的callable
        Callable<V> c = callable;
        //条件1:当前提交的Callable不能为空
        //条件2:当前线程任务状态为新建立
        if (c != null && state == NEW) {
            //Callable的返回值
            V result;
            //任务是否成功执行
            boolean ran;
            try {
                //调用用户自定义call方法的逻辑
                result = c.call();
                //任务成功执行
                ran = true;
            } catch (Throwable ex) {
                //发生异常
                result = null;
                ran = false;
                setException(ex);
            }
            //任务成功执行设置返回值
            if (ran)
                set(result);
        }
    } finally {
        //run方法结束持有线程设置为空,help gc
        //这里可能正常执行完run方法也可能出现异常退出
        runner = null;
        //当前任务执行状态
        int s = state;
        //若是处于中断的状态,包含中断中和已中断,释放cpu资源
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

3.5.3.FutureTask.set方法分析

该方法设置任务成功执行后的执行结果状态和返回值,将返回值封装到outcome属性中,因为get方法是阻塞的,还须要唤醒阻塞的线程。ide

protected void set(V v) {
    //将状态重新建设置为结束中
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        //返回值赋值
        outcome = v;
        //设置任务状态为正常结束
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); 
        //唤醒被get方法阻塞的线程
        finishCompletion();
    }
}

3.5.6.FutureTask静态内部类WaitNode分析

在分析finishCompletion方法前,先介绍下WaitNode类。为何会有这个类?咱们知道FutureTask.get方法是阻塞的,若是咱们在一个线程内屡次调用get方法,这个从理论上考虑其实不须要WaitNode的;若是咱们又屡次建立了线程在其余线程内部调用get方法呢?因为FutureTask.get方法内部会调用LockSupport.park(Thread)或LockSupport.parkNanos阻塞线程,因此就须要唤醒;而LockSupport.unpark(Thread)解除线程阻塞也须要指定线程,因此这里就须要一种数据结构来存储当前线程的引用了。这里就设计了WaitNode这个类,它是一个单链表,并且采用的是头插法,在遍历的时候也是从前日后遍历的,这就是一个典型的栈的结构,先进后出,后进先出。这里为何又是一个单链表结构呢?这是为了方便在任务结束的时候遍历。学习

static final class WaitNode {
    //当前线程的引用
    volatile Thread thread;
    //指向下一个节点
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

3.5.7.FutureTask.finishCompletion方法分析

用于唤醒被get方法阻塞的线程this

private void finishCompletion() {
    // assert state > COMPLETING;
    //从头开始遍历
    for (WaitNode q; (q = waiters) != null;) {
        //使用cas方式设置当前waiters为空,防止外部线程调用cancel致使finishCompletion该方法被调用
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                //获取当前WaitNode对应的线程
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null; //help gc
                    //唤醒当前节点对应的线程
                    LockSupport.unpark(t);
                }
                //获取当前节点的下一个节点
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null;//help gc
                //将q指向下要给节点
                q = next;
            }
            break;
        }
    }

    done();
 //将callable置为空,help gc
    callable = null; 
}

3.5.8.FutureTask.setException方法分析

该方法将返回值设置为抛出的异常,将任务状态设置为EXCEPTIONAL状态,并调用finishCompletion方法唤醒被get阻塞的线程。spa

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

3.5.9.FutureTask.handlePossibleCancellationInterrupt方法分析线程

private void handlePossibleCancellationInterrupt(int s) {
    //若是任务状态处于中断中,释放cpu资源
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}

3.5.9.FutureTask.get和FutureTask.get(long timeout, TimeUnit unit)方法分析

两个方法区别不大,惟一的区别是阻塞线程的时候使用的LockSupport.parkNanos(this, nanos)和LockSupport.park(this),当有时间条件的时候LockSupport.parkNanos(this, nanos)会在指定时间内结束后自动唤醒线程。

这里讲讲sleep和LockSupport.parkNanos区别:sleep在指定时间到期后会判断中断状态,根据中断状态来判断是否须要抛出异常,而LockSupport.parkNanos不会根据中断状态作出响应。
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    //unit.toNanos(timeout)将指定时间格式转化为对应的毫微秒
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

3.6.0.FutureTask.awaitDone方法分析

t.interrupted()也是能够唤醒被LockSupport.park()阻塞的线程的
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? S
        ystem.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    //自旋
    for (;;) {
        //条件成立说明当前线程是被其余线程调用t.interrupted()这种中断方式唤醒
        if (Thread.interrupted()) {
            //从队列中移除线程被中断的节点
            removeWaiter(q);
            throw new InterruptedException();
        }
  
        int s = state;
        //(4).s>COMPLETING成立,说明当前任务已经执行完,结果可能有好有坏
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            //返回当前任务状态
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //(1).第一次自旋,q=null,建立当前线程对应的WaitNode对象
        else if (q == null)
            q = new WaitNode();
        //(2).第二次自旋,queued为false,q.next = waiters采用头插法将当前节点入栈
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //(3).第三次自旋,会走到这里,将线程阻塞,等待后续唤醒后继续自旋调用,也可能由于超时后自动唤醒
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //从队列中移除get超时的节点
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

3.6.1.FutureTask.removeWaiter方法分析

每次调用get方法都会将线程封装成WaitNode入栈,当调用get方法的线程因为被中断唤醒或者超时自动唤醒的都须要从队列中移除, 并从新组装栈结构。

一张图概况该方法作的事情:

private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;
            }
            break;
        }
    }
}

3.6.2.FutureTask.report方法分析

将返回值封装到outcome属性中返回,多是正常的值也多是一个异常信息

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

3.6.3.FutureTask.cancel方法分析

public boolean cancel(boolean mayInterruptIfRunning) {
    //条件1:说明当前任务处于运行中
    //条件2:任务状态修改
    //条件1和条件2成立则执行下面cancel的核心处理逻辑,不然返回false表明取消失败
    //可能会有多个线程调用cancel方法致使cancel失败的状况
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        //mayInterruptIfRunning是否中断线程
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    //中断线程
                    t.interrupt();
            } finally { // final state
                //设置任务为中断状态
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //唤醒全部get阻塞的线程
        finishCompletion();
    }
    return true;
}

4.回顾

常见问题,FutureTask的get方法的执行过程:

get方法会将当前线程封装到WaitNode属性中,并采用头插法插入到waiters这个栈结构中。get方法会使用LockSupport阻塞线程,等待任务结束后调用finishCompletion唤醒调用get方法的对应线程。特别的是get使用超时方法获取返回值的时候,在时间结束后也会自动唤醒线程,不过超时的线程对应的WaitNode会调用removeWaiter方法被回收,从新构建waiters这个栈结构。固然,若是调用get方法的线程被外部线程使用中断方法中断的时候,也会调用removeWaiter方法回收当前WaitNode,从新构建waiters这个栈结构。

5.推荐

分享一个朋友的公众号,有不少干货,包含netty,spring,线程,spring cloud等详细讲解,也有详细的学习规划图,面试题整理等,我感受在讲课这块比我讲的清楚:

相关文章
相关标签/搜索