java多线程系列之future机制

java多线程系列之future机制

future是什么?

  • 在执行比较耗时的任务的时候,咱们常常会采起新开线程执行的方式,好比在netty中,若是在io线程中处理耗cpu的计算任务,那么就会形成io线程的堵塞,致使吞吐率的降低(比较好理解,原本io线程能够去处理io的,如今却在等待cpu执行计算任务),这严重影响了io的效率。
  • 通常咱们采用线程池来执行异步任务,通常状况下不须要获取返回值,可是特殊状况下是须要获取返回值的,也就是须要拿到异步任务的执行结果,举个例子来讲:对大量整数进行求和,若是采用多线程来求解,就须要一个汇总线程和多个计算线程,计算线程执行具体的计算任务而且返回求和值,汇总线程进行多个求和值最后的汇总。
  • 那么若是咱们本身要实现这个异步计算的程序的话能够采用什么方式呢?这其实是线程之间的通讯机制,即咱们的汇总线程须要拿到全部计算线程执行完毕的结果,那么咱们能够采用共享内存来实现,定义一个全局的map,每一个计算线程执行完毕的结果都放到到map中,而后汇总线程从全局map中取出结果进行累加汇总,这样就搞定了,这里面虽然思想很简单,可是仍是有一些细节须要考虑的,好比汇总线程怎么判断全部的任务都执行完毕呢?能够经过计算任务的总数和已经完成计算任务的数目进行比较。总之咱们确定能够实现一套这样的异步计算框架。
  • 那么进一步抽象,在上面的实现过程当中,实际上咱们关心的就是每一个任务执行的结果,以及任务是否执行完毕,对应到上面提到的计算框架,就是咱们关心是否计算完毕和计算完毕后的值,有了这两部分的值,咱们的汇总线程就可以很方便的进行计算总的结果了。
  • 其实仔细观察,对于几乎全部的异步执行线程,咱们都是关心这两部分值的,即任务是否执行完毕,执行完后的结果(若是不须要结果能够返回null),那么这两部分的东西确定能够抽象出来,避免咱们每次编写线程执行的run方法的时候都要本身提交结果和设置完成标志,因而java就是设计了这么一套future机制来帮助开发者

上面就是我结合本身的理解分析的future机制的设计思想,可能说的不够全,但愿有人能够补充。下面会讲解java future的具体实现java

总结一句话:咱们须要异步执行任务而且知道异步任务的执行结果和执行状态,咱们能够本身来实现,可是因为这部分比较通用,因此java经过一种future机制来为咱们实现了这些功能,这就是future。

下面分析java里面future机制的具体实现

  • execute方式:咱们知道一个类若是实现了runnable接口,它就可以被线程来执行,由于实现了runnable接口就拥有了run方法,因此可以被执行。因此最简单的异步线程执行方式以下:利用Executors框架来建立一个线程池,而后调用execute方法来提交异步任务,注意这里的execute方法是没有返回的,也就是说咱们无法知道提交的任务的执行结果。node

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.execute(()->System.out.println("异步执行!"));复制代码
  • submit方式:前面提到的java给咱们提供的线程池接口ExecutorService提供了两种提交异步任务的方式,一种就是没有返回值的execute方法(因为ExecutorService接口是extends了Executor接口的,因此拥有了execute方法),还有一种是带有返回值的submit方法。在submit方法中,提供了三个重载方法:多线程

    <T> Future<T> submit(Callable<T> task);
        Future<?> submit(Runnable task);
    <T> Future<T> submit(Runnable task, T result);复制代码

    能够看到,submit方法支持实现了callable和runnable的task,不一样于runnable只有没有返回值的run方法,callable提供了一个带返回值的call方法,能够有返回值。正是由于runnable没有返回值,因此第二个重载方法返回值为null,第三个重载方法里面能够从外部设置一个返回值,这个返回值将会做为runnable的返回值。具体代码以下:并发

    public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }复制代码

    两个方法都调用newTaskFor方法来建立了一个RunnableFuture的对象,而后调用execute方法来执行这个对象,说明咱们线程池真正执行的对象就是这个RunnableFuture对象。框架

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }复制代码

    由上面代码看出就是建立了一个futureTask对象,这个对象封装了咱们提供的runnable和callable对象。futuretask实现了runnablefuture接口,这就是说明futuretask具有了runnable的功能(能被线程执行)和future功能(可以获取自身执行的结果和状态)。能被线程执行功能是咱们本身经过实现runnable接口或者callable接口来完成的。future功能前面咱们提过是很通用的功能,因此java给咱们实现了。下面就进入futuretask查看。异步

  • futuretask对象:futuretask是真正的future功能实现的地方。前面说过这个一个RunnableFuture对象,因此咱们看看它的run方法this

    private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;    
        /** 封装的callable对象 */
        private Callable<V> callable;
        /** task的执行结果 */
        private Object outcome; 
        /** 当前线程池的哪一个线程正在执行这个task */
        private volatile Thread runner;
        /** 等待的线程列表 */
        private volatile WaitNode waiters;
    
        public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;// 1. 内部包装的一个callable对象
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();// 2. 调用包装的call方法
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);//3. 设置返回值
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }复制代码

    前面提到futuretask是封装了runnable和callable的,但是为何内部只有一个callable呢,其实是由于futuretask本身调用适配器转换了一下:代码以下,采用了java的适配器模式。spa

    public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    
        public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
    
        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }复制代码

    futuretask的run方法调用了内部封装的callable对象的call方法,获取返回值,而且设置到本身outcome中,state表明执行的状态,这样就经过代理的方式代理了咱们的callable的call方法,帮助咱们获取执行的结果和状态,因此咱们本身编写业务逻辑的时候就不用去管这层通用的逻辑了。这里面还有一个waitnode咱们单独讲线程

  • WaitNode: 经过前面的分析咱们知道,实际上咱们submit任务以后返回的future对象就是线程池为咱们建立的runnablefuture对象,也就是futuretask这个对象。future接口为咱们提供了一系列的方法,以下设计

    V get() throws InterruptedException, ExecutionException;
        boolean cancel(boolean mayInterruptIfRunning);复制代码

    上面是主要的两个方法,get和cancel,cancel的时候调用runner的interrupt方法便可

    public boolean cancel(boolean mayInterruptIfRunning) {
            if (!(state == NEW &&
                  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();
            }
            return true;
        }复制代码

    其中unsafe是用于cas操做的,在java并发包中大量用到,后续会讲解。

    get方法的设计是阻塞的,也就是说若是结果没有返回时须要等待的,因此才会有waitnode这个对象的产生,当多个线程都调用futuretask的get方法的时候,若是结果还没产生,就都须要等待,这时候全部等待的线程就会造成一个链表,因此waitnode实际上就是线程的链表。

    static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }复制代码

    再看get方法:若是任务没有完成就调用awaitDone进入阻塞,若是完成了直接调用report返回结果

    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }复制代码
    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {//1. 若是等待过程当中,被中断过了,那么就移除本身
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)//2. cas更新链表节点
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);//3. locksupport原语让线程进入休眠
                }
                else
                    LockSupport.park(this);
            }
        }复制代码

    仍是比较好看懂,其中LockSupport是原语,让线程进行休眠。若是线程在休眠中醒来了,有多是多种状况,好比get的时间到了,也就是从3中醒来了,这样的话下一次循环就会判断时间到了,从而remove掉节点退出。还有可能等待的线程被interrupt了,这时候就会走到1的逻辑,经过判断中断标记将其remove掉。

    既然有了waitnode这个等待链表,那么确定会有相应的唤醒机制,当执行完毕以后就会将waitnode链表上的线程一次唤醒,以下。

    private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }复制代码

实际上java的future接口所提供的功能比较有限,好比listen机制就没有,都须要异步任务发起者主动去查询状态和结果,并且没有提供非阻塞的等待机制。可是咱们能够本身灵活的实现,后续将参照netty中的future机制进行详细讲解。

相关文章
相关标签/搜索