本系列文章索引《响应式Spring的道法术器》
前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范 | 自定义数据流
本节测试源码html
许多地方也叫作“背压”、“负压”,我在《Reactor参考文档》中是翻译为“背压”的,后来在看到有“回压”的翻译,突然感受从文字上彷佛更加符合。java
这一节讨论回压的问题,有两个前提:react
回压的处理有如下几种策略:git
这几种策略定义在枚举类型OverflowStrategy
中,不过还有一个IGNORE类型,即彻底忽略下游背压请求,这可能会在下游队列积满的时候致使 IllegalStateException。github
上一节中,用于生成数据流的方法create
和push
能够用于异步的场景,并且它们也支持回压,咱们能够经过提供一个 OverflowStrategy 来定义背压行为。方法签名:编程
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure)
默认(没有第二个参数的方法)是缓存策略的,咱们来试一下别的策略,好比DROP的策略。缓存
咱们继续使用2.2节的那个测试例子,下边是用create
建立的“快的发布者”,不过方便起见拆放到两个私有方法里供调用:并发
public class Test_2_3 { /** * 使用create方法生成“快的发布者”。 * @param strategy 回压策略 * @return Flux */ private Flux<MyEventSource.MyEvent> createFlux(FluxSink.OverflowStrategy strategy) { return Flux.create(sink -> eventSource.register(new MyEventListener() { @Override public void onNewEvent(MyEventSource.MyEvent event) { System.out.println("publish >>> " + event.getMessage()); sink.next(event); } @Override public void onEventStopped() { sink.complete(); } }), strategy); // 1 } /** * 生成MyEvent。 * @param count 生成MyEvent的个数。 * @param millis 每一个MyEvent之间的时间间隔。 */ private void generateEvent(int times, int millis) { // 循环生成MyEvent,每一个MyEvent间隔millis毫秒 for (int i = 0; i < times; i++) { try { TimeUnit.MILLISECONDS.sleep(millis); } catch (InterruptedException e) { } eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i)); } eventSource.eventStopped(); } }
有了“快的发布者”,下面是“慢的订阅者”,以及一些测试准备工做:异步
public class Test_2_3 { private final int EVENT_DURATION = 10; // 生成的事件间隔时间,单位毫秒 private final int EVENT_COUNT = 20; // 生成的事件个数 private final int PROCESS_DURATION = 30; // 订阅者处理每一个元素的时间,单位毫秒 private Flux<MyEventSource.MyEvent> fastPublisher; private SlowSubscriber slowSubscriber; private MyEventSource eventSource; private CountDownLatch countDownLatch; /** * 准备工做。 */ @Before public void setup() { countDownLatch = new CountDownLatch(1); slowSubscriber = new SlowSubscriber(); eventSource = new MyEventSource(); } /** * 触发订阅,使用CountDownLatch等待订阅者处理完成。 */ @After public void subscribe() throws InterruptedException { fastPublisher.subscribe(slowSubscriber); generateEvent(EVENT_COUNT, EVENT_DURATION); countDownLatch.await(1, TimeUnit.MINUTES); } /** * 内部类,“慢的订阅者”。 */ class SlowSubscriber extends BaseSubscriber<MyEventSource.MyEvent> { @Override protected void hookOnSubscribe(Subscription subscription) { request(1); // 订阅时请求1个数据 } @Override protected void hookOnNext(MyEventSource.MyEvent event) { System.out.println(" receive <<< " + event.getMessage()); try { TimeUnit.MILLISECONDS.sleep(PROCESS_DURATION); } catch (InterruptedException e) { } request(1); // 每处理完1个数据,就再请求1个 } @Override protected void hookOnError(Throwable throwable) { System.err.println(" receive <<< " + throwable); } @Override protected void hookOnComplete() { countDownLatch.countDown(); } } }
下面是测试方法:ide
/** * 测试create方法的不一样OverflowStrategy的效果。 */ @Test public void testCreateBackPressureStratety() { fastPublisher = createFlux(FluxSink.OverflowStrategy.BUFFER) // 1 .doOnRequest(n -> System.out.println(" === request: " + n + " ===")) // 2 .publishOn(Schedulers.newSingle("newSingle"), 1); // 3 }
.publishOn
的请求);publishOn
让后续的操做符和订阅者运行在一个单独的名为newSingle
的线程上,第二个参数1是预取个数,也就是.publishOn
做为订阅者每次向上游request的个数,默认为256,因此必定程度上也起到了缓存的效果,为了测试,设置为1。一般状况下,发布者于订阅者并不在同一个线程上,这里使用
publishOn
来模拟这种状况。
BUFFER
策略的输出以下(来不及处理的数据会缓存下来,这是一般状况下的默认策略):
=== request: 1 === publish >>> Event-0 receive <<< Event-0 publish >>> Event-1 publish >>> Event-2 === request: 1 === publish >>> Event-3 receive <<< Event-1 publish >>> Event-4 publish >>> Event-5 publish >>> Event-6 === request: 1 === receive <<< Event-2 publish >>> Event-7 publish >>> Event-8 ...
DROP
策略的输出以下(有新数据就绪的时候,看是否有request,有的话就发出,没有就丢弃):
=== request: 1 === publish >>> Event-0 receive <<< Event-0 publish >>> Event-1 publish >>> Event-2 publish >>> Event-3 === request: 1 === publish >>> Event-4 receive <<< Event-4 publish >>> Event-5 publish >>> Event-6 publish >>> Event-7 === request: 1 === publish >>> Event-8 receive <<< Event-8 ...
能够看到,第1/2/3/5/6/7/...的数据被丢弃了,当有request以后的数据会被发出。调整一下publishOn
方法的第二个参数(预取个数)为2,输出以下:
=== request: 2 === publish >>> Event-0 receive <<< Event-0 publish >>> Event-1 publish >>> Event-2 publish >>> Event-3 receive <<< Event-1 publish >>> Event-4 publish >>> Event-5 publish >>> Event-6 === request: 2 === publish >>> Event-7 receive <<< Event-7 publish >>> Event-8 publish >>> Event-9 publish >>> Event-10 receive <<< Event-8 publish >>> Event-11 publish >>> Event-12
可见,每次request(请求2个数据)以后的2个数据发出,更多就绪的数据因为没有request就丢弃了。
LATEST
的输出以下(request到来的时候,将最新的数据发出):
=== request: 1 === publish >>> Event-0 receive <<< Event-0 publish >>> Event-1 publish >>> Event-2 publish >>> Event-3 === request: 1 === receive <<< Event-3 publish >>> Event-4 publish >>> Event-5 === request: 1 === receive <<< Event-5 publish >>> Event-6 publish >>> Event-7 publish >>> Event-8 === request: 1 === receive <<< Event-8
ERROR
的输出以下(当订阅者来不及处理时候发出一个错误信号):
=== request: 1 === publish >>> Event-0 receive <<< Event-0 publish >>> Event-1 publish >>> Event-2 === request: 1 === receive <<< reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
IGNORE
的输出以下:
... === request: 2 === receive <<< Event-10 receive <<< Event-11 === request: 2 === receive <<< Event-12 receive <<< reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
Reactor提供了响应的onBackpressureXxx
操做符,调整回压策略。测试方法以下:
/** * 测试不一样的onBackpressureXxx方法的效果。 */ @Test public void testOnBackPressureXxx() { fastPublisher = createFlux(FluxSink.OverflowStrategy.BUFFER) .onBackpressureBuffer() // BUFFER // .onBackpressureDrop() // DROP // .onBackpressureLatest() // LATEST // .onBackpressureError() // ERROR .doOnRequest(n -> System.out.println(" === request: " + n + " ===")) .publishOn(Schedulers.newSingle("newSingle"), 1); }
经过打开某一个操做符的注释能够观察输出。这里就不贴输出内容了,Reactor文档的示意图更加直观:
onBackpressureBuffer,对于来自其下游的request采起“缓存”策略。
onBackpressureDrop,元素就绪时,根据下游是否有未知足的request来判断是否发出当前元素。
onBackpressureLatest,当有新的request到来的时候,将最新的元素发出。
onBackpressureError,当有多余元素就绪时,发出错误信号。
真是一图胜千言啊,上边的这些图片都是来自Reactor官方文档。
当进行异步编程时,一般会面临相互协做的各个组件不在同一个线程的状况,好比一个生产者不断生成消息,而一个消费者不断处理这些产生的消息,两者一般不在一个线程甚至是两个不一样的组件。当有人不当心采用了×××资源(好比无上限的弹性线程池、×××队列等),那么在高并发或任务繁重时就有可能形成线程数爆炸增加,或队列堆积,所以backpressure这种协调机制对于维持系统稳定具备重要做用。