RxJava2.0(四)线程之间切换的内部原理

基本代码

来看一下基本代码:java

Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);
            e.onComplete();
        }).subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(i -> System.out.println("onNext : i= " + i));

复制代码

很简单,即订阅时将task交给子线程去作,而数据的回调则在Android主线程中执行。ide

1、subscribeOn()

点击查看源码:学习

public final Observable<T> subscribeOn(Scheduler scheduler) {
        //非空判断和hook
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

复制代码

实际上这个方法返回了一个ObservableSubscribeOn对象。咱们有理由猜想这个ObservableSubscribeOn应该和上文的ObservableMap及ObservableDoOnEach类似,都是Observable的一个包装类(装饰器):ui

//1.ObservableSubscribeOn也是Observable的一个装饰器
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
       //2.存储上游的ObservableSource和调度器
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //3.new 一个SubscribeOnObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        //4.回调方法,这说明下游的onSubscribe回调方法所在线程和线程调度无关
        // 是订阅时所在的线程
        s.onSubscribe(parent);

        //5.当即执行线程调度
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}

复制代码

前两步咱们不须要 再多解释,直接看第三点,咱们看看SubscribeOnObserver这个类:this

SubscribeOnObserver

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        //下游的Observer
        final Observer<? super T> actual;
        //保存上游的Disposable,自身dispose时,连同上游一块儿dispose
        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }


复制代码

相似Observable和ObservableMap,SubscribeOnObserver一样是Disposable和Observer的一个装饰器,提供了对下游数据的传递,以及将task dispose的接口。spa

第4步咱们以前就讲过了,直接看第5步:线程

//5.当即执行线程调度
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
复制代码

咱们看看SubscribeTask这个类:code

SubscribeTask

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

复制代码

难以置信的简单,SubscribeTask 仅仅是一个Runnable 接口的实现类而已,经过将SubscribeOnObserver做为参数存起来,在run()方法中添加了上游Observable的被订阅事件,就没有了别的操做,cdn

接下来咱们看一下scheduler.scheduleDirect(SubscribeTask)中的代码:server

public abstract class Scheduler {
    //...
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        // Worker 自己就是Disposable 的实现类
        // 请注意, createWorker()所建立的worker,
        // 实际就是Schdulers.io()所提供的IoScheduler所建立的worker
        final Worker w = createWorker();

        //hook相关
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        //即 worker.schedule(task, 0, TimeUnit.NANOSECONDS): 当即执行task
        w.schedule(task, delay, unit);

        return task;
    }
    //...
}

复制代码

咱们不要追究过深,咱们看一下这个createWorker方法的注释说明:

/** * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions. * 检索或建立一个新的{@link Scheduler.Worker}表示一系列的action * * When work is completed it should be unsubscribed using {@link Scheduler.Worker#dispose()}. * 当work完成后,应使用{@link Scheduler.Worker#dispose()}取消订阅。 * * Work on a {@link Scheduler.Worker} is guaranteed to be sequential. * {@link Scheduler.Worker} 上面的work保证是顺序执行的 */

复制代码

如今咱们知道了:咱们经过调用subscribeOn()传入Scheduler,当下游ObservableSource被订阅时(请注意,订阅顺序是由下到上的),距离最近的线程调度subscribeOn()方法中,保存的Scheduler会建立一个worker(对应相应的线程,本文中为IoScheduler),在其对应的线程中,当即执行task

屡次subscribeOn()

如今考虑一个问题,假如在咱们的代码中,屡次使用了subscribeOn()代码,到线程会怎么处理呢?

上文已经讲到了,无论咱们怎么经过subscribeOn()方法切换线程,因为订阅执行顺序是由下到上,所以当最上游的ObservableSource被订阅时,所在线程固然是距离上游最近的subscribeOn()所提供的线程,即最终Observable老是在第一个subscribeOn()所在的线程中执行。

2、observeOn()

先看observeOn()内部,果真是hook+Observable的包装类:

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");

        //实例化ObservableObserveOn对象并返回
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

复制代码

再看ObservableObserveOn:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        //1.相关依赖注入
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //2.建立主线程的worker
            Scheduler.Worker w = scheduler.createWorker();
            //3.上游数据源被订阅
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

复制代码

和subscribeOn()不一样的是,咱们并非当即在对应的线程执行task,而是将对应的线程(其实是worker)做为参数,实例化ObserveOnObserver并存储起来。

当上游的数据传递过来时,ObserveOnObserver执行对应的方法,好比onNext(T),再切换到对应线程中,并交由下游的Observer去接收:

ObserveOnObserver

ObserveOnObserver中代码极多,咱们简单了解原理后,以onNext(T)为例:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {

        //...省略其余代码
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        //队列
        SimpleQueue<T> queue;

       @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            //将数据存入队列
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            } 
            //对应线程取出数据并交由下游的Observer
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
         //...省略其余代码
}

复制代码

屡次observerOn()

由上文得知,与subscribeOn()相反,observerOn()操做会将切换到对应的线程,而后交由下游的Observer处理,所以observerOn()仅对下游的Observer生效,而且,若是屡次调用,observerOn()的线程调度会持续到下一个observerOn()操做以前。

总结

subscribeOn()

  • 订阅顺序当从下到上,上游的ObservableSource被订阅时,先切换线程,而后当即执行task;

  • 当存在多个subscribeOn()方法时,仅第一个subscribeOn()有效。

observerOn()

  • 订阅顺序当从下到上,上游的ObservableSource被订阅时,会将对应的worker建立并做为构造参数存储在Observer的装饰器中,并不会当即切换线程;

  • 当数据由上游发送过来时,先将数据存储到队列中,而后切换线程,而后在新的线程中将数据发送给下游的Observer;

  • 当存在多个observerOn()方法时,仅对距下游下一个observerOn()以前的observer有效

有兴趣能够关注个人小专栏,学习更多知识:小专栏

相关文章
相关标签/搜索