Android进阶:RxJava2 源码解析 1

本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同窗阅读java

一.Rxjava是什么

Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。bash

通俗来讲,Rxjava是一个采用了观察者模式设计处理异步的框架。链式调用设计让代码优雅易读。app

举个例子:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
            }
        });


        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
复制代码

这是Rxjava2最简单的用法: 1.建立一个Observable,重写subscribe方法,这里主要处理被观察的事件。 2.订阅这个Observable,事件会回调observer的方法,咱们能够对事件作响应的处理框架

二.Rxjava源码解析

2.1. 建立Observable:

建立Observable用的是Observable.create(ObservableOnSubscribe source)方法。这个方法的参数是ObservableOnSubscribe:异步

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
复制代码

ObservableOnSubscribe是一个接口,惟一的方法是subscribe,参数是ObservableEmitter e。ObservableEmitter是一个继承了Emitter的接口,接口Emitter里定义了onNext、onError、onComplete等方法,和Observer(观察者)的方法相对应。async

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}
复制代码

ObservableEmitter对接口Emitter进行扩展,增长了setDisposable、setCancellable等方法 基本参数了解了,如今看看create方法里面作了什么,代码以下:ide

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
复制代码

调用了RxJavaPlugins的onAssembly方法。又有一个新参数ObservableCreate(source),咱们看看它是什么:函数

final class ObservableCreate<T> extends Observable<T> {

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

}
复制代码

继承了Observable,因此也是个被观察对象,在构造函数中咱们看到咱们new的ObservableOnSubscribe对象,被存在了ObservableCreate的source里面 那咱们继续看看onAssembly方法作什么:ui

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
复制代码

一个Hook方法。onObservableAssembly是一个静态变量,咱们没有设置,默认为空,因此直接返回source对象。也就是说,Observable的create方法其实就是把咱们ObservableOnSubscribe对象,存储在ObservableCreate对象的source里面,而后返回ObservableCreate对象。 咱们知道ObservableCreate是继承Observable的,因此建立了ObservableCreate对象,咱们的Observable也就建立完了。this

2.2 订阅事件(被观察者)

订阅被观察者的操做是observable.subscribe(new Observer())。这个操做符实际上是个“被动”,就是事件被观察者观察。由于subscribe方法里的参数Observer才是观察者。咱们也会在Observer里的各个会调方法里接收到事件相关的返回值。 咱们看看subscribe方法的源码:

public final void subscribe(Observer<? super T> observer) {
        try {
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            RxJavaPlugins.onError(e);
        }
    }
复制代码

看代码咱们知道最主要调用的方法是:subscribeActual(observer);,这个方法是Observable里的抽象方法,而此时咱们的Observable是一个ObservableCreate对象(前面create方法返回的对象)。因此咱们去看一下ObservableCreate里面是如何重写这个方法的。代码以下:

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

咱们一看到这个方法主要作了三件事: ①建立一个CreateEmitter对象parent。 ②把parent传给source的subscribe方法。上面咱们知道source就是刚才存的ObservableOnSubscribe对象,subscribe也就是咱们重写的方法:

@Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
            }
复制代码

因此咱们在这个方法里就能收到一个CreateEmmiter,经过CreateEmitter能够回调相应的方法。CreateEmitter是实现ObservableEmitter接口,咱们看看它内部实现,如:onNext源码以下:

@Override
public void onNext(T t) {
    observer.onNext(t);
}
复制代码

也就是说,当咱们在ObservableOnSubscribe的subscribe方法里调用ObservableEmitter的onNext方法的时候,它里面会调用observer的onNext。因而经过这样的传递,咱们就能在observer里响应的回调方法里收到事件的相关状态。

至此一个简单Rxjava流式传递原理已经讲完了,总结流程以下:
  • 使用Observbable.create方法,产生一个ObservableCreate对象,对象里存着ObservableOnSubscribe对象source。
  • 调用ObservableCreate.subscribe方法,实际调用的是subscribeActual方法,传入一个Observer对象。
  • subscribeActual方法中建立一个CreateEmmiter对象,调用source.subscribe方法,传入CreateEmmiter对象。
  • 因而咱们在ObservableOnSubscribe中就接收到了一个CreateEmmiter,CreateEmmiter是ObservableEmmiter的子类。咱们能够在这里调用CreateEmmiter的方法进行事件回调。
  • 调用CreateEmmiter方法,实际上会调用Observer的响应的方法。也就是CreateEmmiter把事件状态传递给观察者。
相关文章
相关标签/搜索