RxJava2源码分析(一):基本流程分析

前言:到如今这个阶段,网上关于RxJava2源码分析的文章已经满天飞了,我写这篇文章的目的并非说我会分析的比他们好,比他们透彻,这篇文章的目的只是单纯的记录本身分析RxJava2源码的成功及收获。java

概述

  对于一个编程人的技术成长,通常会经历三个阶段,首先是学会使用开源库,而后是知道开源库的原理,最后就是本身写一个开源库。虽然在平常的开发中使用RxJava2已经达到了驾轻就熟的地步,可是不了解具体的原理,总感受有点虚。因而就想静下心来,好好的分析一下RxJava源码,达到不只知其然更知其因此然的地步。git

  下图是分析RxJava基本流程后,画的UML图,对于已经分析过源码的大神,能够看下图画的是否正确,对于没有分析过源码的人,能够看下,先有个映像,而后再跟着文章的内容,一点点的理解。(点击图片查看大图)github

基本流程分析,UML图

源码分析

  先看RxJava2基础用法的代码编程

private void basicUseRxJava() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("wizardev", "onNext: "+s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }
复制代码

以上代码,只是RxJava2的基本使用,并无涉及任何的操做符代码,下面咱们就按方法顺序开始分析源码。app

create方法分析

  看下create()方法的代码ide

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
     //一、判空
        ObjectHelper.requireNonNull(source, "source is null");
     //二、
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
复制代码

从以上代码能够看出,create方法的返回值类型是Observable,参数是ObservableOnSubscribe<T>,能够先看下这个ObservableOnSubscribe类,源码以下源码分析

public interface ObservableOnSubscribe<T> {

    /** * Called for each Observer that subscribes. * @param emitter the safe emitter instance, never null * @throws Exception on error */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
复制代码

能够发现ObservableOnSubscribe类是一个接口,里面有一个subscribe方法。如今继续看create方法中的代码,在“1”处代码是判断传入的参数是否为空。这里主要看下“2”处,这句RxJavaPlugins.onAssembly实际上是一个Hook方法,**“2”处代码实质就是return new ObservableCreate<T>(source);,**不信的话,能够看下onAssembly方法,以下ui

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
复制代码

经调试,onObservableAssembly为null,因此上面的代码就直接返回了new ObservableCreate<T>(source)this

  如今看下ObservableCreate类,以下spa

public final class ObservableCreate<T> extends Observable<T> {
    //一、全局变量
    final ObservableOnSubscribe<T> source;

    //二、构造方法中将source赋值
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    //三、这个方法是在调用subscribe方法才调用的
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    //...省略部分代码
}
复制代码

从上面的代码能够知道,ObservableCreate类继承自Observable,在实例化的时候将create方法中的ObservableOnSubscribe<T> source参数注入了进来,做为成员变量source

结论

  经过分析Observable类的create方法,能够有如下结论:

  1. create方法的返回值类型是Observable
  2. create方法的参数的类型是接口;
  3. create方法实际返回的是ObservableCreate类,而ObservableCreate类是Observable的子类;
  4. 在实例化ObservableCreate类的时候将create的方法的参数注入到了ObservableCreate类中,做为它的成员变量source

这里重点看下第4个结论,在这里create方法的参数实际就是下面的代码

new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }
复制代码

subscribe方法分析

  分析完了create方法,接着来分析subscribe方法,其方法代码以下

public final void subscribe(Observer<? super T> observer) {
     //一、判空
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //二、Hook方法,实质就是observer
            observer = RxJavaPlugins.onSubscribe(this, observer);
//判空
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
			//四、重点,
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
复制代码

这里重点看下“4”处, 这里调用了ObeservablesubscribeActual方法,能够看下Obeservable类中的这个方法,以下

protected abstract void subscribeActual(Observer<? super T> observer);
复制代码

这个方法是抽象的,实际调用的是它子类中的方法,经过上文的分析,咱们知道ObservableCreateObeservable类的子类,因此,这里调用的实际就是ObservableCreate类中的subscribeActual方法。如今,咱们再看下这个方法中的代码,以下

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        //一、实例化CreateEmitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //二、回调方法
        observer.onSubscribe(parent);

        try {
            //三、回调方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

咱们一步步的分析这个方法中的代码,先看“1”处的代码,这里实例化了CreateEmitter这个类,在实例化的同时将observer传了进去。看下CreateEmitter这个类的代码,以下

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
//...省略部分代码
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

       //...省略部分代码
    }
复制代码

经过上面的代码,能够发现CreateEmitter这个类实现了ObservableEmitter这个接口,而这个接口是ObservableOnSubscribe接口中subscribe方法的参数,是否是发现什么了?如今继续往下看,看下“2”处的代码,这里回调了ObserveronSubscribe方法,分析到这里,能够得出下面的结论

onSubscribe()回调所在的线程是ObservableCreate执行subscribe()所在的线程,和subscribeOn()/observeOn()无关!

重点来了,这里看下“3”处的代码,还记得source是谁吗?**它就是执行Observable.create方法时,咱们注入给ObservableCreate类的成员变量,是ObservableOnSubscribe接口的实例。**这里调用的subscribe方法,实际就是下面代码的subscribe方法,

public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
复制代码

这段代码中的subscribe方法的参数实质就是CreateEmitter,调用的onNext方法就是CreateEmitter类中的onNext方法。继续看下CreateEmitter类中的onNext方法,代码以下

@Override
        public void onNext(T t) {
            //一、判断传入的参数是否为null
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //二、调用Observer中的onNext方法
                observer.onNext(t);
            }
        }
复制代码

分析到这里,就能够得出如下结论了

subscribe方法中发射器所调用的onNext方法,若是代码没有出错的话,最终调用的就是Observer中的onNext方法。

分析CreateEmitter中的其余方法,还能够知道为何Observer中的onErroronComplete方法只有一个会回调的缘由了,缘由就是不管调用的是哪个方法都会调用dispose()方法取消订阅。

结论

  对Observable.subscribe方法的分析能够得出如下结论

  1. subscribe方法最终调用了ObservableCreate类中的subscribeActual方法。
  2. subscribeActual方法中,实例化了发射器,并开始发射数据。
  3. subscribe方法中发射器所调用的onNext方法,若是代码没有出错的话,最终调用的就是Observer接口中的onNext方法。

总结

  经过对RxJava基本流程的源码分析,是否是对RxJava的原理有了更清晰的认识呢?分析完以后,咱们再看下这张图,是否是感受如今看起来就明白多了呢?

结束语

  想要了解一些开源库的原理,咱们必需要阅读其源码,只有从源码中才能获得想要的答案,才能对库的原理有更清晰的认识。

  再说下,阅读开源库的注意事项,阅读源码时,咱们最好带着问题来阅读,阅读前先有个目标,好比我此次阅读要搞懂什么问题,而后再开始阅读,否则就会很容易在茫茫代码中迷失。还有就是不要想着每句代码都搞懂,搞懂与本身想要获取的答案有关的代码便可。

  转载请注明出处:www.wizardev.cn

欢迎关注个人公众号
扫码关注公众号,回复“获取资料”有惊喜
相关文章
相关标签/搜索