关于 RxJava 最友好的文章—— RxJava 2.0 全新来袭

前言

以前写RxJava相关文章的时候,就有人想让我谈谈RxJava2.0的新特性,说实话,一开始我是拒绝的。由于在我看来,RxJava2.0虽然是版本的重大升级,但总归仍是RxJava,升级一个版本还能上天是咋的?了解一下它的更新文档不就行了么?真的有必要单出一篇文章来谈这个么?javascript

可是详细的了解了RxJava2.0以及部分源码以后,我以为仍是有必要对RxJava2.0作一个说明,帮助你们对于RxJava有更好的认识。java


铺垫

假如你还不是很熟悉RxJava,或者对于背压这个概念(2.0更新中会涉及到背压的概念)很模糊,但愿你也能读一读下面两篇铺垫的文章:react

关于背压的那篇文章原本是本文的一部分,由于篇幅过大,被剥离出去了,因此建议你们有时间也一并阅读。android


正文

RxJava2.0有不少的更新,一些改动甚至冲击了我以前的文章里的内容,这也是我想写这篇文章的缘由之一。不过想要写这篇文章其实也是有难度的,由于相关的资料去实际上是不多的,还得本身硬着头皮上....不过俗话说得好,有困难要上,没有困难创造困难也要上。git

在这里,我会按照咱们以前关于RxJava的文章的讲述顺序:观察者模式,操做符,线程调度,这三个方面依次看有哪些更新。github


添加依赖

这个估计得放在最前面。app

Android端使用RxJava须要依赖新的包名:异步

//RxJava的依赖包(我使用的最新版本)
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    //RxAndroid的依赖包
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'复制代码

观察者模式

首先声明,RxJava以观察者模式为骨架,在2.0中依然如此ide

不过这次更新中,出现了两种观察者模式:post

  • Observable(被观察者)/Observer(观察者)
  • Flowable(被观察者)/Subscriber(观察者)

RxJava2.X中,Observeable用于订阅Observer,是不支持背压的,而Flowable用于订阅Subscriber,是支持背压(Backpressure)的。

关于背压这个概念以及它在1.0版本中的缺憾在上一篇文章中我已经介绍到了,若是你不是很清楚,我在这里在作一个介绍:背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的状况下,一种告诉上游的被观察者下降发送速度的策略,在1.0中,关于背压最大的遗憾,就是集中在Observable这个类中,致使有的Observable支持背压,有的不支持。为了解决这种缺憾,新版本把支持背压和不支持背压的Observable区分开来。

Observable/Observer

Observable正经常使用法:

Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        });

Observer mObserver=new Observer<Integer>() {
            //这是新加入的方法,在订阅后发送数据以前,
            //回首先调用这个方法,而Disposable可用于取消订阅
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

mObservable.subscribe(mObserver);复制代码

这种观察者模型是不支持背压的。

啥叫不支持背压呢?

当被观察者快速发送大量数据时,下游不会作其余处理,即便数据大量堆积,调用链也不会报MissingBackpressureException,消耗内存过大只会OOM

我在测试的时候,快速发送了100000个整形数据,下游延迟接收,结果被观察者的数据所有发送出去了,内存确实明显增长了,遗憾的是没有OOM。

因此,当咱们使用Observable/Observer的时候,咱们须要考虑的是,数据量是否是很大(官方给出以1000个事件为分界线,仅供各位参考)

Flowable/Subscriber

Flowable.range(0,10)
        .subscribe(new Subscriber<Integer>() {
            Subscription sub;
            //当订阅后,会首先调用这个方法,其实就至关于onStart(),
            //传入的Subscription s参数能够用于请求数据或者取消订阅
            @Override
            public void onSubscribe(Subscription s) {
                Log.w("TAG","onsubscribe start");
                sub=s;
                sub.request(1);
                Log.w("TAG","onsubscribe end");
            }

            @Override
            public void onNext(Integer o) {
                Log.w("TAG","onNext--->"+o);
                sub.request(1);
            }
            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }
            @Override
            public void onComplete() {
                Log.w("TAG","onComplete");
            }
        });复制代码

输出以下:

onsubscribe start
onNext--->0
onNext--->1
onNext--->2
...
onNext--->9
onComplete
onsubscribe end复制代码

Flowable是支持背压的,也就是说,通常而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。

固然,Flowable也能够经过creat()来建立:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        }
        //须要指定背压策略
        , BackpressureStrategy.BUFFER);复制代码

Flowable虽然能够经过create()来建立,可是你必须指定背压的策略,以保证你建立的Flowable是支持背压的(这个在1.0的时候就很难保证,能够说RxJava2.0收紧了create()的权限)。

根据上面的代码的结果输出中能够看到,当咱们调用subscription.request(n)方法的时候,不等onSubscribe()中后面的代码执行,就会马上执行到onNext方法,所以,若是你在onNext方法中使用到须要初始化的类时,应当尽可能在subscription.request(n)这个方法调用以前作好初始化的工做;

固然,这也不是绝对的,我在测试的时候发现,经过create()自定义Flowable的时候,即便调用了subscription.request(n)方法,也会等onSubscribe()方法中后面的代码都执行完以后,才开始调用onNext。

TIPS: 尽量确保在request()以前已经完成了全部的初始化工做,不然就有空指针的风险。

其余观察者模式

固然,除了上面这两种观察者,还有一类观察者

  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver

其实这三者都差很少,Maybe/MaybeObserver能够说是前二者的复合体,所以以Maybe/MaybeObserver为例简单介绍一下这种观察者模式的用法

//判断是否登录
Maybe.just(isLogin())
    //可能涉及到IO操做,放在子线程
    .subscribeOn(Schedulers.newThread())
    //取回结果传到主线程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new MaybeObserver<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Boolean value) {
                if(value){
                    ...
                }else{
                    ...
                }
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });复制代码

上面就是Maybe/MaybeObserver的普通用法,你能够看到,实际上,这种观察者模式并不用于发送大量数据,而是发送单个数据,也就是说,当你只想要某个事件的结果(true or false)的时候,你能够用这种观察者模式


这是上面那些被观察者的上层接口:

//Observable接口
interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}
//Single接口
interface SingleSource<T> {
    void subscribe(SingleObserver<? super T> observer);
}
//Completable接口
interface CompletableSource {
    void subscribe(CompletableObserver observer);
}
//Maybe接口
interface MaybeSource<T> {
    void subscribe(MaybeObserver<? super T> observer);
}
//Flowable接口
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}复制代码

其实咱们能够看到,每一种观察者都继承自各自的接口,这也就把他们能彻底的区分开,各自独立(特别是Observable和Flowable),保证了他们各自的拓展或者配套的操做符不会相互影响。

例如flatMap操做符实现:

//Flowable中flatMap的定义
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

//Observable中flatMap的定义
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);复制代码

假如你想为Flowable写一个自定义的操做符,那么只要保证Function< Publisher >中的类型实现了Publisher接口便可。这么说可能很抽象,你们不理解其实也不要紧,由于并不推荐你们自定义操做符,RxJava中的操纵符的组合已经能够知足你们的需求了。

固然,你也会注意到上面那些接口中的subscribe()方法的返回类型为void了,在1.X中,这个方法通常会返回一个Subscription对象,用于取消订阅。如今,这个功能的对象已经被放到观察者Observer或者subscriber的内部实现方法中了,

Flowable/Subscriber

public interface Subscriber<T> {  
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}复制代码

上面的实例中,onSubscribe(Subscription s)传入的参数s就肩负着取消订阅的功能,固然,他也能够用于请求上游的数据。

在Observable/observer中,传入的参数是另外一个对象

Observable/Observer

public interface Observer<T> {
   void onSubscribe(Disposable d);
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();
}

public interface Disposable {
    /** * Dispose the resource, the operation should be idempotent. */
    void dispose();
    /** * Returns true if this resource has been disposed. * @return true if this resource has been disposed */
    boolean isDisposed();
}复制代码

在Observer接口中,onSubscribe(Disposable d)方法传入的Disposable也是用于取消订阅,基本功能是差很少的,只不过命名不一致,你们知道就好。

其实这种设计能够说仍是符合逻辑的,由于取消订阅这个动做就只有观察者(Observer等)才能作的,如今把它并入到观察者内部,也算瓜熟蒂落吧。

最后再提一点更新,就是被观察者再也不接收null做为数据源了。


操做符相关

这一块其实能够说没什么改动,大部分以前你用过的操做符都没变,即便有所变更,也只是包名或类名的改动。你们可能常常用到的就是Action和Function。

Action相关

以前我在文章里介绍过关于Action这类接口,在1.0中,这类接口是从Action0,Action1...日后排(数字表明可接受的参数),如今作出了改动

Rx1.0-----------Rx2.0

Action0--------Action
Action1--------Consumer
Action2--------BiConsumer
后面的Action都去掉了,只保留了ActionN

Function相关

同上,也是命名方式的改变

上面那两个类,和RxJava1.0相比,他们都增长了throws Exception,也就是说,在这些方法作某些操做就不须要try-catch

例如:

Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);复制代码

Files.readLines(name)这类io方法原本是须要try-catch的,如今直接抛出异常,就能够放心的使用lambda ,保证代码的简洁优美。

doOnCancel/doOnDispose/unsubscribeOn

以doOnCancel为例,大概就是当取消订阅时,会调用这个方法,例如:

Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);复制代码

take新操符会取消后面那些还未被发送的事件,于是会触发doOnCancel

其余的一些操做符基本没变,或者只是改变了名字,在这里就不一一介绍了,须要提一下的是,不少操做符都有两套,一套用于Observable,一套用于Flowable。


线程调度

能够说这一起基本也没有改动,若是必定要说的话。

  • 那就是去掉了Schedulers.immediate()这个线程环境
  • 移除的还有Schedulers.test()(我好像历来没用过这个方法)
  • io.reactivex.Scheduler这个抽象类支持直接调度自定义线程任务(这个我也没怎么用)

补充

若是你想把本身的RxJava1.0的迁移到2.0的版本,可使用这个库RxJava2Interop,它能够在Rxjava1.0和2.0之间相互转换,也就是说,不只能够把1.0的代码迁移到2.0,你还能够把2.0的代码迁移到1.0,哈哈。

补充2

在RxJava1.0中,有的人会使用CompositeSubscription来收集Subscription,来统一取消订阅,如今在RxJava2.0中,因为subscribe()方法如今返回void,那怎么办呢?

其实在RxJava2.0中,Flowable提供了subscribeWith这个方法能够返回当前订阅的观察者,而且经过ResourceSubscriber DisposableSubscriber等观察者来提供 Disposable的接口

因此,若是想要达成RxJava1.0的效果,如今应该是这样作:

CompositeDisposable composite = new CompositeDisposable();

composite.add(Flowable.range(1, 8).subscribeWith(subscriber));

这个subscriber 应该是 ResourceSubscriber 或者 DisposableSubscriber 的实例。


结尾

其实从整篇文章的分析来看,改动最大的仍是观察者模式的实现,被拆分和细化了,主要分红了Observable和Flowable两大类,固然还有与之相关联的其余变更,整体来看这一版本能够说是对于观察者和被观察者的重构

RxJava2.0的范例代码我没精力去写了,也正巧有位外国朋友已经写了RxJava2.0的demo,下面是他的项目地址:

RxJava2-Android-Samples

固然,学习2.0 的过程当中有什么问题也能够在这里留言讨论。


后记

这篇文章半个月前就开始写了,可是一直不太满意,因此在草稿箱里躺了好久,可是本着不放弃任何一篇落后文章的信念,仍是振做起来,完成关于RxJava2.0的介绍。你可能不信,写完顿时有了一种解(xu)脱的感受。

身体被掏空...


附录

下面我截图展现一下2.0相对于1.0的一些改动的细节,仅作参考。





其实这些都是官方给出的列表,截图在这里只是方便你们观摩。

相关文章
相关标签/搜索