系列文章:java
本文 csdn 地址:友好 RxJava2.x 源码解析(二)线程切换git
本文基于 RxJava 2.1.3github
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e("TAG", "subscribe(): 所在线程为 " + Thread.currentThread().getName());
emitter.onNext("1");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG", "onSubscribe(): 所在线程为 " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.e("TAG", "onNext(): 所在线程为 " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("TAG", "onComplete(): 所在线程为 " + Thread.currentThread().getName());
}
});
复制代码
输出结果:ide
E/TAG: onSubscribe(): 所在线程为 main
E/TAG: subscribe(): 所在线程为 RxCachedThreadScheduler-1
E/TAG: onNext(): 所在线程为 main
E/TAG: onComplete(): 所在线程为 main
复制代码
咱们能够发现,除了 Observable 的 subscribe(ObservableEmitter)
方法执行在 io 线程,Observer 的方法都是执行在 main 线程的,接下来就请各位读者跟着笔者来分析了。函数
看到标题部分读者就疑惑了,明明是说线程切换,跟 Observer#onSubscribe()
方法有什么关系呢?前方的 log 中展现 Observer#onSubscribe()
方法在主线程执行的,可是这个主线程是由 .observeOn(AndroidSchedulers.mainThread())
所致使的吗?为了解决这个疑惑,咱们能够在外面套一个子线程,而后去执行该逻辑,代码以下:源码分析
new Thread() {
@Override
public void run() {
Log.e("TAG", "run: 所在线程为 " + Thread.currentThread().getName());
// 添加示例代码
}
}.start();
复制代码
打印结果:post
run: 所在线程为 Thread-554
onSubscribe(): 所在线程为 Thread-554
subscribe(): 所在线程为 RxCachedThreadScheduler-1
onNext(): 所在线程为 main
onComplete(): 所在线程为 main
复制代码
因此实际上 Observer#onSubscribe()
的执行线程是当前线程,它并不受 subscribe(Scheduler)
或 observeOn(Scheduler)
所影响(由于笔者这段代码写在了 Android 主线程当中,因此当前线程是主线程)。本文不在此扩展缘由,具体源码追溯和查看前一篇文章,简而言之—— subscribe(Observer)
-> subscribeActual(Observer)
-> Observer#onSubscribe()
,咱们能够看到 subscribe(Observer)
的执行线程是当前线程,而在上面所述的数据流中也不存在数据切换的过程,因此 onSubscribe()
执行的线程也是当前线程。学习
此小节针对 Observable#observeOn(Scheduler)
讲解,因此将示例代码更改以下:this
new Thread() {
@Override
public void run() {
Log.e("TAG", "run: 当前默认执行环境为 " + Thread.currentThread().getName());
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
}
})
// 仅保留 observeOn(Scheduler)
.observeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("TAG", "onNext(): 所在线程为 " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
} .start();
复制代码
输出结果:spa
E/TAG: run: 当前默认执行线程为 Thread-610
E/TAG: onNext(): 所在线程为 RxCachedThreadScheduler-1
复制代码
一样的,直接先进入 Observable#observeOn(Scheduler)
源码查看一下,发现其最终会调用 Observable#的observeOn(Scheduler, boolean, int)
方法,该方法将会返回一个 Observable 对象。那么老问题来了,是哪一个 Observable 对象调用的 observeOn()
方法,又返回了一个怎样的 Observable 对象?
第一个问题很简单,是 Observable.create(ObservableOnSubscribe)
对象返回的一个 Observable,并且这个 Observable 是一个 ObservableCreate 对象(这里不理解的能够查看第一篇文章)。可是 Observable#observeOn(Scheduler, boolean, int)
是没有被任何子类重写的,这意味着它的子类都是调用它的该方法。
第二个问题来了,返回了一个怎样的 Observable 对象呢?实际上这里的分析流程和第一篇文章中所阐述的流程是如出一辙的,咱们戳进 Observable#observeOn(Scheduler, boolean, int)
源码,发现它最终会返回一个 new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)
对象,这里咱们只关注前两个对象,第一个参数 this
是指上游的 Observable 对象,也就是咱们第一个问题中所涉及到的 Observable 对象,第二个参数 scheduler
毋庸置疑就是咱们所传入的 Scheduler 对象了,在此也就是咱们的 AndroidSchedulers.mainThread()
。
经过第一篇的学习,咱们应该会轻车熟路地打开 ObservableObserveOn 类并查看它的核心 subscribeActual()
方法以及构造函数——
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 若是传入的 scheduler 是 Scheduler.trampoline() 的状况
// 该线程的意义是传入当前线程,也就是不作任何线程切换操做
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
复制代码
直接进入第二个 case,首先先略去第19行代码,看到第20行代码,source
(上游 Observable) 和 Observable#subscribe()
操做都没有任何变化,惟一改变的地方就是将 Observer 进行了封装,因此咱们能够所以得出结论, Observable#observeOn(Scheduler)
并不会对上游线程执行环境有任何影响。(若是看到这里不可以理解的话,后文中会有通俗易懂的伪代码辅助理解)
通过上文友好 RxJava2.x 源码解析(一)基本订阅流程一文的分析咱们知道 ObservableEmitter 的 onNext(T)
方法会触发「下游」 Observer 的 onNext(T)
方法,而此时的「下游」 Observer 对象是通过 Observable#observeOn(Scheduler)
封装的 ObserveOnObserver 对象,因此咱们不妨打开 ObserveOnObserver 的 onNext(T)
方法——
@Override
public void onNext(T t) {
// 删除无关源码
queue.offer(t);
schedule();
}
复制代码
能够看到 onNext(T)
方法作了两件事——一是将当前方法传入的对象添加进队列;另外一是执行 schedule()
方法,打开 schedule()
方法源码——
void schedule() {
// 删除无关源码
worker.schedule(this);
}
复制代码
因此将会执行 worker.schedule(Runnable)
,可向下继续追溯到 schedule(Runnable, long, TimeUnit )
,该方法是一个抽象方法,因此咱们能够想到,调度器们就是经过实现该方法来建立各色各样的线程的。因此咱们继续追溯到 IoScheduler 的 schedule(Runnable, long, TimeUnit)
中,源码以下:
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
// 删除无关源码
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
复制代码
继续追溯下去——
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Future<?> f;
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
return sr;
}
复制代码
executor 是一个 ScheduledExecutorService 对象,而 ScheduledExecutorService 的父接口是咱们所熟悉的 ExecutorService 接口,因此很清晰 ScheduledExecutorService 具备建立和调度线程的能力,而其具体的实如今此就不讨论了。
最后,咱们不妨将上述所提到的几段源代码总体抽象结合一下:
@Override
public void onNext(T t) {
// 删除无关源码
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)this);
} else {
f = executor.schedule((Callable<Object>)this, delayTime, unit);
}
}
复制代码
总结一下:onNext(T)
方法会触发 Scheduler 对象的 schedule(Runnable, long, TimeUnit)
,该方法是一个抽象方法,由子类实现,因此才有了多元多样的 Schedulers.io()/Schedulers.computation()/Schedulers.trampoline()
等调度器,具体调度器的内部会使用相关的线程来 submit()
或者 schedule()
任务。解决完调度器的问题,那么接下来就是看看 Runnable#run()
里面的逻辑是什么样的,回到 ObserveOnObserver 中——
@Override
public void run() {
drainNormal();
}
复制代码
drainNormal()
源码以下:
void drainNormal() {
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
T v;
try {
v = q.poll();
} catch (Throwable ex) {
}
boolean empty = v == null;
if (empty) {
break;
}
a.onNext(v);
}
}
复制代码
能够看到实际上最后一行执行了 Observer#onNext(T)
方法,也就是意味着「ObserveOnObserver 中触发下一层 Observer 的 onNext(T)
操做」在指定线程执行,也就达到了切换线程的目的了。
来个复杂的例子——
通过友好 RxJava2.x 源码解析(一)基本订阅流程一文咱们知道,Observer 的传递是由下往上的,从源头开始,咱们自定义的 Observer 向上传递的时候到达第六个 Observable 的时候被线程封装了一层,咱们不妨使用伪代码演示一下——
public class Observer {
Observer oldObserver;
public Observer(Observer observer) {
oldObserver = observer;
}
public void onNext(T t) {
// 一些其余操做
new Thread("Android mainThread") {
@Override
public void run() {
oldObserver.onNext(t);
}
} .start();
}
public void onError(Throwable e) {
// 一些其余操做
new Thread("Android mainThread") {
@Override
public void run() {
oldObserver.onError(e);
}
} .start();
}
public void onComplete() {
// 一些其余操做
new Thread("Android mainThread") {
@Override
public void run() {
oldObserver.onComplete();
}
} .start();
}
}
复制代码
Observer 继续向上被传递,Observable#map()
中并未对 Observer 进行线程切换;再向上走,到达第四个 observeOn(Scheduler)
的时候,被 computation 线程嵌套了一层——
public class Observer {
Observer oldObserver;
public Observer(Observer observer) {
oldObserver = observer;
}
public void onNext(T t) {
// 一些其余操做
new Thread("computation") {
@Override
public void run() {
oldObserver.onNext(t);
}
} .start();
}
public void onError(Throwable e) {
// 一些其余操做
new Thread("computation") {
@Override
public void run() {
oldObserver.onError(e);
}
} .start();
}
public void onComplete() {
// 一些其余操做
new Thread("computation") {
@Override
public void run() {
oldObserver.onComplete();
}
} .start();
}
}
复制代码
固然,继续向上直到顶端 Observable——
public class Observer {
Observer oldObserver;
public Observer(Observer observer) {
oldObserver = observer;
}
public void onNext(T t) {
// 一些其余操做
new Thread("io") {
@Override
public void run() {
oldObserver.onNext(t);
}
} .start();
}
public void onError(Throwable e) {
// 一些其余操做
new Thread("io") {
@Override
public void run() {
oldObserver.onError(e);
}
} .start();
}
public void onComplete() {
// 一些其余操做
new Thread("io") {
@Override
public void run() {
oldObserver.onComplete();
}
} .start();
}
}
复制代码
甚至更精简的操做以下:
new Thread("Scheduler io") {
@Override
public void run() {
// flatMap() 操做
flatMap();
System.out.println("flatMap 操做符执行线程:" + Thread.currentThread().getName());
System.out.println("第二个 observeOn() 执行线程:" + Thread.currentThread().getName());
// 第二个 observeOn() 操做
new Thread("Scheduler computation") {
@Override
public void run() {
// map() 操做
map();
System.out.println("map 操做符执行线程:" + Thread.currentThread().getName());
System.out.println("第三个 observeOn() 执行线程:" + Thread.currentThread().getName());
// 第三个 observeOn() 操做
new Thread("Android mainThread") {
@Override
public void run() {
// Observer#onNext(T)/onComplete()/onError() 执行线程
System.out.println("Observer#onNext(T)/onComplete()/onError() 执行线程:" +
Thread.currentThread().getName());
}
} .start();
}
} .start();
}
} .start();
复制代码
输出结果:
flatMap 操做符执行线程:Scheduler io
第二个 observeOn() 执行线程:Scheduler io
map 操做符执行线程:Scheduler computation
第三个 observeOn() 执行线程:Scheduler computation
Observer#onNext(T)/onComplete()/onError() 执行线程:Android mainThread
复制代码
由此便将 Observable#observeOn(Scheduler)
是如何将下游 Observer 置于指定线程执行的流程分析完了。简而言之 Observable#observeOn(Scheduler)
的实现原理在于将目标 Observer 的 onNext(T)/onError(Throwable)/onComplete()
置于指定线程中运行。
这里特别要注意的一点是——【线程操做符切换的是其余的流,自身这条流是不会受到影响的。】看过知乎前一段时间的 rx 分享视频的小伙伴应该有注意到杨凡前辈的 PPT 中有这么一图:
想要提出两点—— ![]()
observeOn(Schedulers.io())
所对应的 Observable 应该是受到了subscribeOn(AndroidSchedulers.mainThread())
影响,因此它建立的这条流应该执行于主线程;而subscribeOn(AndroidSchedulers.mainThread())
所对应的 Observable 则受到了subscribeOn(Schedulers.computation)
影响,因此它建立的这条流应该执行于 computation 线程。
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
Log.e("TAG", "被观察者所在的线程 " + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG", "onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.e("TAG", "观察者所在线程为 " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
复制代码
输出结果:
E/TAG: onSubscribe: main
E/TAG: 观察者所在线程为 RxCachedThreadScheduler-1
E/TAG: 被观察者所在的线程 RxCachedThreadScheduler-1
复制代码
一样地,戳进 Observable#subscirbeOn(Scheduler)
源码,点进 ObservableSubscribeOn 查看 subscribeActual(Observer)
的具体实现,相信这对于各位读者来讲已经轻车熟路了——
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
Disposeable disposable = scheduler.scheduleDirect(new SubscribeTask(parent));
parent.setDisposable(disposable);
}
复制代码
第一行老套路,对下游 Observer 进行了一层封装;第二行由于它不涉及线程切换因此此处也不作扩展;第三行就是咱们的关键了 Scheduler#scheduleDirect(Runnable)
方法能够追溯到 Scheduler#schedule(Runnable, long, TimeUnit)
,这部分在前面已经阐述过了,就不作扩展了。SubscribeTask 是一个 Runnable,它的 run()
核心方法——
@Override
public void run() {
source.subscribe(parent);
}
复制代码
至此谜团解开了,Observable#subscribeOn(Scheduler)
将 Observable#subscribe(Observer)
的执行过程移到了指定线程(在上述中也就是 io 线程),同时 Observable 和 Observer 中并未作新的线程切换处理,因此它们的订阅、发射等操做就执行在了 io 线程。
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
Log.e("TAG", "被观察者所在的线程 " + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG", "onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.e("TAG", "观察者所在线程为 " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
复制代码
打印结果:
onSubscribe: main
观察者所在线程为 RxCachedThreadScheduler-1
被观察者所在的线程 RxCachedThreadScheduler-1
复制代码
咱们知道,只有第一个 Observable#subscribeOn(Scheduler)
操做才有用,然后续的 Observable#subscribeOn(Scheduler)
并不会影响整个流程中 Observerable 。一样的,来张图——
前面咱们分析到,Observable#subscribeOn(Scheduler)
其实是将 Observable#subscribe(Observer)
的操做放在了指定线程,而经过友好 RxJava2.x 源码解析(一)基本订阅流程一文咱们知道了 subscribe
的过程是由下往上的。因此首先是第三个 Observable 调用 Observable#subscribe(Observer)
启动订阅,在其内部会激活第二个 Observable 的 Observable#subscribe(Observer)
方法,可是此时该方法外部被套入了一个 Schedulers.computation()
线程,因而这个订阅的过程就被运行在了该线程中。一样的,咱们不妨用伪代码演示一下——
public class Observable {
// 第「二」个 Observable
Observable source;
Observer observer;
public Observable(Observable source, Observer observer) {
this.source = source;
this.observer = observer;
}
public void subscribe(Observer Observer) {
new Thread("computation") {
@Override
public void run() {
// 第「二」个 Observable 订阅
source.subscribe(observer);
}
}
}
}
复制代码
再往上走,第二个 Observable 订阅内部会激活第一个 Observable 的 Observable#subscribe(Observer)
方法,一样的,该方法被套在了 Schedulers.io()
线程中,以下——
public class Observable {
// 第「一」个 Observable
Observable source;
Observer observer;
public Observable(Observable source, Observer observer) {
this.source = source;
this.observer = observer;
}
public void subscribe(Observer Observer) {
new Thread("io") {
@Override
public void run() {
// 第「一」个 Observable 订阅
source.subscribe(observer);
}
}
}
}
复制代码
此时到达第一个 Observable 了以后就要开始发射事件了,此时的执行线程很明显是 io 线程。还能够换成 Thread 伪代码来表示 ——
new Thread("computation") {
@Override
public void run() {
// 第二个 Observable.subscribe(Observer) 的实质
// 就是切换线程,效果相似以下
new Thread("io") {
@Override
public void run() {
// 第一个 Observable.subscribe(Observer) 的实质
// 就是发射事件
System.out.println("onNext(T)/onError(Throwable)/onComplete() 的执行线程是: " + Thread
.currentThread().getName());
}
} .start();
}
} .start();
复制代码
输出结果:
onNext(T)/onError(Throwable)/onComplete() 的执行线程是: io
复制代码
通过友好 RxJava2.x 源码解析(一)基本订阅流程一文咱们知道,Observable#subscribe(Observer)
的顺序是由下往上的,本游会将 Observer 进行「封装」,而后「激活上游Observable 订阅这个 Observer」。
咱们不妨抽象一个 Observer,以下:
public class Observer<T> {
public void onNext(T t){}
public void onCompelete(){}
public void onError(Throwable t){}
}
复制代码
对于 Observable#observeOn(Schedulers.computation())
操做来讲,它对 Observer 进行了怎样的封装呢?
public class NewObserver<T> {
// 下游 Observer
Observer downStreamObserver;
public NewObserver(Observer observer) {
downStreamObserver = observer;
}
public void onNext(T t) {
new Thread("computation") {
downStreamObserver.onNext(t);
}
}
public void onError(Throwable e) {
new Thread("computation") {
downStreamObserver.onError(e);
}
}
public void onComplete() {
new Thread("computation") {
downStreamObserver.onComplete();
}
}
}
复制代码
在 Observable#observeOn(Scheduler)
内部,其对下游的 Observer 进行了相似如上的封装,这就致使了其「下游」 Observer 在指定线程内执行。因此 Observable#observeOn(Scheduler)
是能够屡次调用并有效的。
而对于 Observable#subscribe(Scheduler)
来讲,它并未对下游 Observer 进行封装,可是对于「激活上游 Observable 订阅这个 Observer」这个操做它作了一点小小的手脚,也就是切换线程,咱们抽象以下——
public class ComputationObservable {
public void subscribe(observer) {
new Thread("computation") {
// upstreamObservable 是上游 Observable,咱们不妨假设是下文中所提到的 IOObservable
upstreamObservable.subscribe(observer);
}
}
}
复制代码
而当它在往上遇到了一个新的 Observable#subscribe(Scheduler)
操做的时候——
public class IOObservable {
public void subscribe(observer) {
new Thread("io") {
// upstreamObservable 是上游 Observable,咱们不妨下文中所提到的 TopObservable
upstreamObservable.subscribe(observer);
}
}
}
复制代码
咱们不妨假设此时已经到达了最顶端开始发射事件了——
public class TopObservable {
public void subscribe(observer) {
observer.onNext(t);
}
}
复制代码
此时的 Observer#onNext(t)
的执行环境固然就是由最后一个 subscribeOn(Scheduler)
操做符(此处的最后一个是指订阅流程中的最后一个,它与实际写代码的顺序相反,也就是咱们代码中的第一个 subscribeOn(Scheduler)
操做符)所决定的了,在上述伪代码中也就是 io 线程,伪代码对应的源码以下——
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
}
})
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});复制代码