这篇文章的主要目的就是弄清楚链式调用与线程切换。
如果你读懂了第一篇文章,这篇文章阅读起来还是非常简单的。因为链式调用和线程切换无非就是多了些 “数据源-观察者” 对。
线程切换还涉及到线程池相关的东西,由于篇幅问题,线程池相关的东西会放到下一篇。
// 观察者 -- 蝙蝠侠 val observerBatMan = object : Observer<Int> { override fun onComplete() { } override fun onNext(t: Int) { System.out.println(Thread.currentThread().name + "--observer") } override fun onError(e: Throwable) { } override fun onSubscribe(d: Disposable) { } } // 数据源 -- 小丑 val sourceClown = ObservableOnSubscribe<Int> { it.onNext(1) it.onComplete() System.out.println(Thread.currentThread().name + "--source") } // 开始观察 Observable // ① .create<Int>(sourceClown) // ② 指定数据源执行的线程 .subscribeOn(Schedulers.computation()) // ③ 指定观察者执行的线程 .observeOn(Schedulers.io()) // ④ .subscribe(observerBatMan) }
嗯,和上一篇文章的例子差不多,就多了几行线程切换而已,我们先给出数据的日志。有一个大致的印象,再来深入分析。
RxComputationThreadPool-1--source RxCachedThreadScheduler-1--observer
有了第一篇的基础,那么我们直接从 demo 里面的 ② 处入手:
Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
你使用了套路一,效果拔群,得到结果如下:
Observable.java
简化之后的 subscribeOn 代码,实际上就是创建并返回了一个 ObservableSubscribeOn 对象
public final Observable<T> subscribeOn(Scheduler scheduler) { return new ObservableSubscribeOn<T>(this, scheduler); }
我们继续看 ObservableSubscribeOn。
ObservableSubscribeOn.java
AbstractObservableWithUpstream.java
// ObservableSubscribeOn 继承了 AbstractObservableWithUpstream public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {...} // AbstractObservableWithUpstream 继承了 Observable abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {...}
所以,ObservableSubscribeOn 最终还是继承至 Observable的。
在第一篇文章中,我们分析的对象是 ObservableCreate,而现在我们分析的对象是 ObservableSubscribeOn。为了能够更加清晰的列出不同点,还是搞一个表格吧:
ObservableSubscribeOn | ObservableCreate | |
---|---|---|
直接父类 | AbstractObservableWithUpstream(最终还是继承 Observable) | Observable |
构造方法参数个数 | 两个:(ObservableSource<T> source, Scheduler scheduler) | 一个:(ObservableOnSubscribe<T> source) |
主要的不同点,还是在构造参数这里。
看看 source 类有哪些不同之处:
ObservableOnSubscribe.java
ObservableSource.java
public interface ObservableOnSubscribe<T> { void subscribe(@NonNull ObservableEmitter<T> e) throws Exception; } public interface ObservableSource<T> { void subscribe(@NonNull Observer<? super T> observer); }
这两个类都是接口,都只有一个 subscribe 方法,看起来比较类似。
除了类名不同之外,就只有方法的参数不同了。
ObservableOnSubscribe 接收的参数 ObservableEmitter,上一篇文章说过,是将 Observer 包装了一层。
ObservableSource 直接接收了 Observer,嗯,很牛逼,后面我们来分析看看,它为啥不要包装,你也可以思考一下,嘿嘿嘿。
经过上面的分析,总的来说,区别不大,所以我们仍然可以套第一篇的套路来分析。
到这里先上个图,为后面做好心理准备。
接下来,我们看 demo 里面的 ③ 处:
Observable.java
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); }
调用了同名方法:
Observable.java
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
使用套路一,简化代码:
Observable.java
observeOn 简化后的代码
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize); }
可以看出,实际上就是创建了一个 ObservableObserveOn 对象。
看看这个对象吧,不出意外,和 ObservableOnSubscribe 应该很像。
先看类的结构:
ObservableObserveOn.java
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {...}
与 ObservableOnSubscribe 一样,继承同一个类。
再看类的构造方法:
ObservableObserveOn.java
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; }
这个方法的参数就更多了,前面两个我们应该熟悉了,看看后面的两个参数是什么意思。虽然这里没有注释,但是这个参数的值是从别处传来的,所以只要找到源头,还是可以找到相关注释的。
delayError
indicates if the onError notification may not cut ahead of onNext notification on the other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream
额,我只能明白一个大概:若存在Error事件,则如常执行,执行完后再抛出错误异常
bufferSize 缓存大小,暂时还不知道缓存在哪里,所以先放着。
到了这里,我们再上一个图吧,后面的流程要起飞了。
接下来,就到了一个转折点,就是 demo 中的 ④ 处,它调用了 subscribe 方法。
在第一篇中,我们分析过,subscribe 方法会调用 subscribeActual 方法,所以这里,我们直接进入到 ObservableObserveOn 的 subscribeActual 方法中,看看它做了什么。
ObservableObserveOn .java
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
由于,我们传递的 scheduler 肯定不是 TrampolineScheduler,所以会直接进入到 else 分支。
else 分支里面的代码也很简单,我们先跳过线程池相关的东西,所以需要分析的就只有一行代码:
ObservableObserveOn.java
subscribeActual 方法的 else 分支
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
按照套路二,ObserveOnObserver 其实就是对 observer 做了一个包装。
这个 observer 就是我们自己创建的 observerBatMan,上个图:
现在比较绕的是,ObservableObserveOn 类中的 subscribeActual 方法中的 source 变量是谁。由于 source 是从构造函数传递进来的,我们再回到 Observerable 的 observeOn 方法:
Observable.java
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
可以看到,在创建 ObservableObserveOn 对象的时候,第一个参数传递的是 this。
这里有一个稍微绕的点,因为 observerOn 方法是 ObservableSubscribeOn 对象调用的,所以 this,指向的是 ObservableSubscribeOn。
所以,这里相当于 ObservableObserveOn 是一个桥梁,让 ObservableSubscribeOn 与 ObservableOnObserve 搭上了关系。
上一个图:
同样的,我们再来分析 ObservableSubscribeOn
类的 subscribeActual
方法:
ObservableSubscribeOn.java
@Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
这个 subscribeActual 内部的代码风格与想象的完全不一样啊,虽然前2行很熟悉,但是第3行完全没见过啊。
现在,我们来分析第3行代码,先看 parent.setDisposable(xxx):
SubscribeOnObserver.java
void setDisposable(Disposable d) { DisposableHelper.setOnce(this, d); }
DisposableHelper.java
该方法,用来将 field 的值,设置为 d。
如果,设置的时候,field 已经有值了,返回false。
如果 field 有值, 并且值不是 DISPOSED,抛出异常。
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) { ObjectHelper.requireNonNull(d, "d is null"); if (!field.compareAndSet(null, d)) { d.dispose(); if (field.get() != DISPOSED) { // 抛出异常 reportDisposableSet(); } return false; } return true; }
这样来看,其实 parent.setDisposable(xxx)
这行代码也没做什么,正常情况下,就是将 scheduler.scheduleDirect(new SubscribeTask(parent))
的值设置给了 parent。
下面,继续看 scheduler.scheduleDirect()
做了什么,由于篇幅问题,这里不分析线程池的东西,只说一下这个方法的作用,其实就是将一个 runnable,放到线程池中去执行,这里可以知道,线程切换了。
再继续,看看 SubscribeTask 类:
SubscribeTask.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { ... // 内部类 final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { // source 变量是 ObservableSubscribeOn 的变量 source.subscribe(parent); } } }
继承了 Runnable,在 run 方法中,我们终于看到了 subscribe 方法。
继续深入,看看 source 对象是谁:
Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
source 参数传递的是this,所以 source 对象是 ObservableCreate 对象。
到这里,还可以解释上面的问题:为啥 ObservableSource 的 subscribe 方法的参数是一个 Observer?
ObservableSource.java
为了方便,我又把这个接口代码贴了一遍
public interface ObservableSource<T> { void subscribe(@NonNull Observer<? super T> observer); }
在我们的例子中,observer 参数就是 ObservableCreate 对象。
其实,从这个类名都可以看出 ObservableSource 这个接口表示的是数据源,因为 Observerable 类就实现了这个接口,所以这个接口的 subscribe 方法的实现就是 Observable 的内部实现。
当我们调用 subscribeOn 与 observeOn 这两个方法的时候,其实是将调者这做为数据源的意思。
另外,可以推出 ObservableOnSubscribe 这个接口,应该是专门用来处理数据源的源头的(方便我们调用 emitter.onNext 等方法),两个接口的意义不一样。
所以,我们可以得到这样的一个图:
把这几个类,整合到调用图里面,得到如下图:
上面的图,看起来还算清晰,但是没有线程切换的内容,下面会说到。
与上一篇一样,我们再来整理一下这个demo的执行流程图,你可以先自行想一下这个图应该是什么样子的。
图中灰色的长方体,表示的是调用 subscribeOn 与 observerOn 产生的 “数据源-观察者”对。
图中 subscribeOn 蓝色方块表示的是 subscribeOn 指定的线程切换的地方,可以看到,它指定的线程会影响到后续的所有流程。
图中 observerOn 黄色的方块表示的是 observerOn 指定的线程切换的地方,它会影响后续流程。
在回想一下,其实每次 observerOn 或者 subscribeOn 都创建了一个链条一样的节点,然后在 subscribeActual 将这些节点连接起来:
好了,这篇文章的东西就差不多了,关于线程剩下的东西,本章中都只是一笔带过,详细的内容,留到下一篇。