Netty系列-netty的Future 和 Promise

首先咱们来看看future和promise接口总体设计java

最顶层的future是jdk的,第二个是netty自定义的future,两个同名,继承关系promise

看看jdk的future接口异步

public interface Future<V> {
    // 取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    // 任务是否取消
    boolean isCancelled();
    // 任务是否完成
    boolean isDone();
    // 阻塞的获取执行的结果
    V get() throws InterruptedException, ExecutionException;
    // 在必定时间内-超时-阻塞的获取执行的结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

瞅瞅netty的future接口async

public interface Future<V> extends java.util.concurrent.Future<V> {
    // 是否成功
    boolean isSuccess();
    // 是否取消
    boolean isCancellable();
    Throwable cause();
     // 添加listener进行回调
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    // 阻塞的等待任务执行,若是失败则抛出失败缘由的异常
    Future<V> sync() throws InterruptedException;
    // 不响应中断等待异常
    Future<V> syncUninterruptibly();

    // 阻塞等待任务执行,失败不抛异常
    Future<V> await() throws InterruptedException;
    Future<V> awaitUninterruptibly();
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);

    // 立刻获取到任务的结果,不阻塞,而jdk的future是阻塞的
    V getNow();


    // 取消任务执行,若是取消成功,任务会由于 CancellationException 异常而致使失败
    //      也就是 isSuccess()==false,同时上面的 cause() 方法返回 CancellationException 的实例。
    // mayInterruptIfRunning 说的是:是否对正在执行该任务的线程进行中断(这样才能中止该任务的执行),
    //       彷佛 Netty 中 Future 接口的各个实现类,都没有使用这个参数
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

netty的future在jdk的基础上扩展了它须要的方法,sync和await的区别咱们放到下面看实现类的时候说ide

同时咱们也能够看到,这个future接口跟io操做是无关的函数

接下来咱们看看ChannelFuture接口,接口注释上写的很清楚,咱们来看看ui

* The result of an asynchronous {@link Channel} I/O operation.
* <p>
* All I/O operations in Netty are asynchronous. It means any I/O calls will
* return immediately with no guarantee that the requested I/O operation has
* been completed at the end of the call. Instead, you will be returned with
* a {@link ChannelFuture} instance which gives you the information about the
* result or status of the I/O operation.
* <p>
* A {@link ChannelFuture} is either <em>uncompleted</em> or <em>completed</em>.
* When an I/O operation begins, a new future object is created. The new future
* is uncompleted initially - it is neither succeeded, failed, nor cancelled
* because the I/O operation is not finished yet. If the I/O operation is
* finished either successfully, with failure, or by cancellation, the future is
* marked as completed with more specific information, such as the cause of the
* failure. Please note that even failure and cancellation belong to the
* completed state.
全部io操做都是异步的,一个io操做的调用会当即返回一个带有结果或者状态的io实例。
io操做要么是未完成的,要么是完成的。当它开始时,future会被建立,一开始是未完成的,未完成的时候没有成功、失败或者取消状态
当它是完成的时候,能够是失败或者取消的,失败或者取消缘由会被附加到future上。
* <pre> * +---------------------------+ * | Completed successfully | * +---------------------------+ * +----> isDone() = true | * +--------------------------+ | | isSuccess() = true | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = false | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = true | * | isCancelled() = false | | | cause() = non-null | * | cause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = true | * | isCancelled() = true | * +---------------------------+ * </pre>

上面那个状态迁移图很清楚了,在两种过程的时候会有什么状态,咱们看看接口this

public interface ChannelFuture extends Future<Void> {
    // 返回future关联的channel
    Channel channel();
    // 重写下面几个方法,修改返回值为channelfuture
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture sync() throws InterruptedException;
    @Override
    ChannelFuture syncUninterruptibly();
    @Override
    ChannelFuture await() throws InterruptedException;
    @Override
    ChannelFuture awaitUninterruptibly();

    /**
     * Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
     * following methods:
     * <ul>
     *     <li>{@link #addListener(GenericFutureListener)}</li>
     *     <li>{@link #addListeners(GenericFutureListener[])}</li>
     *     <li>{@link #await()}</li>
     *     <li>{@link #await(long, TimeUnit)} ()}</li>
     *     <li>{@link #await(long)} ()}</li>
     *     <li>{@link #awaitUninterruptibly()}</li>
     *     <li>{@link #sync()}</li>
     *     <li>{@link #syncUninterruptibly()}</li>
     * </ul>
       标记该future是void的,使不能使用上面的方法
     */
    boolean isVoid();
}

 netty实际上是强烈建议直接经过添加监听器的方式来获取io操做结果,或者进行后续操做的,ChannelFuture能够增长或者删除一个多个 GenericFutureListener,它定义以下spa

public interface GenericFutureListener<F extends Future<?>> extends EventListener {
    void operationComplete(F future) throws Exception;
}

执行完后会回调 operationComplete方法线程

注意一点,不要在ChannelHandler中调用ChannelFuture的await方法,会致使死锁。这是由于发起io操做后,由io线程负责异步通知发起io操做的用户线程,若是io线程和用户线程是同一个的话,就会致使io线程等待本身通知操做完成,这就会致使死锁,本身挂死本身。

咱们继续看promise接口,

public interface Promise<V> extends Future<V> {

    // 标记该future成功及设置结果,并通知全部listener
    // 若是失败的话抛异常
    Promise<V> setSuccess(V result);

     // 和setsuccess同样,只是失败的话返回false
    boolean trySuccess(V result);

     // 标记future失败,而后通知listener
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);

    // 标记该future 不可被取消
    boolean setUncancellable();

    // 下面跟ChannelFuture同样,都是覆盖重写方法
    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> await() throws InterruptedException;
    @Override
    Promise<V> awaitUninterruptibly();
    @Override
    Promise<V> sync() throws InterruptedException;
    @Override
    Promise<V> syncUninterruptibly();
}

Promise是可写的future,Future自己并无写操做相关的接口,netty经过Promise对其进行扩展,用于设置io操做的结果。Promise 实例内部是一个任务,任务的执行每每是异步的,一般是一个线程池来处理任务。Promise 提供的 setSuccess(V result) 或 setFailure(Throwable t) 未来会被某个执行任务的线程在执行完成之后调用,同时那个线程在调用 setSuccess(result) 或 setFailure(t) 后会回调 listeners 的回调函数(固然,回调的具体内容不必定要由执行任务的线程本身来执行,它能够建立新的线程来执行,也能够将回调任务提交到某个线程池来执行)。并且,一旦 setSuccess(...) 或 setFailure(...) 后,那些 await() 或 sync() 的线程就会从等待中返回。

接下来咱们看看ChannelPromise

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
    @Override
    Channel channel();
@Override ChannelPromise setSuccess(Void result); ChannelPromise setSuccess();
boolean trySuccess(); @Override ChannelPromise setFailure(Throwable cause);
@Override ChannelPromise addListener(GenericFutureListener
<? extends Future<? super Void>> listener); @Override ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override ChannelPromise sync()
throws InterruptedException; @Override ChannelPromise syncUninterruptibly(); @Override ChannelPromise await() throws InterruptedException; @Override ChannelPromise awaitUninterruptibly(); /** * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself. */ ChannelPromise unvoid(); }

看方法其实很清楚,基本都是覆写综合了ChannelFuture和Promise接口的,就返回值变了,看看咱们一开始的类继承图,ChannelPromise 接口同时继承了 ChannelFuture 和 Promise,最终继承的都是Future接口,接下来咱们看看具体的实现类DefaultPromise吧

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // 为了后面操做成功后经过cas来保存结果到result字段
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    // result为null的时候默认值
    private static final Object SUCCESS = new Object();
    // 操做成功后cas比对的值
    private static final Object UNCANCELLABLE = new Object();
   
    // 保存执行的结果
    private volatile Object result;
    // 线程执行器
    private final EventExecutor executor;
    // 监听者
    private Object listeners;
     /**
     * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
     */
    // 等待这个 promise 的线程数(调用sync()/await()进行等待的线程数量)
    private short waiters;

    /**
     * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
     * executor changes.
     */
     // 是否唤醒正在等待线程,用于防止重复执行唤醒,否则会重复执行 listeners 的回调方法
    private boolean notifyingListeners;

    ....
 }

 

属性看完了,咱们能够看看它主要的方法

    @Override
    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }

    @Override
    public boolean trySuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();
            return true;
        }
        return false;
    }

    @Override
    public Promise<V> setFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this, cause);
    }

    @Override
    public boolean tryFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return true;
        }
        return false;
    }

set和try的区别就是返回值不同而已,咱们看看底层的方法 setSuccess0

    private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setValue0(Object objResult) {
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            checkNotifyWaiters();
            return true;
        }
        return false;
    }

就是经过cas来把objResult保存到result属性上,而后Notify其余线程。其余方法都差很少,能够比对看看

咱们再看个await方法

public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        checkDeadLock();

        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }

若是当前Promise已被设置,则返回;若是碰到线程中断则响应中断;检查死锁,因为在IO线程中调用Promise的await方法或者sync方法会致使死锁,前面说过的,因此须要检验保护,断定当前线程是不是io线程;同步锁定当前Promise对象,循环断定是否设置完成,使用循环是避免伪唤醒,防止线程 被意外唤醒致使功能异常。

接下来咱们顺便也看下sync方法

@Override
    public Promise<V> sync() throws InterruptedException {
        await();
        rethrowIfFailed();
        return this;
    }

首先调用await方法,而后看是否须要抛出异常,若是任务失败的话就从新抛出异常,这也是两方法区别了。

DefaultChannelPromise实现咱们就不看了,基本都是基于DefaultPromise的,只是返回值都是 ChannelPromise而已。

下面咱们来写个例子吧

public class ChannelPromiseExample extends Thread{

    private static final Object object = new Object();

    public static void main(String[] args) {
        final DefaultEventExecutor executor = new DefaultEventExecutor();
        final Promise<Integer> promise = executor.newPromise();

        // 任务seccess或者failure来回调operationComplete 方法
        promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future)
                    throws Exception {
                System.out.println(Thread.currentThread().getName() + " 第一个监听器");
                if (future.isSuccess()) {
                    System.out.println("任务成功,result:" + future.get());
                } else {
                    System.out.println("任务失败,result:" + future.cause());
                }
            }
        }).addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future)
                    throws Exception {
                System.out.println(Thread.currentThread().getName() + " 第二个监听器");
            }
        });
        // 提交任务
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //能够设置成功或者失败
                //promise.setSuccess(1);
                promise.setFailure(new Throwable("FAILURE"));
            }
        });


        try {
            System.out.println("promise wait begin");
            //promise.sync();
            promise.await();
            System.out.println("promise wait end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

能够体会下await和sync的区别

相关文章
相关标签/搜索