RxJava2.0(三)谈一谈基础功能源码实现

前言

咱们在使用RxJava的时候最经常使用的功能就是写一个被观察者、一个观察者。在被观察者中发射数据,在观察者中接收数据,最后用subscribe将二者给订阅起来实现最基础的功能。例以下面这种:git

//被观察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("aa");
                e.onNext("bb");
                e.onNext("cc");
                e.onNext("dd");
                e.onComplete();
            }
        });

        //观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                //TODO 初始化数据
                d.dispose();
            }

            @Override
            public void onNext(String value) {
                //TODO 接收被观察者发送的数据
                Log.i("result:",value);

            }

            @Override
            public void onError(Throwable e) {
                //TODO 错误

            }

            @Override
            public void onComplete() {
                //TODO 完成以后回调
            }
        };

        //订阅
        observable.subscribe(observer);
复制代码

那么在这种状况下,被观察者是如何发送数据给观察者?观察者又是如何接收数据?二者又是如何被subscribe订阅起来的呢?下面咱们经过源码的分析来查看这一切的操做流程。github

Observable.creat()

首先,在建立被观察者的时候,通常来讲都是经过Observable.creat()来建立。那么咱们进入creat()方法去看看里面作了什么操做。bash

@SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    
        //非空判断
        ObjectHelper.requireNonNull(source, "source is null");
        
        //返回一个Observable
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
复制代码

咱们跳过非空判断逻辑,直接查看return。这里会经过RxJavaPlugins.onAssembly()返回一个Observable对象。那么咱们跳进onAssembly去查看里面是如何进行Observable的建立的。ide

public static <T> Observable<T> onAssembly(Observable<T> source) {
       
       ...
       
        //返回传入的参数对象
        return source;
    }
复制代码

这里其实没作什么很特别的操做,仅仅只是返回了参数对象。也就是说咱们如今应该返回去重点研究的是这个参数对象source。而这个source根据前面的源码查看,能够看到实际上是new ObservableCreate()。咱们跳进这里查看源码分析

ObservableCreat()

//ObservableCreat继承了Observable,为被观察者Observable的子类
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    //构造方法,传入参数ObservableOnSubscribe,也就是咱们在里面进行onNext、onComplete与onError的方法。
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    
    //此方法是在被订阅subscribe时候才调用,具体后面再说。
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}
复制代码

代码进行了一些删减。学习

经过上面代码逻辑注释能够看到:ui

1.ObservableCreat为Observable的子类,由于他拥有Observable的所有特性。this

2.在ObservableCreat构造方法中传入了ObservableOnSubscribe,这个具体的做用咱们下面讲。spa

3.有一个subscribeActual方法,这个方法实际上是后面用做订阅的方法subscribe来实现的具体方式。线程

如今咱们再来看看传入ObservableCreat中参数ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(ObservableEmitter<T> e) throws Exception;
}
复制代码

发现ObservableOnSubscribe实际上是一个接口,而这个接口里有一个方法,专门用来实现咱们的向观察者发送消息的方法。到此,被观察者在进行creat()的源码分析完毕,咱们来总结一下。

总结:

1.在进行creat的时候,内部返回了一个Observable,而这个Observable其实是一个继承了ObserVable的ObservableCreat类

2.ObserVable的ObservableCreat构造方法中传入了一个接口ObservableOnSubscribe,咱们通常进行数据的发送都是经过这个接口中的subscribe方法里的ObservableEmitter来进行onNext、onComplete与onError。

Obsevable.subscribe()

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
        
            //获取observer
            observer = RxJavaPlugins.onSubscribe(this, observer);
            
            //非空判断
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            
            //订阅方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
复制代码

抛开上面的逻辑判断不谈,咱们直接看订阅方法subscribeActual()。不知道你们还记不记得,在咱们讲Observable.creat的时候,在ObservableCreat这个类里面有两个用到的方法,一个是构造方法,传入接口ObservableOnSubscribe,另一个是subscribeActual。而如今Observable.subscribe实际上就是在执行这个方法。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        //实例化CreatEmitter对象
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        
        //执行观察者中的onSubscribe方法
        //onSubscribe()回调所在的线程是ObservableCreate执行subscribe()所在的线程
        //和subscribeOn()与observeOn()无关!
        observer.onSubscribe(parent);

        try {
            //真正的订阅方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

咱们在来看下这方法中实例化CreatEmitter对象的这个类。

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //当没有被取消订阅的时候就执行onNext()方法用于发送数据
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            //出现错误时调用这个方法,用于抛出异常,而且在抛出以后的finally中调用dispose用于取消订阅
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
        //判断若是没有执行disposed方法就调用onComplete而且dispose。这也就是为何onComplete与onError为何只会执行其中一个
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }


复制代码

里面的逻辑能够说是一点都不复杂,就是咱们平时常常使用的onNext、onComplete与onError方法。

onNext():先判断发送的消息是否为null,若是为空则调用onError方法来抛出异常。若不为空而且并未取消订阅,则发送数据。

onError():出现错误的时候执行这个方法。当抛去异常以后经过finally强行执行dispose()方法,来强制结束掉订阅。

onComplete():判断若是没有执行disposed方法就调用onComplete而且dispose。这也就是为何onComplete与onError为何只会执行其中一个。

总结

分析一下各个类的职责:

Observable :我的理解是装饰器模式下的基类,实际上全部操做都是Observable的子类进行的实现

ObservableOnSubscribe: 接口,定义了数据源的发射行为

ObservableCreate: 装饰器模式的具体体现,内部存储了数据源的发射事件,和subscribe订阅事件

ObservableEmitter: 数据源发射器,内部存储了observer

Observer: 接收到数据源后的回调(好比打印数据等)

1.Observable.create(),实例化ObservableCreate和ObservableOnSubscribe,并存储数据源发射行为,准备发射(我已经准备好数据源,等待被订阅)

2.Observable.subscribe(),实例化ObservableEmitter(发射器ObservableEmitter准备好!数据发射后,数据处理方式Observer已准备好!)

3.执行Observer.onSubscribe()回调,ObservableEmitter做为Disposable参数传入

4.执行ObservableOnSubscribe.subscribe()方法(ObservableEmitter发射数据,ObservableEmitter内部的Observer处理数据)

具体其余的一些操做符的用法,请参考个人github:RxJavaDemo

有兴趣能够关注个人小专栏,学习更多知识:小专栏

相关文章
相关标签/搜索