Android进阶:5、RxJava2源码解析 2

上一篇文章Android进阶:4、RxJava2 源码解析 1里咱们讲到Rxjava2 从建立一个事件到事件被观察的过程原理,这篇文章咱们讲Rxjava2中链式调用的原理。本文不讲用法,仍然须要读者熟悉Rxjava基本的用法。java

一.Rxjava2 的基本用法
Rxjava是解决异步问题的,它的链式调用让代码看起来很是流畅优雅。如今咱们带上线程切换以及链式调用来看看。下面代码是示例:性能优化

Observable
                .create(new ObservableOnSubscribe<String>() {

                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        e.onNext("a");
                    }
                })
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return 1;
                    }
                })
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Object o) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

咱们建立一个事件(观察者),想输出一个字符串 "a"。这个事件发生在IO线程,结束也在IO线程,事件的状态回调发生在主线程。示例的用法你们应该都能懂,咱们主要讨论这个链式的原理流程。为何这么说呢?由于这个链式跟通常的链式不太同样架构

二.create方法
这个方法咱们以前看过,返回一个ObservableCreate对象,ObservableCreate继承自Observable,里面的source存着咱们建立的ObservableOnSubscribe匿名对象。app

三.subscribeOn方法
这是Obserbvable的方法,先看源码:异步

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }


    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

代码结构跟create的差很少,在钩子函数里直接返回咱们建立的对象ObservableSubscribeOn<T>(this, scheduler),并传入当前的Observable也就是ObservableCreate对象。因此咱们看一下这个类的代码:ide

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
}

这个类继承自AbstractObservableWithUpstream类,构造函数的参数是ObservableSource,因此这里咱们须要介绍两个类:函数

  • ObservableSource
    ObservableSource是一个接口,全部的Observable都实现了这个接口,它里面只有:
void subscribe(@NonNull Observer<? super T> observer);

这一个方法。很明显这个方法是为了让Observer订阅Observable的,或者说为了Observable把事件状态传递给Observer的。性能

  • AbstractObservableWithUpstream
    这个类继承了Observbable
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> {

    protected final ObservableSource<T> source;

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
}

从源码能够看出这个类有变量source,它在构造函数里传入值,存储ObservableSource对象。学习

因此当咱们调用Observable的subscribeOn方法的时候会建立一个ObservableSubscribeOn对象,并用变量source存储当前的Observable对象,而后返回ObservableSubscribeOn对象。优化

四.unsubscribeOn方法

public final Observable<T> unsubscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableUnsubscribeOn<T>(this, scheduler));
    }

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

这个方法跟上面的方法是一个模子刻的。因此咱们主要看ObservableUnsubscribeOn这个类就好。

public final class ObservableUnsubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableUnsubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
}

这个类跟刚才的ObservableSubscribeOn也几乎如出一辙,继承自AbstractObservableWithUpstream类,使用source存了当前Observable对象。而此时的Observbvable对象是上一个方法建立的对象,也就是ObservableSubscribeOn对象。

五.observeOn方法和map方法
因为这些方法的内容基本同样我就省略代码的解释。
observeOn方法是建立了ObservableObserveOn对象,并保存上一个方法建立的Observable。map方法是建立ObservableMap对象,并保存上一个方法建立的Observable
因此总结一下可知:链式调用这些方法的时候,都会建立一个相关的对象,而后用变量source存储上一个方法建立的Observable子类对象。

六.subscribe方法
上次文章讲到,这个方法内部会调用一个抽象方法,subscribeActual方法,做为真实的订阅。而这个方法的逻辑须要看子类如何实现。
而第一次调用该这个subscribe方法的对象是ObservableMap对象。因此咱们看看它内部如何实现的。
ObservableMap的subscribeActual方法实现:

public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

内部调用了source的subscribe方法。此时ObservableMap对象里存的source是上一个方法建立的observable,也就是ObservableObserveOn对象。因此咱们要看看ObservableObserveOn是如何实现subscribeActual方法的:

protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
``
同理他最终也是调用了上一个Observable的subscribe。

因而咱们知道当咱们调用subscribe方法的时候,会递归式的调用source存储的上一个方法建立的Observable的subscribeActual方法,一直到ObsservableCreate的subscribeActual的方法,把事件状态传递给观察者。这个上一篇文章已经讲过。


七.总结
咱们常见的普通的链式调用通常都会返回当前同一个对象。和普通的链式调用不一样当咱们调用Rxjava2的链式调用时,他们会返回本身对应的Observable子类对象,每一个对象都不同,而后在subscribeActual方法中递归式的调用每一个对象的subscribeActual方法,完成一个链式的调用

八.写在最后
若是喜欢个人文章,想与一群资深开发者一块儿交流学习的话,欢迎加入个人合做群Android Senior Engineer技术交流群。有flutter—性能优化—移动架构—资深UI工程师 —NDK相关专业人员和视频教学资料
群号:925019412
相关文章
相关标签/搜索