系列文章:html
本文基于 RxJava 2.1.9java
距离前两篇文章已通过去三个月之久了,终于补上第三篇了。第三篇预期就是针对某一个操做符的源码进行解析,选择了 Observable.zip
的缘由一是司里这块用的比较多,再一个笔者以为这个操做符十分强大,想去探索一番 zip 操做符是如何实现这样的骚操做,若是读者还不了解 zip 操做符,建议查看文档并上手一番,文档地址:Zip · ReactiveX文档中文翻译react
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.functions.BiFunction;
public class Test {
@SuppressWarnings("ResultOfMethodCallIgnored")
public static void main(String[] args) {
Observable.zip(first(), second(), zipper())
.subscribe(System.out::println);
}
private static ObservableSource<String> first() {
return Observable.create(emitter -> {
Thread.sleep(1000);
emitter.onNext("11");
emitter.onNext("12");
emitter.onNext("13");
}
);
}
private static ObservableSource<String> second() {
return Observable.create(emitter -> {
emitter.onNext("21");
Thread.sleep(2000);
emitter.onNext("22");
Thread.sleep(3000);
emitter.onNext("23");
}
);
}
private static BiFunction<String, String, String> zipper() {
return (s1, s2) -> s1 + "," + s2;
}
}
复制代码
hello world 级别的代码就是为了 hello world. —— 鲁迅git
如上所示,操做过 zip 操做符的读者们应该都知道,会在一秒后输出【11,21】,紧接着两秒后输出【12,22】,再紧接着三秒后输出【13,23】。数组
通过前两篇文章的阅读,笔者相信读者们能很快地找到 ObservableZip
这个类,这个类就是实现具体 zip 操做的核心类,一样地,直接针对该类的 subscribeActual(Observer)
解析,简化后源码以下:并发
public void subscribeActual(Observer<? super R> s) {
// sources 是上游 ObservableSource 数组
// 在本案例中也就是上面 first() 和 second() 方法传回的 ObservableSource
ObservableSource<? extends T>[] sources = this.sources;
ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(s, zipper, count, delayError);
zc.subscribe(sources, bufferSize);
}
复制代码
简化后能够看到仍是很简单的,因此下步就是了解 ZipCoordinator
类和其 subscribe()
方法的实现了,ZipCoordinator
构造函数和 ZipCoordinator#subscribe()
代码简化以下 ——app
ZipCoordinator(Observer<? super R> actual, int count) {
this.actual = actual;
this.observers = new ZipObserver[count];
this.row = (T[])new Object[count];
}
public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
ZipObserver<T, R>[] s = observers;
int len = s.length;
for (int i = 0; i < len; i++) {
s[i] = new ZipObserver<T, R>(this, bufferSize);
}
actual.onSubscribe(this);
for (int i = 0; i < len; i++) {
sources[i].subscribe(s[i]);
}
}
复制代码
大体作了如下几件事:ide
ZipCoordinator#subscribe()
中初始化了 ZipObserver 数组并让上游 ObservableSource 分别订阅了对应的 ZipObserver。通过前面的文章分析咱们知道,上游的 onNext(T)
方法会触发下游的 onNext(T)
方法,因此下一步来看看 ZipObserver 的 onNext(T)
方法实现 ——函数
@Override
public void onNext(T t) {
queue.offer(t);
parent.drain();
}
复制代码
能够看到,源码十分的简单,一是入队,二是调用 ZipCoordinator#drain()
方法,精简以下 ——高并发
public void drain() {
final ZipObserver<T, R>[] zs = observers;
final Observer<? super R> a = actual;
// row 在咱们前面提到过
final T[] os = row;
for (; ; ) {
int i = 0;
int emptyCount = 0;
for (ZipObserver<T, R> z : zs) {
if (os[i] == null) {
boolean d = z.done;
T v = z.queue.poll();
boolean empty = v == null;
if (!empty) {
os[i] = v;
} else {
emptyCount++;
}
} else {
// ...
}
i++;
}
if (emptyCount != 0) {
break;
}
R v = zipper.apply(os.clone();
a.onNext(v);
Arrays.fill(os, null);
}
}
复制代码
先从实际场景解析流程,再来总结 ——
第一个事件应该是上游 first()
返回的 ObservableSource 中发射的【11】,最终在 ZipObserver#onNext(T)
方法中,该事件首先被塞入队列,再触发上述的 ZipCoordinator#drain()
,在 drain()
方法中会进入 ZipObserver 的遍历 ——
for 循环跳出后,因为 emptyCount 不为0,死循环结束。
第二个事件也是由 first()
发射过来的(【12】), 当第二个事件发射过来的时候——
一样地,第三个事件(【13】)发射过来的时候,走一样的逻辑。
可是1000毫秒后,第「四」个事件由 second()
发射(也就是【21】)的时候,事情就不同了——
for 循环跳出后,通过 zipper 操做合并后两个事件被传输给下游 Observer 的 onNext(T)
中,此时打印台就输出了【11,21】了。固然,最后还会将 os 数组中元素所有填充为 null,为下一次数据填充作准备。
因此实际上 zip 操做符的原理在于就是依靠队列+数组,当一个事件被发射过来的时候,首先进入队列,再去查看数组的每一个元素是否为空 ——
直到最后,断定 emptyCount 是否不为0,不为0则意味着数组没有被填满,某些队列中尚未值,因此只能结束这次操做,等待下一次上游发射事件了。而若是 emptyCount 为0,那么说明数组中的值被填满了,这意味着符合触发下游 Observer#onNext(T)
的要求了,固然,不要忘了将数组内部元素置 null,为下次数据填充作准备。
妈个鸡,是否是还没懂?笔者也以为挺难懂的,谁要跟我这么说我也听不懂啊!画图吧 ——
第一次事件由「第一个」事件源发出:
当【11】入队后,数组开始遍历,数组 0 的位置试图将第一个队列 poll 的值填入,此时为【11】;数组 1 的位置试图将第二个队列 poll 的值填入,可是此时为 null,因此最终结束操做,等待下一次上游的事件发射。
第二次事件仍然是由「第一个」事件源发出的 ——
当【12】入队后,数组开始遍历,数组 0 位置已经被填入值,数组 1 的位置试图将第二个队列 poll 的值填入,可是此时为 null,结束操做。
另外一种状况则是第二次事件是由「第二个」事件源发出:
当【21】入队后,数组开始遍历,数组 0 位置已经被填入值,数组 1 的位置试图将第二个队列 poll 的值填入,此时为【21】。循环结束后,emptyCount 依旧为0,符合条件,触发下游 Observer#onNext(T)
,而后将数组中元素置 null,为下一次数据填充作准备。
ZipCoordinator 为了应对高并发引入了 CAS,同时也利用 CAS 优化 ZipCoordinator#drain()
实现,另外若是各位读者对 rxjava 有必定的了解,必定知道有一些和 zip 一类的操做符被称为组合操做符,而里面的 concat 操做符的实现,和 zip 操做符的实现有着殊途同归之妙,感兴趣的读者能够去自行去源码中一探究竟,感觉下 rxjava 的魅力。