史上最浅显易懂的RxJava入门教程

RxJava是一个神奇的框架,用法很简单,但内部实现有点复杂,代码逻辑有点绕。我读源码时,确实有点似懂非懂的感受。网上关于RxJava源码分析的文章,源码贴了一大堆,代码逻辑绕来绕去的,让人看得云里雾里的。既然用拆轮子的方式来分析源码比较难啃,不如换种方式,以造轮子的方式,将源码中与性能、兼容性、扩展性有关的代码剔除,留下核心代码带你们揭秘 RxJava 的实现原理。java

什么是RxJavareact

  • • Rx是Reactive Extensions的简写,翻译为响应的扩展。也就是经过由一方发出信息,另外一方响应信息并做出处理的核心框架代码。
  • • 该框架由微软的架构师Erik Meijer领导的团队开发,并在2012年11月开源。
  • • Rx库支持.NET、JavaScript和C++等,如今已经支持几乎所有的流行编程语言了。
  • • Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。
  • • RxJava做为一个流行的框架,其源码依托在GitHub,除了支持RxJava,针对安卓系统也除了一个支持框架RxAndroid
    2.RxJava简化代码
    通常咱们在安卓项目中,若是想从后台获取数据并刷新界面,代码大概以下,下面咱们来看一个例子:
    new Thread() {br/>@Override
    public void run() {
    super.run();
    for (File folder : folders) {
    File[] files = folder.listFiles();
    for (File file : files) {
    if (file.getName().endsWith(".png")) {
    final Bitmap bitmap = getBitmapFromFile(file);
    getActivity().runOnUiThread(new Runnable() {br/>@Override
    public void run() {
    imageCollectorView.addImage(bitmap);
    }
    });
    }
    }
    }
    }
    }.start();
    上面的代码通过多层嵌套后 可读性太差了!若是你用了RxJava 能够这样写:
    Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {br/>@Override
    public Observable<File> call(File file) {
    return Observable.from(file.listFiles());
    }
    })
    .filter(new Func1<File, Boolean>() {br/>@Override
    public Boolean call(File file) {
    return file.getName().endsWith(".png");
    }
    })
    .map(new Func1<File, Bitmap>() {br/>@Override
    public Bitmap call(File file) {
    return getBitmapFromFile(file);
    }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {br/>@Override
    public void call(Bitmap bitmap) {
    imageCollectorView.addImage(bitmap);
    }
    });
    这样写的好处就是减小层次嵌套 提升了代码的可读性,除了简化代码,RxJava还能够为每一个方法提供特定的运行线程。
    3.引入框架
    目前RxJava已经升级为2.0版本,但为了可以更好的理解RxJava,咱们能够从1.0版本开始学习。也为了让咱们的安卓项目可以更好的使用RxJava,能够在项目中引入gradle脚本依赖:
    compile 'io.reactivex:rxandroid:1.2.1'
    compile 'io.reactivex:rxjava:1.1.6'
    如今 咱们的项目已经支持RxJava的功能了。
    4.响应式的核心
    所谓的响应式,无非就是存在这样的2个部分,一部分负责发送事件/消息,另外一部分负责响应事件/消息。
    之前若是咱们想看新闻,通常须要经过看报纸。好比,你对某个报刊杂志比较感兴趣,那么你首先要作3件事:
      1. 提供你家的地址
      1. 找到对应的报社
      1. 去报社订阅整个月的报纸
        通过了上面的流程,之后天天只要有新的报刊资料出来了,报社都会将杂志发送到你家。
        史上最浅显易懂的RxJava入门教程

将上面的例子进行代码抽象,步骤以下:android

    1. 提供观察者(由于你是关心杂志内容的人 因此你是观察该事件的人)
    1. 提供被观察者(只要有新的杂志出来 就须要通知关心的人 因此报社是被观察的对象)
    1. 订阅(也就是 观察者&被观察者之间要相互关联 以便被观察的对象一变化 就会立刻通知观察该事件的对象)

史上最浅显易懂的RxJava入门教程
上面示例的演示代码以下:
//1.建立被观察者
Observable<String> observable =
Observable.create(new Observable.OnSubscribe<String>() {br/>@Override
public void call(Subscriber<? super String> subscriber) {
//4.开始发送事件
//事件有3个类型 分别是onNext() onCompleted() onError()
//onCompleted() onError() 通常都是用来通知观察者 事件发送完毕了,二者只取其一。
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Java !");
subscriber.onNext("Hello C !");
subscriber.onCompleted();
}
});编程

//2.建立观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};

//3.订阅
observable.subscribe(subscriber);

输出以下:
com.m520it.rxjava I/IT520: onNext: Hello Android !
com.m520it.rxjava I/IT520: onNext: Hello Java !
com.m520it.rxjava I/IT520: onNext: Hello C !
com.m520it.rxjava I/IT520: onCompleted
代码运行的原理markdown

  • • 上面的代码中,当观察者subscriber订阅了被观察者observable以后,系统会自动回调observable对象内部的call()。
  • • 在observable的call()方法实体中,发送了如onNext/onCompleted/onError事件后。
  • • 接着subscriber就能回调到到对应的方法。
    5.被观察者变种
    普通的Observable发送须要三个方法onNext, onError, onCompleted,而Single做为Observable的变种,只须要两个方法:
  • • onSuccess - Single发射单个的值到这个方法
  • • onError - 若是没法发射须要的值,Single发射一个Throwable对象到这个方法
    Single只会调用这两个方法中的一个,并且只会调用一次,调用了任何一个方法以后,订阅关系终止。
    final Single<String> single = Single.create(new Single.OnSubscribe<String>() {br/>@Override
    public void call(SingleSubscriber<? super String> singleSubscriber) {
    //先调用onNext() 最后调用onCompleted()
    //singleSubscriber.onSuccess("Hello Android !");
    //只调用onError();
    singleSubscriber.onError(new NullPointerException("mock Exception !"));
    }
    });架构

    Observer<String> observer = new Observer<String>() {br/>@Override
    public void onCompleted() {
    Log.i(TAG, "onCompleted ");
    }框架

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }
    
    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }

    };
    single.subscribe(observer);
    6.观察者变种
    Observer观察者对象,上面咱们用Subscriber对象代替。由于该对象自己就是继承了Observer。
    该对象实现了onNext()&onCompleted()&onError()事件,咱们若是对哪一个事件比较关心,只须要实现对应的方法便可,代码以下:
    //建立观察者
    Subscriber<String> subscriber = new Subscriber<String>() {br/>@Override
    public void onCompleted() {
    Log.i(TAG, "onCompleted ");
    }编程语言

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }
    
    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }

    };ide

    //订阅
    observable.subscribe(subscriber);
    上面的代码中,若是你只关心onNext()事件,但却不得不实现onCompleted()&onError()事件.这样的代码就显得很臃肿。鉴于这种需求,RxJava框架在订阅方面作了特定的调整,代码以下:
    //为指定的onNext事件建立独立的接口
    Action1<String> onNextAction = new Action1<String>() {br/>@Override
    public void call(String s) {
    Log.i(TAG, "call: "+s);
    }
    };函数

    //订阅
    observable.subscribe(onNextAction);

     

不知道你们注意到没有,subscribe()订阅的再也不是观察者,而是特定的onNext接口对象。相似的函数以下,咱们能够根据须要实现对应的订阅:

public Subscription subscribe(final Observer observer)
public Subscription subscribe(final Action1 onNext)
public Subscription subscribe(final Action1 onNext, Action1 onError)
public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)

这里还有一个forEach函数有相似的功能:

public void forEach(final Action1 onNext)
public void forEach(final Action1 onNext, Action1 onError)
public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)

##7.Subject变种

上面2节中既介绍了被观察者变种,又介绍了观察者变种,这里再介绍一种雌雄同体的对象(既做为被观察者使用,也能够做为观察者)。

针对不一样的场景一共有四种类型的Subject。他们并非在全部的实现中所有都存在。

###AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。它会把这最后一个值发射给任何后续的观察者。

如下贴出代码:
//建立被观察者final AsyncSubject<String> subject = AsyncSubject.create();//建立观察者
Subscriber<String> subscriber = new Subscriber<String>() {br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
    Log.i(TAG, "onError");
}

@Override
public void onNext(String s) {
    Log.i(TAG, "s:" + s);

}

};//订阅事件
subject.subscribe(subscriber);//被观察者发出事件 若是调用onCompleted(),onNext()则会打印最后一个事件;若是没有,onNext()则不打印任何事件。
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
subject.onCompleted();
输出:
s:Hello Java onCompleted
然而,若是原始的Observable由于发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

上面的观察者被观察者代码相同,如今发出一系列信号,并在最后发出异常 代码以下:

subject.onNext("Hello Android ");
subject.onNext("Hello Java ");//由于发送了异常 因此onNext()没法被打印
subject.onError(null);
###BehaviorSubject

当观察者订阅BehaviorSubject时,他会将订阅前最后一次发送的事件和订阅后的全部发送事件都打印出来,若是订阅前无发送事件,则会默认接收构造器create(T)里面的对象和订阅后的全部事件,代码以下:
BehaviorSubject subject=BehaviorSubject.create("NROMAL");

Subscriber subscriber = new Subscriber() {br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
    Log.i(TAG, "onError");
}

@Override
public void onNext(Object o) {
    Log.i(TAG, "onNext: " + o);
}

};
//subject.onNext("Hello Android !");//subject.onNext("Hello Java !");//subject.onNext("Hello C !");//这里开始订阅 若是上面的3个注释没去掉,则Hello C的事件和订阅后面的事件生效//若是上面的三个注释去掉 则打印构造器NORMAL事件生效后和订阅后面的事件生效
subject.subscribe(subscriber);

subject.onNext("Hello CPP !");
subject.onNext("Hello IOS !");
PublishSubject

PublishSubject只会把在订阅发生的时间点以后来自原始Observable的数据发射给观察者。

须要注意的是,PublishSubject可能会一建立完成就马上开始发射数据,所以这里有一个风险:在Subject被建立后到有观察者订阅它以前这个时间段内,一个或多个数据可能会丢失。

代码以下:
PublishSubject subject= PublishSubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction1 call: "+s);
}

};

Action1<String> onNextAction2 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction2 call: "+s);
}

};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出以下:

onNextAction1 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
ReplaySubject

ReplaySubject会发射全部来自原始Observable的数据给观察者,不管它们是什么时候订阅的。

代码以下:

ReplaySubject subject= ReplaySubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction1 call: "+s);
}

};

Action1<String> onNextAction2 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction2 call: "+s);
}

};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出以下:

onNextAction1 call: Hello Android !
onNextAction1 call: Hello Java !
onNextAction2 call: Hello Android !
onNextAction2 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
###Subject总结

AsyncSubject不管什么时候订阅 只会接收最后一次onNext()事件,若是最后出现异常,则不会打印任何onNext()BehaviorSubject会从订阅前最后一次oNext()开始打印直至结束。若是订阅前无调用onNext(),则调用默认creat(T)传入的对象。若是异常后才调用,则不打印onNext()PublishSubject只会打印订阅后的任何事件。ReplaySubject不管订阅在什么时候都会调用发送的事件。

相关文章
相关标签/搜索