subscribeOn和observeOn负责线程切换,同时某些操做符也默认指定了线程.java
咱们这里不分析在线程中怎么执行的.只看如何切换到某个指定线程.react
Observable.subscribeOn()在方法内部生成了一个ObservableSubscribeOn
对象.
主要看一下ObservableSubscribeOn
的subscribeActual方法.ide
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); //调用下游的Observer的onSubscribe方法 observer.onSubscribe(parent); //经过SubscribeTask执行了上游Observable的subscribeActual方法 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
scheduler.scheduleDirect(Runnable)用于执行SubscribeTask这个任务.SubscribeTask自己是Runnable的实现类.看一下其run方法.this
@Override public void run() { //上游的Observable.subscribe方法被切换到了新的线程 source.subscribe(parent); }
首先能够得出结论:subscribeOn将上游的Observable的subscribe方法切换到了新的线程.线程
若是屡次调用subscribeOn切换线程,会有什么效果?code
由下往上,每次调用subscribeOn,都会致使上游的Observable的subscribeActual切换到指定的线程.那么最后一次调用的切换最上游的建立型操做符的subscribeActual的执行线程.若是操做符有默认执行线程怎么办?orm
若是是建立型操做符,处于最上游,那么subscribeOn的线程切换对它不起做用.天高皇帝远,县官不如现管.就是这个道理.
若是是其它操做符,会是怎样的?
以操做符timeout为例:它对应ObservableTimeoutTimed和TimeoutObserverserver
@Override public void onNext(T t) { downstream.onNext(t); //超时计时 startTimeout(idx + 1); } void startTimeout(long nextIndex) { //交给操做符默认的线程执行 task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit)); } @Override public void onError(Throwable t) { downstream.onError(t); } @Override public void onComplete() { downstream.onComplete(); } } @Override public void onTimeout(long idx) { downstream.onError(new TimeoutException(timeoutMessage(timeout, unit))); }
//TimeoutTask.java static final class TimeoutTask implements Runnable { @Override public void run() { parent.onTimeout(idx); } }
能够看到操做符默认的执行线程只用来作超时计时任务,若是超时了,会在操做符的默认线程执行onError方法..操做符默认线程对下游的observer形成什么影响要作具体对待.对象
observeOn对应ObservableObserveOn
和ObserveOnObserver
.接口
//ObservableObserveOn.java @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
//ObserveOnObserver.java @Override public void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.upstream, d)) { if (d instanceof QueueDisposable) { if (m == QueueDisposable.SYNC) { //执行下游Observer的onSubscribe方法 downstream.onSubscribe(this); schedule(); return; } if (m == QueueDisposable.ASYNC) { //执行下游Observer的onSubscribe方法 downstream.onSubscribe(this); return; } } //执行下游Observer的onSubscribe方法 downstream.onSubscribe(this); } } @Override public void onNext(T t) { //省略 schedule(); } @Override public void onError(Throwable t) { //省略 schedule(); } void schedule() { if (getAndIncrement() == 0) { /* ObserveOnObserver是Runnable的实现类.交给线程池执行 */ worker.schedule(this); } } void drainNormal() { final Observer<? super T> a = downstream; for (;;) { for (;;) { T v; try { v = q.poll(); } catch (Throwable ex) { a.onError(ex); return; } //执行下游Observer的onNext方法 a.onNext(v); } } } void drainFused() { for (;;) { if (!delayError && d && ex != null) { //执行下游Observer的onError方法 downstream.onError(error); return; } downstream.onNext(null); if (d) { ex = error; if (ex != null) { //执行下游Observer的onError方法 downstream.onError(ex); } else { //执行下游Observer的onComplete方法 downstream.onComplete(); } return; } } } //执行线程任务 @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }
从上面能够看出ObservableObserveOn在其subscribeActual方法中并无切换上游Observable的subscribe方法的执行线程.可是ObserveOnObserver在其onNext,onError和onComplete中经过schedule()方法将下游Observer的各个方法切换到了新的线程.
得出结论: observeOn负责切换的是下游Observer的各个方法的执行线程
若是下游屡次经过observeOn切换线程,会有什么效果?
每次切换都会对其下游形成影响,直到遇到下一个observeOn为止.
onNext,onError,onComplete与上游最近的observeOn所切换的线程保持一致.onSubscribe则不一样.
遇到线程切换的时候,会首先在对应的Observable的subscribeActual方法内,先调用observer.onSubscribe方法.而observer.onSubscribe会逐级向上传递直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法内调用,这是在主线程执行的.因此onSubscribe方法不管如何都是在主线程执行.
.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { } })
咱们要看的是方法accept的执行线程.
经过源码找到对应的DisposableLambdaObserver
.
@Override public void onSubscribe(Disposable d) { //在这里调用了accept方法. onSubscribe.accept(d); }
这就要看上游在哪一个线程执行了Observer.onSubscribe(disposable)方法.
在建立型操做符的subscribeActual方法和subscribeOn对应的Observable的subscribeActual方法内调用了Observer.onSubscribe(disposable)方法.那么这两处的执行线程就决定了onSubscribe.accept(d);的执行线程.
对应ObservableDoFinally
和DoFinallyObserver
//DoFinallyObserver.java @Override public void onError(Throwable t) { runFinally(); } @Override public void onComplete() { runFinally(); } @Override public void dispose() { runFinally(); } void runFinally() { onFinally.run(); }
能够看到与它所对应的DoFinallyObserver的onError,onComplete,dispose方法的执行线程有关,这三个方法的执行线程又受到上游的observeOn的影响.若是没有observeOn,则会受到最上游的observable.subscribeActual方法影响.
对应ObservableDoOnEach
和DoOnEachObserver
//DoOnEachObserver.java @Override public void onError(Throwable t) { onError.accept(t); }
和自身对应的observer.onError所在线程保持一致.
对应ObservableDoOnEach
和DoOnEachObserver
//DoOnEachObserver.java @Override public void onNext(T t) { onNext.accept(t); }
和自身对应的observer.onNext所在线程保持一致.
包io.reactivex.functions
下的接口类通常用于处理上游数据而后往下传递.这些接口类的方法通常在对应的observer.onNext中调用.因此他们的线程保持一致.
subscribeOn由下往上逐级切换Observable.subscribe的执行线程,不受observeOn影响,也不受具备默认指定线程的非建立型操做符影响,可是会被更上游的subscribeOn夺取线程切换的权利,直到最上游.若是最上游的建立型操做符也有默认执行线程,那么任何一个subscribeOn的线程切换不起做用.subscribeOn由下向上到达最上游后,而后由上往下影响下游的observer的执行线程.遇到observeOn会被夺取线程切换的权利.observeOn影响的是下游的observer的执行线程,由上往下,遇到另外一个observeOn会移交线程控制权力,遇到指定默认线程非建立型的操做符,要视具体状况对待.