本系列文章索引《响应式Spring的道法术器》
前情提要 Reactor 3快速上手 | 响应式流规范
本文测试源码java
在响应式编程中,调试是块难啃的骨头,这也是从命令式编程到响应式编程的切换过程当中,学习曲线最陡峭的地方。react
在命令式编程中,方法的调用关系摆在面上,咱们一般能够经过stack trace追踪的问题出现的位置。可是在异步的响应式编程中,一方面有诸多的调用是在水面如下的,做为响应式开发库的使用者是不须要了解的;另外一方面,基于事件的异步响应机制致使stack trace并不是很容易在代码中按图索骥的。git
好比下边的例子:github
@Test public void testBug() { getMonoWithException() .subscribe(); }
single()
方法只能接收一个元素,多了的话就会致使异常。上边的代码会报出以下的异常stack trace:编程
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.tryOnNext(FluxFilterFuseable.java:129) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.tryOnNext(FluxMapFuseable.java:284) at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.fastPath(FluxRange.java:273) at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.request(FluxRange.java:251) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:316) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:170) at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94) at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:87) at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:79) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:236) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:65) at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:60) at reactor.core.publisher.FluxFilterFuseable.subscribe(FluxFilterFuseable.java:51) at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58) at reactor.core.publisher.Mono.subscribe(Mono.java:3077) at reactor.core.publisher.Mono.subscribeWith(Mono.java:3185) at reactor.core.publisher.Mono.subscribe(Mono.java:2962) at com.getset.Test_2_7.testBug(Test_2_7.java:19) ...
比较明显的信息大概就是那句“Source emitted more than one item”。下边的内容基本都是在Reactor库内部的调用,并且上边的stack trace的问题是出自.subscribe()
那一行的。app
若是对响应式流内部的Publisher、Subscriber和Subscription的机制比较熟悉,大概能够根据subscribe()
或request()
的顺序大概猜想出来getMonoWithException()
方法内大约通过了.map.filter.range
的操做链,可是除此以外,确实获取不到太多信息。异步
另外一方面,命令式编程的方式比较容易使用IDE的调试工具进行单步或断点调试,而在异步编程方式下,一般也不太好使。ide
以上这些都是在异步的响应式编程中可能会遇到的窘境。解铃还须系铃人,对于响应式编程的调试还须要响应式编程库自己提供调试工具。异步编程
Reactor提供了开启调试模式的方法。工具
Hooks.onOperatorDebug();
这个方法可以开启调试模式,从而在抛出异常时打印出一些有用的信息。把这一行加上:
@Test public void testBug() { Hooks.onOperatorDebug(); getMonoWithException() .subscribe(); }
这时候,除了上边的那一套stack trace以外,增长了如下内容:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.MonoSingle] : reactor.core.publisher.Flux.single(Flux.java:6473) com.getset.Test_2_7.getMonoWithException(Test_2_7.java:13) com.getset.Test_2_7.testBug(Test_2_7.java:19) Error has been observed by the following operator(s): |_ Flux.single(Test_2_7.java:13)
这里就能够明确找出问题根源了。
Hooks.onOperatorDebug()
的实现原理在于在组装期包装各个操做符的构造方法,加入一些监测功能,因此这个 hook 应该在早于声明的时候被激活,最保险的方式就是在你程序的最开始就激活它。以map
操做符为例:
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) { if (this instanceof Fuseable) { return onAssembly(new FluxMapFuseable<>(this, mapper)); } return onAssembly(new FluxMap<>(this, mapper)); }
能够看到,每次在返回新的Flux对象的时候,都会调用onAssembly
方法,这里就是Reactor能够在组装期插手“搞事情”的地方。
Hooks.onOperatorDebug()
是一种全局性的Hook,会影响到应用中全部的操做符,因此其带来的性能成本也是比较大的。若是咱们大概知道可能的问题在哪,而对整个应用开启调试模式,也容易被茫茫多的调试信息淹没。这时候,咱们须要一种更加精准且廉价的定位方式。
若是你知道问题出在哪一个链上,可是因为这个链的上游或下游来自其余的调用,就能够针对这个链使用checkpoint()进行问题定位。
checkpoint()
操做符就像一个Hook,不过它的做用范围仅限于这个链上。
@Test public void checkBugWithCheckPoint() { getMonoWithException() .checkpoint() .subscribe(); }
经过增长checkpoint()
操做符,仍然能够打印出调试信息:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.MonoSingle] : reactor.core.publisher.Mono.checkpoint(Mono.java:1367) reactor.core.publisher.Mono.checkpoint(Mono.java:1317) com.getset.Test_2_7.checkBugWithCheckPoint(Test_2_7.java:25) Error has been observed by the following operator(s): |_ Mono.checkpoint(Test_2_7.java:25)
checkpoint()
方法还有变体checkpoint(String description)
,你能够传入一个独特的字符串以方便在 assembly traceback 中进行识别。 这样会省略掉stack trace,不过你能够依赖这个字符串来定位到出问题的组装点。checkpoint(String) 比 checkpoint 有更低的执行成本。以下:
@Test public void checkBugWithCheckPoint2() { getMonoWithException() .checkpoint("checkBugWithCheckPoint2") .subscribe(); }
加入用于标识的字符串(方法名),输出以下:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly site of producer [reactor.core.publisher.MonoSingle] is identified by light checkpoint [I_HATE_BUGS]."description" : "checkBugWithCheckPoint2"
能够看到这里确实省略了调试的assembly traceback,可是咱们经过上边的信息也能够定位到是single
的问题。
上边的例子比较简单,当有许多的调试信息打印出来的时候,这个标识字符串可以方便咱们在许多的控制台输出中定位到问题。
若是既但愿有调试信息assembly traceback,也但愿用上标识字符串,还能够checkpoint(description, true)
来实现,第二个参数true
标识要打印assembly traceback。
最后一个方便调试的工具就是咱们前边屡次用到的log()
操做符了,它可以记录其上游的Flux或 Mono的事件(包括onNext
、onError
、onComplete
, 以及onSubscribe
、cancel
、和request
)。
log
操做符能够经过SLF4J使用相似Log4J和Logback这样的公共的日志工具来记录日志,若是SLF4J不存在的话,则直接将日志输出到控制台。
控制台使用 System.err 记录WARN
和ERROR
级别的日志,使用 System.out 记录其余级别的日志。