Future模拟

1. 疑惑java

    Future f = executor.submit(callable);异步

    线程池执行任务的时候, 为了高效,使用的都是异步执行方式;上面这种形式是异步执行的,可是使用       f.get()方法获取结果时又是阻塞的, 这是怎么实现的呢?ide

2. 分析this

    从使用方式上咱们能够推测出几点:线程

           进入线程池中的任务(callable), 应该被放到Future中执行了;code

           Future执行任务是在新的线程中执行(这样才不会阻塞原来的线程);orm

           get方法调用时,阻塞主线程, 当任务执行完成后, 才取消主线程的阻塞;get

 

3. 实现it

/**
 *  future有几种状态:new(新建立), completing(正在执行), normal(执行完毕)
 */
public class MyFuture<T> implements Runnable {

    private int state;
    private static final int NEW = 0;
    private static final int COMPLETING = 1;
    private static final int NORMAL = 2;

    private Callable<T> callable;
    private Object result;
    private WaitNode waiters;

    public MyFuture(Callable<T> callable) {
        this.callable = callable;
        this.state = NEW;
    }

    public T get() throws InterruptedException, ExecutionException{
        int s = state;
        //若不是‘完成’状态, 则阻塞当前线程
        if (s <= COMPLETING) {
            s = await();
        }
        return report(s);
    }

    private int await() {
        WaitNode q = null;
        boolean queued = false;
        for(;;) {
            if (state > COMPLETING) {
                //若是是完成状态, 则直接返回
                return state;
            } else if (q == null) {
                q = new WaitNode();
            } else if (!queued) {
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            }
            else {
                //阻塞当前线程
                LockSupport.park(this);
            }
        }
    }

    private T report(int s) throws ExecutionException{
        Object x = result;
        if (s == NORMAL) {
            return (T)x;
        }
        throw new ExecutionException((Throwable)x);
    }

    @Override
    public void run() {
        //此方法是有线程池中线程调用, 主线程阻塞, 不会影响这个方法的执行
        try {
            if (callable != null && state == NEW) {
                //任务的执行
                T v = callable.call();
                setResult(v);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void setResult(T result) {
        if (UNSAFE.compareAndSwapLong(this, stateOffset, state, COMPLETING)) {
            this.result = result;
            UNSAFE.compareAndSwapLong(this, stateOffset, state, NORMAL);
            finishComplete();
        }
    }

    private void finishComplete() {
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //取消阻塞
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null;
                    q = next;
                }
                break;
            }
        }
        callable = null;
    }

   static final class WaitNode {
       Thread thread;
       WaitNode next;
       WaitNode() {
           this.thread = Thread.currentThread();
       }
   }

    private static sun.misc.Unsafe UNSAFE;
    private static long stateOffset;
    private static long waitersOffset;
    static {

        try {
            Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            UNSAFE = (Unsafe)f.get(null);

            Class myFutureClass = MyFuture.class;
            stateOffset = UNSAFE.objectFieldOffset(myFutureClass.getDeclaredField("state"));
            waitersOffset = UNSAFE.objectFieldOffset(myFutureClass.getDeclaredField("waiters"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
相关文章
相关标签/搜索