提高开发效率,下降维护成本一直是开发团队永恒不变的宗旨。近两年来国内的技术圈子中愈来愈多的开始说起 RxJava ,愈来愈多的应用和面试中都会有 RxJava ,而就目前的状况,Android 的网络库基本被 Retrofit + OkHttp 一统天下了,而配合上响应式编程 RxJava 可谓如鱼得水。想必你们确定被近期的 Kotlin 炸开了锅,笔者也在闲暇之时去了解了一番(做为一个与时俱进的有理想的青年怎么可能不与时俱进?),发现其中有个很是好的优势就是简洁,支持函数式编程。是的, RxJava 最大的优势也是简洁,但它不止是简洁,并且是** 随着程序逻辑变得愈来愈复杂,它依然可以保持简洁 **(这货洁身自好呀有木有)。
咳咳,要例子,猛戳这里:给 Android 开发者的 RxJava 详解html
上面咱们说起了响应式编程,很多新司机对它可谓一脸懵逼,那什么是响应式编程呢?响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它能够被观测,被过滤,被操做,或者为新的消费者与另一条流合并为一条新的流。java
响应式编程的一个关键概念是事件。事件能够被等待,能够触发过程,也能够触发其它事件。事件是惟一的以合适的方式将咱们的现实世界映射到咱们的软件中:若是屋里太热了咱们就打开一扇窗户。一样的,当咱们的天气app从服务端获取到新的天气数据后,咱们须要更新app上展现天气信息的UI;汽车上的车道偏移系统探测到车辆偏移了正常路线就会提醒驾驶者纠正,就是是响应事件。react
今天,响应式编程最通用的一个场景是UI:咱们的移动App必须作出对网络调用、用户触摸输入和系统弹框的响应。在这个世界上,软件之因此是事件驱动并响应的是由于现实生活也是如此。git
RxJava 这些年可谓愈来愈流行,而在去年的晚些时候发布了2.0正式版。大半年已过,虽然网上已经出现了大部分的 RxJava 教程(其实细心的你仍是会发现 1.x 的超级多),前些日子,笔者花了大约两周的闲暇之时写了 RxJava 2.x 系列教程,也获得了很多反馈,其中就有很多读者以为每一篇的教程过短,抑或是但愿更多的侧重适用场景的介绍,在这样的大前提下,这篇完结版教程就此诞生,仅供各位新司机采纳。
github
RxJava 2.x 已经按照 Reactive-Streams specification 规范彻底的重写了,maven也被放在了io.reactivex.rxjava2:rxjava:2.x.y
下,因此 RxJava 2.x 独立于 RxJava 1.x 而存在,而随后官方宣布的将在一段时间后终止对 RxJava 1.x 的维护,因此对于熟悉 RxJava 1.x 的老司机天然能够直接看一下 2.x 的文档和异同就能轻松上手了,而对于不熟悉的年轻司机,不要慌,本酱带你装逼带你飞,立刻就发车,坐稳了:https://github.com/nanchen2251/RxJava2Examples面试
你只须要在 build.gradle 中加上:compile 'io.reactivex.rxjava2:rxjava:2.1.1'
(2.1.1为写此文章时的最新版本)
数据库
RxJava 2.x 拥有了新的特性,其依赖于4个基础接口,它们分别是编程
其中最核心的莫过于 Publisher
和 Subscriber
。Publisher
能够发出一系列的事件,而 Subscriber
负责和处理这些事件。api
其中用的比较多的天然是 Publisher
的 Flowable
,它支持背压,有兴趣的能够看一下官方对于背压的讲解缓存
能够明显地发现,RxJava 2.x 最大的改动就是对于 backpressure
的处理,为此将原来的 Observable
拆分红了新的Observable
和Flowable
,同时其余相关部分也同时进行了拆分,但使人庆幸的是,是它,是它,仍是它,仍是咱们最熟悉和最喜欢的 RxJava。
在 RxJava 1.x 中,咱们最熟悉的莫过于 Observable
这个类了,笔者在刚刚使用 RxJava 2.x 的时候,建立了 一个 Observable
,瞬间一脸懵逼有木有,竟然连咱们最最熟悉的 Subscriber
都没了,取而代之的是 ObservableEmmiter
,俗称发射器。此外,因为没有了Subscriber
的踪迹,咱们建立观察者时需使用 Observer
。而 Observer
也不是咱们熟悉的那个 Observer
,又出现了一个 Disposable
参数带你装逼带你飞。
废话很少说,从会用开始,还记得 RxJava 的三部曲吗?
** 第一步:初始化 Observable
第二步:初始化 Observer
第三步:创建订阅关系 **
Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { Log.e(TAG, "Observable emit 1" + "\n"); e.onNext(1); Log.e(TAG, "Observable emit 2" + "\n"); e.onNext(2); Log.e(TAG, "Observable emit 3" + "\n"); e.onNext(3); e.onComplete(); Log.e(TAG, "Observable emit 4" + "\n" ); e.onNext(4); } }).subscribe(new Observer<Integer>() { // 第三步:订阅 // 第二步:初始化Observer private int i; private Disposable mDisposable; @Override public void onSubscribe(@NonNull Disposable d) { mDisposable = d; } @Override public void onNext(@NonNull Integer integer) { i++; if (i == 2) { // 在RxJava 2.x 中,新增的Disposable能够作到切断的操做,让Observer观察者再也不接收上游事件 mDisposable.dispose(); } } @Override public void onError(@NonNull Throwable e) { Log.e(TAG, "onError : value : " + e.getMessage() + "\n" ); } @Override public void onComplete() { Log.e(TAG, "onComplete" + "\n" ); } });
不难看出,RxJava 2.x 与 1.x 仍是存在着一些区别的。首先,建立 Observable
时,回调的是 ObservableEmitter
,字面意思即发射器,而且直接 throws Exception。其次,在建立的 Observer 中,也多了一个回调方法:onSubscribe
,传递参数为Disposable
,Disposable
至关于 RxJava 1.x 中的 Subscription
, 用于解除订阅。能够看到示例代码中,在 i 自增到 2 的时候,订阅关系被切断。
07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false 07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 1 07-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 1 07-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 2 07-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 2 07-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true 07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 3 07-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4
固然,咱们的 RxJava 2.x 也为咱们保留了简化订阅方法,咱们能够根据需求,进行相应的简化订阅,只不过传入对象改成了 Consumer
。
Consumer
即消费者,用于接收单个值,BiConsumer
则是接收两个值,Function
用于变换对象,Predicate
用于判断。这些接口命名大多参照了 Java 8 ,熟悉 Java 8 新特性的应该都知道意思,这里也再也不赘述。
关于线程切换这点,RxJava 1.x 和 RxJava 2.x 的实现思路是同样的。这里简单的说一下,以便于咱们的新司机入手。
同 RxJava 1.x 同样,subscribeOn
用于指定subscribe()
时所发生的线程,从源码角度能够看出,内部线程调度是经过 ObservableSubscribeOn
来实现的。
@SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
ObservableSubscribeOn
的核心源码在 subscribeActual
方法中,经过代理的方式使用 SubscribeOnObserver
包装 Observer
后,设置 Disposable
来将 subscribe
切换到 Scheduler
线程中。
observeOn
方法用于指定下游 Observer
回调发生的线程。
@SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
RxJava 内置的线程调度器的确可让咱们的线程切换驾轻就熟,但其中也有些须要注意的地方。
subscribeOn()
指定的就是发射事件的线程,observerOn
指定的就是订阅者接收事件的线程。subscribeOn()
只有第一次的有效,其他的会被忽略。observerOn()
,下游的线程就会切换一次。Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName()); e.onNext(1); e.onComplete(); } }).subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName()); } }) .observeOn(Schedulers.io()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName()); } });
输出:
07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1 07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main 07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2
实例代码中,分别用 Schedulers.newThread()
和 Schedulers.io()
对发射线程进行切换,并采用 observeOn(AndroidSchedulers.mainThread()
和 Schedulers.io()
进行了接收线程的切换。能够看到输出中发射线程仅仅响应了第一个 newThread
,但每调用一次 observeOn()
,线程便会切换一次,所以若是咱们有相似的需求时,便知道如何处理了。
RxJava 中,已经内置了不少线程选项供咱们选择,例若有:
Schedulers.io()
表明io操做的线程, 一般用于网络,读写文件等io密集型的操做;Schedulers.computation()
表明CPU计算密集型的操做, 例如须要大量计算的操做;Schedulers.newThread()
表明一个常规的新线程;AndroidSchedulers.mainThread()
表明Android的主线程这些内置的 Scheduler
已经足够知足咱们开发的需求,所以咱们应该使用内置的这些选项,而 RxJava 内部使用的是线程池来维护这些线程,因此效率也比较高。
关于操做符,在官方文档中已经作了很是完善的讲解,而且笔者前面的系列教程中也着重讲解了绝大多数的操做符做用,这里受于篇幅限制,就很少作赘述,只挑选几个进行实际情景的讲解。
map
操做符能够将一个 Observable
对象经过某种关系转换为另外一个Observable
对象。在 2.x 中和 1.x 中做用几乎一致,不一样点在于:2.x 将 1.x 中的 Func1
和 Func2
改成了 Function
和 BiFunction
。
想必你们都知道,不少时候咱们在使用 RxJava 的时候老是和 Retrofit 进行结合使用,而为了方便演示,这里咱们就暂且采用 OkHttp3 进行演示,配合 map
,doOnNext
,线程切换进行简单的网络请求:
1)经过 Observable.create() 方法,调用 OkHttp 网络请求;
2)经过 map 操做符集合 gson,将 Response 转换为 bean 类;
3)经过 doOnNext() 方法,解析 bean 中的数据,并进行数据库存储等操做;
4)调度线程,在子线程中进行耗时操做任务,在主线程中更新 UI ;
5)经过 subscribe(),根据请求成功或者失败来更新 UI 。
Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception { Builder builder = new Builder() .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }).map(new Function<Response, MobileAddress>() { @Override public MobileAddress apply(@NonNull Response response) throws Exception { if (response.isSuccessful()) { ResponseBody body = response.body(); if (body != null) { Log.e(TAG, "map:转换前:" + response.body()); return new Gson().fromJson(body.string(), MobileAddress.class); } } return null; } }).observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<MobileAddress>() { @Override public void accept(@NonNull MobileAddress s) throws Exception { Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n"); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<MobileAddress>() { @Override public void accept(@NonNull MobileAddress data) throws Exception { Log.e(TAG, "成功:" + data.toString() + "\n"); }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "失败:" + throwable.getMessage() + "\n"); } });
concat
能够作到不交错的发射两个甚至多个 Observable 的发射事件,而且只有前一个 Observable
终止(onComplete
) 后才会订阅下一个 Observable
。
想必在实际应用中,不少时候(对数据操做不敏感时)都须要咱们先读取缓存的数据,若是缓存没有数据,再经过网络请求获取,随后在主线程更新咱们的UI。
concat
操做符简直就是为咱们这种需求量身定作。
利用 concat
的必须调用 onComplete
后才能订阅下一个 Observable
的特性,咱们就能够先读取缓存数据,假若获取到的缓存数据不是咱们想要的,再调用 onComplete()
以执行获取网络数据的Observable
,若是缓存数据能应咱们所需,则直接调用 onNext()
,防止过分的网络请求,浪费用户的流量。
Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() { @Override public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception { Log.e(TAG, "create当前线程:"+Thread.currentThread().getName() ); FoodList data = CacheManager.getInstance().getFoodListData(); // 在操做符 concat 中,只有调用 onComplete 以后才会执行下一个 Observable if (data != null){ // 若是缓存数据不为空,则直接读取缓存数据,而不读取网络数据 isFromNet = false; Log.e(TAG, "\nsubscribe: 读取缓存数据:" ); runOnUiThread(new Runnable() { @Override public void run() { mRxOperatorsText.append("\nsubscribe: 读取缓存数据:\n"); } }); e.onNext(data); }else { isFromNet = true; runOnUiThread(new Runnable() { @Override public void run() { mRxOperatorsText.append("\nsubscribe: 读取网络数据:\n"); } }); Log.e(TAG, "\nsubscribe: 读取网络数据:" ); e.onComplete(); } } }); Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list") .addQueryParameter("rows",10+"") .build() .getObjectObservable(FoodList.class); // 两个 Observable 的泛型应当保持一致 Observable.concat(cache,network) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<FoodList>() { @Override public void accept(@NonNull FoodList tngouBeen) throws Exception { Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() ); if (isFromNet){ mRxOperatorsText.append("accept : 网络获取数据设置缓存: \n"); Log.e(TAG, "accept : 网络获取数据设置缓存: \n"+tngouBeen.toString() ); CacheManager.getInstance().setFoodListData(tngouBeen); } mRxOperatorsText.append("accept: 读取数据成功:" + tngouBeen.toString()+"\n"); Log.e(TAG, "accept: 读取数据成功:" + tngouBeen.toString()); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "subscribe 失败:"+Thread.currentThread().getName() ); Log.e(TAG, "accept: 读取数据失败:"+throwable.getMessage() ); mRxOperatorsText.append("accept: 读取数据失败:"+throwable.getMessage()+"\n"); } });
有时候咱们的缓存可能还会分为 memory 和 disk ,实际上都差很少,无非是多写点 Observable ,而后经过 concat 合并便可。
想必这种状况也在实际状况中比比皆是,例如用户注册成功后须要自动登陆,咱们只须要先经过注册接口注册用户信息,注册成功后立刻调用登陆接口进行自动登陆便可。
咱们的 flatMap
刚好解决了这种应用场景,flatMap
操做符能够将一个发射数据的 Observable
变换为多个 Observables
,而后将它们发射的数据合并后放到一个单独的 Observable
,利用这个特性,咱们很轻松地达到了咱们的需求。
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list") .addQueryParameter("rows", 1 + "") .build() .getObjectObservable(FoodList.class) // 发起获取食品列表的请求,并解析到FootList .subscribeOn(Schedulers.io()) // 在io线程进行网络请求 .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理获取食品列表的请求结果 .doOnNext(new Consumer<FoodList>() { @Override public void accept(@NonNull FoodList foodList) throws Exception { // 先根据获取食品列表的响应结果作一些操做 Log.e(TAG, "accept: doOnNext :" + foodList.toString()); mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n"); } }) .observeOn(Schedulers.io()) // 回到 io 线程去处理获取食品详情的请求 .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() { @Override public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception { if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) { return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show") .addBodyParameter("id", foodList.getTngou().get(0).getId() + "") .build() .getObjectObservable(FoodDetail.class); } return null; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<FoodDetail>() { @Override public void accept(@NonNull FoodDetail foodDetail) throws Exception { Log.e(TAG, "accept: success :" + foodDetail.toString()); mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n"); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "accept: error :" + throwable.getMessage()); mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n"); } });
在实际应用中,咱们极有可能会在一个页面显示的数据来源于多个接口,这时候咱们的 zip
操做符为咱们排忧解难。
zip
操做符能够将多个 Observable
的数据结合为一个数据源再发射出去。
Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512") .build() .getObjectObservable(MobileAddress.class); Observable<CategoryResult> observable2 = Network.getGankApi() .getCategoryData("Android",1,1); Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() { @Override public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception { return "合并后的数据为:手机归属地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who; } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { Log.e(TAG, "accept: 成功:" + s+"\n"); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "accept: 失败:" + throwable+"\n"); } });
想必即时通信等须要轮训的任务在现在的 APP 中已经是很常见,而 RxJava 2.x 的 interval
操做符可谓完美地解决了咱们的疑惑。
这里就简单的意思一下轮训。
private Disposable mDisposable; @Override protected void doSomething() { mDisposable = Flowable.interval(1, TimeUnit.SECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { Log.e(TAG, "accept: doOnNext : "+aLong ); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { Log.e(TAG, "accept: 设置文本 :"+aLong ); mRxOperatorsText.append("accept: 设置文本 :"+aLong +"\n"); } }); } /** * 销毁时中止心跳 */ @Override protected void onDestroy() { super.onDestroy(); if (mDisposable != null){ mDisposable.dispose(); } }
因为 RxJava 2.x 变化较大没法直接升级,幸运的是,官方为咱们提供了 RxJava2Interrop 这个库,能够方便地把 RxJava 1.x 升级到 RxJava 2.x,或者将 RxJava 2.x 转回到 RxJava 1.x。
本酱看你都看到这儿了,实为将来的栋梁之才,因此且送你一本经书:
https://github.com/nanchen2251/RxJava2Examples