(18)Hot vs Cold——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》
前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范
本文测试源码java

2.8 Hot vs Cold

到目前为止,咱们讨论的发布者,不管是Flux仍是Mono,都有一个特色:订阅前什么都不会发生。当咱们“建立”了一个Flux的时候,咱们只是“声明”/“组装”了它,可是若是不调用.subscribe来订阅它,它就不会开始发出元素。react

可是咱们对“数据流”(尤为是乍听到这个词的时候)会有种自然的感受,就是不管有没有订阅者,它始终在按照本身的步伐发出数据。就像假设一我的没有一个粉丝,他也能够发微博同样。git

以上这两种数据流分别称为“冷”序列和“热”序列。因此咱们一直在介绍的Reactor3的发布者就属于“冷”的发布者。不过有少数的例外,好比just生成的就是一个“热”序列,它直接在组装期就拿到数据,若是以后有谁订阅它,就从新发送数据给订阅者。Reactor 中多数其余的“热”发布者是扩展自Processor 的(下节会介绍到)。github

下面咱们经过对比了解一下两种不一样的发布者的效果,首先是咱们熟悉的“冷”发布者:缓存

@Test
    public void testCodeSequence() {
        Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                .map(String::toUpperCase);

        source.subscribe(d -> System.out.println("Subscriber 1: "+d));
        System.out.println();
        source.subscribe(d -> System.out.println("Subscriber 2: "+d));
    }

咱们对发布者source进行了两次订阅,每次订阅都致使它把数据流重新发一遍:ide

Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE

Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE

而后再看一个“热”发布者的例子:测试

@Test
    public void testHotSequence() {
        UnicastProcessor<String> hotSource = UnicastProcessor.create();

        Flux<String> hotFlux = hotSource.publish()
                .autoConnect()
                .map(String::toUpperCase);

        hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

        hotSource.onNext("blue");
        hotSource.onNext("green");

        hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

        hotSource.onNext("orange");
        hotSource.onNext("purple");
        hotSource.onComplete();
    }

这个热发布者是一个UnicastProcessor,咱们可使用它的onNext等方法手动发出元素。上边的例子中,hotSource发出两个元素后第二个订阅者才开始订阅,因此第二个订阅者只能收到以后的元素:code

Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE

因而可知,UnicastProcessor是一个热发布者。blog

有时候,你不只想要在某一个订阅者订阅以后才开始发出数据,可能还但愿在多个订阅者“到齐”以后 才开始。ConnectableFlux的用意便在于此。Flux API 中有两种经常使用的返回ConnectableFlux 的方式:publishreplay索引

  1. publish会尝试知足各个不一样订阅者的需求(也就是回压),并综合这些请求反馈给源。假设有某个订阅者的需求为 0,发布者会暂停向全部订阅者发出元素。
  2. replay将对第一个订阅后产生的数据进行缓存,最多缓存数量取决于配置(时间/缓存大小)。 它会对后续接入的订阅者从新发送数据。

ConnectableFlux提供了多种对订阅的管理方式。包括:

  • connect,当有足够的订阅接入后,能够对 flux 手动执行一次。它会触发对上游源的订阅。
  • autoConnect(n)connect相似,不过是在有 n 个订阅的时候自动触发。
  • refCount(n)不只可以在订阅者接入的时候自动触发,还会检测订阅者的取消动做。若是订阅者所有取消订阅,则会将源“断开链接”,再有新的订阅者接入的时候才会继续“连上”发布者。refCount(int, Duration)增长了一个倒计时:一旦订阅者数量过低了,它会等待 Duration 参数指定的时间,若是没有新的订阅者接入才会与源断开链接。

1)connect的例子

@Test
    public void testConnectableFlux1() throws InterruptedException {
        Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("上游收到订阅"));

        ConnectableFlux<Integer> co = source.publish();

        co.subscribe(System.out::println, e -> {}, () -> {});
        co.subscribe(System.out::println, e -> {}, () -> {});

        System.out.println("订阅者完成订阅操做");
        Thread.sleep(500);
        System.out.println("尚未链接上");

        co.connect();
    }

输出以下:

订阅者完成订阅操做
尚未链接上
上游收到订阅
1
1
2
2
3
3

可见当connect的时候,上游才真正收到订阅请求。

2)autoConnect的例子

@Test
    public void testConnectableFluxAutoConnect() throws InterruptedException {
        Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("上游收到订阅"));

        // 须要两个订阅者才自动链接
        Flux<Integer> autoCo = source.publish().autoConnect(2);

        autoCo.subscribe(System.out::println, e -> {}, () -> {});
        System.out.println("第一个订阅者完成订阅操做");
        Thread.sleep(500);
        System.out.println("第二个订阅者完成订阅操做");
        autoCo.subscribe(System.out::println, e -> {}, () -> {});
    }

输出以下:

第一个订阅者完成订阅操做
第二个订阅者完成订阅操做
上游收到订阅
1
1
2
2
3
3

可见,只有两个订阅者都完成订阅以后,上游才收到订阅请求,并开始发出数据。

3)refCononect的例子

@Test
    public void testConnectableFluxRefConnect() throws InterruptedException {

        Flux<Long> source = Flux.interval(Duration.ofMillis(500))
                .doOnSubscribe(s -> System.out.println("上游收到订阅"))
                .doOnCancel(() -> System.out.println("上游发布者断开链接"));

        Flux<Long> refCounted = source.publish().refCount(2, Duration.ofSeconds(2));

        System.out.println("第一个订阅者订阅");
        Disposable sub1 = refCounted.subscribe(l -> System.out.println("sub1: " + l));

        TimeUnit.SECONDS.sleep(1);
        System.out.println("第二个订阅者订阅");
        Disposable sub2 = refCounted.subscribe(l -> System.out.println("sub2: " + l));

        TimeUnit.SECONDS.sleep(1);
        System.out.println("第一个订阅者取消订阅");
        sub1.dispose();

        TimeUnit.SECONDS.sleep(1);
        System.out.println("第二个订阅者取消订阅");
        sub2.dispose();

        TimeUnit.SECONDS.sleep(1);
        System.out.println("第三个订阅者订阅");
        Disposable sub3 = refCounted.subscribe(l -> System.out.println("sub3: " + l));

        TimeUnit.SECONDS.sleep(1);
        System.out.println("第三个订阅者取消订阅");
        sub3.dispose();

        TimeUnit.SECONDS.sleep(3);
        System.out.println("第四个订阅者订阅");
        Disposable sub4 = refCounted.subscribe(l -> System.out.println("sub4: " + l));
        TimeUnit.SECONDS.sleep(1);
        System.out.println("第五个订阅者订阅");
        Disposable sub5 = refCounted.subscribe(l -> System.out.println("sub5: " + l));
        TimeUnit.SECONDS.sleep(2);
    }

输出以下:

第一个订阅者订阅
第二个订阅者订阅
上游收到订阅
sub1: 0
sub2: 0
第一个订阅者取消订阅
sub1: 1
sub2: 1
sub2: 2
第二个订阅者取消订阅
sub2: 3
第三个订阅者订阅
sub3: 6
sub3: 7
第三个订阅者取消订阅
上游发布者断开链接
第四个订阅者订阅
第五个订阅者订阅
上游收到订阅
sub4: 0
sub5: 0
sub4: 1
sub5: 1
sub4: 2
sub5: 2
sub4: 3
sub5: 3

本例中,refCount设置为最少两个订阅者接入是才开始发出数据,当全部订阅者都取消时,若是不能在两秒内接入新的订阅者,则上游会断开链接。

上边的例子中,随着前两个订阅者相继取消订阅,第三个订阅者及时(在2秒内)开始订阅,因此上游会继续发出数据,并且根据输出能够看出是“热序列”。

当第三个订阅者取消后,第四个订阅者没能及时开始订阅,因此上游发布者断开链接。当第五个订阅者订阅以后,第四和第五个订阅者至关于开始了新一轮的订阅。

相关文章
相关标签/搜索