Android进阶系列之第三方库知识点整理。java
知识点总结,整理也是学习的过程,若有错误,欢迎批评指出。web
上一篇:Rxjava2(一)、基础概念及使用缓存
直接开整,上一篇基础概念里面说了,rxjava2
扩展于观察者模式,咱们上篇的只是简单的介绍了用Observable
来建立使用,其实rxjava2
给咱们提供了五种观察者模式的建立方式。app
可以发射0或n个数据,并以成功或错误事件终止,在第一篇中已经举例说明了,这里就再也不详细说明。异步
可以发射0或n个数据,并以成功或错误事件终止。 支持背压,能够控制数据源发射的速度。ide
咱们看到Observable
和Flowable
这两个的区别就是后者支持背压,那么何为背压?post
背压是一种现象,简单来讲就是在异步操做中,上游发送数据速度快于下游处理数据的速度,下游来不及处理,Buffer 溢出,致使事件阻塞,从而引发的各类问题,好比事件丢失,OOM等。学习
在rxjava1
中并不支持背压,当出现事件阻塞时候,会直接抛出 MissingBackpressureException
异常,可是在rxjava2
中,提供了 Flowable
来建立被观察者,经过Flowable
来处理背压问题,咱们能够简单经过demo分析。spa
A:咱们上游模拟循环发送数据。线程
B:线程切换,异步操做。
C:下游每隔一秒获取数据。
咱们Observable
建立,来模拟了背压这个现象,咱们在上游模拟无限循环的发送数据,下游每次都休眠一秒再获取数据,这样确定会形成咱们前面提的问题,就是上游发送太他丫的快了,下游根本处理不过来,咱们先看结果。
看日志,打印结果停留在了13就没有继续打印了?同时能够看到程序已经崩了,是由于在rxjava2
中,Observable
并不支持背压操做,遇到背压问题,它并不会报错,也不会抛MissingBackpressureException
异常,可是内存会一直飙高,最后致使内存不足程序直接挂掉。
能够看到内存一直在往上飙,针对背压这种现象,rxjava2
中提出用 Flowable
来处理。
下面由浅入深,慢慢揭开Flowable
的神秘面纱。
咱们先用Flowable
建立一个基本的demo:
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
emitter.onNext("事件一");
LogUtil.d(TAG + "--subscribe 发送事件一");
emitter.onNext("事件二");
LogUtil.d(TAG + "--subscribe 发送事件二");
emitter.onNext("事件三");
LogUtil.d(TAG + "--subscribe 发送事件三");
emitter.onNext("事件四");
LogUtil.d(TAG + "--subscribe 发送事件四");
emitter.onComplete();
LogUtil.d(TAG + "--subscribe 发送完成");
}
}, BackpressureStrategy.ERROR) // 这里须要传入背压策略,跟线程池里面饱和策略相似,当缓存区存满时候采起的处理策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // 线程切换,异步操做
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
// 决定观察者能接收多少个事件,多余事件放入缓存区
// Flowable 默认缓存区大小为128,即最大能存放128个事件
s.request(3);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t.getLocalizedMessage());
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
复制代码
能够看到
Flowable
建立和Observable
基本差很少,只是在create
方法中多传入BackpressureStrategy.ERROR
这么一个背压策略,这个后面会详讲。在
onSubscribe
的回调中,参数变成了Subscription
,咱们能够经过这个参数,让观察者本身设置要接收多少个事件,若是发送的事件大于观察者设置接收的事件,多余事件将会存入Flowable
缓存区中。
Flowable
缓存区队列大小只能存放128个事件,若是超过,就会报异常。
结果:
发送四个事件,观察者经过
Subscription.request(3)
设置只接收三个事件,因此下游只接收三个,剩下一个放入Flowable
缓存区中。
若是咱们观察者不设置Subscription.request(x)
,即不接收事件,被观察者仍然会发送事件,并存入缓存区中,观察者能够动态调用Subscription.request(x)
方法来获取事件。
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int x = 0; x <= 10; x++) {
LogUtil.d(TAG + "--subscribe 发送了" + x + "个事件");
emitter.onNext(x + "事件");
}
}
}, BackpressureStrategy.ERROR)
// 线程切换,异步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
subscription = s;
// s.request(3); 这里不指定观察者接收事件个数
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t.getLocalizedMessage());
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
复制代码
动态获取
findViewById(R.id.bt_get_event).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
if (subscription != null) {
LogUtil.d(TAG + "--onClick");
subscription.request(4);
}
}
});
复制代码
能够看到咱们观察者一开始并无指定接收多少个事件,而是经过外接点击事件,来动态设置接收事件个数,咱们看结果,当点击触发后,咱们收到了最早存入队列的四个事件。
结果:
咱们前面提到,Flowable
默认的缓存区队列大小为128,即只能存放上游发送的128个事件,若是上游发送的事件超过128,就须要咱们指定相应的背压策略来作不一样的处理,BackpressureStrategy
为咱们提供了五种背压策略。
整理以下:
策略 | 做用 |
---|---|
MISSING | 当缓存区大小存满(128),被观察者仍然继续发送下一个事件时,抛出异常MissingBackpressureException , 提示缓存区满了 |
ERROR | 当缓存区大小存满(128)(默认缓存区大小128),被观察者仍然继续发送下一个事件时,直接抛出异常MissingBackpressureException |
BUFFER | 当缓存区大小存满(128),被观察者仍然继续发送下一个事件时,缓存区大小设置无限大, 即被观察者可无限发送事件,但其实是存放在缓存区 |
DROP | 当缓存区大小存满,被观察者仍然继续发送下一个事件时, 超过缓存区大小(128)的事件会被所有丢弃 |
LATEST | 当缓存区大小存满,被观察者仍然继续发送下一个事件时,只保存最新/最后发送的事件, 其余超过缓存区大小(128)的事件会被所有丢弃 |
当缓存区大小存满(128),被观察者仍然继续发送下一个事件时,抛出异常MissingBackpressureException
, 提示缓存区满了
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 发送129个事件,模拟超出缓存区
for (int x = 0; x < 129; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 发送了" + x + "个事件");
}
}
}, BackpressureStrategy.MISSING) // 使用BackpressureStrategy.MISSING背压策略
// 线程切换,异步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
复制代码
咱们使用BackpressureStrategy.MISSING背压策略,观察者接收request(Integer.MAX_VALUE),此值也为推荐值。
结果:
咱们看到,当发送了128个事件后,再发送第129个事件时候,抛了MissingBackpressureException
异常,并且咱们设置了观察者接收也未接收到数据,说明是先存入缓存区队列,再发送,当缓存区中抛异常后,就中止了onNext()
事件,咱们能够验证一下,当设置被观察者发送128
事件。
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// ******* 发送128个事件 ********
for (int x = 0; x < 128; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 发送了" + x + "个事件");
}
}
}, BackpressureStrategy.MISSING)
// 线程切换,异步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
复制代码
就是在上面demo的基础上,改了发送的事件个数,上游发送128个事件,恰好为缓存区大小,并不抛异常。
结果:
咱们看到程序没有抛异常,而且正常打印了缓存区中的128个数据(从0开始),能够印证两点
一、缓存区大小确实为128
二、先存入缓存区后再获取(若是异常,
onNext
直接不调用)
当缓存区大小存满(128)(默认缓存区大小128),被观察者仍然继续发送下一个事件时,直接抛出异常MissingBackpressureException
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 发送129个事件,模拟超出缓存区
for (int x = 0; x < 129; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 发送了" + x + "个事件");
}
}
}, BackpressureStrategy.ERROR)
// 线程切换,异步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
复制代码
使用 BackpressureStrategy.ERROR 背压策略
结果:
跟Missing同样,直接抛了MissingBackpressureException
异常且下游未接收到数据,同理,若是上游发送数据小于等于128,正常发送和接收。
当缓存区大小存满(128),被观察者仍然继续发送下一个事件时,缓存区大小设置无限大, 即被观察者可无限发送事件,但其实是存放在缓存区。
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 发送129个事件,模拟超出缓存区
for (int x = 0; x < 129; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 发送了" + x + "个事件");
}
}
}, BackpressureStrategy.BUFFER)
// 线程切换,异步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
复制代码
使用 BackpressureStrategy.BUFFER 背压策略
更改缓存区大小,不作限制。
结果:
能够看到,咱们发送的129个事件所有发送且接收到了。
当缓存区大小存满,被观察者仍然继续发送下一个事件时, 超过缓存区大小(128)的事件会被所有丢弃
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 发送129个事件,模拟超出缓存区
for (int x = 0; x < 129; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 发送了" + x + "个事件");
}
}
}, BackpressureStrategy.DROP)
// 线程切换,异步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
复制代码
使用 BackpressureStrategy.DROP 背压策略
丢掉大于缓存区的事件。
结果:
结果很明了,并无抛异常同时也正常打印了,可是超过缓存区的那个事件被抛弃,并无获取到。
当缓存区大小存满,被观察者仍然继续发送下一个事件时,只保存最新/最后发送的事件, 其余超过缓存区大小(128)的事件会被所有丢弃
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 发送150个事件
for (int x = 0; x < 150; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 发送了" + x + "个事件");
}
}
}, BackpressureStrategy.LATEST)
// 线程切换,异步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
复制代码
使用 BackpressureStrategy.LATEST 背压策略
发送了150个事件
当超出128时,会保存最新的一个事件,即会接收129个事件。
结果:
咱们能够看到,观察者端接收到129个数据,分别为缓存区内数据,加上最新/最后一条数据,中间数据均被丢弃。
前面说过,背压前提是异步操做下,在同步下,咱们并不会有背压一说,由于在同一个线程,发送数据后老是要等下游处理了才会发送第二条数据,不会存在缓冲区,以下:
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
LogUtil.d(TAG + "--subscribe 发送事件一");
emitter.onNext("事件一");
LogUtil.d(TAG + "--subscribe 发送事件二");
emitter.onNext("事件二");
LogUtil.d(TAG + "--subscribe 发送事件三");
emitter.onNext("事件三");
LogUtil.d(TAG + "--subscribe 发送完成");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(3);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
复制代码
结果:
能够看到,事件都是顺序执行,发送一条接收一条,而后再执行下一条。
可是,咱们可能会遇到这个一个状况,当上游发送了四条数据,可是下游只接收三条?咱们改一下demo以下:
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
LogUtil.d(TAG + "--subscribe 发送事件一");
emitter.onNext("事件一");
LogUtil.d(TAG + "--subscribe 发送事件二");
emitter.onNext("事件二");
LogUtil.d(TAG + "--subscribe 发送事件三");
emitter.onNext("事件三");
LogUtil.d(TAG + "--subscribe 发送事件四");
emitter.onNext("事件四");
LogUtil.d(TAG + "--subscribe 发送完成");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(3);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
复制代码
能够看到,被观察者发送了四个事件,可是观察者只接收了三条。
结果:
能够看到,一样抛了MissingBackpressureException
异常
这里可使用BUFFER的背压策略来处理,可是咱们为了说明观察者反向控制被观察者,咱们采用以下方案:
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 经过emitter.requested()获取观察者设置的接收的事件数目
long requested = emitter.requested();
LogUtil.d(TAG + "--subscribe 观察者设置接收的事件数目:" + requested);
for (int x = 0; x < requested; x++) {
LogUtil.d(TAG + "--subscribe 发送事件" + x);
emitter.onNext("发送事件" + x);
}
LogUtil.d(TAG + "--subscribe 发送完成");
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
// 设置观察者接收事件数目为3
s.request(3);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.e(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
复制代码
咱们在
subscribe
中经过emitter.requested()
获取观察者中设置的接收事件数目,来动态的发送数据,这样就避免了上下游数据不一样步问题。
结果:
咱们前面都是经过create来建立Flowable
,能够在Create
第二个参数中传入相应的背压策略,Flowable
全部的操做符都支持背压,可是经过操做符建立的背压策略默认为BackpressureStrategy.ERROR,咱们能够经过
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
三种方式来指定相应的背压策略。
Flowable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
subscription = s;
s.request(Long.MAX_VALUE); //默承认以接收Long.MAX_VALUE个事件
}
@Override
public void onNext(Long aLong) {
LogUtil.i(TAG + "--onNext aLong=" + aLong);
try {
// 延时一秒接收
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
LogUtil.e(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.i(TAG + "--onComplete");
}
});
复制代码
这里咱们经过 interval
来建立Flowable
,能够看到下游每一毫秒发送一条数据,下游一秒处理一条,上游明显快于下游,处理不过来数据放入缓存池中,当缓存池中队列满时,就会抛异常,由于其默认的背压策略为BackpressureStrategy.ERROR
结果:
咱们能够经过onBackpressureXXX
其指定相应的背压策略。
结果:
当咱们指定背压策略为BUFFER后,能够看到并无异常抛出,程序一直在打印输出。
只发射单个数据或错误事件。
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
// 只能发送onSuccess或者onError,发射多条数据,只接受第一条
emitter.onSuccess("Success");
emitter.onError(new NullPointerException(""));
}
}).subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
LogUtil.d(TAG + "--onSubscribe");
}
@Override
public void onSuccess(String s) {
LogUtil.d(TAG + "--onSuccess s=" + s);
}
@Override
public void onError(Throwable e) {
LogUtil.e(TAG + "--onError error=" + e.getMessage());
}
});
复制代码
SingleEmitter
发射器只能发送一条onSuccess
或者onError
数据,若是发射器发射多条数据,观察者只能接收到第一条数据。
结果:
不发射数据,只处理 onComplete 和 onError 事件。
方法
onComplete
与onError
只可调用一个,同时调用,第一个生效。
可以发射0或者1个数据,要么成功,要么失败。有点相似于Optional。
onSuccess
方法一次订阅只能发送一次。方法
onComplete
与onError
只可调用一个,同时调用,第一个生效。