众所周知RxJava有许多优势好比强大的链式调用,方便的线程调度,可是我对其原理仍是了解的太少了,所以打算阅读下源码,先从一个最基本的例子开始java
这个例子只是为了示例,正常状况下也不会这么写数组
fun main() {
Observable.create {
emitter: ObservableEmitter<Int> ->
println("onSourceSubscribe")
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}.subscribe(object : Observer<Int> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(d: Disposable) {
println("onObserverSubscribe")
}
override fun onNext(t: Int) {
println("onNext $t")
}
override fun onError(e: Throwable) {
println("onError")
}
})
}
复制代码
输出结果:缓存
onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 3
onComplete
复制代码
那么为何会按这个顺序输出呢?从代码中也能够看出从始至终也只调用了create、subscribe两个方法,先来看看create的源码bash
// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
复制代码
能够看出当咱们没有设置onObservableAssembly时其实就是直接建立了一个ObservableCreate实例返回,接着看看subscribe并发
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
复制代码
能够看到内部就是调用了subscribeActual方法,而这个方法是个抽象方法,ObservableCreate实现了该方法app
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
复制代码
内部主要就是先建立了一个CreateEmitter实例,而后调用observer的onSubscribe方法,最后再调用source的subscribe方法,这就解释了onObserverSubscribe和onSourceSubscribe的输出,而source的subscribe方法又调用了三次onNext方法和一次onComplete方法,先看看onNextide
// CreateEmitter.java
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
复制代码
若是还没dispose那么直接就调用了observer的onNext,这也就解释了onNext 一、onNext 二、onNext 3三个输出接着看onComplete函数
// CreateEmitter.java
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
复制代码
若是还没dispose就直接调用observer的onComplete,直接解释了onComplete的输出,咱们注意到Observer还有一个onError回调,该方法能够经过调用emitter.onError手动触发oop
// CreateEmitter.java
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
复制代码
能够看到当还没被dispose就会调用到observer的onError方法,至此这个基本demo的源码已经分析完毕。 总结下上述代码其实就分为以下几步ui
上述实例的总体流程图以下
下面来从源码的角度研究研究RxJava中的几个基本方法
首先从最基本的map方法开始
map方法将每一个onNext事件都调用所传入的Function实例的apply方法来达到转化数据源的效果,以下图所示
示例代码以下所示
fun main() {
Observable.create {
emitter: ObservableEmitter<Int> ->
println("onSourceSubscribe")
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}
.map {
it + 1
}
.subscribe(object : Observer<Int> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(d: Disposable) {
println("onObserverSubscribe")
}
override fun onError(e: Throwable) {
println("onError")
}
override fun onNext(t: Int) {
println("onNext $t")
}
})
}
复制代码
输出结果:
onObserverSubscribe
onSourceSubscribe
onNext 2
onNext 3
onNext 4
onComplete
复制代码
很明显map方法会对全部的next的数据作一次变化这里是加1,接着看看map的源码实现
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
复制代码
内部建立了一个ObservableMap实例并将当前的Observable实例和Function实例传入,根据本文一开始的分析当调用Observable的subscribe方法其实会调用subscribeActual方法
// ObservableMap.java
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
复制代码
建立了MapObserver实例将Observer实例进行包装而后调用source.subscribe,这个source其实就是上一级Observable实例本例中对应ObservableCreate实例,接着根据上文的分析会调用该MapObserver实例的onNext三次而后调用一次onComplete
// MapObserver.java
public void onNext(T t) {
// 初始化的时候done为false
if (done) {
return;
}
// 初始化的时候就是NONE
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
复制代码
咱们能够看到内部调用了mapper.apply方法,接着将拿到的结果v当作参数调用downstream的onNext方法,注意这里的downStream就是外界建立的一个Observer对象。上述实例是总体流程图以下。注:绿色框表示对象,蓝色框表示方法调用,括号内为简称
综上咱们能够知道map经过代理下游Observer实例完成数据转换,接着看看flatMap的源码实现
flatMap方法用于将上游的每个onNext事件都转换成一个Observable实例,以下图所示
fun main() {
Observable.create {
emitter: ObservableEmitter<Int> ->
println("onSourceSubscribe")
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}
.flatMap {
Observable.just(it, it + 1)
}
.subscribe(object : Observer<Int> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(d: Disposable) {
println("onObserverSubscribe")
}
override fun onError(e: Throwable) {
println("onError")
}
override fun onNext(t: Int) {
println("onNext $t")
}
})
}
复制代码
输出结果:
onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 2
onNext 3
onNext 3
onNext 4
onComplete
复制代码
很显然flatMap将每个事件好比1转换成一个拥有一、2两个事件的Observable实例,来看看其源码实现
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码
默认delayErrors为false表示当一个事件出现异常就会中止整个事件序列,默认并发数为Int的最大值,默认缓存大小为128,而后根据这些参数和当前Observable实例构建出一个ObservableFlatMap实例,咱们看看其subscribeActual方法
// ObservableFlatMap.java
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码
内部又经过这些参数和下游的Observer实例构建了一个MergeObserver实例,直接看看其onSubscribe方法
// MergeObserver.java
public void onSubscribe(Disposable d) {
// 只会回调一次下游的onSubscribe方法
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
复制代码
若是已经有上游了就不作任何处理否则进行上游的赋值,而后回调了下游也就是自定义的那个Observer的onSubscribe方法,接着看看其onNext方法是怎么把一个输入源转化成一个Observable的
public void onNext(T t) {
ObservableSource<? extends U> p;
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
subscribeInner(p);
}
复制代码
先是调用了传入的apply方法将每一个onNext数据源转化为Observable实例,接着调用subscribeInner方法
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
...
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
boolean addInner(InnerObserver<T, U> inner) {
for (;;) {
InnerObserver<?, ?>[] a = observers.get();
int n = a.length;
InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = inner;
if (observers.compareAndSet(a, b)) {
return true;
}
}
}
复制代码
为每一个Observable对象都建立了一个InnerObserver实例,而后将其放入到一个数组中去,最后调用subscribe方法进行订阅,因为apply方法返回了一个ObservableFromArray实例,因此看看其subscribeActual方法
// ObservableFromArray.java
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
复制代码
observer指代InnerObserver,看看其onSubscribe方法
public void onSubscribe(Disposable d) {
// 第一次调用会返回true,d就是FromArrayDisposable实例其派生自QueueDisposable
if (DisposableHelper.setOnce(this, d)) {
if (d instanceof QueueDisposable) {
QueueDisposable<U> qd = (QueueDisposable<U>) d;
// 至关于传入了7
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
fusionMode = m;
queue = qd;
done = true;
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
}
}
}
}
//FromArrayDisposable.java
public int requestFusion(int mode) {
// 很明显7 & 1 != 0
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
复制代码
这里暂时还无法理解这个fusionMode(混合模式)是干什么用的,接着会调用到MergeObserver的drain方法
void drain() {
// 只会执行一次,循环将全部事件取出
if (getAndIncrement() == 0) {
drainLoop();
}
}
// 当取消了或者出现了错误并其dealyErrors为false时会将全部InnerObserver都dispose掉
boolean checkTerminate() {
if (cancelled) {
return true;
}
Throwable e = errors.get();
if (!delayErrors && (e != null)) {
disposeAll();
e = errors.terminate();
if (e != ExceptionHelper.TERMINATED) {
downstream.onError(e);
}
return true;
}
return false;
}
void drainLoop() {
// 这里的downstream就是外界自定义的Observer实例
final Observer<? super U> child = this.downstream;
for (;;) {
...
for (int i = 0; i < n; i++) {
if (checkTerminate()) {
return;
}
InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
SimpleQueue<U> q = is.queue;
if (q != null) {
for (;;) {
U o;
try {
o = q.poll();
} catch (Throwable ex) {
...
if (checkTerminate()) {
return;
}
continue sourceLoop;
}
// 每取出一个便会调用下游的onNext方法
child.onNext(o);
}
// 会把一个Observable源全部的数据都取完了之后才会进入下一个
if (o == null) {
break;
}
}
...
}
..
}
}
复制代码
drainLoop内部会从数组中一个个取出InnerObserver实例,并取出所对应的数据源而后每取出一个回调下游Observer的onNext方法,下面用一张图来总结下实例的流程
zip方法经过一个函数将多个Observables的发射物结合到一块儿,基于这个函数的结果为每一个结合体发射单个数据项。以下图所示
下面是一个使用zip操做符的一个简单例子
fun main() {
Observable.zip(getObservable1(), getObservable2(), zipper())
.subscribe(object : Observer<String> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(d: Disposable) {
println("onSubscribe")
}
override fun onNext(t: String) {
println("onNext $t")
}
override fun onError(e: Throwable) {
println("onError $e")
}
})
}
fun getObservable1(): ObservableSource<String> {
return Observable.create {
it.onNext("A")
it.onNext("B")
it.onNext("C")
Thread.sleep(1000)
}
}
fun getObservable2(): ObservableSource<String> {
return Observable.create {
it.onNext("1")
Thread.sleep(1000)
it.onNext("2")
Thread.sleep(1000)
it.onNext("3")
}
}
fun zipper(): BiFunction<String, String, String> {
return BiFunction { s1, s2 -> s1 + s2 }
}
复制代码
输出结果以下,onSubscribe输出后过一秒输出A1,再过一秒输出A2,再过一秒输出A3
onSubscribe
onNext A1
onNext B2
onNext C3
复制代码
那么为何输出结果会是这个样子的呢?来看看zip的源码实现
public static <T1, T2, R> Observable<R> zip( ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper) {
return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2);
}
public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize, ObservableSource<? extends T>... sources) {
return RxJavaPlugins.onAssembly(new ObservableZip<T, R>(sources, null, zipper, bufferSize, delayError));
}
复制代码
根据源码能够看出zip
方法其实最终建立了一个ObservableZip
实例,直接看其subscribeActual
// ObservableZip
public void subscribeActual(Observer<? super R> observer) {
ObservableSource<? extends T>[] sources = this.sources;
int count = sources.length;
ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(observer, zipper, count, delayError);
zc.subscribe(sources, bufferSize);
}
ZipCoordinator(Observer<? super R> actual,
Function<? super Object[], ? extends R> zipper,
int count, boolean delayError) {
this.downstream = actual;
this.zipper = zipper;
this.observers = new ZipObserver[count];
this.row = (T[])new Object[count];
this.delayError = delayError;
}
public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
ZipObserver<T, R>[] s = observers;
int len = s.length;
for (int i = 0; i < len; i++) {
s[i] = new ZipObserver<T, R>(this, bufferSize);
}
this.lazySet(0);
downstream.onSubscribe(this);
for (int i = 0; i < len; i++) {
if (cancelled) {
return;
}
sources[i].subscribe(s[i]);
}
}
复制代码
能够看出ZipCoordinator的subscribe
内部建立了输入源大小的ZipObserver实例,而后调用每一个输入源的subscribe
方法,这样当输入源发送事件时就会调用ZipObserver的onNext
方法
// ZipObserver.java
public void onNext(T t) {
queue.offer(t);
parent.drain();
}
复制代码
主要看看ZipCoordinator的drain
方法
public void drain() {
final ZipObserver<T, R>[] zs = observers;
final Observer<? super R> a = downstream;
final T[] os = row;
final boolean delayError = this.delayError;
for (;;) {
for (;;) {
int i = 0;
int emptyCount = 0;
for (ZipObserver<T, R> z : zs) {
if (os[i] == null) {
boolean d = z.done;
T v = z.queue.poll();
boolean empty = v == null;
if (!empty) {
os[i] = v;
} else {
emptyCount++;
}
} else {
// 主要是错误判断
}
i++;
}
if (emptyCount != 0) {
break;
}
R v = zipper.apply(os.clone());
a.onNext(v);
Arrays.fill(os, null);
}
}
}
复制代码
结合示例,首先Obserable1会发送一个A事件,将其放入到了一个队列中去,接着drain
遍历全部的ZipObserver,第一个ZipObserver能够从队列中事件将其赋值给os[0]
,第二个取不到所以emptyCount++
,而后退出循环。接着Observable1又发送了一个B事件,再将其放入队列中,而后执行drain
,此次由于os[0]
已经不为null因此不会从队列中取,os[1]
仍是null,退出循环继续执行,接着Observable1再次发送一个C事件,这个跟B事件处理逻辑同样,再接着Observable2会发送一个1事件,将其放入队列,执行drain
将os[1]
赋值成1,因为emptyCount
等于0,所以会执行zipper.apply
,这个方法内部会回调传入的BiFunction的apply
方法(示例中仅仅进行了字符串拼接),获取到结果A1,回调下游的onNext方法,而后将row这个数组置空,接着线程睡眠1秒,而后再次发送事件2,将其放入队列中,执行drain
,方法内部遍历两个ZipObserver而且都能从队列中取到事件,因此emptyCount
等于0,接着就会执行apply
而后获取到结果B2,调用下游的onNext
,后面Observable2的3事件也跟2事件同样就不说了。
经过分析map和flatMap两个方法能够总结出以下几个的结论