Future Promise 模式(netty源码9)

netty源码死磕9  html

Future Promise 模式详解java

1. Future/Promise 模式

1.1. ChannelFuture的由来


因为Netty中的Handler 处理都是异步IO操做,结果是未知的。编程

Netty继承和扩展了JDK Future的API,定义了自身的Future系列类型,实现异步操做结果的获取和监控。promise


其中,最为重要的是ChannelFuture 。网络

代码以下:异步

public interface ChannelFuture extends Future<Void> {

    //...

    Channel channel();

    @Override

    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

      @Override

    ChannelFuture sync() throws InterruptedException;

    //...

}


之因此命名为ChannelFuture,表示跟Channel的操做有关。socket

ChannelFuture用于获取Channel相关的操做结果,添加事件监听器,取消IO操做,同步等待。ide


1.2. 来自Netty的官方建议


java.util.concurrent.Future是Java提供的接口,提供了对异步操做的简单干预。oop

Future接口定义了isDone()、isCancellable(),用来判断异步执行状态。Future接口的get方法,能够用来获取结果。get方法首先会判断任务是否执行完成,若是完成就返回结果,不然阻塞线程,直到任务完成。学习


Netty官方文档直接说明——Netty的网络操做都是异步的,Netty源码上大量使用了Future/Promise模式。


若是用户操做调用了sync或者await方法,会在对应的future对象上阻塞用户线程,例如future.channel().closeFuture().sync()。


Netty 的Future 接口,在继承了java.util.concurrent.Future的基础上,增长了一系列监听器方法,好比addListener()、removeListener() 等等。Netty强烈建议,经过添加监听器的方式获取IO结果,而不是经过JDK Future的同步等待的方式去获取IO结果。


1.3. Netty 的 Future 接口


Netty扩展了Java的Future,增长了监听器Listener接口,经过监听器可让异步执行更加有效率,不须要经过get来等待异步执行结束,而是经过监听器回调来精确地控制异步执行结束的时间点。

这一点,正好是Netty在Future模式的最主要的改进。

public interface Future<V> extends java.util.concurrent.Future<V> {

    boolean isSuccess();

    boolean isCancellable();

    Throwable cause();

    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> sync() throws InterruptedException;

    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    boolean awaitUninterruptibly(long timeoutMillis);

    V getNow();

    boolean cancel(boolean mayInterruptIfRunning);

}

1.4. ChannelFuture使用的实例

Netty的出站和入站操做,都是异步的。

以最为经典的NIO出站操做——write出站为例,说一下ChannelFuture的使用。

代码以下:

ChannelFuture future = ctx.channel().write(msg);
future.addListener(
        new ChannelFutureListener()
        {
            @Override
            public void operationComplete(ChannelFuture future)
            {
                // write操做完成后的回调代码
            }
        });


在write操做调用后,Netty并无完成对Java NIO底层链接的写入操做,出站操做是异步执行的。

若是须要获取IO结果,可使用回调的方式。


使用ChannelFuture的异步完成后的回调,须要搭配使用另外的一个接口ChannelFutureListener ,他从父接口哪里继承了一个被回调到的operationComplete操做完成的方法。

ChannelFutureListener 的父亲接口是GenericFutureListener 接口。

定义以下:

public interface GenericFutureListener
                 <F extends Future<?>> extends EventListener
{
      void operationComplete(F future) throws Exception;
}


异步操做完成后的回调代码,放在operationComplete方法中的实现中,就能够了。


1.5. Netty的 Promise接口


Netty的Future,只是增长了监听器。整个异步的状态,是不能进行设置和修改的。

换句话说,Future是只读的,是不能够写的。

因而,Netty的 Promise接口扩展了Netty的Future接口,它表示一种可写的Future,就是能够设置异步执行的结果。

部分源码以下:

public interface Promise<V> extends Future<V> {

    Promise<V> setSuccess(V result);

    Promise<V> setFailure(Throwable cause);

    boolean setUncancellable();

   //....

}
在IO操做过程,若是顺利完成、或者发生异常,均可以设置Promise的结果,而且通知Promise的Listener们。


而ChannelPromise接口,则继承扩展了Promise和ChannelFuture。因此,ChannelPromise既绑定了Channel,又具有了监听器的功能,还能够设置IO操做的结果,是Netty实际编程使用的最多的接口。


在AbstratChannel的代码中,至关多的IO操做,都会返回ChannelPromise类型实例做为调用的返回值。 经过这个返回值,客户程序能够用于读取IO操做的结果,执行IO操做真正完成后的回调。


1.6. ChannelPromise的监控流程

在AbstractChannel中,定义了几个对Channel的异步状态进行监控的Promise和Future成员,用于监控Channel的链接是否成功,链接是否关闭。

源码以下:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

        //链接成功的监控
        private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);

        //链接关闭的监控
        private final CloseFuture closeFuture = new CloseFuture(this);

//...
}
对于每一个Channel对象,都会有惟一的一个CloseFuture 成员,用来表示关闭的异步干预。若是要监控Channel的关闭,或者同步等待Channel关闭。

通常状况下,在应用程序中使用以下的代码:

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();


通常来讲,编写以上代码的都是在Main线程中用来启动ServerBootStrap的,因此Main线程会被阻塞,保证服务端Channel的正常运行。

上面的代码中,channel.closeFuture()不作任何操做,只是简单的返回channel对象中的closeFuture对象。而CloseFuture的sync方法,会将当前线程阻塞在CloseFuture上。


那么,f.channel().closeFuture().sync() 实际是如何工做的呢?


1.7. CloseFuture的sync 同步方法


CloseFuture继承了DefaultPromise的sync同步方法。

DefaultPromise的代码以下:

public class DefaultPromise<V> extends AbstractFuture<V>
                                                 implements Promise<V>
{

     private volatile Object result;
      //...
    @Override
    public Promise<V>  sync() throws InterruptedException {

       await();
        //...
    }

   @Override
    public Promise<V> await() throws InterruptedException {

        //...
        synchronized (this) {
            while (!isDone()) {
                incWaiters();
               try {
                    wait();  //阻塞了,死等
                } finally {
                   decWaiters();
                }
            }
        }
        return this;
    }
//...

}


从源码能够看出,sync方法,调用了await方法。

在await方法中,CloseFuture 使用java 基础的synchronized 方法进行线程同步;而且,使用CloseFuture.wait / notify 这组来自Object根类中的古老方法进行线程之间的等待和唤醒。


在await方法,不断的自旋,判断当前的 CloseFuture 实例的结果是否已经完成,若是没有完成 !isDone() ,就不断的等待。一直到 isDone() 的值为true。

isDone() 的源码以下:

@Override
public boolean isDone() {
        return isDone0(result);
}

private static boolean isDone0(Object result) {
        return result != null && result != UNCANCELLABLE;
}


CloseFuture的 isDone() 的条件是否可以知足,和Channel的close 关闭链接的出站操做有关。


下一步,咱们来看 isDone() 的条件,如何才可以知足?



1.8. close 出站处理流程


在Netty中,close 关闭链接的操做,属于全部的出站操做的一种。关于Netty出站处理的流程,在前面的文字中,已经很是详细的介绍了。这里再也不赘述,只是简单的列出一个流程图。

close 关闭链接的出站操做,其流程以下图所示:

wps6221.tmp


一溜儿下来,最终会落地到unsafe.doClose 方法。

看看unsafe.doClose,是如何与CloseFuture的 isDone() 的条件进行关联的。


1.9. unsafe.doClose


unsafe.doClose 方法中,设置了CloseFuture 的result值。

unsafe.doClose 源码以下:


protected abstract class AbstractUnsafe implements Unsafe

{

   private void close(final ChannelPromise promise,…) {
        //…
        try {
            // Close the channel
            doClose0(promise);
        } finally {
            // Fail all the queued messages.
            outboundBuffer.failFlushed(cause, notify);
            outboundBuffer.close(closeCause);
        }
         //……
    }
    }
   private void doClose0(ChannelPromise promise) {
    try {
        doClose();
        closeFuture.setClosed();
        safeSetSuccess(promise);
    } catch (Throwable t) {
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
  }

  //……

}



1.10. closeFuture.setClosed()


在closeFuture.setClosed() 设置关闭的结果的过程当中,主要完成如下三个工做:

1  设置result的值

2  notifyAll,唤醒在本Promise上等待的线程

3  回调listener

closeFuture.setClosed()的主要源码以下:

boolean setClosed() {
     return super.trySuccess();
}

@Override

//这个定义在父类中
public boolean trySuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
         return true;
    }
    return false;
}

private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
}


上面的 notifyListeners()调用,就是用来唤醒了等待在closeFuture 实例对象上的等待线程。

到了这里,终于松了一口气了。

以前经过 f.channel().closeFuture().sync()  同步操做,阻塞在哪儿的Main线程,终于经过channel.close() 方法,给唤醒了。


1.11. 警戒死锁:Reactor线程不能sync


在上面的源码中,最终触发future对象的notify动做的线程,都是eventLoop线程(Reactor线程)。

通常状况下,Channel的出站和入站操做,也都是在eventLoop线程的轮询任务中完成的。


例如由于不管是用户直接关闭channel,或者eventLoop的轮询状态关闭channel,都会在eventLoop的线程内完成notify动做。notify那些经过sync操做,正在等待CloseFuture的哪些阻塞线程。


因此不要在Reactor线程内调用future对象的sync或者await方法。若是在Reactor线程进行sync或者await,会有可能引发死锁。


为何呢?

在Reactor线程进行sync时,会进入等待状态,等待Future(DefaultPromise)的 isDone 的条件知足。经过前面的例子,咱们已经看到了,而Future的isDone的条件,又须要Reactor线程的出站或者入站操做来知足。这是,Reactor线程既然已经处于等待状态,怎么可能再进行其余的出站或者入站操做呢?至关于本身等本身,这就是典型的死锁。


在实际开发中,因为应用程序代码都是编写在自定义的channelHandler处理器中,而channelHandler是在eventLoop线程(Reactor线程)内执行的。因此,不能在channelHandler中调用Future(DefaultPromise)的sync或者await两个同步方法。


正确的作法是:经过给Future(DefaultPromise) 增长listeners监听器 的方式,来干预异步操做的过程,处理异步操做的结果

这样,能够避免使用Future带来的死锁。



无编程不创客,无案例不学习。疯狂创客圈,一大波高手正在交流、学习中!

疯狂创客圈 Netty 死磕系列 10多篇深度文章博客园 总入口】  QQ群:104131248