RxJava
核心功能是一个用来完成异步操做的库,相对于其它异步操做的方法,RxJava
的API
使用更加的简洁。而且RxJava
中还提供了不少功能强大的操做符,帮助咱们解决不少本来复杂繁琐的代码逻辑,提升了代码质量。RxJava
的实现是基于观察者模式,观察者模式中如下有三个比较重要的概念:git
被观察者是事件的发起者,被观察者与观察者创建订阅关系后,被观察者发送事件,观察者才能接收到事件。github
RxJava
的基础使用也很简单,分为三个步骤,分别是建立被观察者,建立观察者和创建订阅关系,具体代码以下。bash
// 1. 建立被观察者 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe"); emitter.onNext("string1"); emitter.onNext("string2"); emitter.onNext("string3"); emitter.onComplete(); } }); // 2. 建立观察者 Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe"); } @Override public void onNext(String s) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s); } @Override public void onError(Throwable e) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onError"); } @Override public void onComplete() { Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete"); } }; Log.d(getClass().getName(), Thread.currentThread().getName() + " observable:"+observable.getClass().getName()); // 3. 创建订阅关系 observable.subscribe(observer); 复制代码
运行日志: markdown
本文中全部源码基于RxJava2
的2.2.11
版本。首先来看看这个基本的订阅流程源码是怎么实现的。app
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 复制代码
使用RxJava
能够经过Observable
的create
方法建立一个被观察者对象。create
方法从参数中传入一个ObservableOnSubscribe
类型的source
,而后方法中先校验了source
是否为空,接着将传入的source
封装成一个ObservableCreate
对象,而后调用了RxJavaPlugins.onAssembly
方法返回建立的好的Observable
。接着进入onAssembly
方法查看。异步
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } 复制代码
onAssembly
方法中首先是一个Hook
实现,这里能够理解为一个代理。能够看到这里先判断onObservableAssembly
是否为空,为空就直接返回传入的source
,不然再调用apply
方法。这里能够继续跟踪一下onObservableAssembly
。async
@SuppressWarnings("rawtypes") @Nullable static volatile Function<? super Observable, ? extends Observable> onObservableAssembly; /** * Sets the specific hook function. * @param onObservableAssembly the hook function to set, null allowed */ @SuppressWarnings("rawtypes") public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } RxJavaPlugins.onObservableAssembly = onObservableAssembly; } 复制代码
它是RxJavaPlugins
中的成员变量,默认为空,而且提供了一个set
方法来设置它。由于默认为空,因此默认返回的就是传入的source
。这里的代理默认是不会对Observable
作什么操做,若是须要有特殊的需求能够调用set
方法实现本身的代理。而默认返回的source
类型为ObservableCreate
对象也实现了Observable
接口。ide
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } ...... } 复制代码
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
复制代码
观察者Observer
是一个接口,其中提供了一些方法,使用时建立接口的实现,并根据需求在方法中作本身的实现。函数
创建订阅关系调用了Observable
的subscribe
方法。oop
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD ...... } catch (Throwable e) { ...... } } 复制代码
方法中仍是先判断了传入参数observer
是否为空,接着仍是一个Hook
实现,这里就不细究了,得到Hook
返回的observer
后再次判断是否为空,以后调用了subscribeActual
方法。
protected abstract void subscribeActual(Observer<? super T> observer);
复制代码
Observable
的subscribeActual
方法是个抽象方法,以前看过这里的Observable
实际实现是个ObservableCreate
对象,因此再进入ObservableCreate
类查看对应方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
ObservableCreate
中的subscribeActual
方法中先建立了一个CreateEmitter
发射器对象,并将observer
对象传入。接着调用了observer
的onSubscribe
方法,此时观察者的onSubscribe
方法执行。最后调用了source
的subscribe
方法。
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe"); emitter.onNext("string1"); emitter.onNext("string2"); emitter.onNext("string3"); emitter.onComplete(); } }); 复制代码
这个source
就是在create
方法中传入的ObservableOnSubscribe
。它的subscribe
方法中经过调用ObservableEmitter
的方法发送事件,这里的ObservableEmitter
就是以前建立的CreateEmitter
对象,因此再来进一步看看它其中的方法。
CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } 复制代码
CreateEmitter
的构造函数接收了观察者对象,而后在调用onNext
方法时先作了空判断,再对isDisposed
进行取消订阅的判断,以后调用了observer
的onNext
方法,也就是观察者的onNext
方法。一样的onComplete
中最终也是调用了observer
的onComplete
方法。至此RxJava
中的基本订阅流程的源码就梳理完了。
RxJava
中有个很重要的功能,就是能方便的切换线程,来看下它的使用,仍是以前基础使用中的例子进行修改。
Observable<String> observable0 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe"); emitter.onNext("string1"); emitter.onNext("string2"); emitter.onNext("string3"); emitter.onComplete(); } }); Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe"); } @Override public void onNext(String s) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s); } @Override public void onError(Throwable e) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onError"); } @Override public void onComplete() { Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete"); } }; Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread()); Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName()); Observable<String> observable2 = observable1.observeOn(AndroidSchedulers.mainThread()); Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName()); observable2.subscribe(observer); 复制代码
被观察者和观察者的建立和以前同样,在创建订阅关系时调用subscribeOn
和observeOn
方法进行线程的切换。这里每一个方法返回的都是Observable
类型,因此能够采用链式调用,这也是RxJava
的一个特色,可是这里没有采用这种写法,而是将其拆分开来写而且日志打印出每一个Observable
的具体类型,这是为了方便以后源码理解。 运行结果日志:
Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread()); Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName()); observable1.subscribe(observer); 复制代码
运行结果:
subscribeOn
方法运行查看结果,发现不只被观察者发射事件运行在了子线程,观察者接收事件也运行在子线程,那么进入
subscribeOn
方法查看它的实现。
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } 复制代码
能够看到subscribeOn
方法和subscribe
方法有些相似。首先是判断传入的scheduler
是否为空,而后一样调用RxJavaPlugins.onAssembly
方法,此次构建了一个ObservableSubscribeOn
对象返回。而subscribeOn
方法以后仍是调用了subscribe
方法,根据以前的分析,subscribe
方法最终会调用到subscribeActual
方法,不过此时的subscribeActual
方法再也不是ObservableCreate
中的而是ObservableSubscribeOn
中的subscribeActual
方法。
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
复制代码
ObservableSubscribeOn
的subscribeActual
方法中流程和以前的也很相似,此次是先建立了一个SubscribeOnObserver
对象,将观察者对象传入,接着一样先调用了observer.onSubscribe
方法,而后将传入的SubscribeOnObserver
封装入了一个SubscribeTask
对象中,接着调用了scheduler.scheduleDirect
方法再将返回结果获得的Disposable
设置到SubscribeOnObserver
中。下面一个方法一个方法看。首先是建立SubscribeTask
。
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } } 复制代码
SubscribeTask
是ObservableSubscribeOn
的内部类,其实现很简单就是实现了一个Runnable
接口,构造方法中传入了SubscribeOnObserver
对象,在其run
方法中调用了ObservableSubscribeOn
中的成员变量source
的subscribe
方法。这个source
是在建立ObservableSubscribeOn
时传入的,根据前面的代码能够找到是在subscribeOn
方法中建立的对象而且这个source
对应传入的是当前这个Observable
对象即经过Observable.create
得到的被观察者对象,其实现以前看过是一个ObservableCreate
因此这里就和以前同样又会走到了其父类Observable
的subscribe
方法中,继而调用ObservableCreate
的subscribeActual
方法,以后最终会调用到观察者的对应onNext
等方法,不过此时的观察者不直接是在使用时建立传入的Observer
,而是以前看到的SubscribeOnObserver
类型,不过其中的onNext
等方法仍是调用了在使用时建立传入的Observer
的对应方法。
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer<? super T> downstream; final AtomicReference<Disposable> upstream; SubscribeOnObserver(Observer<? super T> downstream) { this.downstream = downstream; this.upstream = new AtomicReference<Disposable>(); } @Override public void onNext(T t) { downstream.onNext(t); } @Override public void onError(Throwable t) { downstream.onError(t); } @Override public void onComplete() { downstream.onComplete(); } ...... } 复制代码
下面接着看到scheduleDirect
这个方法,在建立好SubscribeTask
以后调用了scheduleDirect
方法。这里的scheduler
就是subscribeOn
中传入的,对应开始例子中的Schedulers.newThread
。
public static Scheduler newThread() { return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD); } // 静态成员变量NEW_THREAD static final Scheduler NEW_THREAD; NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask()); 复制代码
进入Schedulers.newThread
一步步跟踪,看到newThread
方法返回静态成员变量中的NEW_THREAD
,而NEW_THREAD
又是经过NewThreadTask
建立。
static final class NewThreadTask implements Callable<Scheduler> { @Override public Scheduler call() throws Exception { return NewThreadHolder.DEFAULT; } } static final Scheduler DEFAULT = new NewThreadScheduler(); 复制代码
继续跟踪查看发现NewThreadTask
实际是实现了Callable
接口,其call
方法中返回了静态内部类中的NewThreadHolder.DEFAULT
。这个DEFAULT
的实现类型为NewThreadScheduler
。至此终于找到了咱们传入的Scheduler
的真正实现类。因而继续看其scheduleDirect
方法。
public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; } 复制代码
scheduleDirect
方法是在其父类中实现的,看到其中进而调用了同名重载方法,方法中首先是调用createWorker
方法建立一个Worker
。这个方法的实现就是在NewThreadScheduler
中了。
public Worker createWorker() { return new NewThreadWorker(threadFactory); } 复制代码
createWorker
方法中只作了一件事就是建立返回了一个NewThreadWorker
。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
......
}
复制代码
NewThreadWorker
中看到建立了一个线程池,再回到scheduleDirect
方法,建立完Worker
后将传入的Runnable
即SubscribeTask
进行一个装饰获得新的Runnable
对象。接着将Worker
和新的Runnable
封装到一个DisposeTask
对象中。
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection { @NonNull final Runnable decoratedRun; @NonNull final Worker w; @Nullable Thread runner; DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) { this.decoratedRun = decoratedRun; this.w = w; } @Override public void run() { runner = Thread.currentThread(); try { decoratedRun.run(); } finally { dispose(); runner = null; } } ...... } 复制代码
DisposeTask
一样实现了Runnable
接口,在run
方法中调用了从构造传入的decoratedRun
的run
方法执行任务。回到最后一步,调用Worker
的schedule
方法,这里就对应的NewThreadWorker
的schedule
方法。
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } return scheduleActual(action, delayTime, unit, null); } 复制代码
schedule
方法中又进一步调用了其scheduleActual
方法。
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; } 复制代码
scheduleActual
方法里看到又将decoratedRun
和DisposableContainer
封装成ScheduledRunnable
最后将这个ScheduledRunnable
交给构造函数中建立的线程池去运行,最终就会执行到前面看过的SubscribeTask
中的run
方法完成订阅逻辑,调用观察者的onNext
等方法。到这里就看出最终的source.subscribe
是会经过线程池切换到子线程中去执行了。
经过查看subscribeOn
方法源码能够发现,方法里其实是在前一个建立的ObservableCreate
外面包了一层,把它包成一个ObservableSubscribeOn
对象,一样的原先的Observer
也被包了一层包成一个SubscribeOnObserver
对象,而线程切换的工做是由Scheduler
完成的。
接着再来看看切换回主线程的方法observeOn
,仍是先修改使用代码,查看运行日志。
Observable<String> observable2 = observable0.observeOn(AndroidSchedulers.mainThread()); Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName()); observable2.subscribe(observer); 复制代码
运行日志:
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); } 复制代码
这里看到observeOn
方法里调用了重载方法,方法中仍是同一个套路,不过这里建立的又是另外一个对象ObservableObserveOn
了。根据前面的经验这里就又是将前一个Observable
传递到ObservableObserveOn
中的成员变量source
上,这里看到就是构造函数中的第一个参数。接着仍是会调用subscribe
与观察者创建订阅关系进而会执行到ObservableObserveOn
对象的subscribeActual
方法。
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } 复制代码
subscribeActual
方法中判断了scheduler
的类型,这里的scheduler
就是由AndroidSchedulers.mainThread()
传入的,因而先来看一下这个方法。
public static Scheduler mainThread() { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { return MainHolder.DEFAULT; } }); private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false); } 复制代码
从mainThread
开始看,发现代码调用逻辑和以前的Schedulers.newThread
方法相似,最终会返回一个HandlerScheduler
而这个Scheduler
中的Handler
则是主线程的Handler
,看到这里就能猜测到了,后面观察者的对应方法必定是由这个Handler
来切换到主线程执行的。回到subscribeActual
方法。
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } 复制代码
这里判断完类型会走else
中的方法首先仍是会调用HandlerScheduler
的createWorker
方法建立一个Worker
。
@Override public Worker createWorker() { return new HandlerWorker(handler, async); } 复制代码
这里是个HandlerWorker
其中具体方法后面再看。接着上面建立完Worker
后一样仍是同样调用source.subscribe
建立了一个ObserveOnObserver
对象传入。这里的source
就仍是以前的ObservableCreate
,因此这里仍是会调用ObservableCreate
中的subscribeActual
方法。
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
ObservableCreate
中的subscribeActual
方法中的逻辑以前看过,不过此时传入的observer
仍然再也不是在使用时建立的观察者对象了,而是传过来的ObserveOnObserver
对象,此时建立的CreateEmitter
中的observer
也就是这个ObserveOnObserver
对象。和以前逻辑同样,接着就会调用observer
的onNext
等方法,此时调用的便是ObserveOnObserver
中的onNext
等方法。因此进入ObserveOnObserver
查看。
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } @Override public void onComplete() { if (done) { return; } done = true; schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } 复制代码
查看ObserveOnObserver
中的代码会发现onNext
方法中先将传入的参数放入了一个队列,而后不管是onNext
仍是onComplete
方法最后都调用了schedule
方法,进而再进入查看,发现schedule
方法中又调用了worker.schedule
方法。这里的worker
就是以前建立的HandlerWorker
,这时再来看它的schedule
方法。
public Disposable schedule(@NonNull Runnable run) { return schedule(run, 0L, TimeUnit.NANOSECONDS); } 复制代码
单个参数schedule
方法是在其父类中的,而这个方法中又调用另外一个三个参数的schedule
方法,这个方法父类中是抽象方法因此实现就在子类HandlerWorker
里了。
@Override @SuppressLint("NewApi") // Async will only be true when the API is available to call. public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); // 判断是否取消订阅 if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); // 建立ScheduledRunnable ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); // 建立消息,并将主线程Handler和ScheduledRunnable Message message = Message.obtain(handler, scheduled); message.obj = this; // 判断设置异步消息 if (async) { message.setAsynchronous(true); } // 发送消息执行callback handler.sendMessageDelayed(message, unit.toMillis(delay)); // 检查是否取消订阅 if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } 复制代码
在子类的这个方法里在作了取消订阅的判断后将方法传入的Runnable
和Handler
又封装到一个ScheduledRunnable
对象中。接着建立了一个Message
并将ScheduledRunnable
放入Message
,最后调用handler.sendMessageDelayed
方法经过这个主线程的Handler
执行这个ScheduledRunnable
。
最后来追溯下ScheduledRunnable
到底执行了什么,不过猜也知道最后必定调用到观察者中的对应方法。
private static final class ScheduledRunnable implements Runnable, Disposable { private final Handler handler; private final Runnable delegate; private volatile boolean disposed; // Tracked solely for isDisposed(). ScheduledRunnable(Handler handler, Runnable delegate) { this.handler = handler; this.delegate = delegate; } @Override public void run() { try { delegate.run(); } catch (Throwable t) { RxJavaPlugins.onError(t); } } ...... } 复制代码
ScheduledRunnable
中的run
方法很简单就是调用了构造中传入的Runnable
的run
方法。而根据以前看过得建立ScheduledRunnable
时传入的Runnable
又是从scheduleDirect
方法中传入的,而scheduleDirect
方法中的Runnable
又是从worker.schedule(this)
方法时传入的,根据上下文代码发现这个this
指代的是ObserveOnObserver
对象,因而进一步进入它的run
方法查看。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { ...... ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.downstream = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } ...... @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } ...... } 复制代码
能够看到run
方法中判断了outputFused
的真假,而后分别调用了drainFused
和drainNormal
方法。这里的outputFused
是与RxJava2
中的背压处理相关暂时先无论,根据方法名也能知道正常调用会执行drainNormal
方法,因而直接来看drainNormal
方法。
void drainNormal() { int missed = 1; // 存放onNext传入的事件对象队列 final SimpleQueue<T> q = queue; // 传入的观察者对象 final Observer<? super T> a = downstream; // 循环check事件是否完成或者发生错误 for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { // 从队列中取出发送事件传入的对象 v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); disposed = true; upstream.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; // 再次判断是否完成或者发生错误 if (checkTerminated(d, empty, a)) { return; } // 判断队列中取出的发送事件传入的对象v是否为空 if (empty) { break; } // 执行观察者对象的onNext方法 a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } } 复制代码
drainNormal
方法中先经过checkTerminated
方法校验发送事件是否完成或者发生异常,接着从队列中取出事件对象,再次判断是否完成或者发生错误和取出的对象是否为空,没有问题的话就会执行观察者的onNext
方法。而发送完成和出现异常的方法则是在checkTerminated
方法处理。
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) { if (disposed) { queue.clear(); return true; } if (d) { Throwable e = error; if (delayError) { if (empty) { disposed = true; if (e != null) { a.onError(e); } else { a.onComplete(); } worker.dispose(); return true; } } else { if (e != null) { disposed = true; queue.clear(); a.onError(e); worker.dispose(); return true; } else if (empty) { disposed = true; a.onComplete(); worker.dispose(); return true; } } } return false; } 复制代码
在checkTerminated
方法里根据delayError
判断是否设置了超时的错误,接着再根据得到的错误e
是否为空再决定调用的是观察者的onError()
方法仍是onComplete
方法。至此observeOn
切换线程的流程也梳理结束。
RxJava
中有不少功能强大的操做符,经过使用这些操做符,能够很容易的解决代码编写时遇到的一些复杂繁琐的问题。这里就用map
操做符来做为一个例子,来看看操做符是怎样工做的。首先仍是来了解map
操做符的使用方法和做用。
final Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe"); emitter.onNext("5"); emitter.onComplete(); } }); Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe"); } @Override public void onNext(Integer i) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+i); } @Override public void onError(Throwable e) { Log.d(getClass().getName(), Thread.currentThread().getName() + " onError"); } @Override public void onComplete() { Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete"); } }; Observable<Integer> mapObservable = observable.map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.parseInt(s); } }); Log.d(getClass().getName(), Thread.currentThread().getName() + " mapObservable:"+mapObservable.getClass().getName()); mapObservable.subscribe(observer); 复制代码
运行日志:
map
操做符做用是能够将被观察者发送事件的数据类型转换成其余的数据类型。它的使用方法很简单,例如上面这个例子就将一开始发送的String
类型转换成观察者接收到的Integer
类型。下面开始看map
方法的源码。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } 复制代码
看到map
方法中依旧仍是一样的套路,经过RxJavaPlugins.onAssembly
方法返回一个被观察者对象,只不过此次构建传入的类型又是另外一个ObservableMap
类型的对象。订阅的流程前面已经看过了,这里和以前的同样最终会走到ObservableMap
的subscribeActual
方法,因此直接来看这个方法。
@Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } 复制代码
ObservableMap
的subscribeActual
方法里看到很熟悉仍是会调用source.subscribe
方法,只是这里传入的Observer
对象是一个MapObserver
对象。接下来的逻辑又和以前同样,根据以前的经验source.subscribe
方法最终会调用Observer
的onNext
方法,因此接下来直接来看MapObserver
的onNext
方法。
public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } downstream.onNext(v); } 复制代码
MapObserver
的onNext
方法里的逻辑很简单,在作了一些的判断后调用mapper.apply(t)
方法得到类型转换后的事件传递对象,最后就会调用观察者的downstream.onNext
方法,这里的downstream
就是订阅方法传入的观察者对象。跟踪mapper
能够找到,它是从MapObserver
构造时传入的一个Function
类型,便是在使用map
操做符时传入的那个Function
对象,又由于在使用时实现了Function
的apply
方法完成了数据的类型转换逻辑,因此这里调用mapper.apply(t)
方法就能够得到到转换后的数据。
以上就是关于RxJava
源码工做流程的相关总结,总而言之,观察者模式仍是其核心设计思想。除此以外,经过源码阅读还发现,不管在线程切换方面仍是其它功能的操做符的实现,根本上来讲都是在其原有的被观察者或观察者基础上包装成一个新的对象,功能逻辑由新对象中的方法来实现完成。