Netty源码分析第7章(编码器和写数据)---->第5节: Future和Promies

 

Netty源码分析第七章: 编码器和写数据html

 

第五节: Future和Promisejava

 

Netty中的Future, 其实相似于jdk的Future, 用于异步获取执行结果设计模式

Promise则至关于一个被观察者, 其中promise对象会一直跟随着channel的读写事件, 并跟踪着事件状态, 而后执行相应的回调数组

这种设计思路也就是java设计模式的观察者模式promise

首先咱们看一段写在handler中的业务代码:并发

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelFuture future = ctx.writeAndFlush("test data"); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()){ System.out.println("写出成功"); }else{ System.out.println("写出失败"); } } }); }

熟悉netty的小伙伴估计对这段代码并不陌生, 首先调用writeAndFlush方法将数据写出, 而后返回的future进行添加Listener, 而且重写回调函数异步

这里举一个最简单的示例, 在回调函数中判断future的状态成功与否, 成功的话就打印"写出成功", 不然节打印"写出失败"ide

这里若是写在handler中一般是NioEventLoop线程执行的, 在future返回以后才会执行添加listener的操做, 若是在用户线程中writeAndFlush是异步执行的, 在添加监听的时候有可能写出操做没有执行完毕, 等写出操做执行完毕以后才会执行回调函数

以上逻辑在代码中如何体现的呢?咱们首先跟到writeAndFlush的方法中去oop

这里会走到AbstractChannelHandlerContext中的writeAndFlush方法中:

public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); }

这里的逻辑以前剖析过, 想必你们并不陌生

这里关注newPromise()方法, 跟进去:

public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); }

这里直接建立了DefaultChannelPromise这个对象并传入了当前channel和当前channel绑定NioEventLoop对象

在DefaultChannelPromise构造方法中, 也会将channel和NioEventLoop对象绑定在自身成员变量中

回到writeAndFlush方法继续跟:

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); return promise; } write(msg, true, promise); return promise; }

这里的逻辑也不陌生, 注意这里最后返回了promise, 其实就是咱们上一步建立DefaultChannelPromise对象

DefaultChannelPromise实现了ChannelFuture接口, 因此方法若是返回该对象能够被ChannelFuture类型接收

咱们继续跟write方法:

private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }

这里的逻辑咱们一样不陌生, 若是nioEventLoop线程, 咱们继续调invokeWriteAndFlush方法, 若是不是nioEventLoop线程则将writeAndFlush事件封装成task, 交给eventLoop线程异步

这里若是是异步执行, 则到这一步以后, 咱们的业务代码中, writeAndFlush就会返回并添加监听, 有关添加监听的逻辑稍后分析

走到这里, 不管同步异步, 都会执行到invokeWriteAndFlush方法:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }

这里也是咱们熟悉的逻辑, 咱们看到在invokeWrite0方法中传入了咱们刚才建立的DefaultChannelPromise

后续逻辑想必你们都比较熟悉, 经过事件传播, 最终会调用head节点的write方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }

这里最终调用unsafe的write方法, 并传入了promise对象

跟到AbstractUnsafe的write方法中:

public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); //负责缓冲写进来的byteBuf
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //插入写队列
 outboundBuffer.addMessage(msg, size, promise); }

这里的逻辑以前小节也剖析过, 这里咱们首先关注两个部分, 首先看在catch中safeSetFailure这步

由于是catch块, 说明发生了异常, 写到缓冲区不成功, safeSetFailure就是设置写出失败的状态

咱们跟到safeSetFailure方法中:

protected final void safeSetFailure(ChannelPromise promise, Throwable cause) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } }

这里看if判断, 首先咱们的promise是DefaultChannelPromise, 因此!(promise instanceof VoidChannelPromise)为true

重点分析promise.tryFailure(cause), 这里是设置失败状态, 这里会调用DefaultPromise的tryFailure方法

跟进tryFailure方法:

public boolean tryFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return true; } return false; }

再跟到setFailure0(cause)中:

private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }

这里在if块中的cas操做, 会将参数objResult的值设置到DefaultPromise的成员变量result中, 表示当前操做为异常状态

回到tryFailure方法:

这里关注notifyListeners()这个方法, 这个方法是执行添加监听的回调函数, 当writeAndFlush和addListener是异步执行的时候, 这里有可能添加已经添加, 因此经过这个方法能够调用添加监听后的回调

若是writeAndFlush和addListener是同步执行的时候, 也就是都在NioEventLoop线程中执行的时候, 那么走到这里addListener还没执行, 因此这里不能回调添加监听的回调函数, 那么回调是何时执行的呢?咱们在剖析addListener步骤的时候会给你们分析

 

具体执行回调咱们再讲解添加监听的时候进行剖析

以上就是记录异常状态的大概逻辑

回到AbstractUnsafe的write方法:

咱们再关注这一步:

outboundBuffer.addMessage(msg, size, promise);

跟到addMessage方法中:

public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); //代码省略
}

咱们只须要关注包装Entry的newInstance方法, 该方法传入promise对象

跟到newInstance中:

static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; entry.pendingSize = size; entry.total = total; entry.promise = promise; return entry; }

这里将promise设置到Entry的成员变量中了, 也就是说, 每一个Entry都关联了惟一的一个promise

咱们回到AbstractChannelHandlerContext的invokeWriteAndFlush方法中:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }

咱们刚才分析了write操做中promise的传递以及状态设置的大概过程, 咱们继续看在flush中promise的操做过程

这里invokeFlush0()并无传入promise对象, 是由于咱们刚才分析过, promise对象会绑定在缓冲区中entry的成员变量中, 能够经过其成员变量拿到promise对象

invokeFlush0()咱们以前也分析过, 经过事件传递, 最终会调用HeadContext的flush方法:

public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }

最后跟到AbstractUnsafe的flush方法:

public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }

这块逻辑以前已分析过, 继续看flush0方法:

protected void flush0() { //代码省略
    try { doWrite(outboundBuffer); } catch (Throwable t) { //代码省略
    } finally { inFlush0 = false; } }

篇幅缘由咱们省略大段代码

咱们继续跟进doWrite方法:

protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { Object msg = in.current(); if (msg == null) { clearOpWrite(); return; } if (msg instanceof ByteBuf) { //代码省略
            boolean done = false; //代码省略
            if (done) { //移除当前对象
 in.remove(); } else { break; } } else if (msg instanceof FileRegion) { //代码省略
        } else { throw new Error(); } } incompleteWrite(setOpWrite); }

这里也省略了大段代码, 咱们重点关注in.remove()这里, 以前介绍过, 若是done为true, 说明刷新事件已完成, 则移除当前entry节点

咱们跟到remove()方法中:

public boolean remove() { Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) { ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } e.recycle(); return true; }

这里咱们看这一步:

ChannelPromise promise = e.promise;

以前咱们剖析promise对象会绑定在entry中, 而这步就是从entry中获取promise对象

等remove操做完成, 会执行到这一步:

safeSuccess(promise);

这一步正好和咱们刚才分析的safeSetFailure相反, 这里是设置成功状态

跟到safeSuccess方法中:

private static void safeSuccess(ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise)) { PromiseNotificationUtil.trySuccess(promise, null, logger); } }

再跟到trySuccess方法中:

public static <V> void trySuccess(Promise<? super V> p, V result, InternalLogger logger) { if (!p.trySuccess(result) && logger != null) { //代码省略
 } }

这里再继续跟if中的trySuccess方法, 最后会走到DefaultPromise的trySuccess方法:

public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; }

这里跟到setSuccess0方法中:

private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); }

这里的逻辑咱们刚才剖析过了, 这里参数传入一个信号SUCCESS, 表示设置成功状

再继续跟setValue方法:

private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }

一样, 在if判断中, 经过cas操做将参数传入的SUCCESS对象赋值到DefaultPromise的属性result中, 咱们看这个属性:

 private volatile Object result; 

这里是Object类型, 也就是能够赋值成任何类型

SUCCESS是一个Signal类型的对象, 这里咱们能够简单理解成一种状态, SUCCESS表示一种成功的状态

经过上述cas操做, result的值将赋值成SUCCESS

咱们回到trySuccess方法:

public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; }

设置完成功状态以后, 则会经过notifyListeners()执行监听中的回调

咱们看用户代码:

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelFuture future = ctx.writeAndFlush("test data"); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()){ System.out.println("写出成功"); }else{ System.out.println("写出失败"); } } }); }

在回调中会判断future.isSuccess(), promise设置为成功状态这里会返回true, 从而打印写出成功"

跟到isSuccess方法中, 这里会调用DefaultPromise的isSuccess方法:

public boolean isSuccess() { Object result = this.result; return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); }

咱们看到首先会拿到result对象, 而后判断result不为空, 而且不是UNCANCELLABLE, 而且不属于CauseHolder对象

咱们刚才分析若是promise设置为成功装载, 则result为SUCCESS, 因此这里条件成立, 能够执行 if (future.isSuccess()) 中if块的逻辑

 

和设置错误状态的逻辑同样, 这里也有一样的问题, 若是writeAndFlush是和addListener是异步操做, 那么执行到回调的时候, 可能addListener已经添加完成, 因此能够正常的执行回调

那么若是writeAndFlush是和addListener是同步操做, writeAndFlush在执行回调的时候, addListener并无执行, 因此没法执行回调方法, 那么回调方法是如何执行的呢, 咱们看addListener这个方法:

addListener传入ChannelFutureListener对象, 并重写了operationComplete方法, 也就是执行回调的方法

这里会执行到DefaultChannelPromise的addListener方法, 跟进去

public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.addListener(listener); return this; }

跟到父类的addListener中:

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }

这里经过addListener0方法添加listener, 由于添加listener有可能会在不一样的线程中操做, 好比用户线程和NioEventLoop线程, 为了防止并发问题, 这里简单粗暴的加了个synchronized关键字

跟到addListener0方法中:

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener); } }

若是是第一次添加listener, 则成员变量listeners为null, 这样就把参数传入的GenericFutureListener赋值到成员变量listeners

若是是第二次添加listener, listeners不为空, 会走到else if判断, 由于第一次添加的listener是GenericFutureListener类型, 并非DefaultFutureListeners类型, 因此else if判断返回false, 进入到else块中

else块中, 经过new的方式建立一个DefaultFutureListeners对象并赋值到成员变量listeners中

DefaultFutureListeners的构造方法中, 第一个参数传入DefaultPromise中的成员变量listeners, 也就是第一次添加的GenericFutureListener对象, 第二个参数为第二次添加的GenericFutureListener对象, 这里经过两个GenericFutureListener对象包装成一个DefaultFutureListeners对象

咱们看listeners的定义:

private Object listeners;

这里是个Object类型, 因此能够保存任何类型的对象

再看DefaultFutureListeners的构造方法:

DefaultFutureListeners( GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) { listeners = new GenericFutureListener[2]; //第0个
    listeners[0] = first; //第1个
    listeners[1] = second; size = 2; //代码省略
}

在DefaultFutureListeners类中也定义了一个成员变量listeners, 类型为GenericFutureListener数组

构造方法中初始化listeners这个数组, 而且数组中第一个值赋值为咱们第一次添加的GenericFutureListener, 第二个赋值为咱们第二次添加的GenericFutureListener

回到addListener0方法中:

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener); } }

通过两次添加listener, 属性listeners的值就变成了DefaultFutureListeners类型的对象, 若是第三次添加listener, 则会走到else if块中, DefaultFutureListeners对象经过调用add方法继续添加listener

跟到add方法中:

public void add(GenericFutureListener<? extends Future<?>> l) { GenericFutureListener<? extends Future<?>>[] listeners = this.listeners; final int size = this.size; if (size == listeners.length) { this.listeners = listeners = Arrays.copyOf(listeners, size << 1); } listeners[size] = l; this.size = size + 1; //代码省略
}

这里的逻辑也比较简单, 就是为当前的数组对象listeners中追加新的GenericFutureListener对象, 若是listeners容量不足则进行扩容操做

根据以上逻辑, 就完成了listener的添加逻辑

那么再看咱们刚才遗留的问题, 若是writeAndFlush和addListener是同步进行的, writeAndFlush执行回调时尚未addListener尚未执行回调, 那么回调是如何执行的呢?

回到DefaultPromise的addListener中:

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }

咱们分析完了addListener0方法, 再往下看

这个会有if判断isDone(), isDone方法, 就是程序执行到这一步的时候, 判断刷新事件是否执行完成

跟到isDone方法中:

public boolean isDone() { return isDone0(result); }

继续跟isDone0, 这里传入了成员变量result

private static boolean isDone0(Object result) { return result != null && result != UNCANCELLABLE; }

这里判断result不为null而且不为UNCANCELLABLE, 则就表示完成

由于成功的状态是SUCCESS, 因此flush成功这里会返回true

回到 addListener中:

若是执行完成, 就经过notifyListeners()方法执行回调, 这也解释刚才的问题, 在同步操做中, writeAndFlush在执行回调时并无添加listener, 因此添加listener的时候会判断writeAndFlush的执行状态, 若是状态时完成, 则会这里执行回调

一样, 在异步操做中, 走到这里writeAndFlush可能还没完成, 因此这里不会执行回调, 由writeAndFlush执行回调

因此, 不管writeAndFlush和addListener谁先完成, 均可以执行到回调方法

跟到notifyListeners()方法中:

private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }

这里首先判断是不是eventLoop线程, 若是是eventLoop线程则执行if块中的逻辑, 若是不是eventLoop线程, 则把执行回调的逻辑封装成task丢到EventLoop的任务队列中异步执行

咱们重点关注notifyListenersNow()方法, 跟进去:

private void notifyListenersNow() { Object listeners; synchronized (this) { if (notifyingListeners || this.listeners == null) { return; } notifyingListeners = true; listeners = this.listeners; this.listeners = null; } for (;;) { if (listeners instanceof DefaultFutureListeners) { notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners); } //代码省略
 } }

在无限for循环中, 首先首先判断listeners是否是DefaultFutureListeners类型, 根据咱们以前的逻辑, 若是只添加了一个listener, 则listeners是GenericFutureListener类型

一般在添加的时候只会添加一个listener, 因此咱们跟到else块中的notifyListener0方法:

private static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } }

咱们看到, 这里执行了GenericFutureListener的中咱们重写的回调函数operationComplete

 

以上就是执行回调的相关逻辑

 

 

第七章总结

 

        这一章讲解了有关writeflush的相关逻辑, 并分析了有关添加监听和异步写数据的相关步骤

        通过学习, 同窗们应该掌握以下知识:

        write操做是如何将ByteBuf添加到发送缓冲区的

        flush操做是如何将ByteBuf写出到chanel中的

        抽象编码器MessageToByteEncoder中如何定义了编码器的骨架逻辑

        writeAndFlushaddListener在同步和异步操做中是如何执行回调的

 

上一节: 刷新buffer队列

下一节: FastThreadLocal的使用和建立

相关文章
相关标签/搜索