给初学者的RxJava2.0教程(七)

Outlinejava

[TOC]react

前言

上一节里咱们学习了只使用Observable如何去解决上下游流速不均衡的问题, 之因此学习这个是由于Observable仍是有不少它使用的场景, 有些朋友自从据说了Flowable以后就以为Flowable能解决任何问题, 甚至有抛弃Observable这种想法, 这是万万不可的, 它们都有各自的优点和不足. android

在这一节里咱们先来学习如何使用Flowable, 它东西比较多, 也比较繁琐, 解释起来也比较麻烦, 但我仍是尽可能用通俗易懂的话来讲清楚, 毕竟, 这是一个通俗易懂的教程.git

正题

咱们仍是以两根水管举例子:github

prepare.png

以前咱们所的上游和下游分别是ObservableObserver, 此次不同的是上游变成了Flowable, 下游变成了Subscriber, 可是水管之间的链接仍是经过subscribe(), 咱们来看看最基本的用法吧: 多线程

Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR); //增长了一个参数

        Subscriber<Integer> downstream = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                s.request(Long.MAX_VALUE);  //注意这句代码
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }

            @Override
            public void onError(Throwable t) {
                 Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        upstream.subscribe(downstream);复制代码

这段代码中,分别建立了一个上游Flowable和下游Subscriber, 上下游工做在同一个线程中, 和以前的Observable的使用方式只有一点点的区别, 先来看看运行结果吧: app

D/TAG: onSubscribe   
D/TAG: emit 1        
D/TAG: onNext: 1     
D/TAG: emit 2        
D/TAG: onNext: 2     
D/TAG: emit 3        
D/TAG: onNext: 3     
D/TAG: emit complete 
D/TAG: onComplete复制代码

结果也和咱们预期的是同样的. 异步

咱们注意到此次和Observable有些不一样. 首先是建立Flowable的时候增长了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法, 这里咱们直接用BackpressureStrategy.ERROR这种方式, 这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException. 其他的策略后面再来说解.ide

另外的一个区别是在下游的onSubscribe方法中传给咱们的再也不是Disposable了, 而是Subscription, 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 以前咱们说调用Disposable.dispose()方法能够切断水管, 一样的调用Subscription.cancel()也能够切断水管, 不一样的地方在于Subscription增长了一个void request(long n)方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:工具

s.request(Long.MAX_VALUE);复制代码

这句代码有什么用呢, 不要它能够吗? 咱们来试试:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });复制代码

此次咱们取消掉了request这句代码, 来看看运行结果:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 1
zlc.season.rxjava2demo W/TAG: onError: 
                              io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
                                  at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
                                  at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
                                  at zlc.season.rxjava2demo.demo.ChapterSeven$3.subscribe(ChapterSeven.java:77)
                                  at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
                                  at io.reactivex.Flowable.subscribe(Flowable.java:12218)
                                  at zlc.season.rxjava2demo.demo.ChapterSeven.demo2(ChapterSeven.java:111)
                                  at zlc.season.rxjava2demo.MainActivity$2.onClick(MainActivity.java:36)
                                  at android.view.View.performClick(View.java:5637)
                                  at android.view.View$PerformClick.run(View.java:22429)
                                  at android.os.Handler.handleCallback(Handler.java:751)
                                  at android.os.Handler.dispatchMessage(Handler.java:95)
                                  at android.os.Looper.loop(Looper.java:154)
                                  at android.app.ActivityThread.main(ActivityThread.java:6119)
                                  at java.lang.reflect.Method.invoke(Native Method)
                                  at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
                                  at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)
zlc.season.rxjava2demo D/TAG: emit 2
zlc.season.rxjava2demo D/TAG: emit 3
zlc.season.rxjava2demo D/TAG: emit complete复制代码

哎哎哎, 大兄弟, 怎么一言不合就抛异常?

从运行结果中能够看到, 在上游发送第一个事件以后, 下游就抛出了一个著名的MissingBackpressureException异常, 而且下游没有收到任何其他的事件. 但是这是一个同步的订阅呀, 上下游工做在同一个线程, 上游每发送一个事件应该会等待下游处理完了才会继续发事件啊, 不可能出现上下游流速不均衡的问题呀.

带着这个疑问, 咱们再来看看异步的状况:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });复制代码

此次咱们一样去掉了request这句代码, 可是让上下游工做在不一样的线程, 来看看运行结果:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 1
zlc.season.rxjava2demo D/TAG: emit 2
zlc.season.rxjava2demo D/TAG: emit 3
zlc.season.rxjava2demo D/TAG: emit complete复制代码

哎, 此次上游正确的发送了全部的事件, 可是下游一个事件也没有收到. 这是由于什么呢?

这是由于Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题, 与咱们以前所讲的控制数量控制速度不太同样, 这种方式用通俗易懂的话来讲就比如是叶问打鬼子, 咱们把上游当作小日本, 把下游看成叶问, 当调用Subscription.request(1)时, 叶问就说我要打一个! 而后小日本就拿出一个鬼子给叶问, 让他打, 等叶问打死这个鬼子以后, 再次调用request(10), 叶问就又说我要打十个! 而后小日本又派出十个鬼子给叶问, 而后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打...

因此咱们把request当作是一种能力, 当成下游处理事件的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会形成一窝蜂的发出一堆事件来, 从而致使OOM. 这也就完美的解决以前咱们所学到的两种方式的缺陷, 过滤事件会致使事件丢失, 减速又可能致使性能损失. 而这种方式既解决了事件丢失的问题, 又解决了速度的问题, 完美 !

可是太完美的东西也就意味着陷阱也会不少, 你可能只是被它的外表所迷惑, 失去了理智, 若是你滥用或者不遵照规则, 同样会吃到苦头.

好比这里须要注意的是, 只有当上游正确的实现了如何根据下游的处理能力来发送事件的时候, 才能达到这种效果, 若是上游根本无论下游的处理能力, 一股脑的瞎他妈发事件, 仍然会产生上下游流速不均衡的问题, 这就比如小日本管他叶问要打几个, 老子直接拿出1万个鬼子, 这尼玛有种打死给我看看? 那么如何正确的去实现上游呢, 这里先卖个关子, 以后咱们再来说解.

学习了request, 咱们就能够解释上面的两段代码了.

首先第一个同步的代码, 为何上游发送第一个事件后下游就抛出了MissingBackpressureException异常, 这是由于下游没有调用request, 上游就认为下游没有处理事件的能力, 而这又是一个同步的订阅, 既然下游处理不了, 那上游不可能一直等待吧, 若是是这样, 万一这两根水管工做在主线程里, 界面不就卡死了吗, 所以只能抛个异常来提醒咱们. 那如何解决这种状况呢, 很简单啦, 下游直接调用request(Long.MAX_VALUE)就好了, 或者根据上游发送事件的数量来request就好了, 好比这里request(3)就能够了.

而后咱们再来看看第二段代码, 为何上下游没有工做在同一个线程时, 上游却正确的发送了全部的事件呢? 这是由于在Flowable里默认有一个大小为128的水缸, 当上下游工做在不一样的线程中时, 上游就会先把事件发送到这个水缸中, 所以, 下游虽然没有调用request, 可是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游.

是否是这样呢, 咱们来验证一下:

public static void request(long n) {
        mSubscription.request(n); //在外部调用request请求上游
    }

    public static void demo3() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;  //把Subscription保存起来
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }复制代码

这里咱们把Subscription保存起来, 在界面上增长了一个按钮, 点击一次就调用Subscription.request(1), 来看看运行结果:

request.gif

结果彷佛像那么回事, 上游发送了四个事件保存到了水缸里, 下游每request一个, 就接收一个进行处理.

刚刚咱们有说到水缸的大小为128, 有朋友就问了, 你说128就128吗, 又不是惟品会周年庆, 我不信. 那就来验证一下:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 128; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });复制代码

这里咱们让上游一次性发送了128个事件, 下游一个也不接收, 来看看运行结果:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0
  ...
zlc.season.rxjava2demo D/TAG: emit 126
zlc.season.rxjava2demo D/TAG: emit 127复制代码

这段代码的运行结果很正常, 没有任何错误和异常, 上游仅仅是发送了128个事件.

那来试试129个呢, 把上面代码中的128改为129试试:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0
  ...
zlc.season.rxjava2demo D/TAG: emit 126
zlc.season.rxjava2demo D/TAG: emit 127
zlc.season.rxjava2demo D/TAG: emit 128  //这是第129个事件
zlc.season.rxjava2demo W/TAG: onError: 
                              io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
                                  at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
                                  at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
                                  at zlc.season.rxjava2demo.demo.ChapterSeven$7.subscribe(ChapterSeven.java:169)
                                  at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
                                  at io.reactivex.Flowable.subscribe(Flowable.java:12218)
                                  at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
                                  at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
                                  at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
                                  at java.util.concurrent.FutureTask.run(FutureTask.java:237)
                                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
                                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
                                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
                                  at java.lang.Thread.run(Thread.java:761)复制代码

此次能够看到, 在上游发送了第129个事件的时候, 就抛出了MissingBackpressureException异常, 提醒咱们发洪水啦. 固然了, 这个128也不是我凭空捏造出来的, Flowable的源码中就有这个buffersize的大小定义, 能够自行查看.

注意这里咱们是把上游发送的事件所有都存进了水缸里, 下游一个也没有消费, 因此就溢出了, 若是下游去消费了事件, 可能就不会致使水缸溢出来了. 这里咱们说的是可能不会, 这也很好理解, 好比刚才这个例子上游发了129个事件, 下游只要快速的消费了一个事件, 就不会溢出了, 但若是下游过了十秒钟再来消费一个, 那确定早就溢出了.

好了, 今天的教程就到这里了, 下一节咱们将会更加深刻的去学习FLowable, 敬请期待.

(哈哈, 给个人RxDownload打个广告: RxDownload是一个基于RxJava的多线程+断点续传的下载工具, 感兴趣的来GitHub点个star吧☺. 电梯直达->戳这里 )

相关文章
相关标签/搜索