经过前一篇的RxJava2 是如何实现线程切换的 (上)咱们已经知道了在RxJava中,subscribeOn 将上游线程切换到指定的子线程是如何实现的。这里就接着来看,observeOn 是如何将下游线程切换到指定线程的。java
这里能够经过UML图简单回顾一下subscribeOn的原理。git
经过 subscribeOn 咱们完成了如下操做:github
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
复制代码
将真正的 subscribe 操做安置在了SubscribeTask这样个一个Runnable当中,这个 Runnable 将由scheduler 这个调度器负责启动,所以就把上游操做放到了 scheduler 所在的线程中。app
简单回顾完 subscribeOn 以后,咱们就来看看 observeOn 是如何工做的。ide
其实,了解 subscribeOn 的原理以后,再来看 observeOn 就简单多了,类的命名及实现思路都有不少类似之处,能够对照着理解。函数
RxJava的代码写的很是巧妙,能够说是百读不厌,能够学习的地方特别多。为了不陷入只见树木不见森林的噩梦,咱们就带着如下问题去探索 observeOn 的奥秘。oop
private void multiThread() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("This msg from work thread :" + Thread.currentThread().getName());
sb.append("\nsubscribe: currentThreadName==" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: s= " + s);
}
});
}
复制代码
咱们仍是以这段代码为例,来看看 observeOn 的工做原理。这里经过observeOn(AndroidSchedulers.mainThread())将下游线程切换到了咱们很是熟悉的 Android UI 线程。这样就能够确保咱们在下游全部的操做都是在 UI 线程中完成。这里和讨论 subscribeOn 同样,咱们就从这句代码出发,看看这背后到底发生了什么。post
有了上一篇的经验,咱们知道 AndroidSchedulers.mainThread() 必定去建立了某种类型的调度器,为了方便后面的叙述,这一次咱们先从调度器的建立提及,后面再看 observeOn() 的具体实现。学习
须要注意的是 AndroidSchedulers 并非 RxJava 的一部分,是为了在 Android 中方便的使用 RxJava 而专门设计的一个调度器实现,源码RxAndroid 设计很是巧妙;使用前记得在gradle文件中配置依赖。gradle
下面就来看看 AndroidSchedulers.mainThread() 这个咱们很是熟悉的 Scheduler 是如何建立的。
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
}
复制代码
这里咱们能够认为,当调用AndroidSchedulers.mainThread() 时,返回了一个HandlerScheduler 的实例,而这个实例使用到了咱们很是熟悉的 Handler。那么重点就来到HandlerScheduler 了。
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
复制代码
这个类虽然很简单,可是设计很是巧妙。
首先 HandlerScheduler 是一个 Scheduler ,经过构造函数他获取到了主线程所在的 Handler实例。而在他的 createWorker() 方法中,他又经过这个 Handler 实例建立了一个HandlerWorker 的实例,这个HandlerWorker 本质上就是一个 Worker。在他的 schedule 方法中,建立了一个 ScheduleRunnable 对象,并会把这个Runnable对象经过 handler 的 sendMessageDelayed 方法发送出去,而咱们知道这个 Handler 是主线程,这样在下游中,就把任务从某个子线程转移到了UI线程。
ScheduleRunnable 不但实现了 Runnable ,并且实现了咱们看到过无数次的 Disposable 。
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
复制代码
这样,正确状况下 run 方法会正常执行线程中的任务,而一旦 disposable 对象执行了dispose()方法,那么 handler.removeCallbacks(this),就可确保在 handler 的 dispatchMessage 方法中,不会在执行任何操做,从而达到了 dispose 的效果。
下面就来看看 Observable 中的 observeOn 方法
Observable.java --- observeOn
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
这个方法的实现和 subscribeOn 的实现很是类似,多了两个参数 delayError 和 buffersize 。 buffersize 能够认为是RxJava内部的一个静态变量,默认状况下他的值是128。经过咱们以前的经验,这里能够把 observeOn 的过程简化以下:
new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)
复制代码
也就是说 observeOn 这个操做符给咱们返回了一个 ObservableObserveOn 对象。很容易想到他也是一个 Observeable。那么咱们就去看看这个 ObservableObserveOn 究竟是什么?咱们最关心的 subscribeActual 方法他又是怎样实现的。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
复制代码
和 ObservableSubscribeOn 同样,他也继承了 AbstractObservableWithUpstream ,这样他也是一个拥有上游的 Observeable,他的构造函数很简单,没什么能够说。这里咱们重点关注一下 subscribeActual 方法的实现。这里咱们的使用的Scheduler 实例是 AndroidSchedulers.mainThread(),所以就按 else的逻辑分析。
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
复制代码
经过 scheduler.createWorker() 建立了 Worker 这个对象。这里结合以前对 AndroidSchedulers.mainThread() 的分析,此处的 worker 对象是就是一个持有主线程 handler 引用的 Worker。
接着用这个worker又建立了一个ObserveOnObserver对象。看看这个类的实现。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { ....}
复制代码
这个类功能很是强大,首先是一个 Observer ,同时也是一个Runnable,而且还继承了 BasicIntQueueDisposable(保证原子性、拥有操做队列功能和 Disposable功能)。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
复制代码
咱们关注一下这行代码,根据以前的说法这里的 source 是其父类(AbstractObservableWithUpstream)中的成员变量,也就是说是上游,那么当前ObservableObserveOn 的上游是谁呢? 就是咱们上一篇所说的 ObservableSubscribeOn 。
所以,当这里开始执行订阅方法 subscribe() 后,将以以下顺序响应:
Observable.subscribe--->Observable.subscribeActual---> ObservableObserveOn.subscribeActual---> ObservableSubscribeOn.subscribeActual--->ObservableCreate.subscribeActual
这些方法的参数均为 observer,经过层层回调,最后的 subscribeActual(Observer<? super T> observer) 执行时,这个 observer 持有以前几个 observer 的引用。
咱们再看一下 ObservableCreate.subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
能够看到,这里首先会触发 observer.onSubscribe ,咱们再看一下 ObservableSubscribeOn.subscribeActual
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
复制代码
好了,这样咱们又回到了原点:
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
复制代码
回到了最初的 Observer:ObserveOnObserver
这个 ObserveOnObserver 持有咱们一开始建立的observer,也就是一个Consumer对象。
下面就来看看这个 ObserveOnObserver
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
复制代码
这里指的注意的一点 ,actual 其实就是observer
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
// 现阶段,咱们用到的Disposable 都是单个的,暂时不讨论其
//为QueueDisposable的状况
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
复制代码
在ObservableCreate.subscribeActual 中咱们知道,当执行subscribe 方法后,首先会执行 observer的 onSubscribe 方法。这里的实现很是简单,就是建立了一个queue,并触发了这个 observer 本身的 onSubscribe 方法。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
复制代码
在 onNext 中会执行 scheule() 方法。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
复制代码
这个地方就有意思了,前面说过这里的 worker 是一个持有主线程handler 的Worker对象,当他的 schedule 执行时,就会把特定的线程任务经过Handler.postDelay 方法转移到主线中去执行 。
那么这里的this 又是什么呢?前面咱们说过,ObserveOnObserver 这个类功能很是强大,他是一个Runnable,那么这里就是执行他本身的run方法喽,咱们赶忙看看。
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
复制代码
这里有一个参数 outputFused 他默认是false,至于他何时为true,不做为这里讨论的重点。
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
复制代码
这里大概就是经过一个死循环,不断从 onSubscribe 方法中建立的队列中取出事件,执行observer 的 onNext方法。而当为例为空时,就会执行worker.dispose 取消整个事件流,同时从Handler中移除全部消息。
最后在看一眼 onComplete ,onError 和整个相似
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
复制代码
能够看到这里的处理也很简单,done 设置为 true .这样最后便完成了下游事件的执行。
好了,因为一些无以诉说的缘由,经历了好久终于把 RxJava 线程切换的下篇给完成了。