Android Rxjava :最简单&全面背压讲解 (Flowable)

1.前言

阅读本文须要对Rxjava了解,若是尚未了解或者使用过Rxjava的兄die们,能够观看我另一篇 Android Rxjava:不同的诠释进行学习。java

Rxjava背压被观察者发送事件的速度大于观察者接收事件的速度时,观察者内会建立一个无限制大少的缓冲池存储未接收的事件,所以当存储的事件愈来愈多时就会致使OOM的出现。(注:当subscribeOn与observeOn不为同一个线程时,被观察者与观察者内存在不一样时长耗时任务,就会使发送与接收速度存在差别。)git

背压例子github

public void backpressureSample(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                int i = 0;
                while(true){
                    Thread.sleep(500);
                    i++;
                    e.onNext(i);
                    Log.i(TAG,"每500ms发送一次数据:"+i);
                }
            }
        }).subscribeOn(Schedulers.newThread())//使被观察者存在独立的线程执行
          .observeOn(Schedulers.newThread())//使观察者存在独立的线程执行
          .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  Thread.sleep(5000);
                  Log.e(TAG,"每5000m接收一次数据:"+integer);
              }
          });
    }
复制代码

例子执行效果缓存

经过上述例子能够大概了解背压是如何产生,所以Rxjava2.0版本提供了 Flowable 解决背压问题。
本文章就是使用与分析 Flowable 是如何解决背压问题。
文章中实例 linhaojian的Githubide

2.目录


3.简介


4.使用与原理详解

4.1 Flowable 与 Observable 的区别

flowable与observable对比

上图能够很清楚看出两者的区别,其实Flowable 出来以上的区别以外,它其余全部使用与Observable彻底同样。函数

Flowable 的create例子post

public void flowable(){
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for(int j = 0;j<=150;j++){
                    e.onNext(j);
                    Log.i(TAG," 发送数据:"+j);
                    try{
                        Thread.sleep(50);
                    }catch (Exception ex){
                    }
                }
            }
        },BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE); //观察者设置接收事件的数量,若是不设置接收不到事件
            }
            @Override
            public void onNext(Integer integer) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.e(TAG,"onNext : "+(integer));
            }
            @Override
            public void onError(Throwable t) {
                Log.e(TAG,"onError : "+t.toString());
            }
            @Override
            public void onComplete() {
                Log.e(TAG,"onComplete");
            }
        });
    }
复制代码

4.2 BackpressureStrategy媒体类

从Flowable源码查看,缓存池默认大少为:128性能

public abstract class Flowable<T> implements Publisher<T> {
    /** The default buffer size. */
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
    .....
}
复制代码

经过上面的例子,咱们能够看到create方法中的包含了一个BackpressureStrategy媒体类,其包含5种类型:学习

4.2.1. ERROR

把上面例子改成ERROR类型,执行结果以下:spa


总结 :当被观察者发送事件大于128时,观察者抛出异常并终止接收事件,但不会影响被观察者继续发送事件。

4.2.2. BUFFER

把上面例子改成BUFFER类型,执行结果以下:


总结 :与Observable同样存在背压问题,可是接收性能比Observable低,由于BUFFER类型经过BufferAsyncEmitter添加了额外的逻辑处理,再发送至观察者。

4.2.3. DROP

把上面例子改成DROP类型,执行结果以下:


总结 :每当观察者接收128事件以后,就会丢弃部分事件

4.2.4. LATEST

把上面例子改成LATEST类型,执行结果以下:


总结 :LATEST与DROP使用效果同样,但LATEST会保证能接收最后一个事件,而DROP则不会保证。

4.2.5. MISSING

把上面例子改成MISSING类型,执行结果以下:


总结 :MISSING就是没有采起背压策略的类型,效果跟Obserable同样。

在设置MISSING类型时,能够配合onBackPressure相关操做符使用,也能够到达上述其余类型的处理效果。

4.3 onBackPressure相关操做符

使用例子:

Flowable.interval(50,TimeUnit.MILLISECONDS)
        .onBackpressureDrop()//效果与Drop类型同样
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.e(TAG,"onNext : "+(aLong));
            }
        });
复制代码

onBackpressureBuffer :与BUFFER类型同样效果。
onBackpressureDrop :与DROP类型同样效果。
onBackpressureLaster :与LASTER类型同样效果。

4.4 request()

4.4.1 request(int count):设置接收事件的数量.

例子:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for(int j = 0;j<50;j++){
                    e.onNext(j);
                    Log.i(TAG," 发送数据:"+j);
                    try{
                        Thread.sleep(50);
                    }catch (Exception ex){
                    }
                }
            }
        },BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(10); //观察者设置接收事件的数量,若是不设置接收不到事件
            }
            @Override
            public void onNext(Integer integer) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.e(TAG,"onNext : "+(integer));
            }
            @Override
            public void onError(Throwable t) {
                Log.e(TAG,"onError : "+t.toString());
            }
            @Override
            public void onComplete() {
                Log.e(TAG,"onComplete");
            }
        });
复制代码

4.4.2 request扩展使用

request还可进行扩展使用,当遇到在接收事件时想追加接收数量(如:通讯数据经过几回接收,验证准确性的应用场景),能够经过如下方式进行扩展:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for(int j = 0;j<50;j++){
                    e.onNext(j);
                    Log.i(TAG," 发送数据:"+j);
                    try{
                        Thread.sleep(50);
                    }catch (Exception ex){
                    }
                }
            }
        },BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription s) {
                subscription = s;
                s.request(10); //观察者设置接收事件的数量,若是不设置接收不到事件
            }
            @Override
            public void onNext(Integer integer) {
                if(integer==5){
                    subscription.request(3);
                }
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.e(TAG,"onNext : "+(integer));
            }
            @Override
            public void onError(Throwable t) {
                Log.e(TAG,"onError : "+t.toString());
            }
            @Override
            public void onComplete() {
                Log.e(TAG,"onComplete");
            }
        });
复制代码

总结:能够动态设置观察者接收事件的数量,但不影响被观察者继续发送事件。

4.5 requested

requestedrequest不是同一的函数,但它们都是属于FlowableEmitter类里的方法,那么requested()是有什么做用呢,看看如下例子:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for(int j = 0;j<15;j++){
                    e.onNext(j);
                    Log.i(TAG,e.requested()+" 发送数据:"+j);
                    try{
                        Thread.sleep(50);
                    }catch (Exception ex){
                    }
                }
            }
        },BackpressureStrategy.BUFFER)
// .subscribeOn(Schedulers.newThread())
// .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription s) {
                subscription = s;
                s.request(10); //观察者设置接收事件的数量,若是不设置接收不到事件
            }
            @Override
            public void onNext(Integer integer) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.e(TAG,"onNext : "+(integer));
            }
            @Override
            public void onError(Throwable t) {
                Log.e(TAG,"onError : "+t.toString());
            }
            @Override
            public void onComplete() {
                Log.e(TAG,"onComplete");
            }
        });
复制代码

从图中咱们能够发现,requested打印的结果就是 剩余可接收的数量 ,它的做用就是能够检测剩余可接收的事件数量。

5.总结

到此,Flowable讲解完毕。
若是喜欢个人分享,能够点击 关注 或者 ,大家支持是我分享的最大动力 。



+qq群457848807:。获取以上高清技术思惟图,以及相关技术的免费视频学习资料

相关文章
相关标签/搜索