rxjava2源码解析(一)基本流程分析

写在前面

2020年开始了,给本身定了一个周更博客的计划。还好计划定的不算晚,我能够在第一个星期过去以前赶出第一篇,《rxjava2源码解析(一)基本流程分析》。这也是Read The Fucking Source Code系列的第一篇文章。java

先给你们说说我写博客的初衷。我写博客的目的只有一个:就是成为优质博主git

去年的我,把焦虑当作学习的驱动力,结果很惨。头发没了,对学习的兴趣也愈来愈低。因此今年的我决定换种方式,去TM的焦虑。我不爱学习,我也不爱焦虑,我只想成为优质博主。因此就也有了今年周更博客的计划。我要把写博客当成打怪升级,把大家的每一次阅读看成我补一个兵,每个点赞评论看成个人一次单杀。程序员

为了有更好的游戏体验,我必然会把每篇技术博客写到极致,作到老小皆宜,你们都爱看。但愿你们走过路过,点个赞再走啊!在此拜谢github

引言

Read The Fucking Source Code,是程序员圈子里的一个众所周知的梗。你们都知道读源码枯燥无趣,可又不得不作,非常痛苦。我作这个系列的目的就是想让你们在阅读源码时,也能体验到愉悦。开篇第一章,决定用rxjava2源码阅读开头。由于这个框架平常都在用,面试也常常会问,已经成为Android必备技能。可是知道怎么用并不够,面试官一问原理就蒙圈可不行。因此就有了RTFSC的第一卷,rxjava2源码阅读。我会尽可能把读源码这个枯燥的事情,给你们说的有趣一点,通俗一点。面试

从基本使用入手

首先随便写一个rxjava2的基本用法,咱们根据这个简单的示例来看看rxjava2整个流程是什么样的。bash

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onNext("4");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG,"onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG,"s = "+ s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
复制代码

上面的部分,看起来太长,咱们能够先将其简化。app

Observable.create(ObservableOnSubscribe).subscribe(Observer);
复制代码

废话很少说,直接划重点:框架

  • 1.能够看到这里出现了三个类名很是相像的类:ObservableObservableOnSubscribeObserver。也就是咱们平常说的,被观察者,观察者。
  • 2.为了更好的区分,咱们将其形象化一点。Observable咱们称其为装饰器,ObservableOnSubscribe咱们也称其为发射源,Observer咱们称其为处理器。为何这么称呼,咱们能够边看源码边讲。
  • 3.咱们能够把上面的内容形象化为:装饰器Observable经过一个create方法和一个subscribe方法,将发射源和处理器链接起来。

接下来咱们看看这个链接在源码中是如何实现的。ide

装饰器Observable

首先从Observablecreate入手。函数

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");//判空做用
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
复制代码

划重点:

  • 1.create方法,须要传入一个发射源ObservableOnSubscribe<T>对象,返回一个Observable<T>对象。
  • 2.忽略掉判空的代码,onAssembly方法咱们也暂时放在一边,只须要知道是返回传入参数就好了。那create方法就是返回一个ObservableCreate对象。

那咱们来看看ObservableCreate这个类。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ....
}
复制代码

划重点:

  • 1.ObservableCreate这个类继承自Observable
  • 2.ObservableCreate的构造方法中直接将参数中的发射源ObservableOnSubscribe做为source存在本地。

OK,create方法看完了。很简单,一句话总结,建立了一个装饰器对象,将发射源存在本地备用。(有没有一种看王刚炒菜的感受?)

为何咱们称Observable为装饰器?由于rxjava在这里用到了装饰器模式,而Observable是装饰器模式下的基类。装饰器模式这里看还不明显,看到后面就知道了。

发射源ObservableOnSubscribe

上面create方法须要传入一个发射源ObservableOnSubscribe参数,咱们来看看源码:

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
复制代码

划重点:

  • 1.发射源ObservableOnSubscribe是一个接口,咱们在使用它时会重写subscribe方法。
  • 2.咱们会在subscribe方法中定义接下来要进行的一系列事件,因此咱们称ObservableOnSubscribe为事件发射源。
  • 3.subscribe方法有一个参数就是发射器ObservableEmitter(后面会详细说明)。

订阅(链接)

接下来讲说下一步:subscribe
前面说到,Observablecreate方法返回的是ObservableCreate对象,ObservableCreatesubscribe方法并无进行重写,咱们直接看Observable里的subscribe方法。

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
复制代码

让咱们抛开那些不重要的代码,直入主题。将其中的关键代码简化以后能够变为:

public final void subscribe(Observer<? super T> observer) {
    observer = RxJavaPlugins.onSubscribe(this, observer);
    subscribeActual(observer);
}
复制代码

RxJavaPlugins这个一样先甩在一边放着无论,跟前面的onAssembly同样,咱们只须要知道这是返回传入的observer就好了。
那么只有subscribeActual(observer)这一句关键代码了。ObservablesubscribeActual是一个抽象方法,具体实如今子类中。

其实,在这里咱们就能够看出来,这是一个装饰器模式。Observable是装饰器模式的基类,实际上全部操做都是它的子类完成的。因此咱们称其为装饰器。不仅是create方法,其余一些操做符,例如mapflatMap也是这样的。这个后面讲到操做符和线程切换的时候,大家应该会更有体会。

因此后面咱们分析Observablesubscribe方法时,直接看子类中的subscribeActual(observer)就行。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

划重点:

  • 1.先建立一个CreateEmitter对象parent,而后调用处理器observeronSubscribe方法持有它。
  • 2.再调用source.subscribe(parent)将其传入到source当中。这个source就是前面咱们说到备用的发射源ObservableOnSubscribe,其中的subscribe方法正好须要一个发射器CreateEmitter

那整条订阅线就很清晰了:

  • 1.Observable调用create方法,参数是一个发射源ObservableOnSubscribe(咱们对其subscribe方法进行重写),生成一个ObservableCreate对象。
  • 2.ObservableCreate调用subscribe方法,参数是一个处理器Observer
  • 3.在subscribe方法中咱们以Observer为参数生成了一个发射器CreateEmitter,而且将这个发射器做为参数,调用了发射源ObservableOnSubscribesubscribe方法。

这个CreateEmitter是什么?咱们来看看它的源码。

发射器CreateEmitter

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        
        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
        .....
    }
复制代码

划重点:

  • 1.CreateEmitterObservableCreate中的一个静态内部类,继承自AtomicReference<Disposable>,ObservableEmitter<T>, Disposable,咱们称其为发射器。
  • 2.咱们从onNext方法中能够看出,这个发射器是直接与外部处理器对接的。
  • 3.发射器继承自Disposable接口,这个接口只有dispose()和isDisposed()两个方法,做用是切断发射过程。
  • 4.在上面的subscribeActual方法中咱们能够看到,Observer有调用onSubscribe方法持有这个CreateEmitter发射器对象。因此咱们能够在处理器中经过dispose()接口随时中断发射流程。
  • 5.同时咱们能够在代码中看到,onErroronComplete两个是互斥的。只会执行一个,由于一旦执行其中一个,会当即切断发射过程。

总结

总结一下出现的几个类:

  • Observable -> 装饰器模式的基类,咱们称其为装饰器。有一个create方法,参数是一个ObservableOnSubscribe发射源,会返回一个ObservableCreate对象。
  • ObservableCreate -> 装饰器实现类。有一个subscribe方法,参数是Observer处理器。在subscribe方法内部,咱们以Observer为参数生成了一个CreateEmitter发射器,而且将这个发射器做为参数,调用了发射源的subscribe方法。
  • ObservableOnSubscribe -> 发射源,自己只是一个接口,咱们重写了subscribe方法,定义了接下来要处理的事件,因此称其为发射源。
  • CreateEmitter -> 发射器,构造方法中包含一个处理器。处理器持有这个发射器对象,能够随时中断发射过程。发射器中的onErroronComplete两个是互斥的,只会执行一个。
  • Observer -> 处理器。用于处理发射器发送的数据。

再总结一下整个运行流程以下:

  • 1.Observable调用create方法,参数是一个发射源ObservableOnSubscribe(咱们对其subscribe方法进行重写),生成一个ObservableCreate对象。
  • 2.ObservableCreate调用subscribe方法,参数是一个处理器Observer
  • 3.在subscribe方法中咱们以Observer为参数生成了一个CreateEmitter发射器,而且将这个发射器做为参数,调用了发射源ObservableOnSubscribesubscribe方法。
  • 4.发射源ObservableOnSubscribesubscribe方法中定义了咱们要处理的事件,并将结果传递给发射器CreateEmitterCreateEmitter先判断事件流是否断开,不断开则将结果传递给处理器Observer
  • 5.处理器Observer处理结果。

拓展

这时候咱们再回头看咱们前面扔掉的东西,RxJavaPlugins.onAssemblyRxJavaPlugins.onSubscribe。咱们直接看源码。

/**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } 复制代码

方法介绍中有描述:Calls the associated hook function。
了解hook的应该就知道了,这里至关因而利用Java反射机制,对source进行了一层包装拦截。rxjava给咱们提供了一个注入hook的方法,咱们能够经过hook来实如今调用source以前,须要先调用咱们设置的拦截函数。咱们如今只须要知道有这个东西就好了,后面有这个须要再用。

最后

这一篇主要是讲rxjava基本使用中的源码流程,下一篇咱来讲说线程切换。

每周更新,敬请期待~

相关文章
相关标签/搜索