若是你有一个阻塞的方法,好比 Thread.sleep(1000),而又不想阻塞当前线程 A,只须要把该方法包装成一个任务由另外一个线程 B 执行便可。java
ExecutorService pool = Executors.newFixedThreadPool(3);
Future<Integer> future = pool.submit(() -> {
Thread.sleep(1000);
return 1;
});
复制代码
若是你须要在任务结束以后执行其余逻辑,一种方式是 A 线程先经过调用 future.get()
获取值,而后执行其余代码;可是 get 方法自己也是一个阻塞方法,在这期间 A 线程阻塞。数组
另一种方法是 B 线程执行完任务后,继续执行后续逻辑。Netty 中的 Future,io.netty.util.concurrent.Future,经过回调方法 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
实现了该功能。promise
Promise 接口继承了 Future 接口,在增长了 listener 的状况下,提供了 Promise<V> setSuccess(V result)
方法,能够在任务中手动设置返回值,并当即通知 listeners。ide
private static NioEventLoopGroup loopGroup = new NioEventLoopGroup(8);
public void methodA() {
Promise promise = methodA("ceee...eeeb");
promise.addListener(future -> { // 1
Object ret = future.get(); // 4. 此时能够直接拿到结果
// 后续逻辑由 B 线程执行
System.out.println(ret);
});
// A 线程不阻塞,继续执行其余代码...
}
public Promise<ResponsePacket> methodB(String name) {
Promise<ResponsePacket> promise = new DefaultPromise<>(loopGroup.next());
loopGroup.schedule(() -> { // 2
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("scheduler thread: " + Thread.currentThread().getName());
promise.setSuccess("hello " + name); // 3
}, 0, TimeUnit.SECONDS);
return promise;
}
复制代码
简单的使用 Promise 包括:oop
promise.addListener()
;loopGroup.schedule()
;promise.setSuccess()
;// class: DefaultPromise
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) { // 1. 增长 listener
addListener0(listener);
}
if (isDone()) { // 2. 若是任务执行完了,通知全部 listener
notifyListeners();
}
return this;
}
复制代码
继续看 addListener0:源码分析
private Object listeners;
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
// 1. 添加第 1 个 listener 时,直接赋值便可
if (listeners == null) {
listeners = listener;
}
// 3. 添加第 3 个以及更多 listener 时,直接加入数组便可
else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
}
// 2. 添加第 2 个 listener 时,listeners 类型更改成 DefaultFutureListeners,内部实现为一个数组
else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
复制代码
因为能够添加多个 listener,很容易想到经过一个数组保存全部 listener。而实现类里面 listeners 类型为 Object,多是考虑到大部分都只有 1 个 listener,节省内存空间。this
将任务加入队列,由线程池执行。spa
// class: DefaultPromise
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) { // 若是设置成功,返回;不然抛异常
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
// 设置 result
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// cas 操做
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
private synchronized boolean checkNotifyWaiters() {
/** * 有些线程不是经过增长 listener 的方式获取结果,而是经过 promise.get() 方法获取, * 那么这些线程为阻塞状态;当设置了 result 后,须要唤醒这些线程 */
if (waiters > 0) {
notifyAll();
}
return listeners != null; // 只要存在 listener,就返回 true
}
复制代码
继续查看 notifyListeners:.net
// class: DefaultPromise
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
// TODO 嵌套监听
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
// 1. 若是是 promise 绑定的线程,直接执行
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 2. 不然,加入任务调度, 所以 listener 方法最终仍是由 promise 绑定的线程执行的
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
// 依次通知全部 listener
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
notifyingListeners = false;
return;
}
// 通知原先的 listeners 时,有可能有新的 listener 在此期间注册, 也须要通知到
listeners = this.listeners;
this.listeners = null;
}
}
}
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future); // 执行 listener 中的方法
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
复制代码