Netty 源码分析之 四 Promise 与 Future: 双子星的秘密

永顺大牛写的系列教程《源码之下无秘密 ── 作最好的 Netty 源码分析教程》是目前我读过最好的netty源码分析文章。但不知道什么缘由,做者在写到第三章的时候停更了。所以,我想尝试凭着我的的理解,续写后边几个章节。java

写在最前

永顺前辈已经写完章节有以下:git

续写章节:github

本文使用的netty版本为4.1.33编程

Future<V>和Promise<V>的关系

Netty内部的io.netty.util.concurrent.Future<V> 继承自java.util.concurrent.Future<V>,而Promise<V>是前者的一个特殊实现。
Future和Promise类图.pngbootstrap

Java原生Future<V>

Java并发编程包下提供了Future<V>接口。Future在异步编程中表示该异步操做的结果,经过Future<V>的内部方法能够实现状态检查、取消执行、获取执行结果等操做。内部的方法以下:segmentfault

// 尝试取消执行
    boolean cancel(boolean mayInterruptIfRunning);
    // 是否已经被取消执行
    boolean isCancelled();
    // 是否已经执行完毕
    boolean isDone();
    // 阻塞获取执行结果
    V get() throws InterruptedException, ExecutionException;
    // 阻塞获取执行结果或超时后返回
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

Netty对Future<V>的扩展

原生的Future<V>功能比较有限,Netty扩展了Future<V>并增长了如下方法:数组

  • 增长了更加丰富的状态判断方法
// 判断是否执行成功
    boolean isSuccess();
    // 判断是否能够取消执行
    boolean isCancellable();
  • 支持获取致使I/O操做异常
Throwable cause();
  • 增长了监听回调有关方法,支持future完成后执行用户指定的回调方法
// 增长回调方法
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 增长多个回调方法
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 删除回调方法
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 删除多个回调方法
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  • 增长了更丰富的阻塞等待结果返回的两类方法。其中一类是sync方法,阻塞等待结果且若是执行失败后向外抛出致使失败的异常;另一类是await方法,仅阻塞等待结果返回,不向外抛出异常。
// 阻塞等待,且若是失败抛出异常
    Future<V> sync() throws InterruptedException;
    // 同上,区别是不可中断阻塞等待过程
    Future<V> syncUninterruptibly();

    // 阻塞等待
    Future<V> await() throws InterruptedException;
    // 同上,区别是不可中断阻塞等待过程
    Future<V> awaitUninterruptibly();

Promise<V>

Promise<V>接口继续继承了Future<V>,并增长若干个设置状态并回调的方法:promise

// 设置成功状态并回调
    Promise<V> setSuccess(V result);
    boolean trySuccess(V result);
    // 设置失败状态并回调
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    // 设置为不可取消状态
    boolean setUncancellable();

可见,Promise<V>做为一个特殊的Future<V>,只是增长了一些状态设置方法。因此它经常使用于传入I/O业务代码中,用于I/O结束后设置成功(或失败)状态,并回调方法。缓存

经过Promise设置I/O执行结果

以客户端链接的注册过程为例,调用链路以下:服务器

io.netty.bootstrap.Bootstrap.connect()
--> io.netty.bootstrap.Bootstrap.doResolveAndConnect()
---->io.netty.bootstrap.AbstractBootstrap.initAndRegister()
------>io.netty.channel.MultithreadEventLoopGroup.register()
-------->io.netty.channel.SingleThreadEventLoop.register()

一直跟踪到SingleThreadEventLoop中,会看到这段代码:

@Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

此处新建了一个DefaultChannelPromise,构造函数传入了当前的channel以及当前所在的线程this。从第一节的类图咱们知道,DefaultChannelPromise同时实现了Future和Promise,具备上述提到的全部方法。

而后继续将该promise传递进另一个register方法中:

@Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

在该register方法中,继续将promise传递到Unsafe的register方法中,而当即返回了以ChannelFuture的形式返回了该promise。显然这里是一个异步回调处理:上层的业务能够拿到返回的ChannelFuture阻塞等待结果或者设置回调方法,而继续往下传的Promise能够用于设置执行状态而且回调设置的方法。

咱们继续往下debug能够看到:

// io.netty.channel.AbstractChannel.AbstractUnsafe.java
 @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                // 若是已经注册过,则置为失败
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                // 若是线程类型不兼容,则置为失败
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    // 出现异常状况置promise为失败
                    safeSetFailure(promise, t);
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                // 注册以前,先将promise置为不可取消转态
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                pipeline.invokeHandlerAddedIfNeeded();
                // promise置为成功
                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
             
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                // 出现异常状况置promise为失败
                safeSetFailure(promise, t);
            }
        }

可见,底层的I/O操做成功与否均可以经过Promise设置状态,并使得外层的ChannelFuture能够感知获得I/O操做的结果。

经过ChannelFuture获取I/O执行结果

咱们再来看看被返回的ChannelFuture的用途:

// io.netty.bootstrap.AbstractBootstrap.java

    final ChannelFuture initAndRegister() {
        //...
        ChannelFuture regFuture = config().group().register(channel);
        // 若是异常不为null,则意味着底层的I/O已经失败,而且promise设置了失败异常
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

这里经过检查失败异常栈是否为空,能够提早检查到I/O是否失败。继续回溯,还能够看到:

// io.netty.bootstrap.AbstractBootstrap.java

 private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // 若是注册已经成功
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
       // 若是注册还没有完成
       // ...
    }
}

此处,经过ChannelFuture#isDone()方法能够知道底层的注册是否完成,若是完成,则继续进行bind操做。

可是由于注册是个异步操做,若是此时注册可能还没完成,那就会进入以下逻辑:

// io.netty.bootstrap.AbstractBootstrap.java

//...
else {
    // Registration future is almost always fulfilled already, but just in case it's not.
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Throwable cause = future.cause();
            if (cause != null) {
                // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                // IllegalStateException once we try to access the EventLoop of the Channel.
                promise.setFailure(cause);
            } else {
                // Registration was successful, so set the correct executor to use.
                // See https://github.com/netty/netty/issues/2586
                promise.registered();

                doBind0(regFuture, channel, localAddress, promise);
            }
        }
    });
    return promise;
}

这里新建了一个新的PendingRegistrationPromise,并为原来的ChannelFuture对象添加了一个回调方法,并在回调中更改PendingRegistrationPromise的状态,并且PendingRegistrationPromise会继续被传递到上层。当底层的Promise状态被设置而且回调,就会进入该回调方法。从而将I/O状态继续向外传递。

DefaultChannelPromise的结果传递实现原理

咱们已经了解清楚了Promise和Future的异步模型。再来看看底层是如何实现的。以最经常使用的DefaultChannelPromise为例,内部很是简单,咱们主要看它的父类DefaultPromise:

// result字段的原子更新器
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    // 缓存执行结果的字段
    private volatile Object result;
    // promise所在的线程
    private final EventExecutor executor;
    // 一个或者多个回调方法
    private Object listeners;
    // 阻塞线程数量计数器  
    private short waiters;

设置状态

以设置成功状态为例(setSuccess):

@Override
    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            // 调用回调方法
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }

    private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setValue0(Object objResult) {
        // 原子修改result字段为objResult
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            checkNotifyWaiters();
            return true;
        }
        return false;
    }

    private synchronized void checkNotifyWaiters() {
        if (waiters > 0) {
            // 若是有其余线程在等待该promise的结果,则唤醒他们
            notifyAll();
        }
    }

设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒全部在阻塞等待该promise返回结果的线程。

其余设置状态方法再也不赘言,基本上大同小异。

阻塞线程以等待执行结果

上文提到其余线程会阻塞等待该promise返回结果,具体实现以sync方法为例:

@Override
    public Promise<V> sync() throws InterruptedException {
        // 阻塞等待
        await();
        // 若是有异常则抛出
        rethrowIfFailed();
        return this;
    }

    @Override
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            // 若是已经完成,直接返回
            return this;
        }
        // 能够被中断
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        //检查死循环
        checkDeadLock();

        synchronized (this) {
            while (!isDone()) {
                // 递增计数器(用于记录有多少个线程在等待该promise返回结果)
                incWaiters();
                try {
                    // 阻塞等待结果
                    wait();
                } finally {
                    // 递减计数器
                    decWaiters();
                }
            }
        }
        return this;
    }

全部调用sync方法的线程,都会被阻塞,直到promise被设置为成功或者失败。这也解释了为什么Netty客户端或者服务端启动的时候通常都会调用sync方法,本质上都是阻塞当前线程而异步地等待I/O结果返回,以下:

Bootstrap bootstrap = new Bootstrap();
    ChannelFuture future = bootstrap.group(new NioEventLoopGroup(10))
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 管道中添加基于换行符分割字符串的解析器
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    // 管道中添加字符串编码解码器
                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                    // 管道中添加服务端处理逻辑
                    ch.pipeline().addLast(new MyClientEchoHandler());
                }
            }).connect("127.0.0.1", 9898).sync();

    future.channel().closeFuture().sync();

回调机制

@Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            // 添加回调方法
            addListener0(listener);
        }

        if (isDone()) {
            // 若是I/O操做已经结束,直接触发回调
            notifyListeners();
        }

        return this;
    }

    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            // 只有一个回调方法直接赋值
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
            // 将回调方法添加到DefaultFutureListeners内部维护的listeners数组中
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            // 若是有多个回调方法,新建一个DefaultFutureListeners以保存更多的回调方法
            listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
        }
    }

从上边能够看到,添加回调方法完成以后,会当即检查promise是否已经完成;若是promise已经完成,则立刻调用回调方法。

总结

Netty的Promise和Future机制是基于Java并发包下的Future<V>开发的。其中Future支持阻塞等待、添加回调方法、判断执行状态等,而Promise主要是支持状态设置相关方法。当底层I/O操做经过Promise改变执行状态,咱们能够经过同步等待的Future当即获得结果。

所以,就像永顺大牛标题所言,在Netty的异步模型里,Promise和Future就像是双子星通常紧密相连。但我以为这二者更像是量子纠缠里的两个电子,由于改变其中一个方的状态,另一方可以立刻感知。

至此,Promise和Future的核心原理已经分析完毕。

相关文章
相关标签/搜索