RxJava 知识梳理(3) RxJava2 基础知识小结

前言

首先要感谢 Season_zlc 的一系列RxJava2的教程,关于上游、下游、水缸的类比,让我对于整个RxJava2的基本思想有了更加清晰的认识。你们有兴趣的话必定要多看看,写的通俗易懂,传送门:给初学者的 RxJava 2.0 教程 (一) ,本文的思想都来源于它的一系列文章。java

文章比较长,为了不耽误你们的时间,先列出须要介绍的知识点: react

1、RxJava2 的基本模型

1.1 使用实例

在开始学习以前,咱们先看一下最简单的例子:android

  • 第一步:导入依赖包:
dependencies {
    //在build.gradle中,导入依赖。
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
复制代码
  • 第二步:使用最基本的Observable + Observer的最简单示例,这里咱们在上游发送了四个onNext(String s)事件以后,最后发送了一个onComplete()事件。
public static void classicalSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                observableEmitter.onNext("1");
                observableEmitter.onNext("2");
                observableEmitter.onNext("3");
                observableEmitter.onNext("4");
                observableEmitter.onComplete();

            }
        }).subscribe(new Observer<String>() {

            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "onSubscribe");
                mDisposable = disposable;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext=" + s);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");

            }
        });
    }
复制代码
  • 第三步:运行结果,订阅成功以后,会依次回调如下三步操做:onSubscribeonNextonComplete

1.2 基本元素

在上面的例子中,涉及到了如下五个类:缓存

  • Observable:上游。
  • ObservableOnSubscribe:上游的create方法所接收的参数。
  • ObservableEmitter:上游事件的发送者。
  • Observer:下游的接收者。
  • Disposable:用于维系上游、下游之间的联系。

对于整个模型,能够总结为如下几点:bash

  • RxJava2简单的来讲,就是一个发送事件、接收事件的过程,咱们能够将发送事件方类比做上游,而接收事件方类比做下游。
  • 上游每产生一个事件,下游就能收到事件,上游对应Observable,而下游对应Observer
  • 只有当上游和下游创建链接以后,上游才会开始发送事件,这一关系的创建是经过subscribe方法。

各关键元素的UML图以下: 网络

1.3 ObservableEmitter

用于 发出事件,它能够分别发出onNext/onComplete/onError事件:app

  • 上游能够发送无限个onNext,下游也能够接收无限个onNext
  • 当上游发送了一个onComplete/onError后,上游onComplete/onError后的事件将会继续发送,可是下游在收到onComplete/onError事件后再也不继续接收事件。
  • 上游能够不发送onComplete或者onError事件。
  • 调用onError或者onComplete切断了上游和下游的联系,在联系切断后上游再发送onError事件就会报错,onCompleteonError的调用状况有如下几种: (1) onComplete能够发送屡次,可是只会收到一次回调。 (2) onError只能够发送一次,发送屡次会报错。 (3) onComplete以后不能够发送onError,不然会报错。 (4) onError以后能够发送onComplete,可是只会收到onError事件。
  • onError的参数不容许为空。

其继承关系以下图所示: ide

1.4 Disposable

理解成为 水管的机关,当调用它的dispose方法时,将会将上游和下游之间的管道切断,从而致使 下游接收不到事件函数

  • ObserveronSubscribe回调中,会传入一个Disposable对象,下游能够经过该对象的dispose()方法主动切断和上游的联系,在这以后上游的observableEmitter.isDisposed()方法将返回true
  • 当上游和下游的联系切断以后,下游收不到包括onComplete/onError在内的任何事件,若此时上游再调用onError方法发送事件,那么将会报错。

咱们来模拟一下,在下游收到2以后,经过Disposable来切断上游和下游之间的联系:工具

public static void classicalSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                observableEmitter.onNext("1");
                observableEmitter.onNext("2");
                observableEmitter.onNext("3");
                observableEmitter.onNext("4");
                observableEmitter.onComplete();

            }
        }).subscribe(new Observer<String>() {

            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "onSubscribe");
                mDisposable = disposable;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext=" + s);
                if ("2".equals(s)) {
                    mDisposable.dispose();
                }
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");

            }
        });
    }
复制代码

最终的运行结果为:

1.5 Subscribe 的重载方法

经过subscribe肯定上游和下游的联系有如下几种方法:

能够看到,这里能够分为三类:

  • 不带参数
  • Consumer<T>
  • Observer
  • Action

对于不使用Observer类做为形参的subscribe函数,其实实现的功能和使用Observer类做为参数的方法相同,只不过它们是将Observer的四个回调分解成形参,有参数的回调用Consumer<T>代替,而没有参数的则用Action代替。

2、线程切换

2.1 基本概念

  • 当咱们在上游建立一个Observable来发送事件,那么这个上游就默认在主线程发送事件;而当咱们在下游建立一个Observer来接收事件,那么这个下游就默认在主线程中接收事件。
  • subscribeOn指定的是 上游发送事件 的线程,而observeOn指定的是 下游接收事件 的线程。
  • 屡次调用subscribeOn只有第一次有效,而每调用一次observeOn,那么下游接收消息的线程就会切换一次。
  • CompositeDisposable能够用来容纳Disposable对象,每当咱们获得一个Disposable对象时,就经过add方法将它添加进入容器,在退出的时候,调用clear方法,便可切断全部的水管。

2.2 线程类型

  • Schedulers.io():表明IO操做,一般用于网络请求、文件读写等IO密集型的操做。
  • Schedulers.computation():表明CPU密集型的操做,适用于大量计算。
  • Schedulers.newThread():建立新的常规线程。
  • AndroidSchedulers.mainThread():表明Android的主线程。

2.3 示例

在链式调用当中,咱们能够经过observeOn方法屡次切换管道下游处理消息的线程,例以下面的代码,咱们对下游进行了两次线程的切换:

static void mapSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=true");
                observableEmitter.onNext("true");
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=false");
                observableEmitter.onNext("false");
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",onComplete");
                observableEmitter.onComplete();
            }
        //1.指定了subscribe方法执行的线程,并进行第一次下游线程的切换,将其切换到新的子线程。 
        }).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(new Function<String, Boolean>() {

            @Override
            public Boolean apply(String s) throws Exception {
                Log.d(TAG, "apply's thread=" + Thread.currentThread().getId() + ",s=" + s);
                return "true".equals(s);
            }
        //2.进行第二次下游线程的切换,将其切换到主线程。 
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Boolean>() {

            @Override
            public void onSubscribe(Disposable disposable) {

            }

            @Override
            public void onNext(Boolean aBoolean) {
                Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",boolean=" + aBoolean);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",onComplete");
            }
        });
    }
复制代码

以上代码的运行的结果为:

3、Map 和 FlatMap 操做符

3.1 Map

  • Map操做符的做用是对上游发送的每个事件应用一个函数,使得每一个事件按照函数的逻辑进行变换,经过Map就能够把上游发送的每个事件,转换成Object或者集合,其英文注释为:
  • 如下面使用map的代码为例,能够看到map接收一个Function类,它有两个泛型变量,分别为调用map方法的Observable<T><T>泛型,和返回的Obervable<R><R>泛型。
public static void mapVerify() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
            }
        });
        Observable<String> convertObservable = sourceObservable.map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer.toString();
            }
        });
        Log.d(TAG, "sourceObservable=" + sourceObservable + "\n convertObservable=" + convertObservable);
    }
复制代码

Function为一个接口:

而且在 map函数调用完毕以后,将返回一个新的 Observable,它的类型为 ObservableMap

3.2 FlatMap

  • FlatMap用于将一个发送事件的上游Observable变换成多个发送事件的Observable,而后将它们发送的事件合并,放进一个单独的Observable中,其注释为:
  • 上游每发送一个事件,就会针对该事件建立一个单独的水管,而后发送转换后的新的事件,下游接收到的就是这些新的水管发送的事件。
  • FlatMap不保证不一样水管之间事件的顺序,若是须要保证顺序,则须要使用contactMap

3.2.1 示例

static void flatMapSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                observableEmitter.onNext(1);
                observableEmitter.onNext(2);
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                return Observable.fromArray("a value of " + integer + ",b value of " + integer);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }
复制代码

map操做符相似,它也接收一个类型为Function的接口,只不过它的? extends R参数类型换成了? extends Observable<? extends R>

3.2.2 FlatMap 不保证下游接收事件的顺序

前面咱们说到,flatMap操做符不会保证下游接收事件的顺序,下面,咱们就以一个例子来讲明,在flatMapapply函数中,咱们将一个事件转换成两个Observable,而且加上了延时:

static void flatMapOrderSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "flatMapOrderSample emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "flatMapOrderSample emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "flatMapOrderSample emit 3");
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                Log.d(TAG, "flatMapOrderSample apply=" + integer);
                long delay = (3 - integer) * 100;
                return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }
复制代码

能够看到,最终的输出结果和flatMap收到事件的顺序并不相同:

下面,仍是一样的场景,将 flatMap换成 contactMap

static void contactMapOrderSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(2);
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).concatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                Log.d(TAG, "contactMapOrderSample apply=" + integer);
                long delay = (3 - integer) * 100;
                return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }
复制代码

最终的运行结果为:

4、Zip 操做符

4.1 基本概念

  • Zip经过一个函数从多个Observable每次各取出一个事件,合并成一个新的事件发送给下游。
  • 组合的顺序是严格按照事件发送的顺序来的。
  • 最终下游收到的事件数量和上游中发送事件最少的那一根水管的事件数量相同。

4.1.1 两个 Observable 运行在同一线程当中

static void zipSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "sourceObservable emit 1");
                observableEmitter.onNext(1);
                Thread.sleep(1000);
                Log.d(TAG, "sourceObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "sourceObservable emit 3");
                observableEmitter.onNext(3);
                Log.d(TAG, "sourceObservable emit 4");
                observableEmitter.onNext(4);
            }
        });
        Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "otherObservable emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "otherObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "otherObservable emit 3");
                observableEmitter.onNext(3);
            }
        });
        Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {

            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }

        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "resultObservable onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "resultObservable onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "resultObservable onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "resultObservable onComplete");

            }
        });
    }
复制代码

此时的运行结果为:

4.1.2 两个 Observable 运行在不一样的线程

static void zipSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "sourceObservable emit 1");
                observableEmitter.onNext(1);
                Thread.sleep(1000);
                Log.d(TAG, "sourceObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "sourceObservable emit 3");
                observableEmitter.onNext(3);
                Log.d(TAG, "sourceObservable emit 4");
                observableEmitter.onNext(4);
            }
        });
        Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "otherObservable emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "otherObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "otherObservable emit 3");
                observableEmitter.onNext(3);
            }
        }).subscribeOn(Schedulers.io());
        Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {

            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }

        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "resultObservable onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "resultObservable onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "resultObservable onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "resultObservable onComplete");

            }
        });
    }
复制代码

运行结果为:

5、背压

“背压”其实就是一种用于解决问题的工具,那么咱们的问题又是什么呢?

  • 问题:当上游发送事件的速度很快,下游消费事件的速度又很慢,而系统又必须缓存这些上游发送的消息以便下游处理,那么就会致使系统中堆积了不少的资源。
  • 工具:下游告知上游目前本身的处理能力,上游根据下游的处理能力,进行适当的调整。

想必你们在不少文章中都听过这个一句话:在RxJava2中,Observable不支持“背压”,而Flowable支持背压。

5.1 不支持背压的 Observable

关于Observable不支持背压,咱们应当从两种状况去考虑,即上游、下游是否位于相同的线程。

5.1.1 Observable 之上游、下游位于相同线程

首先,咱们不调用observeOnsubscribeOn方法来改变上游、下游的工做线程,这样,上游和下游就位于同一线程,同时,咱们在下游的处理函数中,每收到一个消息就休眠2000ms,以模拟上游处理速度大于下游的场景。

static void oomSample() {
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    Log.d(TAG, "observableEmitter=" + i);
                    observableEmitter.onNext(i);
                }
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Thread.sleep(2000);
                Log.d(TAG, "accept=" + integer);
            }

        });
    }
复制代码

从下面的打印结果能够看到,当“使用 Observable,而且上游、下游位于相同线程”时,并不会出现消息堆积的状况,由于上游发射完一条消息后,必需要等到下游处理完该消息,才会发射一条新的消息。

5.1.2 Observable 之上游、下游位于不一样线程

接着,咱们采用subscribeOnobserveOn来使得上游和下游位于不一样的工做线程,其它均和2.2中相同。

static void oomSample() {
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    Log.d(TAG, "observableEmitter=" + i);
                    observableEmitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Thread.sleep(2000);
                Log.d(TAG, "accept=" + integer);
            }

        });
    }
复制代码

2.2中不一样,当上游和下游位于不一样的工做线程,那么上游发送消息时,不会考虑下游是否已经处理了以前的消息,它会直接发送,而这些发送的消息被存放在水缸当中,下游每处理完一条消息,就去水缸中取下一条数据,那么随着水缸中数据愈来愈多,那么系统中的无用资源就会急剧增长。

5.1.3 关于 Observable 不支持背压的小结

咱们之因此说Observable不支持“背压”,就是在2.1介绍的整个族谱中,没有一个类,一种方法能让下游通知上游说:不要再发消息到水缸里了,我已经处理不过来了!

那是否是说Flowable支持“背压”,而Observable不支持,那么Observable就要被取代了呢,其实否则,Flowable对于“背压”的支持是以性能为代价的,咱们应当只在有可能出现2.3中上游下游速率不匹配的问题时,才去使用Flowable,不然就应当使用Observable,也就是知足两点条件:

  • 上游和下游位于不一样的工做线程
  • 上游发送消息的速度,要远远大于下游处理消息的速度,有可能形成消息的堆积。

5.2 支持背压的 Flowable

5.2.1 基本概念

  • FlowableSubscriber分别对应于以前讨论的ObservableObserver,它们直接的链接仍然是经过subscribe方法。
  • Flowable在设计的时候采用了 响应式拉取 的思想,当下游调用了Subscriptionrequest方法时,就代表了下游处理事件的能力,这样上游就能够根据这个值来控制事件发送的频率,避免出现前面谈到的上游发送太快,而下游处理太慢从而致使OOM的发生。
  • 只有上游根据下游的处理能力来发送事件,才能达到理想的效果。

5.2.2 基本使用

static void flowSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }

        }, BackpressureStrategy.ERROR);

        sourceFlow.subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
复制代码

其类结构图和Observable几乎彻底一致:

5.3 Flowable 支持背压的策略

从上面的类图能够看出,FlowableObservable最大的不一样,就是在create方法中,须要传入额外的参数,它表示的是“背压”的策略,这里可选的值包括:

  • ERROR
  • BUFFER
  • DROP
  • LATEST

5.3.1 使用 ERROR 的策略

  • 当上游和下游位于同一个线程时,若是上游发送的事件超过了下游声明的request(n)的值,那么会抛出MissingBackpressureException异常。
  • 当上游和下游位于不一样线程时,若是上游发送的事件超过了下游的声明,事件会被放在水缸当中,这个水缸默认的大小是128,只有当下游调用request时,才从水缸中取出事件发送给下游,若是水缸中事件的个数超过了128,那么也会抛出MissingBackpressureException异常。

下面这段代码,咱们先将三个事件放入到水缸当中,以后每次调用request方法就会从水缸当中取出一个事件发送给下游。

static void flowSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }

        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

    static void clickSubscription() {
        if (sSubscription != null) {
            sSubscription.request(1);
        }
    }
复制代码

当上游和下游位于不一样的线程,每次经过Subscription调用request就会从水缸中取出一个事件,发送给下游:

5.3.2 BUFFER 策略

  • 使用BUFFER策略时,至关于在上游放置了一个容量无限大的水缸,全部下游暂时没法处理的消息都放在水缸当中,这里再也不像ERROR策略同样,区分上游和下游是否位于同一线程。
  • 所以,若是下游一直没有处理消息,那么将会致使内存一直增加,从而引发OOM
static void clickSubscription() {
        if (sSubscription != null) {
            sSubscription.request(10);
        }
    }

    static void flowBufferSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 10000;i ++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }

        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
复制代码

在上面的例子中,咱们先把10000条消息放入到水缸当中,以后经过Subscription每次从水缸中取出10条消息发送给下游,演示结果为:

5.3.3 DROP 策略

  • 使用DROP策略时,会把水缸没法存放的事件丢弃掉,这里一样不会受到下游和下游是否处于同一个线程的限制。
static void flowDropSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 130; i++) {
                    emitter.onNext(i);
                }
            }

        }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
复制代码

咱们先往水缸中放入130条消息,以后每次经过Subscription取出60条消息发送给下游,能够看到,最后最多只取到了第128条消息,第129/130条消息被丢弃了。

5.3.4 LATEST 策略

  • DROP相似,当水缸没法容纳下消息时,会将它丢弃,可是除此以外,上游还会缓存最新的一条消息,实例以下:
static void flowLatestSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 130; i++) {
                    emitter.onNext(i);
                }
            }

        }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
复制代码

从下面的运行结果能够看出,当取出最后一批数据的时候,上游除了收到存储在水缸当中的数据,还额外收到了最后一条消息,也就是第130条数据,这就是DROP策略和LATEST策略的区别:


更多文章,欢迎访问个人 Android 知识梳理系列:

相关文章
相关标签/搜索