今天是猿灯塔“365篇原创计划”第三篇。 java
接下来的时间灯塔君持续更新Netty系列一共九篇编程
Netty 源码解析(一): 开始 promise
Netty 源码解析(二): Netty 的 Channel 异步
当前:Netty 源码解析(三): Netty 的 Future 和 Promiseide
Netty 源码解析(四): Netty 的 ChannelPipeline异步编程
Netty 源码解析(五): Netty 的线程池分析函数
Netty 源码解析(六): Channel 的 register 操做oop
Netty 源码解析(七): NioEventLoop 工做流程this
Netty 源码解析(八): 回到 Channel 的 register 操做spa
Netty 源码解析(九): connect 过程和 bind 过程分析
今天呢!灯塔君跟你们讲:
Netty 的 Future 和 Promise
Netty 中很是多的异步调用,因此在介绍更多 NIO 相关的内容以前,咱们来看看它的异步接口是怎么使用的。
前面咱们在介绍 Echo 例子的时候,已经用过了 ChannelFuture 这个接口了:
争取在看完本节后,读者能搞清楚上面的这几行划线部分是怎么走的。
关于 Future 接口,我想你们应该都很熟悉,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,经过它来获取提交的任务的执行状态和最终的执行结果,咱们最经常使用它的 isDone()
和 get()
方法。
下面是 JDK 中的 Future 接口 java.util.concurrent.Future:
public interface Future<V> { // 取消该任务 boolean cancel(boolean mayInterruptIfRunning); // 任务是否已取消 boolean isCancelled(); // 任务是否已完成 boolean isDone(); // 阻塞获取任务执行结果 V get() throws InterruptedException, ExecutionException; // 带超时参数的获取任务执行结果 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Netty 中的 Future 接口(同名)继承了 JDK 中的 Future 接口,而后添加了一些方法:
// io.netty.util.concurrent.Future
publicinterface Future<V\> extends java.util.concurrent.Future<V\> { // 是否成功 boolean isSuccess(); // 是否可取消 boolean isCancellable(); // 若是任务执行失败,这个方法返回异常信息 Throwable cause(); // 添加 Listener 来进行回调 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 阻塞等待任务结束,若是任务失败,将“致使失败的异常”从新抛出来 Future<V> sync() throws InterruptedException; // 不响应中断的 sync(),这个你们应该都很熟了 Future<V> syncUninterruptibly(); // 阻塞等待任务结束,和 sync() 功能是同样的,不过若是任务失败,它不会抛出执行过程当中的异常 Future<V> await() throws InterruptedException; Future<V> awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); // 获取执行结果,不阻塞。咱们都知道 java.util.concurrent.Future 中的 get() 是阻塞的 V getNow(); // 取消任务执行,若是取消成功,任务会由于 CancellationException 异常而致使失败 // 也就是 isSuccess()==false,同时上面的 cause() 方法返回 CancellationException 的实例。 // mayInterruptIfRunning 说的是:是否对正在执行该任务的线程进行中断(这样才能中止该任务的执行), // 彷佛 Netty 中 Future 接口的各个实现类,都没有使用这个参数 @Override boolean cancel(boolean mayInterruptIfRunning); }
看完上面的 Netty 的 Future 接口,咱们能够发现,它加了 sync() 和 await() 用于阻塞等待,还加了 Listeners,只要任务结束去回调 Listener 们就能够了,那么咱们就不必定要主动调用 isDone() 来获取状态,或经过 get() 阻塞方法来获取值。
因此它其实有两种使用范式
顺便说下 sync() 和 await() 的区别:sync() 内部会先调用 await() 方法,等 await() 方法返回后,会检查下这个任务是否失败,若是失败,从新将致使失败的异常抛出来。也就是说,若是使用 await(),任务抛出异常后,await() 方法会返回,可是不会抛出异常,而 sync() 方法返回的同时会抛出异常。
咱们也能够看到,Future 接口没有和 IO 操做关联在一块儿,仍是比较_纯净_的接口。
接下来,咱们来看 Future 接口的子接口 ChannelFuture,这个接口用得最多,它将和 IO 操做中的 Channel 关联在一块儿了,用于异步处理 Channel 中的事件。
publicinterface ChannelFuture extends Future<Void\> { // ChannelFuture 关联的 Channel Channel channel(); // 覆写如下几个方法,使得它们返回值为 ChannelFuture 类型 @Override ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture sync() throws InterruptedException; @Override ChannelFuture syncUninterruptibly(); @Override ChannelFuture await() throws InterruptedException; @Override ChannelFuture awaitUninterruptibly(); // 用来标记该 future 是 void 的, // 这样就不容许使用 addListener(...), sync(), await() 以及它们的几个重载方法 boolean isVoid(); }
咱们看到,ChannelFuture 接口相对于 Future 接口,除了将 channel 关联进来没有增长什么东西。还有个 isVoid() 方法算是不那么重要的存在吧。其余几个都是方法覆写,为了让返回值类型变为 ChannelFuture,而不是原来的 Future。
这里有点跳,咱们来介绍下 Promise 接口,它和 ChannelFuture 接口无关,而是和前面的 Future 接口相关,Promise 这个接口很是重要。
Promise 接口和 ChannelFuture 同样,也继承了 Netty 的 Future 接口,而后加了一些 Promise 的内容:
publicinterface Promise<V\> extends Future<V\> { // 标记该 future 成功及设置其执行结果,而且会通知全部的 listeners。 // 若是该操做失败,将抛出异常(失败指的是该 future 已经有告终果了,成功的结果,或者失败的结果) Promise<V> setSuccess(V result); // 和 setSuccess 方法同样,只不过若是失败,它不抛异常,返回 false boolean trySuccess(V result); // 标记该 future 失败,及其失败缘由。 // 若是失败,将抛出异常(失败指的是已经有告终果了) Promise<V> setFailure(Throwable cause); // 标记该 future 失败,及其失败缘由。 // 若是已经有结果,返回 false,不抛出异常 boolean tryFailure(Throwable cause); // 标记该 future 不能够被取消 boolean setUncancellable(); // 这里和 ChannelFuture 同样,对这几个方法进行覆写,目的是为了返回 Promise 类型的实例 @Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> await() throws InterruptedException; @Override Promise<V> awaitUninterruptibly(); @Override Promise<V> sync() throws InterruptedException; @Override Promise<V> syncUninterruptibly(); }
可能有些读者对 Promise 的概念不是很熟悉,这里简单说两句。
我以为只要明白一点,Promise 实例内部是一个任务,任务的执行每每是异步的,一般是一个线程池来处理任务。Promise 提供的 setSuccess(V result) 或 setFailure(Throwable t) 未来会被某个执行任务的线程在执行完成之后调用,同时那个线程在调用 setSuccess(result) 或 setFailure(t) 后会回调 listeners 的回调函数(固然,回调的具体内容不必定要由执行任务的线程本身来执行,它能够建立新的线程来执行,也能够将回调任务提交到某个线程池来执行)。并且,一旦 setSuccess(...) 或 setFailure(...) 后,那些 await() 或 sync() 的线程就会从等待中返回。
因此这里就有两种编程方式,一种是用 await(),等 await() 方法返回后,获得 promise 的执行结果,而后处理它;另外一种就是提供 Listener 实例,咱们不太关心任务何时会执行完,只要它执行完了之后会去执行 listener 中的处理方法就行。
接下来,咱们再来看下 ChannelPromise,它继承了前面介绍的 ChannelFuture 和 Promise 接口。
ChannelPromise 接口在 Netty 中使用得比较多,由于它综合了 ChannelFuture 和 Promise 两个接口:
/\*\* \* Special {@link ChannelFuture} which is writable. \*/ publicinterface ChannelPromise extends ChannelFuture, Promise<Void\> { // 覆写 ChannelFuture 中的 channel() 方法,其实这个方法一点没变 @Override Channel channel(); // 下面几个方法是覆写 Promise 中的接口,为了返回值类型是 ChannelPromise @Override ChannelPromise setSuccess(Void result); ChannelPromise setSuccess(); boolean trySuccess(); @Override ChannelPromise setFailure(Throwable cause); // 到这里你们应该都熟悉了,下面几个方法的覆写也是为了获得 ChannelPromise 类型的实例 @Override ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise sync() throws InterruptedException; @Override ChannelPromise syncUninterruptibly(); @Override ChannelPromise await() throws InterruptedException; @Override ChannelPromise awaitUninterruptibly(); /\*\* \* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself. \*/ // 咱们忽略这个方法吧。 ChannelPromise unvoid(); }
咱们能够看到,它综合了 ChannelFuture 和 Promise 中的方法,只不过经过覆写将返回值都变为 ChannelPromise 了而已,没有增长什么新的功能。
小结一下,咱们上面介绍了几个接口,Future 以及它的子接口 ChannelFuture 和 Promise,而后是 ChannelPromise 接口同时继承了 ChannelFuture 和 Promise。
我把这几个接口的主要方法列在一块儿,这样你们看得清晰些:
接下来,咱们须要来一个实现类,这样才能比较直观地看出它们是怎么使用的,由于上面的这些都是接口定义,具体还得看实现类是怎么工做的。
下面,咱们来介绍下 DefaultPromise 这个实现类,这个类很经常使用,它的源码也不短,咱们先介绍几个关键的内容,而后介绍一个示例使用。
首先,咱们看下它有哪些属性:
publicclass DefaultPromise<V\> extends AbstractFuture<V\> implements Promise<V\> { // 保存执行结果 privatevolatile Object result; // 执行任务的线程池,promise 持有 executor 的引用,这个其实有点奇怪了 // 由于“任务”其实不必知道本身在哪里被执行的 privatefinal EventExecutor executor; // 监听者,回调函数,任务结束后(正常或异常结束)执行 private Object listeners; // 等待这个 promise 的线程数(调用sync()/await()进行等待的线程数量) privateshort waiters; // 是否正在唤醒等待线程,用于防止重复执行唤醒,否则会重复执行 listeners 的回调方法 privateboolean notifyingListeners; ...... }
能够看出,此类实现了 Promise,可是没有实现 ChannelFuture,因此它和 Channel 联系不起来。别急,咱们后面会碰到另外一个类 DefaultChannelPromise 的使用,这个类是综合了 ChannelFuture 和 Promise 的,可是它的实现其实大部分都是继承自这里的 DefaultPromise 类的。
说完上面的属性之后,你们能够看下 setSuccess(V result)
、trySuccess(V result)
和 setFailure(Throwable cause)
、 tryFailure(Throwable cause)
这几个方法:
看出 setSuccess(result) 和 trySuccess(result) 的区别了吗?
上面几个方法都很是简单,先设置好值,而后执行监听者们的回调方法。notifyListeners() 方法感兴趣的读者也能够看一看,不过它还涉及到 Netty 线程池的一些内容,咱们尚未介绍到线程池,这里就不展开了。上面的代码,在 setSuccess0 或 setFailure0 方法中都会唤醒阻塞在 sync() 或 await() 的线程
另外,就是能够看下 sync() 和 await() 的区别,其余的我以为随便看看就行了。
@Override public Promise<V> sync() throws InterruptedException { await(); // 若是任务是失败的,从新抛出相应的异常 rethrowIfFailed(); returnthis; }
接下来,咱们来写个实例代码吧:
public static void main(String\[\] args) { // 构造线程池 EventExecutor executor = new DefaultEventExecutor(); // 建立 DefaultPromise 实例 Promise promise = new DefaultPromise(executor); // 下面给这个 promise 添加两个 listener promise.addListener(new GenericFutureListener<Future<Integer>>() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { System.out.println("任务结束,结果:" + future.get()); } else { System.out.println("任务失败,异常:" + future.cause()); } } }).addListener(new GenericFutureListener<Future<Integer>>() { @Override public void operationComplete(Future future) throws Exception { System.out.println("任务结束,balabala..."); } }); // 提交任务到线程池,五秒后执行结束,设置执行 promise 的结果 executor.submit(new Runnable() { @Override public void run() { try { Thread.sleep(5000); } catch (InterruptedException e) { } // 设置 promise 的结果 // promise.setFailure(new RuntimeException()); promise.setSuccess(123456); } }); // main 线程阻塞等待执行结果 try { promise.sync(); } catch (InterruptedException e) { } }
运行代码,两个 listener 将在 5 秒后将输出:
任务结束,结果:123456
任务结束,balabala...
读者这里能够试一下 sync() 和 await() 的区别,在任务中调用 promise.setFailure(new RuntimeException()) 试试看。
上面的代码中,你们可能会对线程池 executor 和 promise 之间的关系感到有点迷惑。读者应该也要清楚,具体的任务不必定就要在这个 executor 中被执行。任务结束之后,须要调用 promise.setSuccess(result) 做为通知。
一般来讲,promise 表明的 future 是不须要和线程池搅在一块儿的,future 只关心任务是否结束以及任务的执行结果,至因而哪一个线程或哪一个线程池执行的任务,future 实际上是不关心的。
不过 Netty 毕竟不是要建立一个通用的线程池实现,而是和它要处理的 IO 息息相关的,因此咱们只不过要理解它就行了。
这节就说这么多吧,咱们回过头来再看一下这张图,看看你们是否是看懂了这节内容:
咱们就说说上图左边的部分吧,虽然咱们还不知道 bind() 操做中具体会作什么工做,可是咱们应该能够猜出一二。
显然,main 线程调用 b.bind(port) 这个方法会返回一个 ChannelFuture,bind() 是一个异步方法,当某个执行线程执行了真正的绑定操做后,那个执行线程必定会标记这个 future 为成功(咱们假定 bind 会成功),而后这里的 sync() 方法(main 线程)就会返回了。
若是 bind(port) 失败,咱们知道,sync() 方法会将异常抛出来,而后就会执行到 finally 块了。
一旦绑定端口 bind 成功,进入下面一行,f.channel() 方法会返回该 future 关联的 channel。
channel.closeFuture() 也会返回一个 ChannelFuture,而后调用了 sync() 方法,这个 sync() 方法返回的条件是:有其余的线程关闭了 NioServerSocketChannel,每每是由于须要停掉服务了,而后那个线程会设置 future 的状态( setSuccess(result) 或 setFailure(cause) ),这个 sync() 方法才会返回。
这篇文章就到这里,但愿你们对 Netty 中的异步编程有些了解,后续碰到源码的时候能知道是怎么使用的了。