自顶向下深刻分析Netty(五)--Future

再次回顾这幅图,在上一章中,咱们分析了Reactor的完整实现。因为Java NIO事件驱动的模型,要求Netty的事件处理采用异步的方式,异步处理则须要表示异步操做的结果。Future正是用来表示异步操做结果的对象,Future的类签名为:javascript

public interface Future<V>; 

其中的泛型参数V即表示异步结果的类型。java

5.1 总述

也许你已经使用过JDK的Future对象,该接口的方法以下:数组

    // 取消异步操做
    boolean cancel(boolean mayInterruptIfRunning);
    // 异步操做是否取消
    boolean isCancelled();
    // 异步操做是否完成,正常终止、异常、取消都是完成
    boolean isDone();
    // 阻塞直到取得异步操做结果
    V get() throws InterruptedException, ExecutionException;
    // 同上,但最长阻塞时间为timeout
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

 

咱们的第一印象会以为这样的设计并不坏,但仔细思考,便会发现问题:
(1).接口中只有isDone()方法判断一个异步操做是否完成,可是对于完成的定义过于模糊,JDK文档指出正常终止、抛出异常、用户取消都会使isDone()方法返回真。在咱们的使用中,咱们极有多是对这三种状况分别处理,而JDK这样的设计不能知足咱们的需求。
(2).对于一个异步操做,咱们更关心的是这个异步操做触发或者结束后可否再执行一系列动做。好比说,咱们浏览网页时点击一个按钮后实现用户登陆。在javascript中,处理代码以下:缓存

    $("#login").click(function(){
        login();
    });

 

可见在这样的状况下,JDK中的Future便不能处理,因此,Netty扩展了JDK的Future接口,使其能解决上面的两个问题。扩展的方法以下(相似方法只列出一个):多线程

    // 异步操做完成且正常终止
    boolean isSuccess();
    // 异步操做是否能够取消
    boolean isCancellable();
    // 异步操做失败的缘由
    Throwable cause();
    // 添加一个监听者,异步操做完成时回调,类比javascript的回调函数
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 阻塞直到异步操做完成
    Future<V> await() throws InterruptedException;
    // 同上,但异步操做失败时抛出异常
    Future<V> sync() throws InterruptedException;
    // 非阻塞地返回异步结果,若是还没有完成返回null
    V getNow();

 

若是你对Future的状态还有疑问,放上代码注释中的ascii图打消你的疑虑:异步

 *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------------------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |       cause() = non-null  |
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+

 

可知,Future对象有两种状态还没有完成和已完成,其中已完成又有三种状态:成功、失败、用户取消。各状态的状态断言请在此图中查找。
仔细看完上面的图并联系Future接口中的方法,你是否是也会和我有相同的疑问:Future接口中的方法都是getter方法而没有setter方法,也就是说这样实现的Future子类的状态是不可变的,若是咱们想要变化,那该怎么办呢?Netty提供的解决方法是:使用可写的Future即Promise。Promise接口扩展的方法以下:ide

    // 标记异步操做结果为成功,若是已被设置(无论成功仍是失败)则抛出异常IllegalStateException
    Promise<V> setSuccess(V result);
    // 同上,只是结果已被设置时返回False
    boolean trySuccess(V result);

    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);

   // 设置结果为不可取消,结果已被取消返回False
    boolean setUncancellable();

 

须要注意的是:Promise接口继承自Future接口,它提供的setter方法与常见的setter方法大为不一样。Promise从Uncompleted-->Completed的状态转变有且只能有一次,也就是说setSuccess和setFailure方法最多只会成功一个,此外,在setSuccess和setFailure方法中会通知注册到其上的监听者。为了加深对Future和Promise的理解,咱们能够将Future类比于定额发票,Promise类比于机打发票。当商户拿到税务局的发票时,若是是定额发票,则已经肯定好金额是100仍是50或其余,商户不再能更改;若是是机打发票,商户至关于拿到了一个发票模板,须要多少金额按实际状况填到模板指定处。显然,不能两次使用同一张机打发票打印,这会使发票失效,而Promise作的更好,它使第二次调用setter方法失败。
至此,咱们从整体上了解了Future和Promise的原理。咱们再看一下类图:
函数

Future类图
Future类图

 

类图给咱们的第一印象是:繁杂。咱们抓住关键点:Future和Promise两条分支,分而治之。咱们使用自顶向下的方法分析其实现细节,使用两条线索:oop

    AbstractFuture<--CompleteFuture<--CompleteChannelFuture<--Succeeded/FailedChannelFuture
    
    DefaultPromise<--DefaultChannelPromise

 

5.2 Future

5.2.1 AbstractFuture

AbstractFuture主要实现Future的get()方法,取得Future关联的异步操做结果:this

    @Override
    public V get() throws InterruptedException, ExecutionException {
        await();    // 阻塞直到异步操做完成

        Throwable cause = cause();
        if (cause == null) {
            return getNow();    // 成功则返回关联结果
        }
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;    // 由用户取消
        }
        throw new ExecutionException(cause);    // 失败抛出异常
    }

 

其中的实现简单明了,但关键调用方法的具体实现并无,咱们将在子类实现中分析。对应的加入超时时间的get(long timeout, TimeUnit unit)实现也相似,再也不列出。

5.2.2 CompleteFuture

Complete表示操做已完成,因此CompleteFuture表示一个异步操做已完成的结果,由此可推知:该类的实例在异步操做完成时建立,返回给用户,用户则使用addListener()方法定义一个异步操做。若是你熟悉javascript,将Listener类比于回调函数callback()可方便理解。
咱们首先看其中的字段和构造方法:

    // 执行器,执行Listener中定义的操做
    private final EventExecutor executor;
    
    // 这有一个构造方法,可知executor是必须的
    protected CompleteFuture(EventExecutor executor) {
        this.executor = executor;
    }

 

CompleteFuture类定义了一个EventExecutor,可视为一个线程,用于执行Listener中的操做。咱们再看addListener()和removeListener()方法:

    public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        // 因为这是一个已完成的Future,因此当即通知Listener执行
        DefaultPromise.notifyListener(executor(), this, listener);
        return this;
    }
    
    public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
        // 因为已完成,Listener中的操做已完成,没有须要删除的Listener
        return this;
    }

 

其中的实现也很简单,咱们看一下GenericFutureListener接口,其中只定义了一个方法:

    // 异步操做完成是调用
    void operationComplete(F future) throws Exception;
关于Listener咱们再关注一下ChannelFutureListener,它并无扩展GenericFutureListener接口,因此相似于一个标记接口。咱们看其中实现的三个通用ChannelFutureListener:

    ChannelFutureListener CLOSE = (future) --> {
        future.channel().close();   //操做完成时关闭Channel
    };
    
    ChannelFutureListener CLOSE_ON_FAILURE = (future) --> {
        if (!future.isSuccess()) {
            future.channel().close();   // 操做失败时关闭Channel
        }
    };
    
    ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = (future) --> {
        if (!future.isSuccess()) {
            // 操做失败时触发一个ExceptionCaught事件
            future.channel().pipeline().fireExceptionCaught(future.cause());
        }
    };

 

这三个Listener对象定义了对Channel处理时经常使用的操做,若是符合需求,能够直接使用。
因为CompleteFuture表示一个已完成的异步操做,因此可推知sync()和await()方法都将当即返回。此外,可推知线程的状态以下,再也不列出代码:

    isDone() = true; isCancelled() = false; 

 

5.2.3 CompleteChannelFuture

CompleteChannelFuture的类签名以下:

    abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture

 

ChannelFuture是否是以为很亲切?你确定已经使用过ChannelFuture。ChannelFuture接口相比于Future只扩展了一个方法channel()用于取得关联的Channel对象。CompleteChannelFuture还继承了CompleteFuture<Void>,尖括号中的泛型表示Future关联的结果,此结果为Void,意味着CompleteChannelFuture不关心这个特定结果即get()相关方法返回null。也就是说,咱们能够将CompleteChannelFuture纯粹的视为一种回调函数机制。
CompleteChannelFuture的字段只有一个:

    private final Channel channel; // 关联的Channel对象

 

CompleteChannelFuture的大部分方法实现中,只是将方法返回的Future覆盖为ChannelFuture对象(ChannelFuture接口的要求),代码不在列出。咱们看一下executor()方法:

    @Override
    protected EventExecutor executor() {
        EventExecutor e = super.executor(); // 构造方法指定
        if (e == null) {
            return channel().eventLoop();   // 构造方法未指定使用channel注册到的eventLoop
        } else {
            return e;
        }
    }

 

5.2.4 Succeeded/FailedChannelFuture

Succeeded/FailedChannelFuture为特定的两个异步操做结果,回忆总述中关于Future状态的讲解,成功意味着

    Succeeded: isSuccess() == true, cause() == null;
    Failed:    isSuccess() == false, cause() == non-null     

 

代码中的实现也很简单,再也不列出。须要注意的是,其中的构造方法不建议用户调用,通常使用Channel对象的方法newSucceededFuture()和newFailedFuture(Throwable)代替。

5.3 Promise

5.3.1 DefaultPromise

咱们首先看其中的static字段:

    // 能够嵌套的Listener的最大层数,可见最大值为8
    private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
            SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
    // result字段由使用RESULT_UPDATER更新
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER;
    // 此处的Signal是Netty定义的类,继承自Error,异步操做成功且结果为null时设置为改值
    private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
    // 异步操做不可取消
    private static final Signal UNCANCELLABLE = Signal.valueOf(...);
    // 异步操做失败时保存异常缘由
    private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(...);

 

嵌套的Listener,是指在listener的operationComplete方法中,能够再次使用future.addListener()继续添加listener,Netty限制的最大层数是8,用户可以使用系统变量io.netty.defaultPromise.maxListenerStackDepth设置。
再看其中的私有字段:

    // 异步操做结果
    private volatile Object result;
    // 执行listener操做的执行器
    private final EventExecutor executor;
    // 监听者
    private Object listeners;
    // 阻塞等待该结果的线程数
    private short waiters;
    // 通知正在进行标识
    private boolean notifyingListeners;

 

也许你已经注意到,listeners是一个Object类型。这彷佛不合常理,通常状况下咱们会使用一个集合或者一个数组。Netty之因此这样设计,是由于大多数状况下listener只有一个,用集合和数组都会形成浪费。当只有一个listener时,该字段为一个GenericFutureListener对象;当多余一个listener时,该字段为DefaultFutureListeners,能够储存多个listener。明白了这些,咱们分析关键方法addListener():

    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        synchronized (this) {
            addListener0(listener); // 保证多线程状况下只有一个线程执行添加操做
        }

        if (isDone()) {
            notifyListeners();  // 异步操做已经完成通知监听者
        }
        return this;
    }
    
    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            listeners = listener;   // 只有一个
        } else if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).add(listener); // 大于两个
        } else {
            // 从一个扩展为两个
            listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);   
        }
    }

 

从代码中能够看出,在添加Listener时,若是异步操做已经完成,则会notifyListeners():

    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {   //执行线程为指定线程
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth(); // 嵌套层数
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                // 执行前增长嵌套层数
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);   
                try {
                    notifyListenersNow();
                } finally {
                    // 执行完毕,不管如何都要回滚嵌套层数
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }
        // 外部线程则提交任务给执行线程
        safeExecute(executor, () -> { notifyListenersNow(); });
    }
    
    private static void safeExecute(EventExecutor executor, Runnable task) {
        try {
            executor.execute(task);
        } catch (Throwable t) {
            rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
        }
    }

 

因此,外部线程不能执行监听者Listener中定义的操做,只能提交任务到指定Executor,其中的操做最终由指定Executor执行。咱们再看notifyListenersNow()方法:

    private void notifyListenersNow() {
        Object listeners;
        // 此时外部线程可能会执行添加Listener操做,因此须要同步
        synchronized (this) { 
            if (notifyingListeners || this.listeners == null) {
                // 正在通知或已没有监听者(外部线程删除)直接返回
                return; 
            }
            notifyingListeners = true;  
            listeners = this.listeners;
            this.listeners = null;
        }
        for (;;) {
            if (listeners instanceof DefaultFutureListeners) { // 通知单个
                notifyListeners0((DefaultFutureListeners) listeners);
            } else { // 通知多个(遍历集合调用单个)
                notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
            }
            synchronized (this) {
                // 执行完毕且外部线程没有再添加监听者
                if (this.listeners == null) {
                    notifyingListeners = false; 
                    return; 
                }
                // 外部线程添加了监听者继续执行
                listeners = this.listeners; 
                this.listeners = null;
            }
        }
    }
    
    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }

 

到此为止,咱们分析完了Promise最重要的addListener()和notifyListener()方法。在源码中还有static的notifyListener()方法,这些方法是CompleteFuture使用的,对于CompleteFuture,添加监听者的操做不须要缓存,直接执行Listener中的方法便可,执行线程为调用线程,相关代码可回顾CompleteFuture。addListener()相对的removeListener()方法实现简单,咱们再也不分析。
回忆result字段,修饰符有volatile,因此使用RESULT_UPDATER更新,保证更新操做为原子操做。Promise不携带特定的结果(即携带Void)时,成功时设置为静态字段的Signal对象SUCCESS;若是携带泛型参数结果,则设置为泛型一致的结果。对于Promise,设置成功、设置失败、取消操做,三个操做至多只能调用一个且同一个方法至多生效一次,再次调用会抛出异常(set)或返回失败(try)。这些设置方法原理相同,咱们以setSuccess()为例分析:

    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();  // 能够设置结果说明异步操做已完成,故通知监听者
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
    
    private boolean setSuccess0(V result) {
        // 为空设置为Signal对象Success
        return setValue0(result == null ? SUCCESS : result);
    }
    
    private boolean setValue0(Object objResult) {
        // 只有结果为null或者UNCANCELLABLE时才可设置且只能够设置一次
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            checkNotifyWaiters();   // 通知等待的线程
            return true;
        }
        return false;
    }

 

checkNotifyWaiters()方法唤醒调用await()和sync()方法等待该异步操做结果的线程,代码以下:

    private synchronized void checkNotifyWaiters() {
        // 确实有等待的线程才notifyAll
        if (waiters > 0) {  
            notifyAll();    // JDK方法
        }
    }

 

有了唤醒操做,那么sync()和await()的实现是怎么样的呢?咱们首先看sync()的代码:

    public Promise<V> sync() throws InterruptedException {
        await();
        rethrowIfFailed();  // 异步操做失败抛出异常
        return this;
    }

 

可见,sync()和await()很相似,区别只是sync()调用,若是异步操做失败,则会抛出异常。咱们接着看await()的实现:

    public Promise<V> await() throws InterruptedException {
        // 异步操做已经完成,直接返回
        if (isDone()) {
            return this;    
        }
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        // 死锁检测
        checkDeadLock();
        // 同步使修改waiters的线程只有一个
        synchronized (this) {
            while (!isDone()) { // 等待直到异步操做完成
                incWaiters();   // ++waiters;
                try {
                    wait(); // JDK方法
                } finally {
                    decWaiters(); // --waiters
                }
            }
        }
        return this;
    }

 

其中的实现简单明了,其余await()方法也相似,再也不分析。咱们注意其中的checkDeadLock()方法用来进行死锁检测:

    protected void checkDeadLock() {
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(toString());
        }
    }

 

也就是说,不能在同一个线程中调用await()相关的方法。为了更好的理解这句话,咱们使用代码注释中的例子来解释。Handler中的channelRead()方法是由Channel注册到的eventLoop执行的,其中的Future的Executor也是这个eventLoop,因此不能在channelRead()方法中调用await这一类(包括sync)方法

    // 错误的例子
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ChannelFuture future = ctx.channel().close();
        future.awaitUninterruptibly();
        // ...
    }

    // 正确的作法
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ChannelFuture future = ctx.channel().close();
        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                // ... 使用异步操做
            }
        });
    }

 

到了这里,咱们已经分析完Future和Promise的主要实现。剩下的DefaultChannelPromise、VoidChannelPromise实现都很简单,咱们再也不分析。ProgressivePromise表示异步的进度结果,也再也不进行分析。

做者:Hypercube 连接:https://www.jianshu.com/p/a06da3256f0c 来源:简书 简书著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。
相关文章
相关标签/搜索