系列文章:app
本文 csdn 地址:友好 RxJava2.x 源码解析(一)基本订阅流程ide
本文基于 RxJava 2.1.3源码分析
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG", "onSubscribe(): ");
}
@Override
public void onNext(String s) {
Log.e("TAG", "onNext(): " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("TAG", "onComplete(): ");
}
});
复制代码
输出结果:post
E/TAG: onSubscribe():
E/TAG: onNext(): 1
E/TAG: onNext(): 2
E/TAG: onNext(): 3
E/TAG: onComplete():
复制代码
咱们知道 subscribe()
方法是 Observable 和 Observer 的链接点,因此首先戳进 subscribe(Observer observer)
中,能够发现该方法是 Observable 类的方法,传入了一个 Observer 对象,那首先咱们须要弄明白这里的 Observable 和 Observer 分别是什么,观察上方示例代码咱们能够知道 Observer 是 new 出来的,因此咱们只须要知道 Observable 是什么,固然,这里也很清晰,Observable 就是咱们调用 Observable.create(ObservableOnSubscribe)
所建立出来的 Observable,来一张图 ——spa
Observable 和 Observer 咱们都弄清楚了,接下来就是查看 subscribe(Observer)
具体的实现了,以下 ——.net
@Override
public final void subscribe(Observer<? super T> observer) {
// 略去其余源码
subscribeActual(observer);
// 略去其余源码
}
复制代码
略去非关键源码后咱们发现它只作了一件事,就是调用 Observable#subscribeActual(observer)
,而在 Observable 中该方法是一个抽象方法:线程
protected abstract void subscribeActual(Observer<? super T> observer);
复制代码
这意味着咱们须要去找它的子类,咱们要看看它的 subscribeActual(Observer)
方法,那咱们就得从 create(ObservableOnSubscribe)
着手,看它是如何将一个 ObservableOnSubscribe 对象转换成一个 Observable 对象的——3d
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// 略去其余源码
return new ObservableCreate<T>(source);
}
复制代码
一样的,删除非关键源码以后,咱们就剩下这么一行代码,这也就是意味着咱们须要从 ObservableCreate
这个类中去寻找 subscribeActual(Observer)
的实现了,这里笔者须要说起两点——code
从上述方法能够看出 ObservableCreate 是 Observable 的一个子类cdn
咱们自定义的 ObservableOnSubscribe 做为一个名为 source
字段被传入了。事实上在 Observable 的子类实现中,它们都有一个名为 source
的字段,指代上游 Observable(其实是 ObservableOnSubscribe,可是咱们不妨理解成就是 Observable)。
ObservableCreate#subscribeActual()
实现以下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 触发 Observer#onSubscribe(Disposable)
observer.onSubscribe(parent);
try {
// 发射事件
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
第5行调用了 Observer#onSubscribe(Disposable)
,因此咱们能够知道 Observer#onSubscribe(Disposable)
是先被调用的,而此时 Observable 甚至尚未开始发射事件!接下来就是调用了 source.subscribe(ObservableEmitter)
,这个方法是交由开发者去实现的,在示例代码是以下所写 ——
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
复制代码
在代码中咱们调用了 CreateEmitter 对象的 onNext()
方法,因此咱们须要戳入 CreateEmitter 类中看一下 onNext(T)
的具体实现(固然 onComplete()
方法等同,此处就不作扩展了),源码以下:
@Override
public void onNext(T t) {
// 略去其余源码
if (!isDisposed()) {
observer.onNext(t);
}
}
复制代码
一目了然,当当前对象并不处于 DISPOSED
状态时,那么就将会调用下游 Observer 的 onNext(T)
方法,而下游 Observer 的 onNext(T)
方法也就是咱们上面示例代码中所写的——
public void onNext(String s) {
Log.e("TAG", "onNext(): ");
}
复制代码
至此,基本订阅流程咱们就理清楚了。咱们从 Observable#subscribe(Observer)
开始,将 Observer 传给 Observable,而 Observable 又会在 onNext(T)
方法中激活 Observer 的 onNext(T)
方法。咱们在示例只涉及了少许的 Observable/Observer,事实上,咱们在 RxJava 中运用的操做符都会在内部建立一个 Observable 和 Observer,虽然在 Observable#subscribeActual(Observer)
中都有本身特定的实现,可是它们大部分都是作两个操做,一是将「下游」传来的 Observer 根据需求进行封装;二就是让「上游」的 Observable subscribe()
该 Observer。
通过了如上的分析后,笔者但愿读者可以理解 RxJava2.x 的基本订阅流程是从 Observable#subscribe(Observer)
开始的,而该方法会触发「上游」 Observable 的 Observable#subscribeActual(Observer)
方法,而在该「上游」 Observable 中又会触发「上游的上游」Observable 的 Observable#subscribeActual(Observer)
方法。咱们不妨用如下述源码举例:
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
}
})
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
return Observable.just(s);
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
复制代码
另附一张图,图中标明了后面讲到的「第一个 Observable」、「第二个 Observable」等名词:
接下来用图展现整个订阅的流程——
在 Observable#subscribe(Observer)
以前:
准备触发 Observable#subscribe(Observer)
:
Observable#subscribe(Observer)
将会致使其上游 Observable 的 subscribe(Observer)
方法被调用:
上游 Observable 的 subscribe(Observer)
方法内部又会调用上游的上游 Observable 的 subscribe(Observer)
:
Observable#subscribe(Observer)
会调用Observable#subscribeActual(Observer)
,该方法是一个抽象方法,由子类覆写,因此展示了 Observable 的多态性,并且如何激活上游 Observable 的subscribe(Observer)
/subscribeActual(Observer)
方法的关键点也在此。实现方式就在于Observable#subscribeActual(Observer)
方法虽然是一个抽象方法,可是它的子类实现中都包含有一句source.subscribe(Observer)
,其中 source 就是上游 Observable(其实是 ObservableSource,可是咱们此处不妨就理解成 Observable,毕竟咱们对这个对象更熟悉一些,Observable 是 ObservableSource 接口的实现),因此就能够理解在每个 Observable 的subscribeActual(Observer)
方法中它都会调用上游的subscribe(Observer)/subscribeActual(Observer)
方法,直至到达第一个 Observable 的subscribe(Observer)/subscribeActual(Observer)
中。
订阅的关系链理清了,可是尚未发射事件的流程还没出来啊,咱们继续往下走——
到达顶部 Observable 的时候,已经不能再往上走了,就要准备搞事情(准备发射事件了),此处咱们就以示例代码中的 Observable 为例,它的 subscribeActual(Observer)
中——
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
它首先封装了一个 Disposable,接下来将调用 Observer#onSubscribe(Disposable)
将 Disposable 做为参数传给下一层 Observer。
到了下一层的 Observer 的 onSubscribe(Disposable)
中,该方法中针对上一层 Disposable 作一些操做(判断、封装等),而后再封装一个 Disposable 做为参数传递给 Observer#onSubscribe(Disposable)
。
而此时的 Observer 就是咱们所自定义的 Observer——
在 Observer#onSubscribe(Disposable)
流程结束后,就执行到第7行代码 Observeable.subscribe(Observer)
,实质上也就是——
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
}
})
复制代码
ps:为了方便起见,此处只分析
onNext()
执行流程。
在 ObservableEmitter#onNext(T)
的内部实际上会触发 Observer 的 onNext(T)
方法——
再向下触发就是咱们所自定义的最底层的 Observer 了——
以示例代码来讲,顶游 Observable 会触发 ObservableEmitter#onNext(T)
方法,在该方法的内部又触发了「下游」 Observer 的 onNext(T)
方法,而在该方法内部又会触发「下游的下游」 Observer 的 onNext(T)
方法,直至最底层的 Observer —— 咱们所自定义的 Observer ——
到此,一套订阅流程就执行完毕了。