给初学者的RxJava2.0教程(六)

Outlinejava

[TOC]app

前言

在上一节中, 咱们找到了上下游流速不均衡从而致使BackPressureException出现的源头 , 在这一节里咱们将学习如何去治理它 . 可能不少看过其余人写的文章的朋友都会以为只有Flowable才能解决 , 因此你们对这个Flowable都抱有很大的期许 , 其实呐 , 大家毕竟图样图森破 , 今天咱们先抛开Flowable, 仅仅依靠咱们本身的双手和智慧 , 来看看咱们如何去治理 , 经过本节的学习以后咱们再来看Flowable, 你会发现它其实并无想象中那么牛叉, 它只是被其余人过分神化了. ide

正题

咱们接着来看上一节的这个例子:工具

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {  //无限循环发送事件
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });复制代码

上一节中咱们看到了它的运行结果是直接爆掉了内存, 也明白它为何就爆掉了内存, 那么咱们能作些什么, 才能不让这种状况发生呢. 学习

以前咱们说了, 上游发送的全部事件都放到水缸里了, 因此瞬间水缸就满了, 那咱们能够只放咱们须要的事件到水缸里呀, 只放一部分数据到水缸里, 这样不就不会溢出来了吗, 所以, 咱们把上面的代码修改一下:优化

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer % 10 == 0;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });复制代码

在这段代码中咱们增长了一个filter, 只容许能被10整除的事件经过, 再来看看运行结果:spa

filter.gif

能够看到, 虽然内存依然在增加, 可是增加速度相比以前, 已经减小了太多了, 至少在我录完GIF以前尚未爆掉内存, 你们能够试着改为能被100整除试试.3d

能够看到, 经过减小进入水缸的事件数量的确能够缓解上下游流速不均衡的问题, 可是力度还不够, 咱们再来看一段代码:code

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .sample(2, TimeUnit.SECONDS)  //sample取样
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });复制代码

这里用了一个sample操做符, 简单作个介绍, 这个操做符每隔指定的时间就从上游中取出一个事件发送给下游. 这里咱们让它每隔2秒取一个事件给下游, 来看看此次的运行结果吧:cdn

sample.gif

此次咱们能够看到, 虽然上游仍然一直在不停的发事件, 可是咱们只是每隔必定时间取一个放进水缸里, 并无所有放进水缸里, 所以此次内存仅仅只占用了5M.

你们之后能够出去吹牛逼了: 我曾经经过技术手段去优化一个程序, 最终使得内存占用从300多M变成不到5M. ~(≧▽≦)/~

前面这两种方法归根到底其实就是减小放进水缸的事件的数量, 是以数量取胜, 可是这个方法有个缺点, 就是丢失了大部分的事件.

那么咱们换一个角度来思考, 既然上游发送事件的速度太快, 那咱们就适当减慢发送事件的速度, 从速度上取胜, 听上去不错, 咱们来试试:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                    Thread.sleep(2000);  //每次发送完事件延时2秒
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });复制代码

此次咱们让上游每次发送完事件后都延时了2秒, 来看看运行结果:

sleep.gif

完美 ! 一切都是那么完美 !

能够看到, 咱们给上游加上延时了以后, 瞬间一头发情的公牛就变得跟只小绵羊同样, 如此温顺, 如此平静, 如此平稳的内存线, 美妙极了. 并且事件也没有丢失, 上游经过适当的延时, 不但减缓了事件进入水缸的速度, 也可让下游充足的时间从水缸里取出事件来处理 , 这样一来, 就不至于致使大量的事件涌进水缸, 也就不会OOM啦.

到目前为止, 咱们没有依靠任何其余的工具, 就轻易解决了上下游流速不均衡的问题.

所以咱们总结一下, 本节中的治理的办法就两种:

  • 一是从数量上进行治理, 减小发送进水缸里的事件
  • 二是从速度上进行治理, 减缓事件发送进水缸的速度

你们必定没忘记, 在上一节还有个Zip的例子, 这个例子也爆了咱们的内存, 现学现用, 咱们用刚学到的办法来试试能不能惩奸除恶, 先来看看第一种办法.

先来减小进入水缸的事件的数量:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io()).sample(2, TimeUnit.SECONDS); //进行sample采样

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.w(TAG, throwable);
            }
        });复制代码

来试试运行结果吧:

zip_sample.gif

哈哈, 成功了吧, 再来用第二种办法试试.

此次咱们来减缓速度:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                    Thread.sleep(2000);  //发送事件以后延时2秒
                }
            }
        }).subscribeOn(Schedulers.io());

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.w(TAG, throwable);
            }
        });复制代码

来看看运行结果吧:

zip_sleep.gif

果真也成功了, 这里只打印出了下游收到的事件, 因此只有一个. 若是你对这个结果看不懂, 请自觉掉头看前面几篇文章.

经过本节的学习, 你们应该对如何处理上下游流速不均衡已经有了基本的认识了, 你们也能够看到, 咱们并无使用Flowable, 因此不少时候仔细去分析问题, 找到问题的缘由, 从源头去解决才是最根本的办法. 后面咱们讲到Flowable的时候, 你们就会发现它其实没什么神秘的, 它用到的办法和咱们本节所讲的基本上是同样的, 只是它稍微作了点封装.

好了, 今天的教程就到这里吧, 下一节中咱们就会来学习大家喜闻乐见的Flowable.

相关文章
相关标签/搜索