Outlinejavascript
[TOC]java
在上一节中, 咱们学习了FLowable的一些基本知识, 同时也挖了许多坑, 这一节就让咱们来填坑吧.react
在上一节中最后咱们有个例子, 当上游一次性发送128个事件的时候是没有任何问题的, 一旦超过128就会抛出MissingBackpressureException
异常, 提示你上游发太多事件了, 下游处理不过来, 那么怎么去解决呢? 并发
咱们先来思考一下, 发送128个事件没有问题是由于FLowable
内部有一个大小为128的水缸, 超过128就会装满溢出来, 那既然你水缸这么小, 那我给你换一个大水缸
如何, 听上去颇有道理的样子, 来试试: ide
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});复制代码
此次咱们直接让上游发送了1000个事件,下游仍然不调用request去请求, 与以前不一样的是, 此次咱们用的策略是BackpressureStrategy.BUFFER
, 这就是咱们的新水缸
啦, 这个水缸就比原来的水缸牛逼多了,若是说原来的水缸是95式步枪, 那这个新的水缸就比如黄金AK , 它没有大小限制, 所以能够存放许许多多的事件. 性能
因此此次的运行结果就是:学习
zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0
zlc.season.rxjava2demo D/TAG: emit 1
zlc.season.rxjava2demo D/TAG: emit 2
...
zlc.season.rxjava2demo D/TAG: emit 997
zlc.season.rxjava2demo D/TAG: emit 998
zlc.season.rxjava2demo D/TAG: emit 999复制代码
不知道你们有没有发现, 换了水缸的FLowable和Observable好像是同样的嘛...测试
不错, 这时的FLowable表现出来的特性的确和Observable如出一辙, 所以, 若是你像这样单纯的使用FLowable, 一样须要注意OOM的问题, 例以下面这个例子:spa
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});复制代码
按照咱们之前学习Observable同样, 让上游无限循环发送事件, 下游一个也不去处理, 来看看运行结果吧:3d
一样能够看到, 内存迅速增加, 直到最后抛出OOM. 因此说不要迷恋FLowable, 它只是个传说.
可能有朋友也注意到了, 以前使用Observable测试的时候内存增加很是迅速, 几秒钟就OOM, 但这里增加速度却比较缓慢, 能够翻回去看以前的文章中的GIF图进行对比, 这也看出FLowable相比Observable, 在性能方面有些不足, 毕竟FLowable内部为了实现响应式拉取作了更多的操做, 性能有所丢失也是在所不免, 所以单单只是说由于FLowable是新兴产物就盲目的使用也是不对的, 也要具体分场景,
那除了给FLowable换一个大水缸还有没有其余的办法呢, 由于更大的水缸也只是缓兵之计啊, 动不动就OOM给你看.
想一想看咱们以前学习Observable的时候说到的如何解决上游发送事件太快的, 有一招叫从数量
上取胜, 一样的FLowable中也有这种方法, 对应的就是BackpressureStrategy.DROP
和BackpressureStrategy.LATEST
这两种策略.
从名字上就能猜到它俩是干啥的, Drop就是直接把存不下的事件丢弃,Latest就是只保留最新的事件, 来看看它们的实际效果吧.
先来看看Drop:
public static void request() {
mSubscription.request(128);
}
public static void demo3() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}复制代码
咱们仍然让上游无限循环发送事件, 此次的策略选择了Drop, 同时把Subscription保存起来, 待会咱们在外部调用request(128)时, 即可以看到运行的结果.
咱们先来猜一下运行结果, 这里为何request(128)呢, 由于以前不是已经说了吗, FLowable内部的默认的水缸大小为128, 所以, 它刚开始确定会把0-127这128个事件保存起来, 而后丢弃掉其他的事件, 当咱们request(128)的时候,下游便会处理掉这128个事件, 那么上游水缸中又会从新装进新的128个事件, 以此类推, 来看看运行结果吧:
从运行结果中咱们看到的确是如此, 第一次request的时候, 下游的确收到的是0-127这128个事件, 但第二次request的时候就不肯定了, 由于上游一直在发送事件. 内存占用也很正常, drop的做用相信你们也很清楚了.
再来看看Latest吧:
public static void request() {
mSubscription.request(128);
}
public static void demo4() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}复制代码
一样的, 上游无限循环发送事件, 策略选择Latest, 同时把Subscription保存起来, 方便在外部调用request(128).来看看此次的运行结果:
诶, 看上去好像和Drop差很少啊, Latest也首先保存了0-127这128个事件, 等下游把这128个事件处理了以后才进行以后的处理, 光从这里没有看出有任何区别啊...
古人云,师者,因此传道受业解惑也。人非生而知之者,孰能无惑?惑而不从师,其为惑也,终不解矣.复制代码
做为初学者的入门导师
, 是不能给你们留下一点点疑惑的, 来让咱们继续揭开这个疑问.
咱们把上面两段代码改良一下, 先来看看DROP的改良版:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10000; i++) { //只发1w个事件
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(128); //一开始就处理掉128个事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});复制代码
这段代码和以前有两点不一样, 一是上游只发送了10000个事件, 二是下游在一开始就立马处理掉了128个事件, 而后咱们在外部再调用request(128)试试, 来看看运行结果:
此次能够看到, 一开始下游就处理掉了128个事件, 当咱们再次request的时候, 只获得了第3317的事件, 后面的事件直接被抛弃了.
再来看看Latest的运行结果吧:
从运行结果中能够看到, 除去前面128个事件, 与Drop不一样, Latest老是能获取到最后最新的事件, 例如这里咱们老是能得到最后一个事件9999.
好了, 关于FLowable的策略咱们也讲完了, 有些朋友要问了, 这些FLowable是我本身建立的, 因此我能够选择策略, 那面对有些FLowable并非我本身建立的, 该怎么办呢? 好比RxJava中的interval操做符, 这个操做符并非咱们本身建立的, 来看下面这个例子吧:
Flowable.interval(1, TimeUnit.MICROSECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000); //延时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});复制代码
interval操做符发送Long型的事件, 从0开始, 每隔指定的时间就把数字加1并发送出来, 在这个例子里, 咱们让它每隔1毫秒就发送一次事件, 在下游延时1秒去接收处理, 不用猜也知道结果是什么:
zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo W/TAG: onError:
io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:87)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:278)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:273)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:761)复制代码
一运行就抛出了MissingBackpressureException
异常, 提醒咱们发太多了, 那么怎么办呢, 这个又不是咱们本身建立的FLowable啊...
别慌, 虽然不是咱们本身建立的, 可是RxJava给咱们提供了其余的方法:
熟悉吗? 这跟咱们上面学的策略是同样的, 用法也简单, 拿刚才的例子现学现用:
Flowable.interval(1, TimeUnit.MICROSECONDS)
.onBackpressureDrop() //加上背压策略
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});复制代码
其他的我就不一一列举了.
好了, 今天的教程就到这里吧, 这一节咱们学习了如何使用内置的BackpressureStrategy来解决上下游事件速率不均衡的问题. 这些策略其实以前咱们将Observable的时候也提到过, 其实大差不差, 只要理解了为何会上游发事件太快, 下游处理太慢这一点, 你就好处理了, FLowable无非就是给你封装好了, 确实对初学者友好一点, 可是不少初学者每每只知道How, 殊不知道Why, 最重要的实际上是知道why, 而不是How.
(其他的教程大多数到这里就结束了, 可是, 你觉得FLowable就这么点东西吗, 骚年, Too young too simple, sometimes naive! 这仅仅是开始, 真正牛逼的还没来呢. 敬请关注下一节, 下节见 ! )