RxJava之Scheduler (三) ——线程调度

默认状况下,不作任何线程处理,Observable和Observer处于同一线程中。若是想要切换线程,则能够使用subscribeOn()和observeOn()。bash

1.subscribeOn

subscribeOn经过接受一个Scheduler参数,来指定对数据的处理运行在特定的线程调度器Scheduler上。 若屡次执行subscribeOn,则只有一次起做用app

单击subscribeOn()的源码能够看到,每次调用subscribeOn()都会建立一个ObservableSubscribeOn对象。async

/**
     * Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
     *
     * @param scheduler
     *            the {@link Scheduler} to perform subscription actions on
     * @return the source ObservableSource modified so that its subscriptions happen on the
     *         specified {@link Scheduler}
     * @see #observeOn
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
复制代码

ObservableSubscribeOn真正发生订阅的方法是subscribeActual(Observer<? super T> observer)。ide

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
复制代码

其是,SubscribeOnObserver是下游Observer的OnSubscribe(Disposable disposable)方法ui

s.onSubscribe(parent);
复制代码

而后,将子线程的操做加入Disposable管理中,加入Disposable后能够方便上下游的统一管理。在这里,已经调用了对应Scheduler的scheduleDirect()方法。scheduleDirect()传入的是一个Runnable。this

parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
复制代码

此时,已经再对应的Scheduler线程中运行了:spa

source.subscribe(parent);
复制代码

在RxJava的链式操做中,数据的处理是自下而上的,这点与数据发射正好相反。若是屡次调用subscribeOn,则最上面的线程切换最晚执行,因此就变成了只有第一次切换线程有效线程

2.observeOn

observeOn一样接受一个Scheduler参数,用来指定下游操做运行在特定的线程调度器Scheduler上。code

若屡次执行observeOn,则每次都起做用,线程会一直切换。orm

单击observeOn的源码能够看到,每次调用observeOn()都会建立一个ObservableObserveOn对象。

/**
     * Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
     * asynchronously with an unbounded buffer with {@link Flowable#bufferSize()} "island size".
     *
     * <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
     * asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
     *
     * @param scheduler
     *            the {@link Scheduler} to notify {@link Observer}s on
     * @return the source ObservableSource modified so that its {@link Observer}s are notified on the specified
     *         {@link Scheduler}
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
复制代码

ObservableObserveOn真正发生订阅的方法是subscribeActual(Observer<? super T> observer)。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
    
        //若是scheduler是TrampolineScheduler,则上游事件和下游事件会当即产生订阅。
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //不然scheduler会建立本身的Worker,而后上游事件和下游事件产生订阅,生成一个ObserveOnObserver对象,封装了下游真正的Observer。
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
复制代码

若是scheduler是TrampolineScheduler,则上游事件和下游事件会当即产生订阅。

若是不是TrampolineScheduler,则scheduler会建立本身的Worker,而后上游事件和下游事件产生订阅,生成一个ObserveOnObserver对象,封装了下游真正的Observer。

ObserveOnObserverObservableObserveOn的内部类,实现了Observer、Runnable接口。与SubscribeOnObserver不一样的是,SubscribeOnObserver实现了Observer、DIsposable接口

在ObserveOnObserver的onNext()中,schedule()执行了具体调度的方法。

@Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
复制代码

先写到这儿,后续补充

相关文章
相关标签/搜索