首先要感谢 Season_zlc 的一系列RxJava2
的教程,关于上游、下游、水缸的类比,让我对于整个RxJava2
的基本思想有了更加清晰的认识。你们有兴趣的话必定要多看看,写的通俗易懂,传送门:给初学者的 RxJava 2.0 教程 (一) ,本文的思想都来源于它的一系列文章。java
文章比较长,为了不耽误你们的时间,先列出须要介绍的知识点: react
在开始学习以前,咱们先看一下最简单的例子:android
dependencies {
//在build.gradle中,导入依赖。
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
复制代码
Observable
+ Observer
的最简单示例,这里咱们在上游发送了四个onNext(String s)
事件以后,最后发送了一个onComplete()
事件。public static void classicalSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("1");
observableEmitter.onNext("2");
observableEmitter.onNext("3");
observableEmitter.onNext("4");
observableEmitter.onComplete();
}
}).subscribe(new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "onSubscribe");
mDisposable = disposable;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext=" + s);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
复制代码
onSubscribe
;onNext
;onComplete
。
在上面的例子中,涉及到了如下五个类:缓存
Observable
:上游。ObservableOnSubscribe
:上游的create
方法所接收的参数。ObservableEmitter
:上游事件的发送者。Observer
:下游的接收者。Disposable
:用于维系上游、下游之间的联系。对于整个模型,能够总结为如下几点:bash
RxJava2
简单的来讲,就是一个发送事件、接收事件的过程,咱们能够将发送事件方类比做上游,而接收事件方类比做下游。Observable
,而下游对应Observer
。subscribe
方法。各关键元素的UML
图以下: 网络
用于 发出事件,它能够分别发出onNext/onComplete/onError
事件:app
onNext
,下游也能够接收无限个onNext
。onComplete/onError
后,上游onComplete/onError
后的事件将会继续发送,可是下游在收到onComplete/onError
事件后再也不继续接收事件。onComplete
或者onError
事件。onError
或者onComplete
切断了上游和下游的联系,在联系切断后上游再发送onError
事件就会报错,onComplete
和onError
的调用状况有如下几种: (1) onComplete
能够发送屡次,可是只会收到一次回调。 (2) onError
只能够发送一次,发送屡次会报错。 (3) onComplete
以后不能够发送onError
,不然会报错。 (4) onError
以后能够发送onComplete
,可是只会收到onError
事件。onError
的参数不容许为空。其继承关系以下图所示: ide
理解成为 水管的机关,当调用它的dispose
方法时,将会将上游和下游之间的管道切断,从而致使 下游接收不到事件。函数
Observer
的onSubscribe
回调中,会传入一个Disposable
对象,下游能够经过该对象的dispose()
方法主动切断和上游的联系,在这以后上游的observableEmitter.isDisposed()
方法将返回true
。onComplete/onError
在内的任何事件,若此时上游再调用onError
方法发送事件,那么将会报错。咱们来模拟一下,在下游收到2
以后,经过Disposable
来切断上游和下游之间的联系:工具
public static void classicalSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("1");
observableEmitter.onNext("2");
observableEmitter.onNext("3");
observableEmitter.onNext("4");
observableEmitter.onComplete();
}
}).subscribe(new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "onSubscribe");
mDisposable = disposable;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext=" + s);
if ("2".equals(s)) {
mDisposable.dispose();
}
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
复制代码
最终的运行结果为:
经过subscribe
肯定上游和下游的联系有如下几种方法:
Consumer<T>
类Observer
类Action
类对于不使用Observer
类做为形参的subscribe
函数,其实实现的功能和使用Observer
类做为参数的方法相同,只不过它们是将Observer
的四个回调分解成形参,有参数的回调用Consumer<T>
代替,而没有参数的则用Action
代替。
Observable
来发送事件,那么这个上游就默认在主线程发送事件;而当咱们在下游建立一个Observer
来接收事件,那么这个下游就默认在主线程中接收事件。subscribeOn
指定的是 上游发送事件 的线程,而observeOn
指定的是 下游接收事件 的线程。subscribeOn
只有第一次有效,而每调用一次observeOn
,那么下游接收消息的线程就会切换一次。CompositeDisposable
能够用来容纳Disposable
对象,每当咱们获得一个Disposable
对象时,就经过add
方法将它添加进入容器,在退出的时候,调用clear
方法,便可切断全部的水管。Schedulers.io()
:表明IO
操做,一般用于网络请求、文件读写等IO
密集型的操做。Schedulers.computation()
:表明CPU
密集型的操做,适用于大量计算。Schedulers.newThread()
:建立新的常规线程。AndroidSchedulers.mainThread()
:表明Android
的主线程。在链式调用当中,咱们能够经过observeOn
方法屡次切换管道下游处理消息的线程,例以下面的代码,咱们对下游进行了两次线程的切换:
static void mapSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=true");
observableEmitter.onNext("true");
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=false");
observableEmitter.onNext("false");
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",onComplete");
observableEmitter.onComplete();
}
//1.指定了subscribe方法执行的线程,并进行第一次下游线程的切换,将其切换到新的子线程。
}).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(new Function<String, Boolean>() {
@Override
public Boolean apply(String s) throws Exception {
Log.d(TAG, "apply's thread=" + Thread.currentThread().getId() + ",s=" + s);
return "true".equals(s);
}
//2.进行第二次下游线程的切换,将其切换到主线程。
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(Boolean aBoolean) {
Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",boolean=" + aBoolean);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",onComplete");
}
});
}
复制代码
以上代码的运行的结果为:
Map
操做符的做用是对上游发送的每个事件应用一个函数,使得每一个事件按照函数的逻辑进行变换,经过Map
就能够把上游发送的每个事件,转换成Object
或者集合,其英文注释为:
map
的代码为例,能够看到map
接收一个Function
类,它有两个泛型变量,分别为调用map
方法的Observable<T>
的<T>
泛型,和返回的Obervable<R>
的<R>
泛型。public static void mapVerify() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
});
Observable<String> convertObservable = sourceObservable.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
Log.d(TAG, "sourceObservable=" + sourceObservable + "\n convertObservable=" + convertObservable);
}
复制代码
Function
为一个接口:
map
函数调用完毕以后,将返回一个新的
Observable
,它的类型为
ObservableMap
:
FlatMap
用于将一个发送事件的上游Observable
变换成多个发送事件的Observable
,而后将它们发送的事件合并,放进一个单独的Observable
中,其注释为:
FlatMap
不保证不一样水管之间事件的顺序,若是须要保证顺序,则须要使用contactMap
。static void flatMapSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.fromArray("a value of " + integer + ",b value of " + integer);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
复制代码
和map
操做符相似,它也接收一个类型为Function
的接口,只不过它的? extends R
参数类型换成了? extends Observable<? extends R>
。
前面咱们说到,flatMap
操做符不会保证下游接收事件的顺序,下面,咱们就以一个例子来讲明,在flatMap
的apply
函数中,咱们将一个事件转换成两个Observable
,而且加上了延时:
static void flatMapOrderSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "flatMapOrderSample emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "flatMapOrderSample emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "flatMapOrderSample emit 3");
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
Log.d(TAG, "flatMapOrderSample apply=" + integer);
long delay = (3 - integer) * 100;
return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
复制代码
能够看到,最终的输出结果和flatMap
收到事件的顺序并不相同:
flatMap
换成
contactMap
:
static void contactMapOrderSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(2);
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
Log.d(TAG, "contactMapOrderSample apply=" + integer);
long delay = (3 - integer) * 100;
return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
复制代码
最终的运行结果为:
Zip
经过一个函数从多个Observable
每次各取出一个事件,合并成一个新的事件发送给下游。static void zipSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "sourceObservable emit 1");
observableEmitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "sourceObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "sourceObservable emit 3");
observableEmitter.onNext(3);
Log.d(TAG, "sourceObservable emit 4");
observableEmitter.onNext(4);
}
});
Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "otherObservable emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "otherObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "otherObservable emit 3");
observableEmitter.onNext(3);
}
});
Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "resultObservable onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "resultObservable onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "resultObservable onError");
}
@Override
public void onComplete() {
Log.d(TAG, "resultObservable onComplete");
}
});
}
复制代码
此时的运行结果为:
static void zipSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "sourceObservable emit 1");
observableEmitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "sourceObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "sourceObservable emit 3");
observableEmitter.onNext(3);
Log.d(TAG, "sourceObservable emit 4");
observableEmitter.onNext(4);
}
});
Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "otherObservable emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "otherObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "otherObservable emit 3");
observableEmitter.onNext(3);
}
}).subscribeOn(Schedulers.io());
Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "resultObservable onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "resultObservable onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "resultObservable onError");
}
@Override
public void onComplete() {
Log.d(TAG, "resultObservable onComplete");
}
});
}
复制代码
运行结果为:
“背压”其实就是一种用于解决问题的工具,那么咱们的问题又是什么呢?
想必你们在不少文章中都听过这个一句话:在RxJava2
中,Observable
不支持“背压”,而Flowable
支持背压。
关于Observable
不支持背压,咱们应当从两种状况去考虑,即上游、下游是否位于相同的线程。
首先,咱们不调用observeOn
和subscribeOn
方法来改变上游、下游的工做线程,这样,上游和下游就位于同一线程,同时,咱们在下游的处理函数中,每收到一个消息就休眠2000ms
,以模拟上游处理速度大于下游的场景。
static void oomSample() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
for (int i = 0; i < 1000; i++) {
Log.d(TAG, "observableEmitter=" + i);
observableEmitter.onNext(i);
}
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.d(TAG, "accept=" + integer);
}
});
}
复制代码
从下面的打印结果能够看到,当“使用 Observable
,而且上游、下游位于相同线程”时,并不会出现消息堆积的状况,由于上游发射完一条消息后,必需要等到下游处理完该消息,才会发射一条新的消息。
接着,咱们采用subscribeOn
和observeOn
来使得上游和下游位于不一样的工做线程,其它均和2.2
中相同。
static void oomSample() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
for (int i = 0; i < 1000; i++) {
Log.d(TAG, "observableEmitter=" + i);
observableEmitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.d(TAG, "accept=" + integer);
}
});
}
复制代码
和2.2
中不一样,当上游和下游位于不一样的工做线程,那么上游发送消息时,不会考虑下游是否已经处理了以前的消息,它会直接发送,而这些发送的消息被存放在水缸当中,下游每处理完一条消息,就去水缸中取下一条数据,那么随着水缸中数据愈来愈多,那么系统中的无用资源就会急剧增长。
咱们之因此说Observable
不支持“背压”,就是在2.1
介绍的整个族谱中,没有一个类,一种方法能让下游通知上游说:不要再发消息到水缸里了,我已经处理不过来了!
那是否是说Flowable
支持“背压”,而Observable
不支持,那么Observable
就要被取代了呢,其实否则,Flowable
对于“背压”的支持是以性能为代价的,咱们应当只在有可能出现2.3
中上游下游速率不匹配的问题时,才去使用Flowable
,不然就应当使用Observable
,也就是知足两点条件:
Flowable
和Subscriber
分别对应于以前讨论的Observable
和Observer
,它们直接的链接仍然是经过subscribe
方法。Flowable
在设计的时候采用了 响应式拉取 的思想,当下游调用了Subscription
的request
方法时,就代表了下游处理事件的能力,这样上游就能够根据这个值来控制事件发送的频率,避免出现前面谈到的上游发送太快,而下游处理太慢从而致使OOM
的发生。static void flowSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
sourceFlow.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
复制代码
其类结构图和Observable
几乎彻底一致:
从上面的类图能够看出,Flowable
和Observable
最大的不一样,就是在create
方法中,须要传入额外的参数,它表示的是“背压”的策略,这里可选的值包括:
ERROR
BUFFER
DROP
LATEST
request(n)
的值,那么会抛出MissingBackpressureException
异常。128
,只有当下游调用request
时,才从水缸中取出事件发送给下游,若是水缸中事件的个数超过了128
,那么也会抛出MissingBackpressureException
异常。下面这段代码,咱们先将三个事件放入到水缸当中,以后每次调用request
方法就会从水缸当中取出一个事件发送给下游。
static void flowSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
static void clickSubscription() {
if (sSubscription != null) {
sSubscription.request(1);
}
}
复制代码
当上游和下游位于不一样的线程,每次经过Subscription
调用request
就会从水缸中取出一个事件,发送给下游:
BUFFER
策略时,至关于在上游放置了一个容量无限大的水缸,全部下游暂时没法处理的消息都放在水缸当中,这里再也不像ERROR
策略同样,区分上游和下游是否位于同一线程。OOM
。static void clickSubscription() {
if (sSubscription != null) {
sSubscription.request(10);
}
}
static void flowBufferSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10000;i ++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
复制代码
在上面的例子中,咱们先把10000
条消息放入到水缸当中,以后经过Subscription
每次从水缸中取出10
条消息发送给下游,演示结果为:
DROP
策略时,会把水缸没法存放的事件丢弃掉,这里一样不会受到下游和下游是否处于同一个线程的限制。static void flowDropSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 130; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
复制代码
咱们先往水缸中放入130
条消息,以后每次经过Subscription
取出60
条消息发送给下游,能够看到,最后最多只取到了第128
条消息,第129/130
条消息被丢弃了。
DROP
相似,当水缸没法容纳下消息时,会将它丢弃,可是除此以外,上游还会缓存最新的一条消息,实例以下:static void flowLatestSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 130; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
复制代码
从下面的运行结果能够看出,当取出最后一批数据的时候,上游除了收到存储在水缸当中的数据,还额外收到了最后一条消息,也就是第130
条数据,这就是DROP
策略和LATEST
策略的区别: