RxJava2.0实用操做符总结及原理简析

欢迎关注微信公众号「随手记技术团队」,查看更多随手记团队的技术文章。
本文做者:周浩源
原文连接:mp.weixin.qq.com/s/OJCEyH1gJ…html

大概从2015年开始,RxJava1.0开始快速流行起来,短短两年时间,RxJava在Android开发中已经算是无人不知无人不晓了,加之它与Retrofit等流行框架的完美结合,已经成为Android项目开发的必备利器。随手记做为一个大型项目,引入三方框架一直比较慎重,但也从今年初开始,正式引入了RxJava2.0,并配合Retrofit对项目的网络框架和繁琐的异步逻辑进行重构。RxJava虽然好用,但伴随而来的是不可避免的学习成本,为了让你们快速的了解RxJava的前因后果以及快速上手使用,特意总结该篇文章。本文将详细讲解如何快速理解RxJava的操做符,并从源码角度来分析RxJava操做符的原理。java

RxJava的优势

简单来说RxJava是一个简化异步调用的库,但其实它更是一种优雅的编程方式和编程思想,当你熟悉RxJava的使用方式以后,会很容易爱上它。
我总结它的优势主要有两个方面:react

  • 简洁,免除传统异步代码逻辑中的callback hell
  • 增长业务逻辑代码的可读性

关于第一点你们应该都会认同,关于第二点可能有人会有疑惑,由于不少人以为RxJava大量不明因此的操做符会让代码的可读性变得更差,其实产生这种印象偏偏就是由于没有掌握RxJava操做符的使用和原理所致使的。
好比随手记项目中绑定用户QQ帐号的业务逻辑,这段逻辑的代码涉及三个异步接口,两个是QQ登陆SDK的,一个是随手记后台的,在使用RxJava重构前,这段代码使用了3个AsyncTask,也就是三个嵌套的回调,代码复杂,可读性很是差。而改造以后,它变成了下面这样子git

若是你对这里面的几个RxJava操做符比较熟悉的话,你会迅速了解我这段代码作了什么事情,并且不用再去梳理一堆嵌套回调了,这就是RxJava带来的可读性。
因此,学习RxJava,理解和掌握操做符是不可避免的第一步。github

RxJava2.0与RxJava1.0的关系

从RxJava1.0到RxJava2.0,基本思想没有变化,但RxJava2.0按照Reactive-Streams规范对整个架构进行了从新设计,并变动了Maven仓库依赖地址和包名。因此如今RxJava的github网站中,RxJava1.0和RxJava2.0是两个独立的分支,不相互兼容 ,也不能同时使用,并且RxJava1.0再过一段时间也将再也不维护。因此,目前还使用RxJava1.0的,建议尽早切换到RxJava2.0,而若是没有接触过RxJava1.0,直接使用和学习RxJava2.0就能够了。若是想了解RxJava1.0和RxJava2.0的详细区别,请参考官方文档
为行文方便,今后处开始,本文使用Rx来表示RxJava2.x。编程

Rx的操做符有哪些

刚接触Rx的人面对一堆各式各样的操做符会以为不知如何去学习记忆,其实你只须要从总体上了解Rx操做符的类别和掌握一些使用频率较高的操做符就足够了,至于其余的操做符,你只须要知道它的使用场景和掌握如何快速理解一个操做符的方法,就能够在须要的时候快速拿来用了。
下图是我根据官方文档总结的Rx操做符的分类及每一个类别下的表明性操做符
数组

Rx主要操做符
Rx主要操做符

从上图能够看出,Rx的操做符主要十个大类别,每一个类别下经常使用的操做符也就三五个左右,因此只要掌握这些,你就能够应付大部分的业务场景了。

如何快速理解一个Rx操做符

提到Rx操做符,相信不少人都会对描述Rx操做符的花花绿绿的宝石图有很大印象。
bash

Rx宝石图
Rx宝石图

要快速理解Rx操做符,看懂宝石图是个快捷有效的方式,如今咱们就来详细分析一下构成宝石图的各个主要元素。
首先,咱们有必要回顾一下Rx中的几个主要的基类

  • io.reactivex.Flowable : 事件源(0..N个元素), 支持 Reactive-Streams and 背压
    • io.reactivex.Observable:事件源(0..N个元素), 不支持背压
    • io.reactivex.Single: 仅发射一个元素或产生error的事件源,
    • io.reactivex.Completable: 不发射任何元素,只产生completion或error的事件源
    • io.reactivex.Maybe: 不发射任何元素,或只发射一个元素,或产生error的事件源
    • Subject: 既是事件源,也是事件接受者
      能够看到Rx中最重要的概念就是事件源了,基本上全部的操做符都是针对事件源来进行一些转换、组合等操做,而咱们最经常使用的事件源就是Observable了。

本文中咱们就以Observable事件源为例来说解Rx的操做符,Observable发射的事件咱们统一称之为item。首先咱们须要详细了解一下宝石图中各个图像元素的含义:微信

  • —>Observable的时间线,从左至右流动
  • :星星、圆、方块等表示Observable发射的item
  • |:时间线最后的小竖线表示Observable的事件流已经成功发射完毕了
  • X:时间线最后的X符合表示因为某种缘由Observable非正常终止发射,产生了error

上面几种元素组合在一块儿表明一个完整的Observable,也能够称为源Observable网络

-->方向朝下的虚线箭头表示以及中间的长方框表示正在对上面的源Observable进行某种转换。长方框里的文字展现了转换的性质。下面的Observable是对上面的源Observable转换后的结果。

掌握了宝石图的含义,咱们就能够根据某个操做符的宝石图快速理解这个操做符了。举几个例子:

1. map

map
map

能够看到,这幅图表达的意思是一个源 Observable前后发射了一、二、3的三个item,而通过 map操做符一转换,就变成了一个发射了十、20、30三个item的新的 Observable。描述操做符的长方框中也清楚的说明了该 map操做符进行了何种具体的转换操做(图中的10*x只是一个例子,这个具体的转换函数是能够自定义的)。
因而,咱们就很快速地理解了 map操做符的含义和用法,简单来说,它就是经过一个函数将一个 Observable发射的item逐个进行某种转换。
示例代码:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
    }
}).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(@NonNull Integer integer) throws Exception {
        return integer * 10;
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer result) throws Exception {
        Log.i(TAG, "accept : " + result +"\n" );
    }
});复制代码

输出结果:

2. zip


根据 zip的宝石图,能够知道zip操做符的做用是把多个源 Observable发射的item经过特定函数组合在一块儿,而后发射组合后的item。从图中还能够看到一个重要的信息是,最终发射的item是对上面的两个源 Observable发射的item按照发射顺序逐个组合的结果,并且最终发射的 1A等item的发射时间是由组合它的 1A等item中发射时间较晚的那个item决定的,也正是如此, zip操做符常常能够用在须要同时组合处理多个网络请求的结果的业务场景中。
示例代码:

Observable.zip(Observable.just(1, 2, 3),
        Observable.just("A", "B", "C"),
        new BiFunction<Integer, String, String>() {
            @Override
            public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
                return integer + s;
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.i(TAG, "zip : accept : " + s + "\n");
            }
        });复制代码

输出结果:

3. concat


从宝石图能够看出, concat操做符的做用就是将两个源 Observable发射的item链接在一块儿发射出来。这里的链接指的是总体链接,被 concat操做后产生的 Observable会先发射第一个源 Observable的全部item,而后紧接着再发射第二个源 Observable的全部的item。
示例代码:

Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.i(TAG, "concat : " + integer + "\n");
            }
        });复制代码

输出结果:

大部分操做符都配有这样的宝石图,经过官方文档或者直接在Rx源码中查看JavaDoc就能够找到,再也不过多举例。你也能够在rxmarbles这样的网站上查看更多能够动态交互的宝石图。

Rx操做符的原理

要了解操做符的原理,确定要从源码入手喽。因此咱们先来简单撸一遍Rx的最基本的Create操做符的源码。
Rx的源码目录结构是比较清晰的,咱们先从Observable.create方法来分析

Observable.create(new ObservableOnSubscribe<String>() {
  @Override
  public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
      e.onNext("s");
  }
}).subscribe(new Observer<String>() {
  @Override
  public void onSubscribe(@NonNull Disposable d) {
    // 建立的Observer中多了一个回调方法onSubscribe,传递参数为Disposable ,Disposable至关于RxJava1.x中的Subscription,用于解除订阅。
  }

  @Override
  public void onNext(@NonNull String s) {

  }

  @Override
  public void onError(@NonNull Throwable e) {

  }

  @Override
  public void onComplete() {

  }
});复制代码

create方法以下

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
   ObjectHelper.requireNonNull(source, "source is null");
   return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}复制代码

代码很简单,第一行判空不用管,第二行调用RxJavaPlugins的方法是为了实现Rx的hook功能,咱们暂时也无需关注,在通常状况下,第二行代码会直接返回它的入参即ObservableCreate对象,ObservableCreateObservable的子类,实现了Observable的一些抽象方法好比subscribeActual。事实上Rx的每一个操做符都对应Observable的一个子类。
这里create方法接受的是一个ObservableOnSubscribe的接口实现类:

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}复制代码

经过注释能够知道这个接口的做用是经过一个subscribe方法接受一个ObservableEmitter类型的实例,俗称发射器。
Observable.create方法执行时,咱们传入的就是一个ObservableOnSubscribe类型的匿名内部类,并实现了它的subscribe方法,而后它又被传入create方法的返回对象ObservableCreate,最终成为ObservableCreate的成员source

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ...复制代码

接着咱们来看Observablesubscribe方法,它的入参是一个Observer(即观察者,也就是事件接收者)

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

       subscribeActual(observer);
   } catch (NullPointerException e) { // NOPMD
       throw e;
   } catch (Throwable e) {
       Exceptions.throwIfFatal(e);
       // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
       RxJavaPlugins.onError(e);

       NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
       npe.initCause(e);
       throw npe;
   }
}复制代码

最终它会调用它的子类ObservableCreatesubscribeActual方法:

@Override
protected void subscribeActual(Observer<? super T> observer) {
   CreateEmitter<T> parent = new CreateEmitter<T>(observer);
   observer.onSubscribe(parent);

   try {
       source.subscribe(parent);
   } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
   }
}复制代码

subscribeActual里首先建立了用于发射事件的CreateEmitter对象parentCreateEmitter实现了接口EmitterDisposable,并持有observer
这段代码的关键语句是source.subscribe(parent),这行代码执行后,就会触发事件源进行发射事件,即e.onNext("s")会被调用。细心的同窗也会注意到这行代码以前,parent先被传入了observeronSubscribe()方法,而在上面咱们说过,observeronSubscribe()方法接受一个Disposable类型的参数,能够用于解除订阅,之因此可以解除订阅,正是由于在触发事件发射以前调用了observeronSubscribe(),给了咱们调用CreateEmitter的解除订阅的方法dispose()的机会。
继续来看CreateEmitteronNext()方法,它最终是经过调用observeronNext()方法将事件发射出去的

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


   private static final long serialVersionUID = -3434801548987643227L;

   final Observer<? super T> observer;

   CreateEmitter(Observer<? super T> observer) {
       this.observer = observer;
   }

   @Override
   public void onNext(T t) {
       if (t == null) {
           onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
           return;
       }
       // 在真正发射以前,会先判断该CreateEmitter是否已经解除订阅
       if (!isDisposed()) {
           observer.onNext(t);
       }
   }
   ...
}复制代码

至此,Rx事件源的建立和订阅的流程就走通了。

下面咱们从map操做符来入手看一下Rx操做符的原理,map方法以下

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
   ObjectHelper.requireNonNull(mapper, "mapper is null");
   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}复制代码

map方法接受一个Function类型的参数mapper,返回了一个ObservableMap对象,它也是继承自Observable,而mapper被传给了ObservableMap的成员function,同时当前的源Observable被传给ObservableMap的成员source,进入ObservableMap

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}复制代码

能够看到这里用到了装饰者模式,ObservableMap持有来自它上游的事件源sourceMapObserver持有来自它下游的事件接收者和咱们实现的转换方法function,在subscribeActual()方法中完成ObservableMapsource的订阅,触发MapObserveronNext()方法,继而未来自source的原始数据通过函数mapper转换后再发射给下游的事件接收者,从而实现map这一功能。

如今咱们终于可以来总结一下包含多个操做符时的订阅流程了,如下面这段代码为例

Observable.
        create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("holen");
            }
        })
        .map(new Function<String, Integer>() {
            @Override
            public Integer apply(@NonNull String s) throws Exception {
                return s.length();
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });复制代码

执行代码时,自上而下每一步操做符都会建立一个新的Observable(均为Observable的子类,对应不一样的操做符),当执行create时,建立并返回了ObservableCreate,当执行map时,建立并返回了ObservableMap,而且每个新的Observable都持有它上游的源Observable(即source)及当前涉及到的操做函数function。当最后一步执行订阅方法subscribe时会触发ObservableMapsubscribeActual()方法,并将最下游的Observer包装成MapObserver,同时该方法又会继续调用它所持有ObservableCreate的订阅方法(即执行source.subscribe),由此也会触发ObservableCreatesubscribeActual()方法,此时咱们的发射器CreateEmitter才会调用它的onNext()方法发射事件,再依次调用MapObserver的操做函数mapperonNext()方法,最终将事件传递给了最下游的ObserveronNext()方法。

我简单的将这段逻辑用下面这幅图来表示

操做符liftcompose

liftcompose在Rx中是两个比较特殊的操做符。
lift让咱们能够对Observer进行封装,在RxJava1.0中大部分变换都基于lift这个神奇的操做符。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) {
   ObjectHelper.requireNonNull(lifter, "onLift is null");
   return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter));
}复制代码

lift操做符接受一个ObservableOperator对象

/**
 * Interface to map/wrap a downstream observer to an upstream observer.
 *
 * @param <Downstream> the value type of the downstream
 * @param <Upstream> the value type of the upstream
 */
public interface ObservableOperator<Downstream, Upstream> {
    /**
     * Applies a function to the child Observer and returns a new parent Observer.
     * @param observer the child Observer instance
     * @return the parent Observer instance
     * @throws Exception on failure
     */
    @NonNull
    Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Exception;
}复制代码

看注释能够知道,这是一个将下游订阅者包装成一个上游订阅者的接口。相似Map操做符中的MapObserver。

compose操做符让咱们能够对Observable进行封装

@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
   return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}复制代码

wrap方法以下,仅仅是走了RxJavaPlugins的流程

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> wrap(ObservableSource<T> source) {
   ObjectHelper.requireNonNull(source, "source is null");
   if (source instanceof Observable) {
       return RxJavaPlugins.onAssembly((Observable<T>)source);
   }
   return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}复制代码

compose方法接受一个ObservableTransformer对象

/**
 * Interface to compose Observables.
 *
 * @param <Upstream> the upstream value type
 * @param <Downstream> the downstream value type
 */
public interface ObservableTransformer<Upstream, Downstream> {
    /**
     * Applies a function to the upstream Observable and returns an ObservableSource with
     * optionally different element type.
     * @param upstream the upstream Observable instance
     * @return the transformed ObservableSource instance
     */
    @NonNull
    ObservableSource<Downstream> apply(@NonNull Observable<Upstream> upstream);
}复制代码

ObservableSource即为咱们的基类Observable继承的惟一接口。看注释能够知道,ObservableTransformer是一个组合多个Observable的接口,它经过一个apply()方法接收上游的Observable,进行一些操做后,返回新的Observable
这里组合多个Observable的意思其实就是组合多个操做符,好比咱们常常会须要在使用Rx进行网络异步请求时进行线程变化,这个操做通常都是差很少的,每次都写会比较烦,这时咱们就可使用compose把经常使用的线程变换的几个操做符组合起来

private final ObservableTransformer schedulersObservable = new ObservableTransformer() {
   @Override
   public ObservableSource apply(Observable upstream) {
       return upstream.subscribeOn(Schedulers.io())
               .unsubscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread());
   }
};  

protected void testCompose() {
   getNetObservable()
           .compose(schedulersObservable)
           .subscribe(new Consumer<String>() {
               @Override
               public void accept(@NonNull String s) throws Exception {
                   mRxOperatorsText.append(s);
               }
           });
}复制代码

关于compose的典型应用,你们有兴趣还能够去看一下开源项目RxLifecycle,它就是巧妙地利用compose操做符来解决了使用Rx可能会出现的内存泄露问题。

Rx操做符的应用场景

说了这么多,其实咱们最关心的仍是Rx操做符的应用场景。其实只要存在异步的地方,均可以优雅地使用Rx操做符。好比不少流行的Rx周边开源项目

而针对本身想要实现的功能情景,如何去选择特定的操做符,官网的文档中也列出了一些指导——Rx操做符决策树

固然除了这些,咱们在开发项目时,还会有各类具体的业务场景须要选择合适的操做符,这里我总结了一些常常遇到的场景以及适合它们的操做符

只要咱们理解了Rx操做符的原理,熟练掌握了一些使用频率较高的操做符,就可以在以上场景中轻松地使用,再也不让本身的代码被复杂的业务逻辑搞得混乱。


以上就是本文的所有内容,关于Rx还有不少东西值得深刻地学习研究,后续有机会再跟你们分享更多Rx的使用心得。

参考

相关文章
相关标签/搜索