关于RxJava最友好的文章——背压(Backpressure)

前言

背压(Backpressure)多是全部想要深刻运用RxJava的朋友必须理解的一个概念java

关于它的介绍,我本意是想写在RxJava2.0更新介绍的文章里的,但是写着写着发现,要完整介绍这个概念须要花费的篇幅太长,刚好目前对于背压的介绍文章比较少,因此决定单独拿出来,自成一篇。而关于RxJava2.0的文章修改以后就会发出来和你们探讨。react

若是对于RxJava不是很熟悉,那么在这篇文章以前,我但愿你们先看看那篇关于Rxjava最友好的文章,能够帮助你们很顺畅的了解RxJava。缓存


从场景出发

让咱们先忘掉背压(Backpressure)这个概念,从RxJava一个比较常见的工做场景提及。bash

RxJava是一个观察者模式的架构,当这个架构中被观察者(Observable)和观察者(Subscriber)处在不一样的线程环境中时,因为者各自的工做量不同,致使它们产生事件和处理事件的速度不同,这就会出现两种状况:架构

  • 被观察者产生事件慢一些,观察者处理事件很快。那么观察者就会等着被观察者发送事件,(比如观察者在等米下锅,程序等待,这没有问题)
  • 被观察者产生事件的速度很快,而观察者处理很慢。那就出问题了,若是不做处理的话,事件会堆积起来,最终挤爆你的内存,致使程序崩溃。(比如被观察者生产的大米没人吃,堆积最后就会烂掉)

下面咱们用代码演示一下这种崩溃的场景:异步

//被观察者在主线程中,每1ms发送一个事件
Observable.interval(1, TimeUnit.MILLISECONDS)
                //.subscribeOn(Schedulers.newThread())
                //将观察者的工做放在新线程环境中
                .observeOn(Schedulers.newThread())
                //观察者处理每1000ms才处理一个事件
                .subscribe(new Action1<Long>() {
                      @Override
                      public void call(Long aLong) {
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          Log.w("TAG","---->"+aLong);
                      }
                  });
复制代码

在上面的代码中,被观察者发送事件的速度是观察者处理速度的1000倍ide

这段代码运行以后:post

...
    Caused by: rx.exceptions.MissingBackpressureException
    ...
    ...
复制代码

抛出MissingBackpressureException每每就是由于,被观察者发送事件的速度太快,而观察者处理太慢,并且你尚未作相应措施,因此报异常。学习

而这个MissingBackpressureException异常里面就包含了Backpressure这个单词,看来背压确定和这种异常状况有关系。spa

那么背压(Backpressure)究竟是什么呢?


关于背压(Backpressure)

我这两天翻阅了大量的中文和英文资料,我发现中文资料中,不少人对于背压(Backpressure)的理解是有很大问题的,有的人把它看做一个须要避免的问题,或者程序的异常,有的人则干脆避而不谈,模棱两可,着实让人尴尬。

经过参考和对比大量的相关资料,我在这里先对背压(Backpressure)作一个明确的定义:背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的状况下,一种告诉上游的被观察者下降发送速度的策略

简而言之,背压是流速控制的一种策略

须要强调两点:

  • 背压策略的一个前提是异步环境,也就是说,被观察者和观察者处在不一样的线程环境中。
  • 背压(Backpressure)并非一个像flatMap同样能够在程序中直接使用的操做符,他只是一种控制事件流速的策略。

那么咱们再回看上面的程序异常就很好理解了,就是当被观察者发送事件速度过快的状况下,咱们没有作流速控制,致使了异常。

那么背压(Backpressure)策略具体是哪如何实现流速控制的呢?


响应式拉取(reactive pull)

首先咱们回忆以前那篇关于Rxjava最友好的文章,里面其实提到,在RxJava的观察者模型中,被观察者是主动的推送数据给观察者,观察者是被动接收的。而响应式拉取则反过来,观察者主动从被观察者那里去拉取数据,而被观察者变成被动的等待通知再发送数据

结构示意图以下:

观察者能够根据自身实际状况按需拉取数据,而不是被动接收(也就至关于告诉上游观察者把速度慢下来),最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略。

代码实例以下:

//被观察者将产生100000个事件
Observable observable=Observable.range(1,100000);
class MySubscriber extends Subscriber<T> {
    @Override
    public void onStart() {
    //必定要在onStart中通知被观察者先发送一个事件
      request(1);
    }
 
    @Override
    public void onCompleted() {
        ...
    }
 
    @Override
    public void onError(Throwable e) {
        ...
    }
 
    @Override
    public void onNext(T n) {
        ...
        ...
        //处理完毕以后,在通知被观察者发送下一个事件
        request(1);
    }
}

observable.observeOn(Schedulers.newThread())
            .subscribe(MySubscriber);
复制代码

在代码中,传递事件开始前的onstart()中,调用了request(1),通知被观察者先发送一个事件,而后在onNext()中处理完事件,再次调用request(1),通知被观察者发送下一个事件....

注意在onNext()方法中,最好最后再调用request()方法.

若是你想取消这种backpressure 策略,调用quest(Long.MAX_VALUE)便可。

实际上,在上面的代码中,你也能够不须要调用request(n)方法去拉取数据,程序依然能完美运行,这是由于range --> observeOn,这一段中间过程自己就是响应式拉取数据,observeOn这个操做符内部有一个缓冲区,Android环境下长度是16,它会告诉range最多发送16个事件,充满缓冲区便可。不过话说回来,在观察者中使用request(n)这个方法可使背压的策略表现得更加直观,更便于理解

若是你足够细心,会发现,在开头展现异常状况的代码中,使用的是interval这个操做符,可是在这里使用了range操做符,为何呢?

这是由于interval操做符自己并不支持背压策略,它并不响应request(n),也就是说,它发送事件的速度是不受控制的,而range这类操做符是支持背压的,它发送事件的速度能够被控制。

那么到底什么样的Observable是支持背压的呢?


Hot and Cold Observables

须要说明的时,Hot Observables 和cold Observables并非严格的概念区分,它只是对于两类Observable形象的描述

  • Cold Observables:指的是那些在订阅以后才开始发送事件的Observable(每一个Subscriber都能接收到完整的事件)。
  • Hot Observables:指的是那些在建立了Observable以后,(不论是否订阅)就开始发送事件的Observable

其实也有建立了Observable以后调用诸如publish()方法就能够开始发送事件的,这里我们暂且忽略。

咱们通常使用的都是Cold Observable,除非特殊需求,才会使用Hot Observable,在这里,Hot Observable这一类是不支持背压的,而是Cold Observable这一类中也有一部分并不支持背压(好比interval,timer等操做符建立的Observable)。

懵逼了吧?

Tips: 都是Observable,结果有的支持背压,有的不支持,这就是RxJava1.X的一个问题。在2.0中,这种问题已经解决了,之后谈到2.0时再细说。

在那些不支持背压策略的操做符中使用响应式拉取数据的话,仍是会抛出MissingBackpressureException。

那么,不支持背压的Observevable如何作流速控制呢?


流速控制相关的操做符

过滤(抛弃)

就是虽然生产者产生事件的速度很快,可是把大部分的事件都直接过滤(浪费)掉,从而间接的下降事件发送的速度。

相关相似的操做符:Sample,ThrottleFirst.... 以sample为例,

Observable.interval(1, TimeUnit.MILLISECONDS)

                .observeOn(Schedulers.newThread())
                //这个操做符简单理解就是每隔200ms发送里时间点最近那个事件,
                //其余的事件浪费掉
                  .sample(200,TimeUnit.MILLISECONDS)
                  .subscribe(new Action1<Long>() {
                      @Override
                      public void call(Long aLong) {
                          try {
                              Thread.sleep(200);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          Log.w("TAG","---->"+aLong);
                      }
                  });
复制代码

这是以杀敌一千,自损八百的方式解决这个问题,由于抛弃了绝大部分的事件,而在咱们使用RxJava 时候,咱们本身定义的Observable产生的事件可能都是咱们须要的,通常来讲不会抛弃,因此这种方案有它的缺陷。

缓存

就是虽然被观察者发送事件速度很快,观察者处理不过来,可是能够选择先缓存一部分,而后慢慢读。

相关相似的操做符:buffer,window... 以buffer为例,

Observable.interval(1, TimeUnit.MILLISECONDS)

                .observeOn(Schedulers.newThread())
                //这个操做符简单理解就是把100毫秒内的事件打包成list发送
                .buffer(100,TimeUnit.MILLISECONDS)
                  .subscribe(new Action1<List<Long>>() {
                      @Override
                      public void call(List<Long> aLong) {
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          Log.w("TAG","---->"+aLong.size());
                      }
                  });
                  
复制代码

两个特殊操做符

对于不支持背压的Observable除了使用上述两类生硬的操做符以外,还有更好的选择:onBackpressurebuffer,onBackpressureDrop

  • onBackpressurebuffer:把observable发送出来的事件作缓存,当request方法被调用的时候,给下层流发送一个item(若是给这个缓存区设置了大小,那么超过了这个大小就会抛出异常)。
  • onBackpressureDrop:将observable发送的事件抛弃掉,直到subscriber再次调用request(n)方法的时候,就发送给它这以后的n个事件。

下面,咱们以onBackpressureDrop为例说说用法:

Observable.interval(1, TimeUnit.MILLISECONDS)
                .onBackpressureDrop()
                .observeOn(Schedulers.newThread())
               .subscribe(new Subscriber<Long>() {

                    @Override
                    public void onStart() {
                        Log.w("TAG","start");
//                        request(1);
                    }

                    @Override
                      public void onCompleted() {

                      }
                      @Override
                      public void onError(Throwable e) {
                            Log.e("ERROR",e.toString());
                      }

                      @Override
                      public void onNext(Long aLong) {
                          Log.w("TAG","---->"+aLong);
                          try {
                              Thread.sleep(100);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  });
复制代码

这段代码的输出:

W/TAG: start
W/TAG: ---->0
W/TAG: ---->1
W/TAG: ---->2
W/TAG: ---->3
W/TAG: ---->4
W/TAG: ---->5
W/TAG: ---->6
W/TAG: ---->7
W/TAG: ---->8
W/TAG: ---->9
W/TAG: ---->10
W/TAG: ---->11
W/TAG: ---->12
W/TAG: ---->13
W/TAG: ---->14
W/TAG: ---->15
W/TAG: ---->1218
W/TAG: ---->1219
W/TAG: ---->1220
...
复制代码

之因此出现0-15这样连贯的数据,就是是由于observeOn操做符内部有一个长度为16的缓存区,它会首先请求16个事件缓存起来....

你可能会以为这两个操做符和上面讲的过滤和缓存很相似,确实,功能上是有些相似,可是这两个操做符提供了更多的特性,那就是能够响应下游观察者的request(n)方法了,也就是说,使用了这两种操做符,可让本来不支持背压的Observable“支持”背压了


勘误

1, 本文以前对于Hot Observables和Cold observables的描述写反了,是我太大意,目前已改正,大家如今看到的是正确的,感谢@jaychang0917的提醒


后记

讲了这么多终于要到尾声了。

下面咱们总结一下:

  • 背压是一种策略,具体措施是下游观察者通知上游的被观察者发送事件
  • 背压策略很好的解决了异步环境下被观察者和观察者速度不一致的问题
  • 在RxJava1.X中,一样是Observable,有的不支持背压策略,致使某些状况下,显得特别麻烦,出了问题也很难排查,使得RxJava的学习曲线变得十份陡峭。

这篇文章并非为了让你学习在RxJava1.0中使用背压(若是你以前不了解背压的话),由于在1.0中,背压的设计并不十分完美。而是但愿你对背压有一个全面清晰的认识,对于它在RxJava1.0中的设计缺陷有所了解便可。由于这篇文章自己是为了2.0作一个铺垫,后续的文章中我会继续谈到背压和使用背压的正确姿式。

相关文章
相关标签/搜索