友好 RxJava2.x 源码解析(一)基本订阅流程

系列文章:app

本文 csdn 地址:友好 RxJava2.x 源码解析(一)基本订阅流程ide

本文基于 RxJava 2.1.3源码分析

前言

本文基于读者会使用 RxJava 2.x 而讲解,基本原理不涉及,示例只纯粹为示例而示例。

示例代码

示例源码:
Observable
            .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("1");
                    emitter.onNext("2");
                    emitter.onNext("3");
                    emitter.onComplete();
                }
            })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("TAG", "onSubscribe():  ");
                }

                @Override
                public void onNext(String s) {
                    Log.e("TAG", "onNext():  " + s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.e("TAG", "onComplete():  ");
                }
            });
复制代码

输出结果:post

E/TAG: onSubscribe():  
E/TAG: onNext():  1
E/TAG: onNext():  2
E/TAG: onNext():  3
E/TAG: onComplete():  
复制代码

订阅流程解析

咱们知道 subscribe() 方法是 Observable 和 Observer 的链接点,因此首先戳进 subscribe(Observer observer) 中,能够发现该方法是 Observable 类的方法,传入了一个 Observer 对象,那首先咱们须要弄明白这里的 Observable 和 Observer 分别是什么,观察上方示例代码咱们能够知道 Observer 是 new 出来的,因此咱们只须要知道 Observable 是什么,固然,这里也很清晰,Observable 就是咱们调用 Observable.create(ObservableOnSubscribe) 所建立出来的 Observable,来一张图 ——spa

Observable.create(ObservableOnSubscribe)

Observable 和 Observer 咱们都弄清楚了,接下来就是查看 subscribe(Observer) 具体的实现了,以下 ——.net

@Override
public final void subscribe(Observer<? super T> observer) {
	// 略去其余源码
    subscribeActual(observer);
	// 略去其余源码
}
复制代码

略去非关键源码后咱们发现它只作了一件事,就是调用 Observable#subscribeActual(observer),而在 Observable 中该方法是一个抽象方法:线程

protected abstract void subscribeActual(Observer<? super T> observer);
复制代码

这意味着咱们须要去找它的子类,咱们要看看它的 subscribeActual(Observer) 方法,那咱们就得从 create(ObservableOnSubscribe) 着手,看它是如何将一个 ObservableOnSubscribe 对象转换成一个 Observable 对象的——3d

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
	// 略去其余源码
    return new ObservableCreate<T>(source);
}
复制代码

一样的,删除非关键源码以后,咱们就剩下这么一行代码,这也就是意味着咱们须要从 ObservableCreate 这个类中去寻找 subscribeActual(Observer) 的实现了,这里笔者须要说起两点——code

  1. 从上述方法能够看出 ObservableCreate 是 Observable 的一个子类cdn

  2. 咱们自定义的 ObservableOnSubscribe 做为一个名为 source 字段被传入了。事实上在 Observable 的子类实现中,它们都有一个名为 source 的字段,指代上游 Observable(其实是 ObservableOnSubscribe,可是咱们不妨理解成就是 Observable)。

ObservableCreate#subscribeActual() 实现以下:

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 触发 Observer#onSubscribe(Disposable)
    observer.onSubscribe(parent);

    try {
        // 发射事件
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
复制代码

第5行调用了 Observer#onSubscribe(Disposable) ,因此咱们能够知道 Observer#onSubscribe(Disposable) 是先被调用的,而此时 Observable 甚至尚未开始发射事件!接下来就是调用了 source.subscribe(ObservableEmitter),这个方法是交由开发者去实现的,在示例代码是以下所写 ——

@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
      emitter.onNext("1");
      emitter.onNext("2");
      emitter.onNext("3");
      emitter.onComplete();
  }
复制代码

在代码中咱们调用了 CreateEmitter 对象的 onNext() 方法,因此咱们须要戳入 CreateEmitter 类中看一下 onNext(T) 的具体实现(固然 onComplete() 方法等同,此处就不作扩展了),源码以下:

@Override
    public void onNext(T t) {
		// 略去其余源码
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
复制代码

一目了然,当当前对象并不处于 DISPOSED 状态时,那么就将会调用下游 Observer 的 onNext(T) 方法,而下游 Observer 的 onNext(T) 方法也就是咱们上面示例代码中所写的——

public void onNext(String s) {
    Log.e("TAG", "onNext():  ");
}
复制代码

至此,基本订阅流程咱们就理清楚了。咱们从 Observable#subscribe(Observer) 开始,将 Observer 传给 Observable,而 Observable 又会在 onNext(T) 方法中激活 Observer 的 onNext(T) 方法。咱们在示例只涉及了少许的 Observable/Observer,事实上,咱们在 RxJava 中运用的操做符都会在内部建立一个 Observable 和 Observer,虽然在 Observable#subscribeActual(Observer) 中都有本身特定的实现,可是它们大部分都是作两个操做,一是将「下游」传来的 Observer 根据需求进行封装;二就是让「上游」的 Observable subscribe() 该 Observer。

订阅流程

通过了如上的分析后,笔者但愿读者可以理解 RxJava2.x 的基本订阅流程是从 Observable#subscribe(Observer) 开始的,而该方法会触发「上游」 Observable 的 Observable#subscribeActual(Observer) 方法,而在该「上游」 Observable 中又会触发「上游的上游」Observable 的 Observable#subscribeActual(Observer) 方法。咱们不妨用如下述源码举例:

Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("1");
        }
    })
    .flatMap(new Function<String, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(String s) throws Exception {
            return Observable.just(s);
        }
    })
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(String s) {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });
复制代码

另附一张图,图中标明了后面讲到的「第一个 Observable」、「第二个 Observable」等名词:

例图

接下来用图展现整个订阅的流程——

Observable#subscribe(Observer) 流程

Observable#subscribe(Observer) 以前:

Observable#subscribe(Observer) 之前

准备触发 Observable#subscribe(Observer)

准备触发 Observable#subscribe(Observer)

Observable#subscribe(Observer) 将会致使其上游 Observable 的 subscribe(Observer) 方法被调用:

Observable#subscribe(Observer) 以后

上游 Observable 的 subscribe(Observer) 方法内部又会调用上游的上游 Observable 的 subscribe(Observer)

触发上级 Observer

Observable#subscribe(Observer) 会调用 Observable#subscribeActual(Observer) ,该方法是一个抽象方法,由子类覆写,因此展示了 Observable 的多态性,并且如何激活上游 Observable 的 subscribe(Observer)/subscribeActual(Observer) 方法的关键点也在此。实现方式就在于 Observable#subscribeActual(Observer) 方法虽然是一个抽象方法,可是它的子类实现中都包含有一句 source.subscribe(Observer),其中 source 就是上游 Observable(其实是 ObservableSource,可是咱们此处不妨就理解成 Observable,毕竟咱们对这个对象更熟悉一些,Observable 是 ObservableSource 接口的实现),因此就能够理解在每个 Observable 的 subscribeActual(Observer) 方法中它都会调用上游的 subscribe(Observer)/subscribeActual(Observer) 方法,直至到达第一个 Observable 的 subscribe(Observer)/subscribeActual(Observer) 中。

Observer#onSubscribe(Disposable) 流程

订阅的关系链理清了,可是尚未发射事件的流程还没出来啊,咱们继续往下走——

到达顶部 Observable 的时候,已经不能再往上走了,就要准备搞事情(准备发射事件了),此处咱们就以示例代码中的 Observable 为例,它的 subscribeActual(Observer) 中——

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
复制代码

它首先封装了一个 Disposable,接下来将调用 Observer#onSubscribe(Disposable) 将 Disposable 做为参数传给下一层 Observer。

Disposable 初始化

到了下一层的 Observer 的 onSubscribe(Disposable) 中,该方法中针对上一层 Disposable 作一些操做(判断、封装等),而后再封装一个 Disposable 做为参数传递给 Observer#onSubscribe(Disposable)

Disposable 传递

而此时的 Observer 就是咱们所自定义的 Observer——

Observer#onSubscribe(Disposable)

Observer#onNext(T) 流程

Observer#onSubscribe(Disposable) 流程结束后,就执行到第7行代码 Observeable.subscribe(Observer),实质上也就是——

new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("1");
    }
})
复制代码

ps:为了方便起见,此处只分析 onNext() 执行流程。

ObservableEmitter#onNext(T) 的内部实际上会触发 Observer 的 onNext(T) 方法——

发射事件

再向下触发就是咱们所自定义的最底层的 Observer 了——

发射事件

以示例代码来讲,顶游 Observable 会触发 ObservableEmitter#onNext(T) 方法,在该方法的内部又触发了「下游」 Observer 的 onNext(T) 方法,而在该方法内部又会触发「下游的下游」 Observer 的 onNext(T) 方法,直至最底层的 Observer —— 咱们所自定义的 Observer ——

Observer#onNext(T)

到此,一套订阅流程就执行完毕了。