(17)Reactor的调试——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》
前情提要 Reactor 3快速上手 | 响应式流规范
本文测试源码java

2.7 调试

在响应式编程中,调试是块难啃的骨头,这也是从命令式编程到响应式编程的切换过程当中,学习曲线最陡峭的地方。react

在命令式编程中,方法的调用关系摆在面上,咱们一般能够经过stack trace追踪的问题出现的位置。可是在异步的响应式编程中,一方面有诸多的调用是在水面如下的,做为响应式开发库的使用者是不须要了解的;另外一方面,基于事件的异步响应机制致使stack trace并不是很容易在代码中按图索骥的。git

好比下边的例子:github

@Test
    public void testBug() {
        getMonoWithException()
                .subscribe();
    }
  1. single()方法只能接收一个元素,多了的话就会致使异常。

上边的代码会报出以下的异常stack trace:编程

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item

Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.tryOnNext(FluxFilterFuseable.java:129)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.tryOnNext(FluxMapFuseable.java:284)
    at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.fastPath(FluxRange.java:273)
    at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.request(FluxRange.java:251)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:316)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:170)
    at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94)
    at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:87)
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:79)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:236)
    at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:65)
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:60)
    at reactor.core.publisher.FluxFilterFuseable.subscribe(FluxFilterFuseable.java:51)
    at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3077)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:3185)
    at reactor.core.publisher.Mono.subscribe(Mono.java:2962)
    at com.getset.Test_2_7.testBug(Test_2_7.java:19)
    ...

比较明显的信息大概就是那句“Source emitted more than one item”。下边的内容基本都是在Reactor库内部的调用,并且上边的stack trace的问题是出自.subscribe()那一行的。app

若是对响应式流内部的Publisher、Subscriber和Subscription的机制比较熟悉,大概能够根据subscribe()request()的顺序大概猜想出来getMonoWithException()方法内大约通过了.map.filter.range的操做链,可是除此以外,确实获取不到太多信息。异步

另外一方面,命令式编程的方式比较容易使用IDE的调试工具进行单步或断点调试,而在异步编程方式下,一般也不太好使。ide

以上这些都是在异步的响应式编程中可能会遇到的窘境。解铃还须系铃人,对于响应式编程的调试还须要响应式编程库自己提供调试工具。异步编程

2.7.1 开启调试模式

Reactor提供了开启调试模式的方法。工具

Hooks.onOperatorDebug();

这个方法可以开启调试模式,从而在抛出异常时打印出一些有用的信息。把这一行加上:

@Test
    public void testBug() {
        Hooks.onOperatorDebug();
        getMonoWithException()
                .subscribe();
    }

这时候,除了上边的那一套stack trace以外,增长了如下内容:

Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
    reactor.core.publisher.Flux.single(Flux.java:6473)
    com.getset.Test_2_7.getMonoWithException(Test_2_7.java:13)
    com.getset.Test_2_7.testBug(Test_2_7.java:19)
Error has been observed by the following operator(s):
    |_  Flux.single(Test_2_7.java:13)

这里就能够明确找出问题根源了。

Hooks.onOperatorDebug()的实现原理在于在组装期包装各个操做符的构造方法,加入一些监测功能,因此这个 hook 应该在早于声明的时候被激活,最保险的方式就是在你程序的最开始就激活它。以map操做符为例:

public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {
        if (this instanceof Fuseable) {
            return onAssembly(new FluxMapFuseable<>(this, mapper));
        }
        return onAssembly(new FluxMap<>(this, mapper));
    }

能够看到,每次在返回新的Flux对象的时候,都会调用onAssembly方法,这里就是Reactor能够在组装期插手“搞事情”的地方。

Hooks.onOperatorDebug()是一种全局性的Hook,会影响到应用中全部的操做符,因此其带来的性能成本也是比较大的。若是咱们大概知道可能的问题在哪,而对整个应用开启调试模式,也容易被茫茫多的调试信息淹没。这时候,咱们须要一种更加精准且廉价的定位方式。

2.7.2 使用 checkpoint() 来定位

若是你知道问题出在哪一个链上,可是因为这个链的上游或下游来自其余的调用,就能够针对这个链使用checkpoint()进行问题定位。

checkpoint()操做符就像一个Hook,不过它的做用范围仅限于这个链上。

@Test
    public void checkBugWithCheckPoint() {
        getMonoWithException()
                .checkpoint()
                .subscribe();
    }

经过增长checkpoint()操做符,仍然能够打印出调试信息:

Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
    reactor.core.publisher.Mono.checkpoint(Mono.java:1367)
    reactor.core.publisher.Mono.checkpoint(Mono.java:1317)
    com.getset.Test_2_7.checkBugWithCheckPoint(Test_2_7.java:25)
Error has been observed by the following operator(s):
    |_  Mono.checkpoint(Test_2_7.java:25)

checkpoint()方法还有变体checkpoint(String description),你能够传入一个独特的字符串以方便在 assembly traceback 中进行识别。 这样会省略掉stack trace,不过你能够依赖这个字符串来定位到出问题的组装点。checkpoint(String) 比 checkpoint 有更低的执行成本。以下:

@Test
    public void checkBugWithCheckPoint2() {
        getMonoWithException()
                .checkpoint("checkBugWithCheckPoint2")
                .subscribe();
    }

加入用于标识的字符串(方法名),输出以下:

Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly site of producer [reactor.core.publisher.MonoSingle] is identified by light checkpoint [I_HATE_BUGS]."description" : "checkBugWithCheckPoint2"

能够看到这里确实省略了调试的assembly traceback,可是咱们经过上边的信息也能够定位到是single的问题。

上边的例子比较简单,当有许多的调试信息打印出来的时候,这个标识字符串可以方便咱们在许多的控制台输出中定位到问题。

若是既但愿有调试信息assembly traceback,也但愿用上标识字符串,还能够checkpoint(description, true)来实现,第二个参数true标识要打印assembly traceback。

2.7.3 使用log()操做符了解执行过程

最后一个方便调试的工具就是咱们前边屡次用到的log()操做符了,它可以记录其上游的Flux或 Mono的事件(包括onNextonErroronComplete, 以及onSubscribecancel、和request)。

log操做符能够经过SLF4J使用相似Log4J和Logback这样的公共的日志工具来记录日志,若是SLF4J不存在的话,则直接将日志输出到控制台。

控制台使用 System.err 记录WARNERROR级别的日志,使用 System.out 记录其余级别的日志。

相关文章
相关标签/搜索