RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.java
以上是RxJava在Github上的介绍,大概意思是,针对于JVM(Java虚拟机)的响应式扩展实现,一个在Java VM上使用可观察的序列来组合实现异步的、基于事件编程的库。git
RxJava如今你们用的都应该已经很溜了,用法这里就再也不多说了。咱们都知道RxJava是对观察者模式的扩展,下面就从观察者模式的实现机制出发,了解一下RxJava2的实现逻辑。只有真正了解了RxJava 的实现原理,咱们才能在遇到问题的时候,更快速更准确的定位的到问题。github
这次源码分析基于 RxJava Release 2.1.7编程
这里简单回顾一下观察者模式的组成及使用方式,经过以前观察者模式一文中的分析,咱们知道观察者模式中有四个重要的角色:安全
当咱们建立好了具体主题和观察者类,就可使用观察者模式了,下面是一个最简单的测试demo。bash
public class TestObservePattern { public static void main(String[] args) { // 建立主题(被观察者) ConcreteSubject concreteSubject = new ConcreteSubject(); // 建立观察者 ObserverOne observerOne=new ObserverOne(); // 为主题添加观察者 concreteSubject.addObserver(observerOne); //主题通知全部的观察者 concreteSubject.notifyAllObserver("wake up,wake up"); } } 复制代码
以上就是观察者模式的使用方式,很简单是吧。如今就让咱们带着如下几个问题,看看RxJava是如何使用观察者模式的。markdown
用RxJava这么久了,你能够思考一下以下几个问题:app
若是对以上几个问题,你有明确的答案,恭喜你,如下内容你就不用再看了,O(∩_∩)O哈哈~。异步
不少开发者对RxJava的学习多是从上游和下游的角度开始,这里能够认为这样的叙述更偏重RxJava 事件序列的特征。本文从被观察者(主题)和观察者的角度出发,能够说是更偏向于RxJava 观察者模式的特征。这里的主题就是上游,观察者就是下游。不管从哪一个角度出发去理解,源码就那么一份,无所谓对错,只是每一个人的认知角度不一样而已,选择一种本身更容易了解的方式便可。async
好了,若是你看到了这里,说明你对以上几个问题,还有些许疑问,那么咱们就从这几个问题出发,了解一下RxJava的源码实现。
咱们就带着上述几个问题,依次来看看RxJava究竟是怎么一回事儿。为了方便叙述和记忆,咱们首先看一段RxJava2 最最基础的使用方式。
private void basicRxjava2() { Observable mObservable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext("1"); e.onNext("2"); e.onNext("3"); e.onNext("4"); e.onComplete(); } }); Observer mObserver = new Observer() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe: d=" + d); sb.append("\nonSubcribe: d=" + d); } @Override public void onNext(Object s) { Log.e(TAG, "onNext: " + s); sb.append("\nonNext: " + s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: " + e); sb.append("\nonError: " + e.toString()); logContent.setText(sb.toString()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); sb.append("\nonComplete: "); logContent.setText(sb.toString()); } }; mObservable.subscribe(mObserver); } 复制代码
上面这段代码,应该很容易理解了,输出结果你们闭着眼睛也能想出来吧。咱们就以这段代码为基础,结合上面提到的问题依次展开对RxJava的分析。
首先看看RxJava 中四个重要的角色是如何定义的。
首先能够看看这个Observable类。
public abstract class Observable<T> implements ObservableSource<T> { …… } 复制代码
他实现了ObservableSource接口,接着看ObservableSource
public interface ObservableSource<T> { /** * Subscribes the given Observer to this ObservableSource instance. * @param observer the Observer, not null * @throws NullPointerException if {@code observer} is null */ void subscribe(@NonNull Observer<? super T> observer); } 复制代码
这里很明显了,ObservableSource 就是抽象主题(被观察者)的角色。按照以前观察者模式中约定的职责,subscribe 方法就是用来实现订阅观察者(Observer)角色的功能。从这里咱们也能够看出,抽象观察者的角色就是Observer了。
这里,你也许会有疑问,这么简单?抽象主题(上游)不是须要发送事件吗?onNext(),onComplete()以及onError()跑哪儿去了?别着急,咱们后面慢慢看。
回过头来继续看Observable,他实现了ObservableSource接口,而且实现了其subscribe方法,可是它并无真正的去完成主题和观察者之间的订阅关系,而是把这个功能,转接给了另外一个抽象方法subscribeActual(具体细节后面分析)。
所以,Observable依旧是一个抽象类,咱们知道抽象类是不能被实例化的,所以从理论上来讲,他好像不能做为具体主题的角色。其实否则,Observable内部提供了create,defer,fromXXX,repeat,just等一系列建立型操做符, 用来建立各类Observable。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 复制代码
在RxJava内有不少他的子类。
诚然,你能够认为,这些子类其实才是真正的具体主题。可是,换一个角度,从代理模式的角度出发,咱们能够把Observable当作是一个代理类,客户端你只管调用create 方法,想要什么样的 Observable告诉我一声就能够,不一样Observable之间的差别你不用管,包在我身上,保证给你返回你想要的Observable实例。
同时,Observable另外一个巨大的贡献,就是定义了不少的操做符,咱们平时经常使用的map,flatMap,distinct等,也是在这里定义。而且这些方法都是final类型的,所以他的全部子类都会继承同时也没法改变这些操做符的实现。
所以,Observable 就是具体主题。
在抽象主题里已经提过了,Observer就是抽象观察者的角色。
public interface Observer<T> { void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); } 复制代码
很是符合观察者模式中抽象观察者的职责描述,Observer 定义了观察者(下游)收到主题(上游)通知后该作什么事情。这里须要注意的是onSubscribe 也是定义在这里的。
这个具体的观察者,o(╯□╰)oo(╯□╰)o,就很少说了吧。你们平时使用应该都是直接用new一个Observer的实例。RxJava内部有不少Observer的子类,有兴趣的同窗能够具体了解一下。这里其实能够引伸出一个有意思的问题,一样是抽象类,为何接口能够直接实例化,而用abstract修饰过的类就不能够?
咱们看一下这段代码:
Observable mObservable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { } }); public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; // 是否有别的其余操做符运算,有的话,在此Observable上执行一遍 if (f != null) { return apply(f, source); } return source; } 复制代码
RxJava的代码里,不少时候会有ObjectHelper.requireNonNull这种空检查的地方,一概都是为了最大程度的防止NPE的出现,后面出现就再也不赘述了.
咱们使用create操做符建立Observable的过程当中,看似经历了不少方法,在不考虑任何其余操做符的前提下,整个过程简化一下的话就这么一句代码
Observable mObservable=new ObservableCreate(new ObservableOnSubscribe()) 复制代码
从以前的分析,咱们也看到了ObservableCreate 就是Observeable抽象类的一个子类。咱们简单看一下他的实现。
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { …… } } 复制代码
能够看到,他惟一的构造函数须要一个ObservableOnSubscribe实例,同时他实现subscribeActual方法,说明他真正处理主题和观察者之间实现订阅的逻辑。
看了半天,你可能一直很好奇,这个ObservableOnSubscribe是个什么东西呢?他其实很简单。
/** * A functional interface that has a {@code subscribe()} method that receives * an instance of an {@link ObservableEmitter} instance that allows pushing * events in a cancellation-safe manner. * * @param <T> the value type pushed */ public interface ObservableOnSubscribe<T> { /** * Called for each Observer that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */ void subscribe(@NonNull ObservableEmitter<T> e) throws Exception; } 复制代码
ε=(´ο`*)))唉,怎么又一个subscribe,这又是啥?不要慌,看注释。意思是说,这里的subscribe 接收到一个ObservableEmitter实例后,就会容许他以一种能够安全取消(也就是必定能取消)的形式发送事件。
就是说会有某个对象,给他一个ObservableEmitte的实例,没给他以前他是不会主动发送事件的,会一直憋着。,到这里,你是否是想到了什么,咱们知道在RxJava 中只有观察者(下游)订阅(subscribe)了主题(上游),主题才会发送事件。这就是和普通的观察者模式有区别的地方之一。
好了,最后再来看看这个神秘的ObservableEmitter是个什么鬼?
public interface ObservableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable d); void setCancellable(@Nullable Cancellable c); boolean isDisposed(); ObservableEmitter<T> serialize(); /** * Attempts to emit the specified {@code Throwable} error if the downstream * hasn't cancelled the sequence or is otherwise terminated, returning false * if the emission is not allowed to happen due to lifecycle restrictions. * <p> * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called * if the error could not be delivered. * @param t the throwable error to signal if possible * @return true if successful, false if the downstream is not able to accept further * events * @since 2.1.1 - experimental */ boolean tryOnError(@NonNull Throwable t); } 复制代码
这里能够关注一下tryOnError这个方法,能够看到他会把某些类型的error传递到下游。
o(╥﹏╥)o,又是一个接口,并且还继承了另外一个接口,什么状况?继续看
public interface Emitter<T> { void onNext(@NonNull T value); void onError(@NonNull Throwable error); void onComplete(); } 复制代码
惊不惊喜,意不意外? 哈哈,终于找到你了,熟悉的onNext,onError,onComplete.原来在这里。
这里有个问题能够思考一下,在抽象观察者中,定义了四个处理事件的方法,这里只有三个,按照对应关系来讲彷佛缺了一个onSubscribe,这又是怎么回事呢?后面会有分析,能够本身先想一想
这两个接口的含义很明显了,总结一下:
好了,绕了一大圈,就为了一行代码:
Observable mObservable=new ObservableCreate(new ObservableOnSubscribe()) 复制代码
总结一下具体主题(上游)的到底干了啥:
为了方便叙述,把问题3和4连在一块儿说了。
经过上面的叙述,如今具体主题和具体的观察者都建立好了,接下来就是实现两者的订阅关系。
mObservable.subscribe(mObserver);
复制代码
这里须要明确的一点是,是观察者(下游)订阅了主题(上游),虽然从代码上看好像了前者订阅了后者,不要搞混了。
咱们看Observable的subscribe() 方法:
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { …… } } 复制代码
这个前面已经提到过了,Observable并无真正的去实现subscribe,而是把他转接给了subscribeActual()方法。
前面已经说过,Observable的实例是一个ObservableCreate对象,那么咱们就到这个类里去看看subscribeActual()的实现。
// 为了方便,顺便再看一眼构造函数 public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 复制代码
CreateEmitter 实现了以前提到的ObservableEmitter接口。这里有一句关键的代码:
observer.onSubscribe(parent);
复制代码
以前在看到Emitter的定义时,咱们说缺乏了onSubscribe方法,到这里就明白了。onSubscribe并非由主题(上游)主动发送的事件,而是有观察者(下游)本身调用的一个事件,只是为了方便获取Emitter的实例对象,准确的说应该是Disposable的实例对象,这样下游就能够控制上游了。
接下来就更简单了,source 是ObservableOnSubscribe,按照以前的逻辑,调用其subscribe方法,给他一个ObservableEmitter对象实例,ObservableEmitter就会开始发送事件序列。这样,一旦开始订阅了,主题(上游)就开始发送事件了。也就是咱们熟悉的onNext,onComplete,onError 方法真正的开始执行了。
接着看看CreateEmitter的实现。
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); …… } static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } return true; } return false; } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } @Override public void setDisposable(Disposable d) { DisposableHelper.set(this, d); } @Override public void setCancellable(Cancellable c) { setDisposable(new CancellableDisposable(c)); } @Override public ObservableEmitter<T> serialize() { return new SerializedEmitter<T>(this); } @Override public void dispose() { DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } } } 复制代码
最后再来简单说一下,RxJava中对常规的观察者模式作了怎样的调整,有什么值得借鉴的地方。大部分优势在上面已经说起了,这里就来总结一下。
好了,以上就是从观察者模式的角度出发,对RxJava的一次解读,有什么疏漏或理解错误的地方,欢迎读者指出,共同进步!