RxJava2源码初探

前言

众所周知RxJava有许多优势好比强大的链式调用,方便的线程调度,可是我对其原理仍是了解的太少了,所以打算阅读下源码,先从一个最基本的例子开始java

1、例子

这个例子只是为了示例,正常状况下也不会这么写数组

fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }.subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }
        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }
        override fun onNext(t: Int) {
            println("onNext $t")
        }
        override fun onError(e: Throwable) {
            println("onError")
        } 
    })
}
复制代码

输出结果:缓存

onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 3
onComplete
复制代码

那么为何会按这个顺序输出呢?从代码中也能够看出从始至终也只调用了create、subscribe两个方法,先来看看create的源码bash

// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
复制代码

能够看出当咱们没有设置onObservableAssembly时其实就是直接建立了一个ObservableCreate实例返回,接着看看subscribe并发

public final void subscribe(Observer<? super T> observer) {
    subscribeActual(observer);
}
复制代码

能够看到内部就是调用了subscribeActual方法,而这个方法是个抽象方法,ObservableCreate实现了该方法app

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    source.subscribe(parent);
}
复制代码

内部主要就是先建立了一个CreateEmitter实例,而后调用observer的onSubscribe方法,最后再调用source的subscribe方法,这就解释了onObserverSubscribe和onSourceSubscribe的输出,而source的subscribe方法又调用了三次onNext方法和一次onComplete方法,先看看onNextide

// CreateEmitter.java
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}
复制代码

若是还没dispose那么直接就调用了observer的onNext,这也就解释了onNext 一、onNext 二、onNext 3三个输出接着看onComplete函数

// CreateEmitter.java
public void onComplete() {
    if (!isDisposed()) {
        try {
            observer.onComplete();
        } finally {
            dispose();
        }
    }
}
复制代码

若是还没dispose就直接调用observer的onComplete,直接解释了onComplete的输出,咱们注意到Observer还有一个onError回调,该方法能够经过调用emitter.onError手动触发oop

// CreateEmitter.java
public void onError(Throwable t) {
    if (!tryOnError(t)) {
        RxJavaPlugins.onError(t);
    }
}
public boolean tryOnError(Throwable t) {
    if (t == null) {
        t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
    }
    if (!isDisposed()) {
        try {
            observer.onError(t);
        } finally {
            dispose();
        }
        return true;
    }
    return false;
}
复制代码

能够看到当还没被dispose就会调用到observer的onError方法,至此这个基本demo的源码已经分析完毕。 总结下上述代码其实就分为以下几步ui

  1. 建立Observable实例
  2. 调用该实例的subscribeActual方法
  3. 回调observer的onSubscribe方法
  4. 调用source的subscribe方法
  5. 上述的subscribe方法内部能够执行若干次onNext,最多一个onError,最多一次onComplete

上述实例的总体流程图以下

下面来从源码的角度研究研究RxJava中的几个基本方法

2、基本方法

首先从最基本的map方法开始

1. map

map方法将每一个onNext事件都调用所传入的Function实例的apply方法来达到转化数据源的效果,以下图所示

示例代码以下所示

fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }
    .map {
       it + 1
    }
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }
        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }
        override fun onError(e: Throwable) {
            println("onError")
        }
        override fun onNext(t: Int) {
            println("onNext $t")
        }
    })
}
复制代码

输出结果:

onObserverSubscribe
onSourceSubscribe
onNext 2
onNext 3
onNext 4
onComplete
复制代码

很明显map方法会对全部的next的数据作一次变化这里是加1,接着看看map的源码实现

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
复制代码

内部建立了一个ObservableMap实例并将当前的Observable实例和Function实例传入,根据本文一开始的分析当调用Observable的subscribe方法其实会调用subscribeActual方法

// ObservableMap.java
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}
复制代码

建立了MapObserver实例将Observer实例进行包装而后调用source.subscribe,这个source其实就是上一级Observable实例本例中对应ObservableCreate实例,接着根据上文的分析会调用该MapObserver实例的onNext三次而后调用一次onComplete

// MapObserver.java
public void onNext(T t) {
    // 初始化的时候done为false
    if (done) {
        return;
    }
    // 初始化的时候就是NONE
    if (sourceMode != NONE) {
        downstream.onNext(null);
        return;
    }
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    downstream.onNext(v);
}
复制代码

咱们能够看到内部调用了mapper.apply方法,接着将拿到的结果v当作参数调用downstream的onNext方法,注意这里的downStream就是外界建立的一个Observer对象。上述实例是总体流程图以下。注:绿色框表示对象,蓝色框表示方法调用,括号内为简称

综上咱们能够知道map经过代理下游Observer实例完成数据转换,接着看看flatMap的源码实现

2. flatMap

flatMap方法用于将上游的每个onNext事件都转换成一个Observable实例,以下图所示

fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }
    .flatMap {
        Observable.just(it, it + 1)
    }
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }

        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }

        override fun onError(e: Throwable) {
            println("onError")
        }

        override fun onNext(t: Int) {
            println("onNext $t")
        }
    })
}
复制代码

输出结果:

onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 2
onNext 3
onNext 3
onNext 4
onComplete
复制代码

很显然flatMap将每个事件好比1转换成一个拥有一、2两个事件的Observable实例,来看看其源码实现

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码

默认delayErrors为false表示当一个事件出现异常就会中止整个事件序列,默认并发数为Int的最大值,默认缓存大小为128,而后根据这些参数和当前Observable实例构建出一个ObservableFlatMap实例,咱们看看其subscribeActual方法

// ObservableFlatMap.java
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码

内部又经过这些参数和下游的Observer实例构建了一个MergeObserver实例,直接看看其onSubscribe方法

// MergeObserver.java
public void onSubscribe(Disposable d) {
    // 只会回调一次下游的onSubscribe方法
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        downstream.onSubscribe(this);
    }
}
复制代码

若是已经有上游了就不作任何处理否则进行上游的赋值,而后回调了下游也就是自定义的那个Observer的onSubscribe方法,接着看看其onNext方法是怎么把一个输入源转化成一个Observable的

public void onNext(T t) {
    ObservableSource<? extends U> p;
    p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    subscribeInner(p);
}
复制代码

先是调用了传入的apply方法将每一个onNext数据源转化为Observable实例,接着调用subscribeInner方法

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Callable) {
            ...
        } else {
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}
boolean addInner(InnerObserver<T, U> inner) {
    for (;;) {
        InnerObserver<?, ?>[] a = observers.get();
        int n = a.length;
        InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = inner;
        if (observers.compareAndSet(a, b)) {
            return true;
        }
    }
}
复制代码

为每一个Observable对象都建立了一个InnerObserver实例,而后将其放入到一个数组中去,最后调用subscribe方法进行订阅,因为apply方法返回了一个ObservableFromArray实例,因此看看其subscribeActual方法

// ObservableFromArray.java
public void subscribeActual(Observer<? super T> observer) {
    FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
    observer.onSubscribe(d);
    if (d.fusionMode) {
        return;
    }
    d.run();
}
复制代码

observer指代InnerObserver,看看其onSubscribe方法

public void onSubscribe(Disposable d) {
    // 第一次调用会返回true,d就是FromArrayDisposable实例其派生自QueueDisposable
    if (DisposableHelper.setOnce(this, d)) {
        if (d instanceof QueueDisposable) {
            QueueDisposable<U> qd = (QueueDisposable<U>) d;
            // 至关于传入了7
            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
            if (m == QueueDisposable.SYNC) {
                fusionMode = m;
                queue = qd;
                done = true;
                parent.drain();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                fusionMode = m;
                queue = qd;
            }
        }
    }
}
//FromArrayDisposable.java
public int requestFusion(int mode) {
    // 很明显7 & 1 != 0
    if ((mode & SYNC) != 0) {
        fusionMode = true;
        return SYNC;
    }
    return NONE;
}
复制代码

这里暂时还无法理解这个fusionMode(混合模式)是干什么用的,接着会调用到MergeObserver的drain方法

void drain() {
    // 只会执行一次,循环将全部事件取出
    if (getAndIncrement() == 0) {
        drainLoop();
    }
}
// 当取消了或者出现了错误并其dealyErrors为false时会将全部InnerObserver都dispose掉
boolean checkTerminate() {
    if (cancelled) {
        return true;
    }
    Throwable e = errors.get();
    if (!delayErrors && (e != null)) {
        disposeAll();
        e = errors.terminate();
        if (e != ExceptionHelper.TERMINATED) {
            downstream.onError(e);
        }
        return true;
    }
    return false;
}
void drainLoop() {
    // 这里的downstream就是外界自定义的Observer实例
    final Observer<? super U> child = this.downstream;
    for (;;) {
        ...
        for (int i = 0; i < n; i++) {
            if (checkTerminate()) {
                return;
            }
            InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
            SimpleQueue<U> q = is.queue;
            if (q != null) {
                for (;;) {
                    U o;
                    try {
                        o = q.poll();
                    } catch (Throwable ex) {
                        ...
                        if (checkTerminate()) {
                            return;
                        }
                        continue sourceLoop;
                    }
                    // 每取出一个便会调用下游的onNext方法
                    child.onNext(o);
                }
                // 会把一个Observable源全部的数据都取完了之后才会进入下一个
                if (o == null) {
                    break;
                }
            }
            ...
        }
        ..
    }
}
复制代码

drainLoop内部会从数组中一个个取出InnerObserver实例,并取出所对应的数据源而后每取出一个回调下游Observer的onNext方法,下面用一张图来总结下实例的流程

3. zip

zip方法经过一个函数将多个Observables的发射物结合到一块儿,基于这个函数的结果为每一个结合体发射单个数据项。以下图所示

下面是一个使用zip操做符的一个简单例子

fun main() {
    Observable.zip(getObservable1(), getObservable2(), zipper())
        .subscribe(object : Observer<String> {
            override fun onComplete() {
                println("onComplete")
            }
            override fun onSubscribe(d: Disposable) {
                println("onSubscribe")
            }
            override fun onNext(t: String) {
                println("onNext $t")
            }
            override fun onError(e: Throwable) {
                println("onError $e")
            }
        })
}
fun getObservable1(): ObservableSource<String> {
    return Observable.create {
        it.onNext("A")
        it.onNext("B")
        it.onNext("C")
        Thread.sleep(1000)
    }
}
fun getObservable2(): ObservableSource<String> {
    return Observable.create {
        it.onNext("1")
        Thread.sleep(1000)
        it.onNext("2")
        Thread.sleep(1000)
        it.onNext("3")
    }
}
fun zipper(): BiFunction<String, String, String> {
    return BiFunction { s1, s2 -> s1 + s2 }
}
复制代码

输出结果以下,onSubscribe输出后过一秒输出A1,再过一秒输出A2,再过一秒输出A3

onSubscribe
onNext A1
onNext B2
onNext C3
复制代码

那么为何输出结果会是这个样子的呢?来看看zip的源码实现

public static <T1, T2, R> Observable<R> zip( ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper) {
    return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2);
}
public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize, ObservableSource<? extends T>... sources) {
    return RxJavaPlugins.onAssembly(new ObservableZip<T, R>(sources, null, zipper, bufferSize, delayError));
}
复制代码

根据源码能够看出zip方法其实最终建立了一个ObservableZip实例,直接看其subscribeActual

// ObservableZip
public void subscribeActual(Observer<? super R> observer) {
    ObservableSource<? extends T>[] sources = this.sources;
    int count = sources.length;
    ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(observer, zipper, count, delayError);
    zc.subscribe(sources, bufferSize);
}
ZipCoordinator(Observer<? super R> actual,
        Function<? super Object[], ? extends R> zipper,
        int count, boolean delayError) {
    this.downstream = actual;
    this.zipper = zipper;
    this.observers = new ZipObserver[count];
    this.row = (T[])new Object[count];
    this.delayError = delayError;
}
public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
    ZipObserver<T, R>[] s = observers;
    int len = s.length;
    for (int i = 0; i < len; i++) {
        s[i] = new ZipObserver<T, R>(this, bufferSize);
    }
    this.lazySet(0);
    downstream.onSubscribe(this);
    for (int i = 0; i < len; i++) {
        if (cancelled) {
            return;
        }
        sources[i].subscribe(s[i]);
    }
}
复制代码

能够看出ZipCoordinator的subscribe内部建立了输入源大小的ZipObserver实例,而后调用每一个输入源的subscribe方法,这样当输入源发送事件时就会调用ZipObserver的onNext方法

// ZipObserver.java
public void onNext(T t) {
    queue.offer(t);
    parent.drain();
}
复制代码

主要看看ZipCoordinator的drain方法

public void drain() {
    final ZipObserver<T, R>[] zs = observers;
    final Observer<? super R> a = downstream;
    final T[] os = row;
    final boolean delayError = this.delayError;
    for (;;) {
        for (;;) {
            int i = 0;
            int emptyCount = 0;
            for (ZipObserver<T, R> z : zs) {
                if (os[i] == null) {
                    boolean d = z.done;
                    T v = z.queue.poll();
                    boolean empty = v == null;
                    if (!empty) {
                        os[i] = v;
                    } else {
                        emptyCount++;
                    }
                } else {
                    // 主要是错误判断
                }
                i++;
            }
            if (emptyCount != 0) {
                break;
            }
            R v = zipper.apply(os.clone());
            a.onNext(v);
            Arrays.fill(os, null);
        }
    }
}
复制代码

结合示例,首先Obserable1会发送一个A事件,将其放入到了一个队列中去,接着drain遍历全部的ZipObserver,第一个ZipObserver能够从队列中事件将其赋值给os[0],第二个取不到所以emptyCount++,而后退出循环。接着Observable1又发送了一个B事件,再将其放入队列中,而后执行drain,此次由于os[0]已经不为null因此不会从队列中取,os[1]仍是null,退出循环继续执行,接着Observable1再次发送一个C事件,这个跟B事件处理逻辑同样,再接着Observable2会发送一个1事件,将其放入队列,执行drainos[1]赋值成1,因为emptyCount等于0,所以会执行zipper.apply,这个方法内部会回调传入的BiFunction的apply方法(示例中仅仅进行了字符串拼接),获取到结果A1,回调下游的onNext方法,而后将row这个数组置空,接着线程睡眠1秒,而后再次发送事件2,将其放入队列中,执行drain,方法内部遍历两个ZipObserver而且都能从队列中取到事件,因此emptyCount等于0,接着就会执行apply而后获取到结果B2,调用下游的onNext,后面Observable2的3事件也跟2事件同样就不说了。

3、总结

经过分析map和flatMap两个方法能够总结出以下几个的结论

  1. subscribeActual方法老是会调用上游的subscribe方法
  2. onSubscribe方法老是会调用下游的onSubscribe方法
  3. Observer实例的onSubscribe会在事件发射前调用
  4. RxJava提供的一些操做符其实会在内部建立本身的Observable和Observer实例,其目的无非是为了对下游的Observer进行封装还有就是让下游subscribe调用的是本身建立的Observable实例
相关文章
相关标签/搜索