到目前为止笔者分析了Android中最热门的网络底层和封装框架:Android主流三方库源码分析(1、深刻理解OKHttp源码)和Android主流三方库源码分析(2、深刻理解Retrofit源码),Android中使用最普遍的图片加载框架Glide的加载流程:Android主流三方库源码分析(3、深刻理解Glide源码)以及Android中性能最好的数据库框架Android主流三方库源码分析(4、深刻理解GreenDao源码)。本篇,我将会对近几年比较热门的函数式编程框架RxJava的源码进行详细的分析。git
RxJava是基于Java虚拟机上的响应式扩展库,它经过使用可观察的序列将异步和基于事件的程序组合起来。 与此同时,它扩展了观察者模式来支持数据/事件序列,而且添加了操做符,这些操做符容许你声明性地组合序列,同时抽象出要关注的问题:好比低级线程、同步、线程安全和并发数据结构等。github
从RxJava的官方定义来看,咱们若是要想真正地理解RxJava,就必须对它如下两个部分进行深刻的分析:数据库
固然,RxJava操做符的源码也是很不错的学习资源,特别是FlatMap、Zip等操做符的源码,有不少能够借鉴的地方,可是它们内部的实现比较复杂,限于篇幅,本文只讲解RxJava的订阅流程和线程切换原理。接下来,笔者一一对以上RxJava的两个关键部分来进行详细地讲解。编程
首先给出RxJava消息订阅的例子:json
Observable.create(newObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String>emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
复制代码
能够看到,这里首先建立了一个被观察者,而后建立一个观察者订阅了这个被观察者,所以下面分两个部分对RxJava的订阅流程进行分析:缓存
首先,上面使用了Observable类的create()方法建立了一个被观察者,看看里面作了什么。安全
// 省略一些检测性的注解
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码
在Observable的create()里面其实是建立了一个新的ObservableCreate对象,同时,把咱们定义好的ObservableOnSubscribe对象传入了ObservableCreate对象中,最后调用了RxJavaPlugins.onAssembly()方法。接下来看看这个ObservableCreate是干什么的。微信
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
...
}
复制代码
这里仅仅是把ObservableOnSubscribe这个对象保存在ObservableCreate中了。而后看看RxJavaPlugins.onAssembly()这个方法的处理。网络
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
// 应用hook函数的一些处理,通常用到不到
...
return source;
}
复制代码
最终仅仅是把咱们的ObservableCreate给返回了。数据结构
从以上分析可知,Observable.create()方法仅仅是先将咱们自定义的ObservableOnSubscribe对象从新包装成了一个ObservableCreate对象。
接着,看看Observable.subscribe()的订阅过程是如何实现的。
public final void subscribe(Observer<? super T> observer) {
...
// 1
observer = RxJavaPlugins.onSubscribe(this,observer);
...
// 2
subscribeActual(observer);
...
}
复制代码
在注释1处,在Observable的subscribe()方法内部首先调用了RxJavaPlugins的onSubscribe()方法。
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
// 应用hook函数的一些处理,通常用到不到
...
return observer;
}
复制代码
除去hook应用的逻辑,这里仅仅是将observer返回了。接着来分析下注释2处的subscribeActual()方法,
protected abstract void subscribeActual(Observer<? super T> observer);
复制代码
这是一个抽象的方法,很明显,它对应的具体实现类就是咱们在第一步建立的ObservableCreate类,接下来看到ObservableCreate的subscribeActual()方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 2
observer.onSubscribe(parent);
try {
// 3
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
在注释1处,首先新建立了一个CreateEmitter对象,同时传入了咱们自定义的observer对象进去。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
...
}
复制代码
从上面能够看出,CreateEmitter经过继承了Java并发包中的原子引用类AtomicReference保证了事件流切断状态Dispose的一致性(这里不理解的话,看到后面讲解Dispose的时候就明白了),并实现了ObservableEmitter接口和Disposable接口,接着咱们分析下注释2处的observer.onSubscribe(parent),这个onSubscribe回调的含义其实就是告诉观察者已经成功订阅了被观察者。再看到注释3处的source.subscribe(parent)这行代码,这里的source实际上是ObservableOnSubscribe对象,咱们看到ObservableOnSubscribe的subscribe()方法。
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public voidsubscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
});
复制代码
这里面使用到了ObservableEmitter的onNext()方法将事件流发送出去,最后调用了onComplete()方法完成了订阅过程。ObservableEmitter是一个抽象类,实现类就是咱们传入的CreateEmitter对象,接下来咱们看看CreateEmitter的onNext()方法和onComplete()方法的处理。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
@Override
public void onNext(T t) {
...
if (!isDisposed()) {
//调用观察者的onNext()
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
...
}
复制代码
在CreateEmitter的onNext和onComplete方法中首先都要通过一个isDisposed的判断,做用就是看当前的事件流是否被切断(废弃)掉了,默认是不切断的,若是想要切断,能够调用Disposable的dispose()方法将此状态设置为切断(废弃)状态。咱们继续看看这个isDisposed内部的处理。
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
复制代码
注意到这里经过get()方法首先从ObservableEmitter的AtomicReference中拿到了保存的Disposable状态。而后交给了DisposableHelper进行判断处理。接下来看看DisposableHelper的处理。
public enum DisposableHelper implements Disposable {
DISPOSED;
public static boolean isDisposed(Disposable d) {
// 1
return d == DISPOSED;
}
public static boolean set(AtomicReference<Disposable> field, Disposable d) {
for (;;) {
Disposable current = field.get();
if (current == DISPOSED) {
if (d != null) {
d.dispose();
}
return false;
}
// 2
if (field.compareAndSet(current, d)) {
if (current != null) {
current.dispose();
}
return true;
}
}
}
...
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
// ...
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
...
}
复制代码
DisposableHelper是一个枚举类,内部只有一个值即DISPOSED, 从上面的分析可知它就是用来标记事件流被切断(废弃)状态的。先看到注释2和注释3处的代码field.compareAndSet(current, d)和field.getAndSet(d),这里使用了原子引用AtomicReference内部包装的CAS方法处理了标志Disposable的并发读写问题。最后看到注释3处,将咱们传入的CreateEmitter这个原子引用类保存的Dispable状态和DisposableHelper内部的DISPOSED进行比较,若是相等,就证实数据流被切断了。为了更进一步理解Disposed的做用,再来看看CreateEmitter中剩余的关键方法。
@Override
public void onNext(T t) {
...
// 1
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
// 2
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
...
// 3
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
// 4
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
// 5
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
// 6
dispose();
}
}
}
复制代码
在注释一、三、5处,onNext()和onError()、onComplete()方法首先都会判断事件流是否被切断,若是事件流此时被切断了,那么onNext()和onComplete()则会退出方法体,不作处理,onError()则会执行到RxJavaPlugins.onError(t)这句代码,内部会直接抛出异常,致使崩溃。若是事件流没有被切断,那么在onError()和onComplete()内部最终会调用到注释四、6处的这句dispose()代码,将事件流进行切断,由此可知,onError()和onComplete()只能调用一个,若是先执行的是onComplete(),再调用onError()的话就会致使异常崩溃。
首先给出RxJava线程切换的例子:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public voidsubscribe(ObservableEmitter<String>emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " +e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
复制代码
能够看到,RxJava的线程切换主要分为subscribeOn()和observeOn()方法,首先,来分析下subscribeOn()方法。
在Schedulers.io()方法中,咱们须要先传入一个Scheduler调度类,这里是传入了一个调度到io子线程的调度类,咱们看看这个Schedulers.io()方法内部是怎么构造这个调度器的。
static final Scheduler IO;
...
public static Scheduler io() {
// 1
return RxJavaPlugins.onIoScheduler(IO);
}
static {
...
// 2
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
// 3
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
// 4
static final Scheduler DEFAULT = new IoScheduler();
}
复制代码
Schedulers这个类的代码不少,这里我只拿出有关Schedulers.io这个方法涉及的逻辑代码进行讲解。首先,在注释1处,同前面分析的订阅流程的处理同样,只是一个处理hook的逻辑,最终返回的仍是传入的这个IO对象。再看到注释2处,在Schedulers的静态代码块中将IO对象进行了初始化,其实质就是新建了一个IOTask的静态内部类,在IOTask的call方法中,也就是注释3处,能够了解到使用了静态内部类的方式把建立的IOScheduler对象给返回出去了。绕了这么大圈子,Schedulers.io方法其实质就是返回了一个IOScheduler对象。
public final Observable<T> subscribeOn(Scheduler scheduler) {
...
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
复制代码
在subscribeOn()方法里面,又将ObservableCreate包装成了一个ObservableSubscribeOn对象。咱们关注到ObservableSubscribeOn类。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
// 1
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
// 2
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
// 3
observer.onSubscribe(parent);
// 4
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
...
}
复制代码
首先,在注释1处,将传进来的source和scheduler保存起来。接着,等到实际订阅的时候,就会执行到这个subscribeActual方法,在注释2处,将咱们自定义的Observer包装成了一个SubscribeOnObserver对象。在注释3处,通知观察者订阅了被观察者。在注释4处,内部先建立了一个SubscribeTask对象,来看看它的实现。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
复制代码
SubscribeTask是ObservableSubscribeOn的内部类,它实质上就是一个任务类,在它的run方法中会执行到source.subscribe(parent)的订阅方法,这个source其实就是咱们在ObservableSubscribeOn构造方法中传进来的ObservableCreate对象。接下来看看scheduler.scheduleDirect()内部的处理。
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 1
final Worker w = createWorker();
// 2
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 3
DisposeTask task = new DisposeTask(decoratedRun, w);
// 4
w.schedule(task, delay, unit);
return task;
}
复制代码
这里最后会执行到上面这个scheduleDirect()重载方法。首先,在注释1处,会调用createWorker()方法建立一个工做者对象Worker,它是一个抽象类,这里的实现类就是IoScheduler,下面,咱们看看IoScheduler类的createWorker()方法。
final AtomicReference<CachedWorkerPool> pool;
...
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
...
@Override
public Worker createWorker() {
// 1
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
...
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
// 2
this.threadWorker = pool.get();
}
}
复制代码
首先,在注释1处调用了pool.get()这个方法,pool是一个CachedWorkerPool类型的原子引用对象,它的做用就是用于缓存工做者对象Worker的。而后,将获得的CachedWorkerPool传入新建立的EventLoopWorker对象中。重点关注一下注释2处,这里将CachedWorkerPool缓存的threadWorker对象保存起来了。
下面,咱们继续分析3.6处代码段的注释2处的代码,这里又是一个关于hook的封装处理,最终仍是返回的当前的Runnable对象。在注释3处新建了一个切断任务DisposeTask将decoratedRun和w对象包装了起来。最后在注释4处调用了工做者的schedule()方法。下面咱们来分析下它内部的处理。
@Override
public Disposable schedule(@NonNull Runnableaction, long delayTime, @NonNull TimeUnit unit){
...
return threadWorker.scheduleActual(action,delayTime, unit, tasks);
}
复制代码
内部调用了threadWorker的scheduleActual()方法,其实是调用到了父类NewThreadWorker的scheduleActual()方法,咱们继续看看NewThreadWorker的scheduleActual()方法中作的事情。
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 1
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
// 2
if (delayTime <= 0) {
// 3
f = executor.submit((Callable<Object>)sr);
} else {
// 4
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
复制代码
在NewThreadWorker的scheduleActual()方法的内部,在注释1处首先会新建一个ScheduledRunnable对象,将Runnable对象和parent包装起来了,这里parent是一个DisposableContainer对象,它实际的实现类是CompositeDisposable类,它是一个保存全部事件流是否被切断状态的容器,其内部的实现是使用了RxJava本身定义的一个简单的OpenHashSet类进行存储。最后注释2处,判断是否设置了延迟时间,若是设置了,则调用线程池的submit()方法当即进行线程切换,不然,调用schedule()方法进行延时执行线程切换。
从上面的分析,咱们能够很容易了解到被观察者被订阅时是从最外面的一层(ObservableSubscribeOn)通知到里面的一层(ObservableOnSubscribe),当连续执行了到屡次subscribeOn()的时候,其实就是先执行倒数第一次的subscribeOn()方法,直到最后一次执行的subscribeOn()方法,这样确定会覆盖前面的线程切换。
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
....
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
能够看到,observeOn()方法内部最终也是返回了一个ObservableObserveOn对象,咱们直接来看看ObservableObserveOn的subscribeActual()方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1
if (scheduler instanceof TrampolineScheduler) {
// 2
source.subscribe(observer);
} else {
// 3
Scheduler.Worker w = scheduler.createWorker();
// 4
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
复制代码
首先,在注释1处,判断指定的调度器是否是TrampolineScheduler,这是一个不进行线程切换,当即执行当前代码的调度器。若是是,则会直接调用ObservableSubscribeOn的subscribe()方法,若是不是,则会在注释3处建立一个工做者对象。而后,在注释4处建立一个新的ObserveOnObserver将SubscribeOnobserver对象包装起来,并传入ObservableSubscribeOn的subscribe()方法进行订阅。接下来看看ObserveOnObserver类的重点方法。
@Override
public void onNext(T t) {
...
if (sourceMode != QueueDisposable.ASYNC) {
// 1
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
...
schedule();
}
@Override
public void onComplete() {
...
schedule();
}
复制代码
去除非主线逻辑的代码,在ObserveOnObserver的onNext()和onError()、onComplete()方法中最后都会调用到schedule()方法。接着看schedule()方法,其中onNext()还会把消息存放到队列中。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
复制代码
这里使用了worker进行调度ObserveOnObserver这个实现了Runnable的任务。worker就是在AndroidSchedulers.mainThread()中建立的,内部其实就是使用Handler进行线程切换的,此处再也不赘述了。接着看ObserveOnObserver的run()方法。
@Override
public void run() {
// 1
if (outputFused) {
drainFused();
} else {
// 2
drainNormal();
}
}
复制代码
在注释1处会先判断outputFused这个标志位,它表示事件流是否被融化掉,默认是false,因此,最后会执行到drainNormal()方法。接着看看drainNormal()方法内部的处理。
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
// 1
final Observer<? super T> a = downstream;
...
// 2
v = q.poll();
...
// 3
a.onNext(v);
...
}
复制代码
在注释1处,这里的downstream其实是从外面传进来的SubscribeOnObserver对象。在注释2处将队列中的消息取出来,接着在注释3处调用了SubscribeOnObserver的onNext方法。最终,会从咱们包装类的最外层一直调用到最里面的咱们自定义的Observer中的onNext()方法,因此,在observeOn()方法下面的链式代码都会执行到它所指定的线程中,噢,原来如此。
其实笔者使用了RxJava也已经有一年多的时间了,可是一直没有去深刻去了解过它的内部实现原理,现在细细品尝,的确是酣畅淋漓。从一开始的OkHttp到现现在的RxJava源码分析,到此为止,Android主流三方库源码分析系列文章已经发布了五篇了,咱们的征途已通过半,接下来,我将会对Android中的内存泄露框架LeakCanary源码进行深刻地讲解,尽请期待~
一、RxJava V2.2.5 源码
二、Android 进阶之光
欢迎关注个人微信:
bcce5360
微信群若是不能扫码加入,麻烦你们想进微信群的朋友们,加我微信拉你进群。
2千人QQ群,Awesome-Android学习交流群,QQ群号:959936182, 欢迎你们加入~