netty中的promise和future源码分析(一)

netty中的操做是异步操做,须要使用future来表示异步操做的结果。netty在jdk中的future的基础上进行了扩展。接下来采用自顶向下的方式对future和promise的源码进行分析:javascript

clipboard.png

(一)jdk中future和netty中future的比较

jdk中future:

// 取消异步操做
boolean cancel(boolean mayInterruptIfRunning);
// 异步操做是否取消
boolean isCancelled();
// 异步操做是否完成,正常终止、异常、取消都是完成
boolean isDone();
// 阻塞直到取得异步操做结果
V get() throws InterruptedException, ExecutionException;
// 同上,但最长阻塞时间为timeout
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

jdk中future的特色:
1.不管结果是成功仍是失败仍是取消,返回的都是isdone();
2.并且咱们在异步操做触发和结束的时候比较关心其余的一些操做,在jdk的future中没法进行补充。因此netty对future作了扩展。java

netty中future(如下为扩展内容):

// 异步操做完成且正常终止
boolean isSuccess();
// 异步操做是否能够取消
boolean isCancellable();
// 异步操做失败的缘由
Throwable cause();
// 添加一个监听者,异步操做完成时回调,类比javascript的回调函数
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 阻塞直到异步操做完成
Future<V> await() throws InterruptedException;
// 同上,但异步操做失败时抛出异常
Future<V> sync() throws InterruptedException;
// 非阻塞地返回异步结果,若是还没有完成返回null
V getNow();

netty中future的特色:
1.操做结果分为success,fail,canceled三种;
2.而且经过addlisteners()方法能够添加回调操做,即触发或者完成时须要进行的操做;
3.await()和sync(),能够以阻塞的方式等待异步完成;getnow()能够得到异步操做的结果,若是还未完成则返回Null;promise

综合以上的分析,给出一张表示future的状态图来增强对future的理解:

clipboard.png

ps:future只有两种状态,unconpleted和conpleted.异步

(二)abstractfuture

abstractfuture类实现future接口,在其中主要实现了get()方法,以阻塞的方式来取得异步操做的结果,其功能与jdk中future的get()方法相似。ide

abstractfuture源码:

@Override函数

public V get() throws InterruptedException, ExecutionException {
    //阻塞直到异步任务完成
    await();
    
    Throwable cause = cause();
    if (cause == null) {
    //得到异步操做结果
        return getNow();
    }
    //操做失败则抛出异常
    if (cause instanceof CancellationException) {
        throw (CancellationException) cause;
    }
    throw new ExecutionException(cause);

}
该类中get(long timeout, TimeUnit unit) 实现方式与get()大体相同。oop

(三)completefuture

completedfuture表示已经完成异步操做,该类在异步操做结束时建立,用户使用addlistener()方法提供异步操做方法。this

completefuture的状态

@Override
public boolean isDone() {
    return true;
}

completefuture表示已经完成异步操做,因此isdone()方法返回true;而且sync()方法和await()方法会当即返回。spa

@Override
public Future<V> sync() throws InterruptedException {
    return this;
}

触发操做的执行者(eventexecutor)

private final EventExecutor executor;

protected CompleteFuture(EventExecutor executor) {
    this.executor = executor;
}

protected EventExecutor executor() {
    return executor;
}

completefuture中维护了一个eventexecutor实例,用来执行listener中的任务。线程

触发操做的执行过程:

第一步addlistener():

#completefuture类
 public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listener == null) {
        throw new NullPointerException("listener");
    }
    DefaultPromise.notifyListener(executor(), this, listener);
    return this;

}

调用addlistener()方法后,DefaultPromise会调用静态方法notifyListener(),来执行listener中的操做。

#DefaultPromise类
protected static void notifyListener(
        EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
    //省略
    ……
    notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener);
}

而后再看看notifyListenerWithStackOverFlowProtection()

#DefaultPromise类
private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
                                                              final Future<?> future,
                                                              final GenericFutureListener<?> listener) {
    if (executor.inEventLoop()) {
        //省略,这段代码表示在本线程中执行
        ……
    }
    //在外部触发,则将其封装成runnable任务
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListener0(future, listener);
        }
    });
}

接下来的safeexecute()和notifylistener0()就很简单了,

#DefaultPromise类
private static void safeExecute(EventExecutor executor, Runnable task) {
    try {
    //将任务添加到任务队列中等待执行
        executor.execute(task);
    } 
    //省略
    ……
}

private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
    //能够清楚的看到,执行到了listener中的operationcomplete(future)方法
        l.operationComplete(future);
    } catch (Throwable t) {
        logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
    }
}

completefuture类总结:
1.conpletefuture中保存了eventexecutor的信息,用来执行listener中的任务。
2.调用了future的addlistener()方法后,将listener中的操做封装成runnble任务扔到eventexecutor中的任务队列中等待执行
3.completefuture表示已经完成异步操做,状态是isdone。

(四)channelfuture

这是一个继承future的接口,顾名思义,该接口与通道操做有关,因此在channelfuture接口中,除了覆盖future的功能外,只提供了一个channel()抽象方法。

Channel channel();

(五)completechannelfuture

completechannelfuture类实现channelfuture接口,继承completefuture类。

abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture

异步操做结果的获取

尖括号中的泛型表示返回结果的类型,此处是void,表示不关心返回的结果。

@Override
public Void getNow() {
    return null;
}

getnow()方法的返回结果为null,结合前面对abstractfuture类中get()方法的分析,能够得知在completechannelfuture跟get相关的方法返回的结果都是null.

completechannelfuture的初始化

类中的channel字段:

private final Channel channel;

类的初始化:

protected CompleteChannelFuture(Channel channel, EventExecutor executor) {
    super(executor);
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;
}

eventexecutor的管理:

@Override
protected EventExecutor executor() {
    EventExecutor e = super.executor();
    if (e == null) {
        return channel().eventLoop();
    } else {
        return e;
    }
}

若是父类中eventexecutor不为空,则返回父类中的eventexecutor,不然返回channel中保存的eventexecutor.

其余

completechannelfuture中的addlistener()和await(),sync()等方法的特色和completefuture类类似。

(六)Succeededchannelfuture/FailedChannelFuture

这两个类继承自completechannelfuture,一个表示的状态是success,另外一个表示操做失败。

#FailedChannelFuture
@Override
public Throwable cause() {
    return cause;
}

#Succeededchannelfuture
@Override
public boolean isSuccess() {
    return true;
}

到此为止,netty中的future分析完毕。

相关文章
相关标签/搜索