如今网上已经有大量的源码分析文章,各类技术的都有。但我以为不少文章对初学者并不友好,让人读起来云里雾里的,比源码还源码。究其缘由,是根本没有从学习者的角度去分析。在本身完成了源码阅读以后,却忘记了本身是如何一步步提出问题,进而走到这里的。php
因此,我想在本篇及之后的文章中,花更多的精力去进行源码的分析,争取用浅显易懂的语言,用适合的逻辑去组织内容。这样不至于陷入源码里,致使文章难懂。尽可能让更多的人愿意去读源码。css
阅读本文,你须要对 RxJava2 的一些基本使用有所了解,不过不用太深。这里推荐下Season_zlc
的给初学者的RxJava2.0教程(一) ,比较浅显易懂。 java
提到 RxJava,你第一个想到的词是什么?react
“异步”。android
RxJava 在 GitHub 上的官网主页也说了,“RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.”(RxJava是一个使用可观测序列来组建异步、基于事件的程序的库,它是 Reactive Extensions 在Java虚拟机上的一个实现)。它的优势嘛,用扔物线凯哥的话讲,就是“简洁”,而且“随着程序逻辑变得愈来愈复杂,它依然可以保持简洁”。nginx
这里要注意一点,虽然对大多数人来说,更多的是使用 RxJava 来配合 Retrofit、OkHttp 进行网络请求框架的封装及数据的异步处理,可是,RxJava和网络请求本质上没有半毛钱的关系。它的本质,官网已经说的很明白了,就是“异步”。git
RxJava 基于观察者模式实现,基于事件流进行链式调用。github
首先,咱们须要添加必要的依赖,这里以最新的2.2.8
版本为例:安全
implementation "io.reactivex.rxjava2:rxjava:2.2.8"
复制代码
固然,对于 Android 项目来说,咱们通常还须要添加一个补充库:网络
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
复制代码
这个库其实就是提供了 Android 相关的主线程的支持。
而后写个简单的代码,就能够开始咱们的源码分析啦。
// 上游 observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
});
// 下游 observer
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
// onSubscribe 方法会最早被执行
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: ");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
// 将上游和下游进行关联
observable.subscribe(observer);
复制代码
为便于理解,我故意将能够链式调用的代码,拆成了三部分。你彻底能够写成下面的链式风格:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
// onSubscribe 方法会最早被执行
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: ");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
复制代码
一样,为了便于理解,我会借用i/o
流里面常常用到的水流
进行类比。将被观察者 observable
称为上游(upstream)
,将观察者 observer
称为下游(downstream)
。读源码其实也能看出,做者自己也正是这么类比的。
经过将整个过程拆分红三个步骤,能更清晰的理清逻辑。咱们须要作的,本质上就是建立一个上游和一个下游,最终经过上游对象的subscribe
方法将两者关联起来:
明白了这三点,之后咱们就不会被各类实现类搞的眼花缭乱。
这三个步骤,里面的核心是第三部,也就是订阅过程,毕竟,这属于一个动做,而咱们进行源码分析的时候,每每就是从动做开始的。这时候,咱们Ctrl/Command + 鼠标左键
,进入该方法看看,里面作了下什么。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// RxJavaPlugins是个钩子函数,用来在代码的执行先后插入进行一些操做
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
// 关键点是这行代码
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
复制代码
这里将this(上游Observable类型)的和下游observer做为参数传给了 RxJavaPlugins 的 onSubscribe(…)方法,并返回一个Observer,同时,将原来的observer指向这个返回值,那么咱们看看这个函数中到底进行了什么操做:
// RxJavaPlugins.java
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
复制代码
这里判断onObservableSubscribe
是否为 null,不为 null 则调用其 apply(…) 方法。若为 null ,则直接返回原来的observer。而该变量须要经过RxJavaPlugin的setOnSingleSubscribe(...)
方法来指定的,显然,咱们并无指定,因此忽略无论(后面遇到相似问题,基本也均可以忽略)。
回到以前的订阅流程,就能够简化为下面这样:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
...
// 调用到具体实现子类的 subscribeActual(observer) 方法
subscribeActual(observer);
} catch (
...
}
}
复制代码
从上面代码能够看出,订阅过程,即调用Observable的subscribe(...)
的过程,其实就是直接调用了其实现类的subscribeActual(observer)
方法(该方法在 Observable 中是个抽象方法)。之后咱们遇到这个方法,就直接去 Observable 的实现类中找便可,就不会乱了。
一些熟悉RxJava的朋友可能会说,有时候咱们经过subscribe(...)
订阅的并非Observer对象,而是consumer对象,有各类重载。以下:
当你传入的是Consumer的时候,无论你传递了几个参数,最终都会代用到如下方法,那些你没传递的 onError或者 onComplete 回调等等,会自动使用默认建立的值。
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
// 最终都会封装成一个 LambdaObserver,并做为参数传入subscribe(...)方法中
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
复制代码
能够看出,这里最终仍是将这些 Consumer
对象包装在了一个 LambdaObserver 类型的变量中,而后又调用了subscribe(...)
方法,将其做为变量传入,以后的分析,就跟上面是同样的了。
订阅方法讲完了,咱们也知道最终调用到了 Observable 的实现类的subscribeActual(...)
方法。那接下来确定就是要弄懂在这个方中作了什么事。咱们例子中是使用Observable.create(...)
方法建立的 observable:
// 上游 observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
});
复制代码
其中,Observable.create(...)
方法的实现是这样的:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码
咱们传进去了一个实现了ObservableOnSubscribe
接口的匿名内部类,该接口类也很简单,就定义了一个void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception
抽象方法。
而后咱们将传进来的source(刚刚提到的匿名内部类ObservableOnSubscribe)封装进一个ObservableCreate
对象中,又传进了RxJavaPlugins.onAssembly(...)
中,这个RxJavaPlugins类刚才咱们说过,其实就是一个hook类,暂时直接忽略,通常就是直接把传进来的参数返回了(不放心的话能够本身点进去,之后遇到该方法再也不赘述)。
也就是说Observable.create(...)
方法最终建立了一个ObservableCreate
对象。注意,该对象是Observable
抽象类的具体实现类。
特别注意!
特别注意!
特别注意!
重要事情说三遍。咱们这里经过create(...)
方法建立的Observable
的具体实现子类是ObservableCreate
。该子类的命名是有规律可言的。我在分析源码的时候有时候就想,这么多看起来名字都同样的类,RxJava的开发者本人不会懵逼吗?做为一个用户量这么大的库,确定各类都有讲究,确定有贵了。嗯。规律就是生成的子类的命名方法为“Observable+建立该类的方法名”
,即:在建立该类的方法名称前面加上个Observable,以此来做为新的类的名称。
不信?
咱们还能够经过Observable.just(...)
这种方式来建立Observable,点进去看看具体子类名字是啥:
其余的本身就去验证吧。
因此,咱们之后遇到Observable
开头的类名,就能够猜想它是一个Observable
类型的变量,类名后面的部分,就是建立该变量的方法(确保严谨,倒推可能不成立,要仔细确认)。
一样的,各类Observer
的实现类也是相似,只不过各类它们是把建立的方法放在了前面,而后以Observer
结尾而已,这点以后遇到的时候会再说起。
回到刚才讲的。咱们经过create(…)方法,建立出来的是ObservableCreate
,它是个Observable,那咱们就直接看它的subscribeActual(...)
方法究竟作了什么:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 首先调用下游 observer 的 onSubscribe方法
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
首先,将observer
包装进CreateEmitter
对象中。
而后当即调用 Observer 的onSubscribe(parent)
方法,表示订阅过程完成。(当咱们经过subscribe(...)
进行订阅的时候,会当即调用下游Observer 的onSubscribe(...)
方法。经过查看其它实现类,能够总结出该结论)。
这里,会将咱们的封装类CreateEmitter
做为参数传进onSubscribe(...)
方法中。
以后,又在代码source.subscribe(parent)
中将其做为参数传递。这里的source,是源的意思,其实也就是上游。此例子中具体指咱们传入Observable.create(...)
中的ObservableOnSubscribe
类型的匿名内部类。
而咱们已经实现了该抽象方法:
// 上游 observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
});
复制代码
咱们以后就是调用的传进来的ObservableEmitter
的onNext/onError/OnComplete
来发送事件的。等等,咱们建立的时候不是传进来的是CreateEmitter
吗,怎么又变成了ObservableEmitter
?其实,CreateEmitter
是ObservableCreate的一个 static final 类型的内部类,而且实现了ObservableEmitter
接口。由于是由create
方法建立的,因此这样命名咯,同时,又做为内部类定义在 ObservableCreate
中,这样,用到的时候是否是就不那么凌乱啦?
到这里,咱们知道了会经过回调emitter
的各类方法来发送事件,这些事件又是怎么被observer 正确接收并处理的呢?
咱们继续回到 ObservableCreate 的subscribeActual(...)
方法:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
咱们发送事件,最终调用的实际上是 parent(即 CreateEmitter) 中的相应方法,而 CreateEmitter 里又封装了 observer。咱们到 CreateEmitter 这个类的源码中,看看发送事件的时候,都干吗了:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
// 这里返回了一个 SerializedEmitter,并传入 this,也就是 CreateEmitter 对象
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
// 这里判断,是否已经处于 disposed 状态,
// 注意 get() 是定义在 AtomicReference 中的方法
return DisposableHelper.isDisposed(get());
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
}
}
复制代码
这里的代码也是比较简单的,就是将发送的事件中的参数直接传递给 observer 中的相应方法。只不过中间多了背压的判断(该类实现了Disposable 接口)。同时注意,该类仍是 AtomicReference 的子类,能够实现原子操做。而且在覆写的 ObservableEmitter 的serialize()
接口中建立并返回了一个SerializedEmitter
,这些都是跟线程安全以及背压相关的,不是本文的重点。
还有一点,须要你们注意,从RxJava2.x
开始,已经不容许向onNext/onError
中传null
值,不然会报空指针,这点在上面的源码中也能看到。这就会对封装网络请求的时候产生影响,好比请求验证验证码
接口成功,可是后台返回的 result 字段为 null,咱们此时可能仍然想要它调用 onNext 方法去执行成功的回调。那这就须要额外的处理了。网上也有一些解决方案,可是总以为不够优雅,有大佬有比较好的建议,也能够指点下。
好啦,本篇文章就写到这里,带你们完成了订阅、事件的发送及处理的整个流程。
关于线程切换的内容,放在下一篇文章RxJava2.x 源码解析(二): 线程切换中讲。毕竟,不谈线程切换,谈什么 RxJava源码 分析,哈哈。
欢迎关注公众号来获取其余最新消息,有趣的灵魂在等你。