手写极简版的Rxjava

本文已受权微信公众号:鸿洋(hongyangAndroid)在微信公众号平台原创首发

你是否是看过了不少分析Rxjava源码的文章,但依旧没法在心中勾勒出Rxjava原理的样貌。是什么让咱们阅读Rxjava源码变得如此艰难?是Rxjava的代码封装,以及各类细节问题的解决。本文我把Rxjava的各类封装、抽象通通剥去,只专一于基本的事件变换。在理解了事件变换大概是作了件什么事情时,再去看源码,考虑一些其它问题就会更加容易。java

说明:这是一篇Rxjava源码分析的入门文章。旨在让读者脑中有个概念Rxjava最主要干了件什么事情,几个经常使用操做符的主要原理。从此再去看其它源码分析文章或源码可以更容易理解。所以本文先不去考虑Rxjava源码中复杂的抽象封装,线程间通讯,onComplete、onError、dispose等方法,仅专一于“onNext”的最基本调用方式。

项目源码git


本文目录:

  1. 手写Rxjava核心代码,create,nullMap(核心)操做符
  2. map,observeOn,subscribeOn,flatMap操做符
  3. 响应式编程思想的理解

手写Rxjava核心代码,create,nullMap操做符

Create操做符

咱们先来看一个最简单调用github

MainActivity.java

Observable.create(new Observable<String>() {
            @Override
            public void subscribe(Observer<String> observer) {
                observer.onNext("hello");
                observer.onNext("world");
                observer.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onNext(String s) {
                Log.e("yxj",s);
            }

            @Override
            public void onComplete() {
                Log.e("yxj","onComplete");
            }
        });
复制代码
Observable.java

public abstract class Observable<T> {

    public abstract void subscribe(Observer<T> observer);

    public static <T> Observable<T> create(Observable<T> observable){
        return observable;
    }

}
复制代码
Observer.java

public interface Observer<T> {

    void onNext(T t);
    void onComplete();
}
复制代码
本篇文章我把Observable称为“节点”,Observer称为“处理者”,一是由于我被观察者、被观察者、谁订阅谁给绕晕了,更重要的是我以为这个名称比较符合Rxjava的设计思想。

Observable调用create方法建立一个本身,重写subscribe方法说:若是 我有一个处理者Observer,我就把“hello”,“world”交给它处理。编程

Observable调用了subscribe方法,真的找到了Observer。因而兑现承诺,完成整个调用逻辑。bash

这里是“若是”有处理者,须要subscribe方法被调用时,“若是”才成立。Rxjava就是创建在一系列的“若是”(回调)操做上的。微信

“nullMap”操做符(核心)

1.建立一个observable
2.调用空map操做符作变换
3.交给observer处理

MainActivity.java

Observable.create(new Observable<String>() {
            @Override
            public void subscribe(Observer<String> observer) {
                observer.onNext("hello");
                observer.onNext("world");
                observer.onComplete();
            }
        })
        .nullMap()
        .subscribe(new Observer<String>() {
            @Override
            public void onNext(String s) {
                Log.e("yxj",s);
            }

            @Override
            public void onComplete() {
                Log.e("yxj","onComplete");
            }
        });

复制代码
nullMap()等价于 下面这段代码
即把上个节点的数据不作任何修改的传递给下一节点的map操做
 
.map(new Function<String, String>() {
    @Override
    public String apply(String s) throws Exception {
        return s;
    }
})

复制代码

"nullMap"操做符在Rxjava源码里并不存在,是我方便你们理解Rxjava运行机制写出来的。 由于nullMap操做是一个 base变换操做,map,flatMap,subscribeOn,observeOn操做符都是在nullMap上修改而来。因此Rxjava的变换的基础就是nullMap操做符。app

Observable.java
// 这就是Rxjava的变换核心

public Observable<T> nullMap() {

        return new Observable<T>() {
            @Override
            public void subscribe(final Observer<T> observerC) {

                Observer<T> observerB = new Observer<T>() {
                    @Override
                    public void onNext(T t) {
                        observerC.onNext(t);
                    }

                    @Override
                    public void onComplete() {
                        observerC.onComplete();
                    }
                };
                Observable.this.subscribe(observerB);
            }
        };
    }
复制代码

“nullMap”操做符作了件什么事情:异步

  1. 上一个节点Observable A调用nullMap(),在内部new一个新的节点Observable B。
  2. 节点B重写subscribe方法,说"若是"本身有操做者Observer C,就new一个操做者Observer B,而后让节点A subscribe 操做者B。
  3. 节点A subscribe 操做者B,让操做者B执行onNext方法。操做者B的onNext方法内部,调用了操做者C的onNext。从而完成了整个调用。

请注意2中的”若是“。意味着,当节点B中的subscribe方法没有被调用的时候,2,3步骤都不会执行(他们都是回调),没有Observer B,节点A也不会调用subscribe方法。 接下来分两种状况:ide

  • 节点B调用了subscribe方法,则执行2,3,完成整个流程。
  • 节点B调用nullMap,重新走一遍1,2,3步骤,至关于节点B把任务交给了下一个节点C。

概况一下就是:源码分析

Observable每调用一次操做符,其实就是建立一个新的Observable。新Observable内部经过subscribe方法“逆向的”与上一Observable关联。在新Observable中的new出来的Observer内的onNext方法中作了和下一个Observer之间的关联。

github上有nullMap详细注释版的代码


map,observeOn,subscribeOn,flatMap操做符

接下来让咱们看看这4个操做符,仅仅是在nullMap中作了小改动而已。 操做符源码

map操做符

Observable.java

public <R> Observable<R> map(final Function<T, R> function) {

        return new Observable<R>() {
            @Override
            public void subscribe(final Observer<R> observer1) {
                Observable.this.subscribe(new Observer<T>() {
                    @Override
                    public void onNext(T t) {
                        R r = function.apply(t); // 仅仅在这里加了变换操做
                        observer1.onNext(r);
                    }

                    @Override
                    public void onComplete() {
                        observer1.onComplete();
                    }
                });
            }
        };
    }
复制代码

和“nullMap”相比,仅仅加了一行代码function.apply() 方法的调用。

observeOn操做符

Observable.java

public Observable<T> observeOn() {
        return new Observable<T>() {
            @Override
            public void subscribe(final Observer<T> observer) {
                Observable.this.subscribe(new Observer<T>() {
                    @Override
                    public void onNext(final T t) {
	        	//模拟切换到主线程(一般上个节点是运行在子线程的状况)
                        handler.post(new Runnable() {
                            @Override
                            public void run() {
                                observer.onNext(t);
                            }
                        });
                    }

                    @Override
                    public void onComplete() {
                        
                    }
                });
            }
        };
    }
复制代码

与“nullMap”相比,修改了最内部的onNext方法执行所在的线程。Rxjava源码会更加灵活,observerOn方法参数让你能够指定切换到的线程,其实就是传入了一个线程调度器,用于指定observer.onNext()方法要在哪一个线程执行。原理是同样的。我这里就简写,直接写了切换到主线程,这你确定能看明白。

subscribeOn操做符

Observable.java

public Observable<T> subscribeOn() {
        return new Observable<T>() {
            @Override
            public void subscribe(final Observer<T> observer) {
                
                new Thread() {
                    @Override
                    public void run() {
                    // 这里简写了,没有new Observer作中转,github上有完整代码
                        Observable.this.subscribe(observer);
                    }
                }.start();
            }
        };
    }
复制代码

将上一个节点切换到新的线程,修改了Observable.this.subscribe()运行的线程,Observable.this指的是调用subscribeOn()的Observable,即上一个节点。所以subscribeOn操做符修改了上一个节点的运行所在的线程

flatMap操做符

public <R> Observable<R> flatMap(final Function<T, Observable<R>> function) {

        return new Observable<R>() {
            @Override
            public void subscribe(final Observer<R> observer) {
                Observable.this.subscribe(new Observer<T>() {
                    @Override
                    public void onNext(T t) {
                        try {
                            Observable<R> observable = function.apply(t);
                            observable.subscribe(observer);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }
        };

    }
复制代码

flatmap和map极为类似,只不过function.apply()的返回值是一个Observable。

Observable是一个节点,既能够用来封装异步操做,也能够用来封装同步操做(封装同步操做 == map操做符)。因此这样就能够很方便的写出一个 耗时1操做 —> 耗时2操做 —> 耗时3操做...的操做

到这里相信你们已经对Rxjava怎样运行,几个常见的操做符内部基本原理有了初步的理解,本文的目的就已经达到了。在以后看Rxjava源码或者其它分析文章时,就能少受各类变换的干扰。接下来就能够思考Rxjava是如何对各个Observable作封装,线程之间如何通讯,onComplete、onError、dispose等方法如何实现了。

响应式编程思想的理解

响应式编程是一种面向数据流和变化传播的编程范式。

直接看这句话其实不太容易理解。让咱们换个说法,实际编程中是什么会干扰咱们,使咱们没法专一于数据流和变化传播呢?答案是:异步,它会让咱们的代码造成嵌套,不够顺序化。

由于异步,咱们的业务逻辑会写成回调嵌套的形式,致使过一段时间看本身代码看不懂,语义化不强,不是按着顺序一个节点一个节点的往下执行的。

Rxjava将全部的业务操做变成一步一步,每一步无论你是同步、异步,通通用一个节点包裹起来,节点与节点之间是同步调用的关系。如此,整个代码的节点都是按顺序执行的。

限于做者我的水平有限,本文部分表述不免有不对之处,请留言指出,相互交流。
相关文章
相关标签/搜索