Rxjava Backpressure 32

原文:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part 4 - Concurrency/4. Backpressure.mdjava

Rx将事件从管道的一端引导到另外一端,在每一端发生的行动可能很是不一样。当生产者和消费者须要不一样的时间来处理值时会发生什么?在同步模型中,这个问题不是问题。请考虑如下示例:react

输出:ios

在这里,生产者已经准备好了它的值,而且能够绝不拖延地发出它们。相比之下,消费者很是缓慢,但这不会引发问题,由于上述代码的同步性质会自动调节生产者和消费者的比率。当o.onNext(1)被调用,生产者的执行被阻止,直到整个Rx链完成。只有当该表达式返回时,执行才能进入o.onNext(2)。git

这相似于仅用于同步执行。生产者和消费者一般是异步的。那么,当生产者和消费者以不一样的速度异步操做时会发生什么?github

咱们首先考虑传统的基于拉的模型,例如迭代器iterator。在基于拉取的模型中,消费者请求值。若是生产者较慢,消费者将根据请求阻止并在下一个值到达时恢复。若是生产者更快,那么生产者将有空闲时间等待消费者请求下一个值。缓存

Rx基于推送,而不是基于拉。在Rx中,生产者在值准备好时将值推送给消费者。若是生产者较慢,那么消费者将有空闲时间等待下一个值到达。若是生产者更快,没有任何规定,它将保持强制喂养数据给消费者,而不知道消费者的困难。安全

输出:服务器

在这里,MissingBackpressureException让咱们知道生产者太快了,咱们连接在一块儿的运算符没法处理它。异步

Remedies for the consumer

 咱们在前几章中看到的一些operators能够帮助消费者减轻因输入过多而带来的压力。测试

Thin out the data

(/Part 3 - Taming the sequence/5. Time-shifted sequences.md#sample) 运算符天然容许您指定最大输入速率,而不会留下任何多余的数据。

demo:

输出:

相似的操做能够服务于同一目的。

  • [throttle](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#throttling运算符族也能够对速率进行过滤,但容许您以不一样的方式指定在受到压力时容许哪一个元素经过。
  • [Debounce](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#debouncing) 没有将速率下降到固定的最大值。相反,它将彻底删除每一个信息突发并用单个值替换它。

Collect

您可使用buffer和window在消费者忙碌时收集溢出的数据,而不是对数据进行采样。若是批量处理项目更快,这将很是有用。或者,您能够检查缓冲区以手动肯定要处理的缓冲项的数量。

在咱们以前看到的示例中,消费者以几乎相同的速度处理单个项目和批量。在这里,咱们放慢了生产者的速度,使批次符合一条线,但原则保持不变。

输出:

Reactive pull

上述补救措施是解决问题的合法方法。然而,它们并不老是处理过分生产的可观察事件的最佳方式。有时,问题能够在生产者方面获得更好的处理。背压是管道抵抗值排放的一种方式。

背压是指与诸如管道的受限位置中的所需流体流相对的压力。它一般是因为其移动的限制容器中的障碍物或弯曲弯曲引发的,例如管道或通风口。

- Wikipedia

RxJava已经为subscriber实施了一种调节可观察率的方法。订阅者有一个request(n)方法,经过它能够通知观察者它已准备好接受更多的值。经过在订阅服务器的onStart方法上调用请求,能够创建reactive pull backpressure。这不是pull模型意义上的拉:它不会返回任何值,而且若是值未准备就不会阻止。相反,它只是通知可观察订阅者准备接受多少值并保留其他值。对请求的后续调用将容许更多值。

这是一个一次获取一个值的订阅者:

onStart中的request(1)创建背压并通知可观察到它应该只发出第一个值。在处理onNext中的值以后,咱们请求发送下一个项目,若是它可用的话。呼叫请求(Long.MAX_VALUE)禁用背压。

doOnRequested

回到咱们讨论反作用的doOn_ 运算符时,咱们省略了doOnRequested。

当subscriber请求更多项目时,会发生doOnRequested元事件。提供给操做的值是请求的项目数。

咱们在这里作了一个例外,由于它使咱们可以窥视稳定的背压功能,不然这些功能是隐藏的。让咱们看看在一个简单的observable中发生了什么:

输出:

咱们看到subscribe从头开始请求最大数量的items。这意味着订阅根本不抵制values。若是咱们提供实施背压的订户,订阅将仅使用背压。如下是订户的完整示例,它容许咱们从外部控制背压。

除非咱们使用requestMore手动执行此操做,不然此简单实现不会请求值。

输出:

在内部使用队列和缓冲区的Rx运算符应使用背压来避免存储无限量的值。大规模缓冲应留给明确用于此目的的运算符,例如缓存,缓冲区等。须要缓冲项的运算符的示例是zip:第一个observable可能在第二个observable发出以前发出两个或更多值下一个价值。即便假设两个序列具备相同的频率,也指望这种小的不对称性。须要缓冲一些项目不该该致使操做员失败。出于这个缘由,zip有一个128项的小缓冲区。

输出:

zip操做符首先请求足够的项来填充其缓冲区,并在消耗它们时请求更多。zip请求的项目数量的详细信息并不有趣。读者应该带走的是,不管开发人员是否请求,Rx中都存在一些缓冲和背压。这为Rx管道提供了一些灵活性,在这种状况下你可能没有。它可能会让你认为你的代码是可靠的,经过默默地保存小测试失败,可是在你明确声明关于背压的行为以前你就不安全了。

Backpressure policies

许多Rx操做在内部使用背压来避免过分填充其内部队列。这样,慢速消费者的问题在运算符链中向后传播:若是运算符中止接受值,则前一个运算符将填充其缓冲区,直到它也中止接受值,依此类推。背压不会使问题消失。它只是将它移动到能够更好地处理的地方。咱们仍然须要决定如何处理过分生产的可观察量的值。

有些Rx运算符声明了如何处理订阅者没法接受正在发出的值的状况。

onBackpressureBuffer

onBackpressureBuffer运算符会将没法使用的每一个值都被存储,直到观察者可使用它。

您能够拥有无​​限大小的缓冲区或具备最大容量的缓冲区。若是缓冲区溢出,则observable将失败。

输出:

这里发生的是生产者比消费者快100倍。咱们尝试经过缓冲多达1000个项目来解决这个问题。很容易计算出,当消费者消费第11个项目时,生产者已经生产了1100个项目,远远超过了咱们的缓冲区容量。而后序列失败,由于它没法处理背压。

onBackpressureDrop

若是没法接收项目,onBackpressureDrop运算符将丢弃这些项目。

咱们在这里看到的是前128个项目正常消耗,但后来咱们跳了起来。介于二者之间的项目被onBackPressureDrop删除。即便咱们没有请求它,前128个项目仍然是缓冲的,由于observeOn在切换线程之间使用一个小缓冲区。

原文:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part 4 - Concurrency/4. Backpressure.md

未完待续!

git:https://github.com/woshiyexinjie/rxjava-leaner

相关文章
相关标签/搜索