Android进阶知识:RxJava相关

1. 前言

RxJava核心功能是一个用来完成异步操做的库,相对于其它异步操做的方法,RxJavaAPI使用更加的简洁。而且RxJava中还提供了不少功能强大的操做符,帮助咱们解决不少本来复杂繁琐的代码逻辑,提升了代码质量。RxJava的实现是基于观察者模式,观察者模式中如下有三个比较重要的概念:git

  1. 被观察者(Observable)
  2. 观察者(Observer)
  3. 订阅(subscribe)

被观察者是事件的发起者,被观察者与观察者创建订阅关系后,被观察者发送事件,观察者才能接收到事件。github

2. 基础使用

RxJava的基础使用也很简单,分为三个步骤,分别是建立被观察者,建立观察者和创建订阅关系,具体代码以下。bash

// 1. 建立被观察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("string1");
                emitter.onNext("string2");
                emitter.onNext("string3");
                emitter.onComplete();
            }
        });
        // 2. 建立观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s);

            }

            @Override
            public void onError(Throwable e) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");

            }

            @Override
            public void onComplete() {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
            }
        };
        Log.d(getClass().getName(), Thread.currentThread().getName() + " observable:"+observable.getClass().getName());
        // 3. 创建订阅关系
        observable.subscribe(observer);
复制代码

运行日志: app

3. 订阅源码流程

本文中全部源码基于RxJava22.2.11版本。首先来看看这个基本的订阅流程源码是怎么实现的。异步

3.1 建立被观察者

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码

使用RxJava能够经过Observablecreate方法建立一个被观察者对象。create方法从参数中传入一个ObservableOnSubscribe类型的source,而后方法中先校验了source是否为空,接着将传入的source封装成一个ObservableCreate对象,而后调用了RxJavaPlugins.onAssembly方法返回建立的好的Observable。接着进入onAssembly方法查看。async

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
复制代码

onAssembly方法中首先是一个Hook实现,这里能够理解为一个代理。能够看到这里先判断onObservableAssembly是否为空,为空就直接返回传入的source,不然再调用apply方法。这里能够继续跟踪一下onObservableAssemblyide

@SuppressWarnings("rawtypes")
@Nullable
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;

/**
  * Sets the specific hook function.
  * @param onObservableAssembly the hook function to set, null allowed
  */
@SuppressWarnings("rawtypes")
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
复制代码

它是RxJavaPlugins中的成员变量,默认为空,而且提供了一个set方法来设置它。由于默认为空,因此默认返回的就是传入的source。这里的代理默认是不会对Observable作什么操做,若是须要有特殊的需求能够调用set方法实现本身的代理。而默认返回的source类型为ObservableCreate对象也实现了Observable接口。函数

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ......
}
复制代码

3.2 建立观察者

public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();
}
复制代码

观察者Observer是一个接口,其中提供了一些方法,使用时建立接口的实现,并根据需求在方法中作本身的实现。oop

3.3 创建订阅关系

创建订阅关系调用了Observablesubscribe方法。ui

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
           ......
        } catch (Throwable e) {
           ......
        }
    }
复制代码

方法中仍是先判断了传入参数observer是否为空,接着仍是一个Hook实现,这里就不细究了,得到Hook返回的observer后再次判断是否为空,以后调用了subscribeActual方法。

protected abstract void subscribeActual(Observer<? super T> observer);
复制代码

ObservablesubscribeActual方法是个抽象方法,以前看过这里的Observable实际实现是个ObservableCreate对象,因此再进入ObservableCreate类查看对应方法。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
复制代码

ObservableCreate中的subscribeActual方法中先建立了一个CreateEmitter发射器对象,并将observer对象传入。接着调用了observeronSubscribe方法,此时观察者的onSubscribe方法执行。最后调用了sourcesubscribe方法。

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("string1");
                emitter.onNext("string2");
                emitter.onNext("string3");
                emitter.onComplete();
            }
        });
复制代码

这个source就是在create方法中传入的ObservableOnSubscribe。它的subscribe方法中经过调用ObservableEmitter的方法发送事件,这里的ObservableEmitter就是以前建立的CreateEmitter对象,因此再来进一步看看它其中的方法。

CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
复制代码

CreateEmitter的构造函数接收了观察者对象,而后在调用onNext方法时先作了空判断,再对isDisposed进行取消订阅的判断,以后调用了observeronNext方法,也就是观察者的onNext方法。一样的onComplete中最终也是调用了observeronComplete方法。至此RxJava中的基本订阅流程的源码就梳理完了。

4. 线程切换

RxJava中有个很重要的功能,就是能方便的切换线程,来看下它的使用,仍是以前基础使用中的例子进行修改。

Observable<String> observable0 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("string1");
                emitter.onNext("string2");
                emitter.onNext("string3");
                emitter.onComplete();
            }
        });
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
            }
            @Override
            public void onNext(String s) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");
            }
            @Override
            public void onComplete() {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
            }
        };
        Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread());
        Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName());
        Observable<String> observable2 = observable1.observeOn(AndroidSchedulers.mainThread());
        Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName());
        observable2.subscribe(observer);
复制代码

被观察者和观察者的建立和以前同样,在创建订阅关系时调用subscribeOnobserveOn方法进行线程的切换。这里每一个方法返回的都是Observable类型,因此能够采用链式调用,这也是RxJava的一个特色,可是这里没有采用这种写法,而是将其拆分开来写而且日志打印出每一个Observable的具体类型,这是为了方便以后源码理解。 运行结果日志:

4.1 subscribeOn

Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread());
Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName());
observable1.subscribe(observer);
复制代码

运行结果:

先只调用 subscribeOn方法运行查看结果,发现不只被观察者发射事件运行在了子线程,观察者接收事件也运行在子线程,那么进入 subscribeOn方法查看它的实现。

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
复制代码

能够看到subscribeOn方法和subscribe方法有些相似。首先是判断传入的scheduler是否为空,而后一样调用RxJavaPlugins.onAssembly方法,此次构建了一个ObservableSubscribeOn对象返回。而subscribeOn方法以后仍是调用了subscribe方法,根据以前的分析,subscribe方法最终会调用到subscribeActual方法,不过此时的subscribeActual方法再也不是ObservableCreate中的而是ObservableSubscribeOn中的subscribeActual方法。

@Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
复制代码

ObservableSubscribeOnsubscribeActual方法中流程和以前的也很相似,此次是先建立了一个SubscribeOnObserver对象,将观察者对象传入,接着一样先调用了observer.onSubscribe方法,而后将传入的SubscribeOnObserver封装入了一个SubscribeTask对象中,接着调用了scheduler.scheduleDirect方法再将返回结果获得的Disposable设置到SubscribeOnObserver中。下面一个方法一个方法看。首先是建立SubscribeTask

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
复制代码

SubscribeTaskObservableSubscribeOn的内部类,其实现很简单就是实现了一个Runnable接口,构造方法中传入了SubscribeOnObserver对象,在其run方法中调用了ObservableSubscribeOn中的成员变量sourcesubscribe方法。这个source是在建立ObservableSubscribeOn时传入的,根据前面的代码能够找到是在subscribeOn方法中建立的对象而且这个source对应传入的是当前这个Observable对象即经过Observable.create得到的被观察者对象,其实现以前看过是一个ObservableCreate因此这里就和以前同样又会走到了其父类Observablesubscribe方法中,继而调用ObservableCreatesubscribeActual方法,以后最终会调用到观察者的对应onNext等方法,不过此时的观察者不直接是在使用时建立传入的Observer,而是以前看到的SubscribeOnObserver类型,不过其中的onNext等方法仍是调用了在使用时建立传入的Observer的对应方法。

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;
        final AtomicReference<Disposable> upstream;
        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }
        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }
        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }
        @Override
        public void onComplete() {
            downstream.onComplete();
        }
        ......
    }
复制代码

下面接着看到scheduleDirect这个方法,在建立好SubscribeTask以后调用了scheduleDirect方法。这里的scheduler就是subscribeOn中传入的,对应开始例子中的Schedulers.newThread

public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
// 静态成员变量NEW_THREAD
static final Scheduler NEW_THREAD;

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
复制代码

进入Schedulers.newThread一步步跟踪,看到newThread方法返回静态成员变量中的NEW_THREAD,而NEW_THREAD又是经过NewThreadTask建立。

static final class NewThreadTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return NewThreadHolder.DEFAULT;
    }
}
static final Scheduler DEFAULT = new NewThreadScheduler();
复制代码

继续跟踪查看发现NewThreadTask实际是实现了Callable接口,其call方法中返回了静态内部类中的NewThreadHolder.DEFAULT。这个DEFAULT的实现类型为NewThreadScheduler。至此终于找到了咱们传入的Scheduler的真正实现类。因而继续看其scheduleDirect方法。

public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}
复制代码

scheduleDirect方法是在其父类中实现的,看到其中进而调用了同名重载方法,方法中首先是调用createWorker方法建立一个Worker。这个方法的实现就是在NewThreadScheduler中了。

public Worker createWorker() {
    return new NewThreadWorker(threadFactory);
}
复制代码

createWorker方法中只作了一件事就是建立返回了一个NewThreadWorker

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    ......
}
复制代码

NewThreadWorker中看到建立了一个线程池,再回到scheduleDirect方法,建立完Worker后将传入的RunnableSubscribeTask进行一个装饰获得新的Runnable对象。接着将Worker和新的Runnable封装到一个DisposeTask对象中。

static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
        @NonNull
        final Runnable decoratedRun;
        @NonNull
        final Worker w;
        @Nullable
        Thread runner;
        DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }
        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }
    ......
}
复制代码

DisposeTask一样实现了Runnable接口,在run方法中调用了从构造传入的decoratedRunrun方法执行任务。回到最后一步,调用Workerschedule方法,这里就对应的NewThreadWorkerschedule方法。

public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }
复制代码

schedule方法中又进一步调用了其scheduleActual方法。

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }
复制代码

scheduleActual方法里看到又将decoratedRunDisposableContainer封装成ScheduledRunnable最后将这个ScheduledRunnable 交给构造函数中建立的线程池去运行,最终就会执行到前面看过的SubscribeTask中的run方法完成订阅逻辑,调用观察者的onNext等方法。到这里就看出最终的source.subscribe是会经过线程池切换到子线程中去执行了。

经过查看subscribeOn方法源码能够发现,方法里其实是在前一个建立的ObservableCreate外面包了一层,把它包成一个ObservableSubscribeOn对象,一样的原先的Observer也被包了一层包成一个SubscribeOnObserver对象,而线程切换的工做是由Scheduler完成的。

4.2 observeOn

接着再来看看切换回主线程的方法observeOn,仍是先修改使用代码,查看运行日志。

Observable<String> observable2 = observable0.observeOn(AndroidSchedulers.mainThread());
Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName());
observable2.subscribe(observer);
复制代码

运行日志:

接着仍是进入来看源码。

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

 public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码

这里看到observeOn方法里调用了重载方法,方法中仍是同一个套路,不过这里建立的又是另外一个对象ObservableObserveOn了。根据前面的经验这里就又是将前一个Observable传递到ObservableObserveOn中的成员变量source上,这里看到就是构造函数中的第一个参数。接着仍是会调用subscribe与观察者创建订阅关系进而会执行到ObservableObserveOn对象的subscribeActual方法。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
复制代码

subscribeActual方法中判断了scheduler的类型,这里的scheduler就是由AndroidSchedulers.mainThread()传入的,因而先来看一下这个方法。

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
    @Override public Scheduler call() throws Exception {
        return MainHolder.DEFAULT;
    }
});

private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
复制代码

mainThread开始看,发现代码调用逻辑和以前的Schedulers.newThread方法相似,最终会返回一个HandlerScheduler而这个Scheduler中的Handler则是主线程的Handler,看到这里就能猜测到了,后面观察者的对应方法必定是由这个Handler来切换到主线程执行的。回到subscribeActual方法。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
复制代码

这里判断完类型会走else中的方法首先仍是会调用HandlerSchedulercreateWorker方法建立一个Worker

@Override
public Worker createWorker() {
    return new HandlerWorker(handler, async);
}
复制代码

这里是个HandlerWorker其中具体方法后面再看。接着上面建立完Worker后一样仍是同样调用source.subscribe建立了一个ObserveOnObserver对象传入。这里的source就仍是以前的ObservableCreate,因此这里仍是会调用ObservableCreate中的subscribeActual方法。

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

ObservableCreate中的subscribeActual方法中的逻辑以前看过,不过此时传入的observer仍然再也不是在使用时建立的观察者对象了,而是传过来的ObserveOnObserver对象,此时建立的CreateEmitter中的observer也就是这个ObserveOnObserver对象。和以前逻辑同样,接着就会调用observeronNext等方法,此时调用的便是ObserveOnObserver中的onNext等方法。因此进入ObserveOnObserver查看。

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
        schedule();
}

@Override
public void onComplete() {
    if (done) {
        return;
    }
    done = true;
    schedule();
}

void schedule() {
     if (getAndIncrement() == 0) {
         worker.schedule(this);
     }
}
复制代码

查看ObserveOnObserver中的代码会发现onNext方法中先将传入的参数放入了一个队列,而后不管是onNext仍是onComplete方法最后都调用了schedule方法,进而再进入查看,发现schedule方法中又调用了worker.schedule方法。这里的worker就是以前建立的HandlerWorker,这时再来看它的schedule方法。

public Disposable schedule(@NonNull Runnable run) {
   return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
复制代码

单个参数schedule方法是在其父类中的,而这个方法中又调用另外一个三个参数的schedule方法,这个方法父类中是抽象方法因此实现就在子类HandlerWorker里了。

@Override
        @SuppressLint("NewApi") // Async will only be true when the API is available to call.
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
	    // 判断是否取消订阅
            if (disposed) {
                return Disposables.disposed();
            }
            run = RxJavaPlugins.onSchedule(run);
	    // 建立ScheduledRunnable
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
	    // 建立消息,并将主线程Handler和ScheduledRunnable
            Message message = Message.obtain(handler, scheduled);
            message.obj = this;
	    // 判断设置异步消息
            if (async) {
                message.setAsynchronous(true);
            }
	    // 发送消息执行callback
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            // 检查是否取消订阅
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
            return scheduled;
        }
复制代码

在子类的这个方法里在作了取消订阅的判断后将方法传入的RunnableHandler又封装到一个ScheduledRunnable对象中。接着建立了一个Message并将ScheduledRunnable放入Message,最后调用handler.sendMessageDelayed方法经过这个主线程的Handler执行这个ScheduledRunnable

最后来追溯下ScheduledRunnable到底执行了什么,不过猜也知道最后必定调用到观察者中的对应方法。

private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed; // Tracked solely for isDisposed().

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }
    ......
    }
复制代码

ScheduledRunnable中的run方法很简单就是调用了构造中传入的Runnablerun方法。而根据以前看过得建立ScheduledRunnable时传入的Runnable又是从scheduleDirect方法中传入的,而scheduleDirect方法中的Runnable又是从worker.schedule(this)方法时传入的,根据上下文代码发现这个this指代的是ObserveOnObserver对象,因而进一步进入它的run方法查看。

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        ......
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        ......
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
    ......    
    }
复制代码

能够看到run方法中判断了outputFused的真假,而后分别调用了drainFuseddrainNormal方法。这里的outputFused是与RxJava2中的背压处理相关暂时先无论,根据方法名也能知道正常调用会执行drainNormal方法,因而直接来看drainNormal方法。

void drainNormal() {
            int missed = 1;
            // 存放onNext传入的事件对象队列
            final SimpleQueue<T> q = queue;
            // 传入的观察者对象
            final Observer<? super T> a = downstream;
            // 循环check事件是否完成或者发生错误
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                for (;;) {
                    boolean d = done;
                    T v;
                    try {
                        // 从队列中取出发送事件传入的对象
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
                    // 再次判断是否完成或者发生错误
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    // 判断队列中取出的发送事件传入的对象v是否为空
                    if (empty) {
                        break;
                    }
                    // 执行观察者对象的onNext方法
                    a.onNext(v);
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
复制代码

drainNormal方法中先经过checkTerminated方法校验发送事件是否完成或者发生异常,接着从队列中取出事件对象,再次判断是否完成或者发生错误和取出的对象是否为空,没有问题的话就会执行观察者的onNext方法。而发送完成和出现异常的方法则是在checkTerminated方法处理。

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (disposed) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (delayError) {
                    if (empty) {
                        disposed = true;
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        disposed = true;
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        disposed = true;
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
复制代码

checkTerminated方法里根据delayError判断是否设置了超时的错误,接着再根据得到的错误e是否为空再决定调用的是观察者的onError()方法仍是onComplete方法。至此observeOn切换线程的流程也梳理结束。

5. map操做符

RxJava中有不少功能强大的操做符,经过使用这些操做符,能够很容易的解决代码编写时遇到的一些复杂繁琐的问题。这里就用map操做符来做为一个例子,来看看操做符是怎样工做的。首先仍是来了解map操做符的使用方法和做用。

final Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("5");
                emitter.onComplete();
            }
        });
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
            }

            @Override
            public void onNext(Integer i) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+i);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");
            }
            @Override
            public void onComplete() {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
            }
        };
        Observable<Integer> mapObservable = observable.map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return Integer.parseInt(s);
            }
        });
        Log.d(getClass().getName(), Thread.currentThread().getName() + " mapObservable:"+mapObservable.getClass().getName());
        mapObservable.subscribe(observer);
复制代码

运行日志:

map操做符做用是能够将被观察者发送事件的数据类型转换成其余的数据类型。它的使用方法很简单,例如上面这个例子就将一开始发送的String类型转换成观察者接收到的Integer类型。下面开始看map方法的源码。

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
复制代码

看到map方法中依旧仍是一样的套路,经过RxJavaPlugins.onAssembly方法返回一个被观察者对象,只不过此次构建传入的类型又是另外一个ObservableMap类型的对象。订阅的流程前面已经看过了,这里和以前的同样最终会走到ObservableMapsubscribeActual方法,因此直接来看这个方法。

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}
复制代码

ObservableMapsubscribeActual方法里看到很熟悉仍是会调用source.subscribe方法,只是这里传入的Observer对象是一个MapObserver对象。接下来的逻辑又和以前同样,根据以前的经验source.subscribe方法最终会调用ObserveronNext方法,因此接下来直接来看MapObserveronNext方法。

public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
复制代码

MapObserveronNext方法里的逻辑很简单,在作了一些的判断后调用mapper.apply(t)方法得到类型转换后的事件传递对象,最后就会调用观察者的downstream.onNext方法,这里的downstream就是订阅方法传入的观察者对象。跟踪mapper能够找到,它是从MapObserver构造时传入的一个Function类型,便是在使用map操做符时传入的那个Function对象,又由于在使用时实现了Functionapply方法完成了数据的类型转换逻辑,因此这里调用mapper.apply(t)方法就能够得到到转换后的数据。

6. 总结

以上就是关于RxJava源码工做流程的相关总结,总而言之,观察者模式仍是其核心设计思想。除此以外,经过源码阅读还发现,不管在线程切换方面仍是其它功能的操做符的实现,根本上来讲都是在其原有的被观察者或观察者基础上包装成一个新的对象,功能逻辑由新对象中的方法来实现完成。

相关文章
相关标签/搜索