默认状况下,不作任何线程处理,Observable和Observer处于同一线程中。若是想要切换线程,则能够使用subscribeOn()和observeOn()。bash
subscribeOn经过接受一个Scheduler参数,来指定对数据的处理运行在特定的线程调度器Scheduler上。 若屡次执行subscribeOn
,则只有一次起做用。app
单击subscribeOn()的源码能够看到,每次调用subscribeOn()都会建立一个ObservableSubscribeOn对象。async
/**
* Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
*
* @param scheduler
* the {@link Scheduler} to perform subscription actions on
* @return the source ObservableSource modified so that its subscriptions happen on the
* specified {@link Scheduler}
* @see #observeOn
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
复制代码
ObservableSubscribeOn真正发生订阅的方法是subscribeActual(Observer<? super T> observer)。ide
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
复制代码
其是,SubscribeOnObserver是下游Observer的OnSubscribe(Disposable disposable)方法ui
s.onSubscribe(parent);
复制代码
而后,将子线程的操做加入Disposable管理中,加入Disposable后能够方便上下游的统一管理。在这里,已经调用了对应Scheduler的scheduleDirect()方法。scheduleDirect()传入的是一个Runnable。this
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
复制代码
此时,已经再对应的Scheduler线程中运行了:spa
source.subscribe(parent);
复制代码
在RxJava的链式操做中,数据的处理是自下而上的,这点与数据发射正好相反。若是屡次调用subscribeOn,则最上面的线程切换最晚执行,因此就变成了只有第一次切换线程才有效。线程
observeOn一样接受一个Scheduler参数,用来指定下游操做运行在特定的线程调度器Scheduler上。code
若屡次执行observeOn,则每次都起做用,线程会一直切换。orm
单击observeOn的源码能够看到,每次调用observeOn()都会建立一个ObservableObserveOn对象。
/**
* Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with an unbounded buffer with {@link Flowable#bufferSize()} "island size".
*
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @return the source ObservableSource modified so that its {@link Observer}s are notified on the specified
* {@link Scheduler}
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
ObservableObserveOn真正发生订阅的方法是subscribeActual(Observer<? super T> observer)。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//若是scheduler是TrampolineScheduler,则上游事件和下游事件会当即产生订阅。
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//不然scheduler会建立本身的Worker,而后上游事件和下游事件产生订阅,生成一个ObserveOnObserver对象,封装了下游真正的Observer。
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
复制代码
若是scheduler是TrampolineScheduler,则上游事件和下游事件会当即产生订阅。
若是不是TrampolineScheduler,则scheduler会建立本身的Worker,而后上游事件和下游事件产生订阅,生成一个ObserveOnObserver对象,封装了下游真正的Observer。
ObserveOnObserver
是ObservableObserveOn
的内部类,实现了Observer、Runnable接口。与SubscribeOnObserver不一样的是,SubscribeOnObserver
实现了Observer、DIsposable接口。
在ObserveOnObserver的onNext()中,schedule()执行了具体调度的方法。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
复制代码
先写到这儿,后续补充