Netty: DefaultPromise源码解读

1、为何须要 io.netty.util.concurrent.Promise ?

若是你有一个阻塞的方法,好比 Thread.sleep(1000),而又不想阻塞当前线程 A,只须要把该方法包装成一个任务由另外一个线程 B 执行便可。java

ExecutorService pool = Executors.newFixedThreadPool(3);
Future<Integer> future = pool.submit(() -> {
    Thread.sleep(1000);
    return 1;
});
复制代码

若是你须要在任务结束以后执行其余逻辑,一种方式是 A 线程先经过调用 future.get() 获取值,而后执行其余代码;可是 get 方法自己也是一个阻塞方法,在这期间 A 线程阻塞。数组

另一种方法是 B 线程执行完任务后,继续执行后续逻辑。Netty 中的 Future,io.netty.util.concurrent.Future,经过回调方法 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); 实现了该功能。promise

Promise 接口继承了 Future 接口,在增长了 listener 的状况下,提供了 Promise<V> setSuccess(V result) 方法,能够在任务中手动设置返回值,并当即通知 listeners。ide

2、实例程序

private static NioEventLoopGroup loopGroup = new NioEventLoopGroup(8);

public void methodA() {
    Promise promise = methodA("ceee...eeeb");
    promise.addListener(future -> {		// 1
        Object ret = future.get();      // 4. 此时能够直接拿到结果
      	// 后续逻辑由 B 线程执行
        System.out.println(ret);
    });
  	// A 线程不阻塞,继续执行其余代码...
}

public Promise<ResponsePacket> methodB(String name) {
    Promise<ResponsePacket> promise = new DefaultPromise<>(loopGroup.next());
    loopGroup.schedule(() -> {		// 2
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("scheduler thread: " + Thread.currentThread().getName());
        promise.setSuccess("hello " + name);	// 3
    }, 0, TimeUnit.SECONDS);

    return promise;
}
复制代码

简单的使用 Promise 包括:oop

  1. 给 promise 增长 listener,promise.addListener()
  2. 分配执行任务的线程,loopGroup.schedule()
  3. 在任务执行过程当中,设置结果,promise.setSuccess()

3、源码分析

1. addListener

// class: DefaultPromise
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");

    synchronized (this) {  // 1. 增长 listener
        addListener0(listener);	
    }

    if (isDone()) {  // 2. 若是任务执行完了,通知全部 listener
        notifyListeners();
    }

    return this;
}
复制代码

继续看 addListener0:源码分析

private Object listeners;
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
 	// 1. 添加第 1 个 listener 时,直接赋值便可
    if (listeners == null) {
        listeners = listener;
    } 
  	// 3. 添加第 3 个以及更多 listener 时,直接加入数组便可
  	else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } 
  	// 2. 添加第 2 个 listener 时,listeners 类型更改成 DefaultFutureListeners,内部实现为一个数组
  	else {
        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
    }
}
复制代码

因为能够添加多个 listener,很容易想到经过一个数组保存全部 listener。而实现类里面 listeners 类型为 Object,多是考虑到大部分都只有 1 个 listener,节省内存空间。this

2. schedule

将任务加入队列,由线程池执行。spa

3. setSuccess

// class: DefaultPromise
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {	// 若是设置成功,返回;不然抛异常
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

private boolean setSuccess0(V result) {
  	// 设置 result
    return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
    // cas 操做
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        if (checkNotifyWaiters()) {
            notifyListeners();
        }
        return true;
    }
    return false;
}

private synchronized boolean checkNotifyWaiters() {
      /** * 有些线程不是经过增长 listener 的方式获取结果,而是经过 promise.get() 方法获取, * 那么这些线程为阻塞状态;当设置了 result 后,须要唤醒这些线程 */
    if (waiters > 0) {
        notifyAll();
    }
    return listeners != null;  // 只要存在 listener,就返回 true
}
复制代码

继续查看 notifyListeners:.net

// class: DefaultPromise
private void notifyListeners() {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
      	// TODO 嵌套监听
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
              	// 1. 若是是 promise 绑定的线程,直接执行
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }

    // 2. 不然,加入任务调度, 所以 listener 方法最终仍是由 promise 绑定的线程执行的
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

private void notifyListenersNow() {
    Object listeners;
    synchronized (this) {
        if (notifyingListeners || this.listeners == null) {
            return;
        }
        notifyingListeners = true;
        listeners = this.listeners;
        this.listeners = null;
    }
    for (;;) {
      	// 依次通知全部 listener
        if (listeners instanceof DefaultFutureListeners) {
            notifyListeners0((DefaultFutureListeners) listeners);
        } else {
            notifyListener0(this, (GenericFutureListener<?>) listeners);
        }
        synchronized (this) {
            if (this.listeners == null) {
                notifyingListeners = false;
                return;
            }
            // 通知原先的 listeners 时,有可能有新的 listener 在此期间注册, 也须要通知到
            listeners = this.listeners;
            this.listeners = null;
        }
    }
}

private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
        l.operationComplete(future);	// 执行 listener 中的方法
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }
}
复制代码
相关文章
相关标签/搜索