本篇文章已受权微信公众号 YYGeeker
独家发布转载请标明出处react
CSDN学院课程地址缓存
- RxJava2从入门到精通-初级篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-中级篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-进阶篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-源码分析篇:edu.csdn.net/course/deta…
背压的概念是在平时业务开发时较为常见,大多数是针对高并发的业务,背压是必须考虑的因素之一。在异步场景中,因为数据流的发射速度高于数据流的接收速度,就会致使数据不能及时处理,从而致使数据流的阻塞。背压所要作的事情就是主动控制数据流发射的速度bash
在RxJava2.0中,推出了Flowable用来支持背压,去除了Observable对背压的支持,下面在背压策略的讲解中,咱们都使用Flowable做为咱们的响应类型。在使用背压时,只须要在create()
方法中第二个参数添加背压策略便可微信
FlowableSubscriber
,那么须要经过s.request(Long.MAX_VALUE)
去主动请求上游的数据项。若是遇到背压报错的时候,FlowableSubscriber
默认已经将错误try-catch,并经过onError()
进行回调,程序并不会崩溃Consumer
,那么不须要主动去请求上游数据,默认已经调用了s.request(Long.MAX_VALUE)
。若是遇到背压报错、且对Throwable的Consumer
没有new出来,则程序直接崩溃public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
}
复制代码
MISSING表示OnNext事件没有任何缓存和丢弃,下游要处理任何溢出,能够理解为至关于没有指定背压策略。Flowable至关于没有指定背压策略能够将下游要处理任何溢出理解为,上游发射的数据未获得处理,就会缓存起来,当缓存容量达到128时,再增长一个未处理的数据项,就会抛出MissingBackpressureException,且带有队列已经满了的友好提示。这里就比如一个大水缸,当水注满的时候,它就会把盖子盖上,不让你再继续注水了
并发
这里咱们模拟上游发送速度高于下游数据流的处理速度,在数据处理的时候加上
Thread.sleep(1000)
异步
public void missing() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
复制代码
输出ide
io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
复制代码
ERROR表示在下游没法跟上时,会抛出MissingBackpressureException。能够将下游没法跟上理解为,上游发射的数据未获得处理,就会缓存起来,当缓存容量达到128时,再增长一个未处理的数据项,就会抛出MissingBackpressureException。这里比如一个大水缸,当水注满的时候,它会把水缸撑破了,直接破裂
高并发
public void error() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
复制代码
输出源码分析
io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
复制代码
上游不断的发出onNext请求,直到下游处理完,上游发射的数据项的缓存池是无限大的,程序也不会抛出错误,可是要注意程序OOM的现象,由于缓存越大,占用的内存就越多。例子中发射129个数据项,然而程序并无崩溃,只会一直读取缓存池的数据项,直到数据项被处理完。这里就是一个无限大的水缸
ui
背压策略除了BUFFER策略的缓存池是无限大以外,其余默认的缓存池都是128
public void buffer() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
复制代码
输出
onNext=0
onNext=1
onNext=2
......
onNext=998
onNext=999
复制代码
会在下游跟不上速度时,把onNext的值丢弃,简单的说就是,超过缓存区大小(128)的数据项都会被丢弃。例子中经过发射800个数据项,那么咱们只会收到0-127的数据项。若是咱们再次调用request()
,这时候取到的数据就是上一次request()后的128个数据。这里比如一个大水缸,当水注满的时候,水仍是在继续的流,一旦有request调用的时候,它就会去取出水缸里的全部水,这时候水缸就是空的,但水一直在流,因此水缸立刻又会被注满,这个时候就要等request再次取出水缸里的水
public void drop() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
复制代码
输出
onNext=0
onNext=1
onNext=2
......
onNext=127
复制代码
LATEST与Drop策略同样,若是超过缓存池容量大小的数据项都会被丢弃。不一样的是,无论缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中。这里的水缸容纳下了最后一滴水
public void latest() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
复制代码
输出
onNext=0
onNext=1
......
onNext=126
onNext=127
onNext=999
复制代码