欢迎关注微信公众号「随手记技术团队」,查看更多随手记团队的技术文章。
本文做者:周浩源
原文连接:mp.weixin.qq.com/s/OJCEyH1gJ…html
大概从2015年开始,RxJava1.0开始快速流行起来,短短两年时间,RxJava在Android开发中已经算是无人不知无人不晓了,加之它与Retrofit等流行框架的完美结合,已经成为Android项目开发的必备利器。随手记做为一个大型项目,引入三方框架一直比较慎重,但也从今年初开始,正式引入了RxJava2.0,并配合Retrofit对项目的网络框架和繁琐的异步逻辑进行重构。RxJava虽然好用,但伴随而来的是不可避免的学习成本,为了让你们快速的了解RxJava的前因后果以及快速上手使用,特意总结该篇文章。本文将详细讲解如何快速理解RxJava的操做符,并从源码角度来分析RxJava操做符的原理。java
简单来说RxJava是一个简化异步调用的库,但其实它更是一种优雅的编程方式和编程思想,当你熟悉RxJava的使用方式以后,会很容易爱上它。
我总结它的优势主要有两个方面:react
关于第一点你们应该都会认同,关于第二点可能有人会有疑惑,由于不少人以为RxJava大量不明因此的操做符会让代码的可读性变得更差,其实产生这种印象偏偏就是由于没有掌握RxJava操做符的使用和原理所致使的。
好比随手记项目中绑定用户QQ帐号的业务逻辑,这段逻辑的代码涉及三个异步接口,两个是QQ登陆SDK的,一个是随手记后台的,在使用RxJava重构前,这段代码使用了3个AsyncTask,也就是三个嵌套的回调,代码复杂,可读性很是差。而改造以后,它变成了下面这样子git
若是你对这里面的几个RxJava操做符比较熟悉的话,你会迅速了解我这段代码作了什么事情,并且不用再去梳理一堆嵌套回调了,这就是RxJava带来的可读性。
因此,学习RxJava,理解和掌握操做符是不可避免的第一步。github
从RxJava1.0到RxJava2.0,基本思想没有变化,但RxJava2.0按照Reactive-Streams规范对整个架构进行了从新设计,并变动了Maven仓库依赖地址和包名。因此如今RxJava的github网站中,RxJava1.0和RxJava2.0是两个独立的分支,不相互兼容 ,也不能同时使用,并且RxJava1.0再过一段时间也将再也不维护。因此,目前还使用RxJava1.0的,建议尽早切换到RxJava2.0,而若是没有接触过RxJava1.0,直接使用和学习RxJava2.0就能够了。若是想了解RxJava1.0和RxJava2.0的详细区别,请参考官方文档。
为行文方便,今后处开始,本文使用Rx来表示RxJava2.x。编程
刚接触Rx的人面对一堆各式各样的操做符会以为不知如何去学习记忆,其实你只须要从总体上了解Rx操做符的类别和掌握一些使用频率较高的操做符就足够了,至于其余的操做符,你只须要知道它的使用场景和掌握如何快速理解一个操做符的方法,就能够在须要的时候快速拿来用了。
下图是我根据官方文档总结的Rx操做符的分类及每一个类别下的表明性操做符
数组
提到Rx操做符,相信不少人都会对描述Rx操做符的花花绿绿的宝石图有很大印象。
bash
io.reactivex.Flowable
: 事件源(0..N个元素), 支持 Reactive-Streams and 背压
io.reactivex.Observable
:事件源(0..N个元素), 不支持背压io.reactivex.Single
: 仅发射一个元素或产生error的事件源,io.reactivex.Completable
: 不发射任何元素,只产生completion或error的事件源io.reactivex.Maybe
: 不发射任何元素,或只发射一个元素,或产生error的事件源Subject
: 既是事件源,也是事件接受者事件源
了,基本上全部的操做符都是针对事件源来进行一些转换、组合等操做,而咱们最经常使用的事件源就是Observable
了。本文中咱们就以Observable
事件源为例来说解Rx的操做符,Observable
发射的事件咱们统一称之为item。首先咱们须要详细了解一下宝石图中各个图像元素的含义:微信
—>
:Observable
的时间线,从左至右流动★
:星星、圆、方块等表示Observable
发射的item|
:时间线最后的小竖线表示Observable
的事件流已经成功发射完毕了X
:时间线最后的X符合表示因为某种缘由Observable
非正常终止发射,产生了error上面几种元素组合在一块儿表明一个完整的Observable
,也能够称为源Observable
网络
-->
方向朝下的虚线箭头表示以及中间的长方框表示正在对上面的源Observable
进行某种转换。长方框里的文字展现了转换的性质。下面的Observable
是对上面的源Observable
转换后的结果。
掌握了宝石图的含义,咱们就能够根据某个操做符的宝石图快速理解这个操做符了。举几个例子:
1. map
Observable
前后发射了一、二、3的三个item,而通过
map
操做符一转换,就变成了一个发射了十、20、30三个item的新的
Observable
。描述操做符的长方框中也清楚的说明了该
map
操做符进行了何种具体的转换操做(图中的10*x只是一个例子,这个具体的转换函数是能够自定义的)。
map
操做符的含义和用法,简单来说,它就是经过一个函数将一个
Observable
发射的item逐个进行某种转换。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return integer * 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer result) throws Exception {
Log.i(TAG, "accept : " + result +"\n" );
}
});复制代码
输出结果:
2. zip
zip
的宝石图,能够知道zip操做符的做用是把多个源
Observable
发射的item经过特定函数组合在一块儿,而后发射组合后的item。从图中还能够看到一个重要的信息是,最终发射的item是对上面的两个源
Observable
发射的item按照发射顺序逐个组合的结果,并且最终发射的
1A
等item的发射时间是由组合它的
1
和
A
等item中发射时间较晚的那个item决定的,也正是如此,
zip
操做符常常能够用在须要同时组合处理多个网络请求的结果的业务场景中。
Observable.zip(Observable.just(1, 2, 3),
Observable.just("A", "B", "C"),
new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer + s;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG, "zip : accept : " + s + "\n");
}
});复制代码
输出结果:
3. concat
concat
操做符的做用就是将两个源
Observable
发射的item链接在一块儿发射出来。这里的链接指的是总体链接,被
concat
操做后产生的
Observable
会先发射第一个源
Observable
的全部item,而后紧接着再发射第二个源
Observable
的全部的item。
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "concat : " + integer + "\n");
}
});复制代码
输出结果:
大部分操做符都配有这样的宝石图,经过官方文档或者直接在Rx源码中查看JavaDoc就能够找到,再也不过多举例。你也能够在rxmarbles这样的网站上查看更多能够动态交互的宝石图。
要了解操做符的原理,确定要从源码入手喽。因此咱们先来简单撸一遍Rx的最基本的Create操做符的源码。
Rx的源码目录结构是比较清晰的,咱们先从Observable.create
方法来分析
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("s");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 建立的Observer中多了一个回调方法onSubscribe,传递参数为Disposable ,Disposable至关于RxJava1.x中的Subscription,用于解除订阅。
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});复制代码
create
方法以下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}复制代码
代码很简单,第一行判空不用管,第二行调用RxJavaPlugins
的方法是为了实现Rx的hook功能,咱们暂时也无需关注,在通常状况下,第二行代码会直接返回它的入参即ObservableCreate
对象,ObservableCreate
是Observable
的子类,实现了Observable
的一些抽象方法好比subscribeActual
。事实上Rx的每一个操做符都对应Observable
的一个子类。
这里create
方法接受的是一个ObservableOnSubscribe
的接口实现类:
/**
* A functional interface that has a {@code subscribe()} method that receives
* an instance of an {@link ObservableEmitter} instance that allows pushing
* events in a cancellation-safe manner.
*
* @param <T> the value type pushed
*/
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}复制代码
经过注释能够知道这个接口的做用是经过一个subscribe
方法接受一个ObservableEmitter
类型的实例,俗称发射器。Observable.create
方法执行时,咱们传入的就是一个ObservableOnSubscribe
类型的匿名内部类,并实现了它的subscribe
方法,而后它又被传入create
方法的返回对象ObservableCreate
,最终成为ObservableCreate
的成员source
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
...复制代码
接着咱们来看Observable
的subscribe
方法,它的入参是一个Observer
(即观察者,也就是事件接收者)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}复制代码
最终它会调用它的子类ObservableCreate
的subscribeActual
方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}复制代码
在subscribeActual
里首先建立了用于发射事件的CreateEmitter
对象parent
,CreateEmitter
实现了接口Emitter
和Disposable
,并持有observer
。
这段代码的关键语句是source.subscribe(parent)
,这行代码执行后,就会触发事件源进行发射事件,即e.onNext("s")
会被调用。细心的同窗也会注意到这行代码以前,parent
先被传入了observer
的onSubscribe()
方法,而在上面咱们说过,observer
的onSubscribe()
方法接受一个Disposable
类型的参数,能够用于解除订阅,之因此可以解除订阅,正是由于在触发事件发射以前调用了observer
的onSubscribe()
,给了咱们调用CreateEmitter
的解除订阅的方法dispose()
的机会。
继续来看CreateEmitter
的onNext()
方法,它最终是经过调用observer
的onNext()
方法将事件发射出去的
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 在真正发射以前,会先判断该CreateEmitter是否已经解除订阅
if (!isDisposed()) {
observer.onNext(t);
}
}
...
}复制代码
至此,Rx事件源的建立和订阅的流程就走通了。
下面咱们从map
操做符来入手看一下Rx操做符的原理,map
方法以下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}复制代码
map
方法接受一个Function类型的参数mapper
,返回了一个ObservableMap
对象,它也是继承自Observable
,而mapper
被传给了ObservableMap
的成员function
,同时当前的源Observable
被传给ObservableMap
的成员source
,进入ObservableMap
类
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}复制代码
能够看到这里用到了装饰者模式,ObservableMap
持有来自它上游的事件源source
,MapObserver
持有来自它下游的事件接收者和咱们实现的转换方法function
,在subscribeActual()
方法中完成ObservableMap
对source
的订阅,触发MapObserver
的onNext()
方法,继而未来自source
的原始数据通过函数mapper
转换后再发射给下游的事件接收者,从而实现map这一功能。
如今咱们终于可以来总结一下包含多个操做符时的订阅流程了,如下面这段代码为例
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("holen");
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return s.length();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});复制代码
执行代码时,自上而下每一步操做符都会建立一个新的Observable
(均为Observable
的子类,对应不一样的操做符),当执行create
时,建立并返回了ObservableCreate
,当执行map
时,建立并返回了ObservableMap
,而且每个新的Observable
都持有它上游的源Observable
(即source
)及当前涉及到的操做函数function
。当最后一步执行订阅方法subscribe
时会触发ObservableMap
的subscribeActual()
方法,并将最下游的Observer
包装成MapObserver
,同时该方法又会继续调用它所持有ObservableCreate
的订阅方法(即执行source.subscribe
),由此也会触发ObservableCreate
的subscribeActual()
方法,此时咱们的发射器CreateEmitter
才会调用它的onNext()
方法发射事件,再依次调用MapObserver
的操做函数mapper
和onNext()
方法,最终将事件传递给了最下游的Observer
的onNext()
方法。
我简单的将这段逻辑用下面这幅图来表示
lift
和compose
lift
和compose
在Rx中是两个比较特殊的操做符。lift
让咱们能够对Observer
进行封装,在RxJava1.0中大部分变换都基于lift
这个神奇的操做符。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) {
ObjectHelper.requireNonNull(lifter, "onLift is null");
return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter));
}复制代码
lift
操做符接受一个ObservableOperator
对象
/**
* Interface to map/wrap a downstream observer to an upstream observer.
*
* @param <Downstream> the value type of the downstream
* @param <Upstream> the value type of the upstream
*/
public interface ObservableOperator<Downstream, Upstream> {
/**
* Applies a function to the child Observer and returns a new parent Observer.
* @param observer the child Observer instance
* @return the parent Observer instance
* @throws Exception on failure
*/
@NonNull
Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Exception;
}复制代码
看注释能够知道,这是一个将下游订阅者包装成一个上游订阅者的接口。相似Map操做符中的MapObserver。
而compose
操做符让咱们能够对Observable
进行封装
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}复制代码
wrap
方法以下,仅仅是走了RxJavaPlugins
的流程
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> wrap(ObservableSource<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
if (source instanceof Observable) {
return RxJavaPlugins.onAssembly((Observable<T>)source);
}
return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}复制代码
compose
方法接受一个ObservableTransformer
对象
/**
* Interface to compose Observables.
*
* @param <Upstream> the upstream value type
* @param <Downstream> the downstream value type
*/
public interface ObservableTransformer<Upstream, Downstream> {
/**
* Applies a function to the upstream Observable and returns an ObservableSource with
* optionally different element type.
* @param upstream the upstream Observable instance
* @return the transformed ObservableSource instance
*/
@NonNull
ObservableSource<Downstream> apply(@NonNull Observable<Upstream> upstream);
}复制代码
ObservableSource
即为咱们的基类Observable
继承的惟一接口。看注释能够知道,ObservableTransformer
是一个组合多个Observable
的接口,它经过一个apply()
方法接收上游的Observable
,进行一些操做后,返回新的Observable
。
这里组合多个Observable
的意思其实就是组合多个操做符,好比咱们常常会须要在使用Rx进行网络异步请求时进行线程变化,这个操做通常都是差很少的,每次都写会比较烦,这时咱们就可使用compose
把经常使用的线程变换的几个操做符组合起来
private final ObservableTransformer schedulersObservable = new ObservableTransformer() {
@Override
public ObservableSource apply(Observable upstream) {
return upstream.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
protected void testCompose() {
getNetObservable()
.compose(schedulersObservable)
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append(s);
}
});
}复制代码
关于compose
的典型应用,你们有兴趣还能够去看一下开源项目RxLifecycle,它就是巧妙地利用compose
操做符来解决了使用Rx可能会出现的内存泄露问题。
说了这么多,其实咱们最关心的仍是Rx操做符的应用场景。其实只要存在异步的地方,均可以优雅地使用Rx操做符。好比不少流行的Rx周边开源项目
而针对本身想要实现的功能情景,如何去选择特定的操做符,官网的文档中也列出了一些指导——Rx操做符决策树。
固然除了这些,咱们在开发项目时,还会有各类具体的业务场景须要选择合适的操做符,这里我总结了一些常常遇到的场景以及适合它们的操做符
只要咱们理解了Rx操做符的原理,熟练掌握了一些使用频率较高的操做符,就可以在以上场景中轻松地使用,再也不让本身的代码被复杂的业务逻辑搞得混乱。
以上就是本文的所有内容,关于Rx还有不少东西值得深刻地学习研究,后续有机会再跟你们分享更多Rx的使用心得。