本系列文章索引《响应式Spring的道法术器》
前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范
本文测试源码java
到目前为止,咱们讨论的发布者,不管是Flux仍是Mono,都有一个特色:订阅前什么都不会发生。当咱们“建立”了一个Flux的时候,咱们只是“声明”/“组装”了它,可是若是不调用.subscribe
来订阅它,它就不会开始发出元素。react
可是咱们对“数据流”(尤为是乍听到这个词的时候)会有种自然的感受,就是不管有没有订阅者,它始终在按照本身的步伐发出数据。就像假设一我的没有一个粉丝,他也能够发微博同样。git
以上这两种数据流分别称为“冷”序列和“热”序列。因此咱们一直在介绍的Reactor3的发布者就属于“冷”的发布者。不过有少数的例外,好比just
生成的就是一个“热”序列,它直接在组装期就拿到数据,若是以后有谁订阅它,就从新发送数据给订阅者。Reactor 中多数其余的“热”发布者是扩展自Processor
的(下节会介绍到)。github
下面咱们经过对比了解一下两种不一样的发布者的效果,首先是咱们熟悉的“冷”发布者:缓存
@Test public void testCodeSequence() { Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")) .map(String::toUpperCase); source.subscribe(d -> System.out.println("Subscriber 1: "+d)); System.out.println(); source.subscribe(d -> System.out.println("Subscriber 2: "+d)); }
咱们对发布者source
进行了两次订阅,每次订阅都致使它把数据流重新发一遍:ide
Subscriber 1: BLUE Subscriber 1: GREEN Subscriber 1: ORANGE Subscriber 1: PURPLE Subscriber 2: BLUE Subscriber 2: GREEN Subscriber 2: ORANGE Subscriber 2: PURPLE
而后再看一个“热”发布者的例子:测试
@Test public void testHotSequence() { UnicastProcessor<String> hotSource = UnicastProcessor.create(); Flux<String> hotFlux = hotSource.publish() .autoConnect() .map(String::toUpperCase); hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d)); hotSource.onNext("blue"); hotSource.onNext("green"); hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d)); hotSource.onNext("orange"); hotSource.onNext("purple"); hotSource.onComplete(); }
这个热发布者是一个UnicastProcessor
,咱们可使用它的onNext
等方法手动发出元素。上边的例子中,hotSource
发出两个元素后第二个订阅者才开始订阅,因此第二个订阅者只能收到以后的元素:code
Subscriber 1 to Hot Source: BLUE Subscriber 1 to Hot Source: GREEN Subscriber 1 to Hot Source: ORANGE Subscriber 2 to Hot Source: ORANGE Subscriber 1 to Hot Source: PURPLE Subscriber 2 to Hot Source: PURPLE
因而可知,UnicastProcessor
是一个热发布者。blog
有时候,你不只想要在某一个订阅者订阅以后才开始发出数据,可能还但愿在多个订阅者“到齐”以后 才开始。ConnectableFlux
的用意便在于此。Flux API 中有两种经常使用的返回ConnectableFlux
的方式:publish
和replay
。索引
publish
会尝试知足各个不一样订阅者的需求(也就是回压),并综合这些请求反馈给源。假设有某个订阅者的需求为 0,发布者会暂停向全部订阅者发出元素。replay
将对第一个订阅后产生的数据进行缓存,最多缓存数量取决于配置(时间/缓存大小)。 它会对后续接入的订阅者从新发送数据。ConnectableFlux
提供了多种对订阅的管理方式。包括:
connect
,当有足够的订阅接入后,能够对 flux 手动执行一次。它会触发对上游源的订阅。autoConnect(n)
与connect
相似,不过是在有 n 个订阅的时候自动触发。refCount(n)
不只可以在订阅者接入的时候自动触发,还会检测订阅者的取消动做。若是订阅者所有取消订阅,则会将源“断开链接”,再有新的订阅者接入的时候才会继续“连上”发布者。refCount(int, Duration)
增长了一个倒计时:一旦订阅者数量过低了,它会等待 Duration 参数指定的时间,若是没有新的订阅者接入才会与源断开链接。1)connect的例子
@Test public void testConnectableFlux1() throws InterruptedException { Flux<Integer> source = Flux.range(1, 3) .doOnSubscribe(s -> System.out.println("上游收到订阅")); ConnectableFlux<Integer> co = source.publish(); co.subscribe(System.out::println, e -> {}, () -> {}); co.subscribe(System.out::println, e -> {}, () -> {}); System.out.println("订阅者完成订阅操做"); Thread.sleep(500); System.out.println("尚未链接上"); co.connect(); }
输出以下:
订阅者完成订阅操做 尚未链接上 上游收到订阅 1 1 2 2 3 3
可见当connect
的时候,上游才真正收到订阅请求。
2)autoConnect的例子
@Test public void testConnectableFluxAutoConnect() throws InterruptedException { Flux<Integer> source = Flux.range(1, 3) .doOnSubscribe(s -> System.out.println("上游收到订阅")); // 须要两个订阅者才自动链接 Flux<Integer> autoCo = source.publish().autoConnect(2); autoCo.subscribe(System.out::println, e -> {}, () -> {}); System.out.println("第一个订阅者完成订阅操做"); Thread.sleep(500); System.out.println("第二个订阅者完成订阅操做"); autoCo.subscribe(System.out::println, e -> {}, () -> {}); }
输出以下:
第一个订阅者完成订阅操做 第二个订阅者完成订阅操做 上游收到订阅 1 1 2 2 3 3
可见,只有两个订阅者都完成订阅以后,上游才收到订阅请求,并开始发出数据。
3)refCononect的例子
@Test public void testConnectableFluxRefConnect() throws InterruptedException { Flux<Long> source = Flux.interval(Duration.ofMillis(500)) .doOnSubscribe(s -> System.out.println("上游收到订阅")) .doOnCancel(() -> System.out.println("上游发布者断开链接")); Flux<Long> refCounted = source.publish().refCount(2, Duration.ofSeconds(2)); System.out.println("第一个订阅者订阅"); Disposable sub1 = refCounted.subscribe(l -> System.out.println("sub1: " + l)); TimeUnit.SECONDS.sleep(1); System.out.println("第二个订阅者订阅"); Disposable sub2 = refCounted.subscribe(l -> System.out.println("sub2: " + l)); TimeUnit.SECONDS.sleep(1); System.out.println("第一个订阅者取消订阅"); sub1.dispose(); TimeUnit.SECONDS.sleep(1); System.out.println("第二个订阅者取消订阅"); sub2.dispose(); TimeUnit.SECONDS.sleep(1); System.out.println("第三个订阅者订阅"); Disposable sub3 = refCounted.subscribe(l -> System.out.println("sub3: " + l)); TimeUnit.SECONDS.sleep(1); System.out.println("第三个订阅者取消订阅"); sub3.dispose(); TimeUnit.SECONDS.sleep(3); System.out.println("第四个订阅者订阅"); Disposable sub4 = refCounted.subscribe(l -> System.out.println("sub4: " + l)); TimeUnit.SECONDS.sleep(1); System.out.println("第五个订阅者订阅"); Disposable sub5 = refCounted.subscribe(l -> System.out.println("sub5: " + l)); TimeUnit.SECONDS.sleep(2); }
输出以下:
第一个订阅者订阅 第二个订阅者订阅 上游收到订阅 sub1: 0 sub2: 0 第一个订阅者取消订阅 sub1: 1 sub2: 1 sub2: 2 第二个订阅者取消订阅 sub2: 3 第三个订阅者订阅 sub3: 6 sub3: 7 第三个订阅者取消订阅 上游发布者断开链接 第四个订阅者订阅 第五个订阅者订阅 上游收到订阅 sub4: 0 sub5: 0 sub4: 1 sub5: 1 sub4: 2 sub5: 2 sub4: 3 sub5: 3
本例中,refCount设置为最少两个订阅者接入是才开始发出数据,当全部订阅者都取消时,若是不能在两秒内接入新的订阅者,则上游会断开链接。
上边的例子中,随着前两个订阅者相继取消订阅,第三个订阅者及时(在2秒内)开始订阅,因此上游会继续发出数据,并且根据输出能够看出是“热序列”。
当第三个订阅者取消后,第四个订阅者没能及时开始订阅,因此上游发布者断开链接。当第五个订阅者订阅以后,第四和第五个订阅者至关于开始了新一轮的订阅。