(19)Reactor Processors——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》
前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范java

2.9 Processor

Processor既是一种特别的发布者(Publisher)又是一种订阅者(Subscriber)。 因此你可以订阅一个Processor,也能够调用它们提供的方法来手动插入数据到序列,或终止序列。缓存

前面一直在聊响应式流的四个接口中的三个:PublisherSubscriberSubscription,惟独Processor迟迟没有说起。缘由在于想用好它们不太容易,多数状况下,咱们应该进行避免使用Processor,一般来讲仅用于一些特殊场景。安全

2.9.1 使用 Sink 来线程安全地生成流

比起直接使用Processor,更好的方式是经过调用sink()来获得它的Sink。这个Sink是线程安全的,能够用于在应用程序中多线程并发地生成数据。例如,经过UnicastProcessor获得一个线程安全的 sink:数据结构

UnicastProcessor<Integer> processor = UnicastProcessor.create();
    FluxSink<Integer> sink = processor.sink(overflowStrategy);

多个线程能够并发地经过下边的方法生成数据到sink。多线程

sink.next(n);

看到这里是否是感受跟generate生成数据流的方式很像?因此Reactor官方建议,当你想要使用Processor的时候,首先看看可否用generate实现一样的功能,或者看看是否有相应的操做符能够达到你想要的效果并发

2.9.2 Reactor 内置的 Processor

Reactor Core 内置多种 Processor。这些 processor 具备不一样的语法,大概分为三类。 异步

  • 直接的(direct)(DirectProcessor 和 UnicastProcessor):这些 processors 只能经过直接 调用 Sink 的方法来推送数据。
  • 同步的(synchronous)(EmitterProcessor 和 ReplayProcessor):这些 processors 既能够直接调用 Sink 方法来推送数据,也能够经过订阅到一个上游的发布者来同步地产生数据。
  • 异步的(asynchronous)(WorkQueueProcessor 和 TopicProcessor):这些 processors 能够将从多个上游发布者获得的数据推送下去。因为使用了 RingBuffer 的数据结构来缓存多个来自上游的数据,所以更加有健壮性。

异步的 processor 在实例化的时候最复杂,由于有许多不一样的选项。所以它们暴露出一个 Builder 接口。 而简单的 processors 有静态的工厂方法。async

1)DirectProcessoride

DirectProcessor能够将信号分发给零到多个订阅者(Subscriber)。它是最容易实例化的,使用静态方法 create() 便可。另外一方面,它的不足是没法处理背压。因此,当DirectProcessor推送的是 N 个元素,而至少有一个订阅者的请求个数少于 N 的时候,就会发出一个IllegalStateExceptionui

一旦 Processor 结束(一般经过调用它的 Sink 的 error(Throwable) 或 complete() 方法), 虽然它容许更多的订阅者订阅它,可是会当即向它们从新发送终止信号。

2)UnicastProcessor

UnicastProcessor可使用一个内置的缓存来处理背压。代价就是它最多只能有一个订阅者(上一节的例子经过publish转换成了ConnectableFlux,因此能够接入两个订阅者)。

UnicastProcessor有多种选项,所以提供多种不一样的create静态方法。例如,它默认是 无限的(unbounded) :若是你在在订阅者尚未请求数据的状况下让它推送数据,它会缓存全部数据。

能够经过提供一个自定义的 Queue 的具体实现传递给 create 工厂方法来改变默认行为。若是给出的队列是有限的(bounded), 而且缓存已满,并且未收到下游的请求,processor 会拒绝推送数据。

在上边“有限的”例子中,还能够在构造 processor 的时候提供一个回调方法,这个回调方法能够在每个 被拒绝推送的元素上调用,从而让开发者有机会清理这些元素。

3)EmitterProcessor

EmitterProcessor可以向多个订阅者发送数据,而且能够对每个订阅者进行背压处理。它自己也能够订阅一个发布者并同步得到数据。

最初若是没有订阅者,它仍然容许推送一些数据到缓存,缓存大小由bufferSize定义。 以后若是仍然没有订阅者订阅它并消费数据,对onNext的调用会阻塞,直到有订阅者接入 (这时只能并发地订阅了)。

所以第一个订阅者会收到最多bufferSize个元素。然而以后,后续接入的订阅者只能获取到它们开始订阅以后推送的数据。这个内部的缓存会继续用于背压的目的。

默认状况下,若是全部的订阅者都取消了订阅,它会清空内部缓存,而且再也不接受更多的订阅者。这一点能够经过 create 静态工厂方法的 autoCancel 参数来配置。

4)ReplayProcessor

ReplayProcessor会缓存直接经过自身的 Sink 推送的元素,以及来自上游发布者的元素, 而且后来的订阅者也会收到重发(replay)的这些元素。

能够经过多种配置方式建立它:

  • 缓存一个元素(cacheLast)。
  • 缓存必定个数的历史元素(create(int)),全部的历史元素(create())。
  • 缓存基于时间窗期间内的元素(createTimeout(Duration))。
  • 缓存基于历史个数和时间窗的元素(createSizeOrTimeout(int, Duration))。

5)TopicProcessor

TopicProcessor是一个异步的 processor,它可以重发来自多个上游发布者的元素, 这须要在建立它的时候配置shared(build() 的 share(boolean) 配置)。

若是你企图在并发环境下经过并发的上游发布者调用TopicProcessoronNextonComplete,或onError方法,就必须配置shared。不然,并发调用就是非法的,从而 processor 是彻底兼容响应式流规范的。

TopicProcessor可以对多个订阅者发送数据。它经过对每个订阅者关联一个线程来实现这一点, 这个线程会一直执行直到 processor 发出onErroronComplete信号,或关联的订阅者被取消。 最多能够接受的订阅者个数由构造者方法executor指定,经过提供一个有限线程数的 ExecutorService来限制这一个数。

这个 processor 基于一个RingBuffer数据结构来存储已发送的数据。每个订阅者线程 自行管理其相关的数据在RingBuffer中的索引。

这个 processor 也有一个autoCancel构造器方法:若是设置为true(默认的),那么当 全部的订阅者取消以后,上游发布者也就被取消了。

6)WorkQueueProcessor

WorkQueueProcessor也是一个异步的 processor,也可以重发来自多个上游发布者的元素, 一样在建立时须要配置shared(它多数构造器配置与TopicProcessor相同)。

它放松了对响应式流规范的兼容,可是好处就在于相对于TopicProcessor来讲须要更少的资源。 它仍然基于RingBuffer,可是再也不要求每个订阅者都关联一个线程,所以相对于TopicProcessor来讲更具扩展性。

代价在于分发模式有些区别:来自订阅者的请求会汇总在一块儿,而且这个 processor 每次只对一个 订阅者发送数据,所以须要循环(round-robin)对订阅者发送数据,而不是一次所有发出的模式(没法保证彻底公平的循环分发)。

WorkQueueProcessor多数构造器方法与TopicProcessor相同,好比autoCancelshare, 以及waitStrategy。下游订阅者的最大数目一样由构造器executor配置的ExecutorService 决定。

注意:最好不要有太多订阅者订阅WorkQueueProcessor,由于这会锁住 processor。若是你须要限制订阅者数量,最好使用一个ThreadPoolExecutorForkJoinPool。这个 processor 可以检测到(线程池)容量并在订阅者过多时抛出异常。


本文的介绍并未给出示例,在下一章咱们编写“响应式Netty”的时候会介绍到Processor的使用。

相关文章
相关标签/搜索