(13)Reactor的backpressure策略——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》
前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范 | 自定义数据流
本节测试源码html

2.3 不一样的回压策略

许多地方也叫作“背压”、“负压”,我在《Reactor参考文档》中是翻译为“背压”的,后来在看到有“回压”的翻译,突然感受从文字上彷佛更加符合。java

这一节讨论回压的问题,有两个前提:react

  1. 发布者与订阅者不在同一个线程中,由于在同一个线程中的话,一般使用传统的逻辑就能够,不须要进行回压处理;
  2. 发布者发出数据的速度高于订阅者处理数据的速度,也就是处于“PUSH”状态下,若是相反,那就是“PUll”状态,不须要处理回压。

2.3.1 回压策略

回压的处理有如下几种策略:git

  1. ERROR: 当下游跟不上节奏的时候发出一个错误信号。
  2. DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。
  3. LATEST:让下游只获得上游最新的元素。
  4. BUFFER:缓存下游没有来得及处理的元素(若是缓存不限大小的可能致使OutOfMemoryError)。

这几种策略定义在枚举类型OverflowStrategy中,不过还有一个IGNORE类型,即彻底忽略下游背压请求,这可能会在下游队列积满的时候致使 IllegalStateException。github

2.3.2 使用create声明回压策略

上一节中,用于生成数据流的方法createpush能够用于异步的场景,并且它们也支持回压,咱们能够经过提供一个 OverflowStrategy 来定义背压行为。方法签名:编程

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure)

默认(没有第二个参数的方法)是缓存策略的,咱们来试一下别的策略,好比DROP的策略。缓存

咱们继续使用2.2节的那个测试例子,下边是用create建立的“快的发布者”,不过方便起见拆放到两个私有方法里供调用:并发

public class Test_2_3 {
        /**
         * 使用create方法生成“快的发布者”。
         * @param strategy 回压策略
         * @return  Flux
         */
        private Flux<MyEventSource.MyEvent> createFlux(FluxSink.OverflowStrategy strategy) {
            return Flux.create(sink -> eventSource.register(new MyEventListener() {
                @Override
                public void onNewEvent(MyEventSource.MyEvent event) {
                    System.out.println("publish >>> " + event.getMessage());
                    sink.next(event);
                }

                @Override
                public void onEventStopped() {
                    sink.complete();
                }
            }), strategy);  // 1
        }
        /**
         * 生成MyEvent。
         * @param count 生成MyEvent的个数。
         * @param millis 每一个MyEvent之间的时间间隔。
         */
        private void generateEvent(int times, int millis) {
            // 循环生成MyEvent,每一个MyEvent间隔millis毫秒
            for (int i = 0; i < times; i++) {
                try {
                    TimeUnit.MILLISECONDS.sleep(millis);
                } catch (InterruptedException e) {
                }
                eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i));
            }
            eventSource.eventStopped();
        }
    }

有了“快的发布者”,下面是“慢的订阅者”,以及一些测试准备工做:异步

public class Test_2_3 {
        private final int EVENT_DURATION   = 10;    // 生成的事件间隔时间,单位毫秒
        private final int EVENT_COUNT      = 20;    // 生成的事件个数
        private final int PROCESS_DURATION = 30;    // 订阅者处理每一个元素的时间,单位毫秒

        private Flux<MyEventSource.MyEvent> fastPublisher;
        private SlowSubscriber slowSubscriber;
        private MyEventSource eventSource;
        private CountDownLatch countDownLatch;

        /**
         * 准备工做。
         */
        @Before
        public void setup() {
            countDownLatch = new CountDownLatch(1);
            slowSubscriber = new SlowSubscriber();
            eventSource = new MyEventSource();
        }

        /**
         * 触发订阅,使用CountDownLatch等待订阅者处理完成。
         */
        @After
        public void subscribe() throws InterruptedException {
            fastPublisher.subscribe(slowSubscriber);
            generateEvent(EVENT_COUNT, EVENT_DURATION);
            countDownLatch.await(1, TimeUnit.MINUTES);
        }

        /**
         * 内部类,“慢的订阅者”。
         */
        class SlowSubscriber extends BaseSubscriber<MyEventSource.MyEvent> {

            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(1);     // 订阅时请求1个数据
            }

            @Override
            protected void hookOnNext(MyEventSource.MyEvent event) {
                System.out.println("                      receive <<< " + event.getMessage());
                try {
                    TimeUnit.MILLISECONDS.sleep(PROCESS_DURATION);
                } catch (InterruptedException e) {
                }
                request(1);     // 每处理完1个数据,就再请求1个
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                System.err.println("                      receive <<< " + throwable);
            }

            @Override
            protected void hookOnComplete() {
                countDownLatch.countDown();
            }
        }
    }

下面是测试方法:ide

/**
     * 测试create方法的不一样OverflowStrategy的效果。
     */
    @Test
    public void testCreateBackPressureStratety() {
        fastPublisher =
                createFlux(FluxSink.OverflowStrategy.BUFFER)    // 1
                        .doOnRequest(n -> System.out.println("         ===  request: " + n + " ==="))    // 2
                        .publishOn(Schedulers.newSingle("newSingle"), 1);   // 3
    }
  1. 调整不一样的策略(BUFFER/DROP/LATEST/ERROR/IGNORE)观察效果,create方法默认为BUFFER;
  2. 打印出每次的请求(也就是后边.publishOn的请求);
  3. 使用publishOn让后续的操做符和订阅者运行在一个单独的名为newSingle的线程上,第二个参数1是预取个数,也就是.publishOn做为订阅者每次向上游request的个数,默认为256,因此必定程度上也起到了缓存的效果,为了测试,设置为1。

一般状况下,发布者于订阅者并不在同一个线程上,这里使用publishOn来模拟这种状况。

BUFFER策略的输出以下(来不及处理的数据会缓存下来,这是一般状况下的默认策略):

===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
         ===  request: 1 ===
publish >>> Event-3
                      receive <<< Event-1
publish >>> Event-4
publish >>> Event-5
publish >>> Event-6
         ===  request: 1 ===
                      receive <<< Event-2
publish >>> Event-7
publish >>> Event-8
...

DROP策略的输出以下(有新数据就绪的时候,看是否有request,有的话就发出,没有就丢弃):

===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
         ===  request: 1 ===
publish >>> Event-4
                      receive <<< Event-4
publish >>> Event-5
publish >>> Event-6
publish >>> Event-7
         ===  request: 1 ===
publish >>> Event-8
                      receive <<< Event-8
...

能够看到,第1/2/3/5/6/7/...的数据被丢弃了,当有request以后的数据会被发出。调整一下publishOn方法的第二个参数(预取个数)为2,输出以下:

===  request: 2 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
                      receive <<< Event-1
publish >>> Event-4
publish >>> Event-5
publish >>> Event-6
         ===  request: 2 ===
publish >>> Event-7
                      receive <<< Event-7
publish >>> Event-8
publish >>> Event-9
publish >>> Event-10
                      receive <<< Event-8
publish >>> Event-11
publish >>> Event-12

可见,每次request(请求2个数据)以后的2个数据发出,更多就绪的数据因为没有request就丢弃了。

LATEST的输出以下(request到来的时候,将最新的数据发出):

===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
         ===  request: 1 ===
                      receive <<< Event-3
publish >>> Event-4
publish >>> Event-5
         ===  request: 1 ===
                      receive <<< Event-5
publish >>> Event-6
publish >>> Event-7
publish >>> Event-8
         ===  request: 1 ===
                      receive <<< Event-8

ERROR的输出以下(当订阅者来不及处理时候发出一个错误信号):

===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
         ===  request: 1 ===
                      receive <<< reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)

IGNORE的输出以下:

...
         ===  request: 2 ===
                      receive <<< Event-10
                      receive <<< Event-11
         ===  request: 2 ===
                      receive <<< Event-12
                      receive <<< reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure

2.3.3 调整回压策略的操做符

Reactor提供了响应的onBackpressureXxx操做符,调整回压策略。测试方法以下:

/**
     * 测试不一样的onBackpressureXxx方法的效果。
     */
    @Test
    public void testOnBackPressureXxx() {
        fastPublisher = createFlux(FluxSink.OverflowStrategy.BUFFER)
                .onBackpressureBuffer()     // BUFFER
//                .onBackpressureDrop()     // DROP
//                .onBackpressureLatest()   // LATEST
//                .onBackpressureError()    // ERROR
                .doOnRequest(n -> System.out.println("         ===  request: " + n + " ==="))
                .publishOn(Schedulers.newSingle("newSingle"), 1);
    }

经过打开某一个操做符的注释能够观察输出。这里就不贴输出内容了,Reactor文档的示意图更加直观:

onBackpressureBuffer,对于来自其下游的request采起“缓存”策略。

onBackpressureBuffer

onBackpressureDrop,元素就绪时,根据下游是否有未知足的request来判断是否发出当前元素。

onBackpressureDrop

onBackpressureLatest,当有新的request到来的时候,将最新的元素发出。

onBackpressureLatest

onBackpressureError,当有多余元素就绪时,发出错误信号。

onBackpressureError

真是一图胜千言啊,上边的这些图片都是来自Reactor官方文档。

当进行异步编程时,一般会面临相互协做的各个组件不在同一个线程的状况,好比一个生产者不断生成消息,而一个消费者不断处理这些产生的消息,两者一般不在一个线程甚至是两个不一样的组件。当有人不当心采用了×××资源(好比无上限的弹性线程池、×××队列等),那么在高并发或任务繁重时就有可能形成线程数爆炸增加,或队列堆积,所以backpressure这种协调机制对于维持系统稳定具备重要做用。

相关文章
相关标签/搜索