Rxjava
,因为其基于事件流的链式调用、逻辑简洁 & 使用简单的特色,深受各大 Android
开发者的欢迎。若是还不了解
RxJava
,请看文章:Android:这是一篇 清晰 & 易懂的Rxjava 入门教程java
RxJava
如此受欢迎的缘由,在于其提供了丰富 & 功能强大的操做符,几乎能完成全部的功能需求RxJava
操做符中最经常使用的建立操做符,并附带 Retrofit 结合 RxJava的实例Demo教学,但愿大家会喜欢。
- 本系列文章主要基于
Rxjava 2.0
- 接下来的时间,我将持续推出
Android
中Rxjava 2.0
的一系列文章,包括原理、操做符、应用场景、背压等等 ,有兴趣能够继续关注Carson_Ho的安卓开发笔记!!
建立 被观察者( Observable
) 对象 & 发送事件。react
注:在使用RxJava 2
操做符前,记得在项目的Gradle
中添加依赖:android
dependencies { compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxjava:2.0.7' // 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在 }
需求场景
完整的建立被观察者对象git
对应操做符类型github
create()web
Observable
)
RxJava
中建立被观察者对象最基本的操做符数组
/ ** * 1. 经过creat()建立被观察者 Observable 对象 */ Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { // 传入参数: OnSubscribe 对象 // 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发 // 即观察者会依次调用对应事件的复写方法从而响应事件 // 从而实现由被观察者向观察者的事件传递 & 被观察者调用了观察者的回调方法 ,即观察者模式 / ** * 2. 在复写的subscribe()里定义须要发送的事件 */ @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { // 经过 ObservableEmitter类对象 产生 & 发送事件 // ObservableEmitter类介绍 // a. 定义:事件发射器 // b. 做用:定义须要发送的事件 & 向观察者发送事件 // 注:建议发送事件前检查观察者的isUnsubscribed状态,以便在没有观察者时,让Observable中止发射数据 if (!observer.isUnsubscribed()) { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } emitter.onComplete(); } }); // 至此,一个完整的被观察者对象(Observable)就建立完毕了。
在具体使用时,通常采用 链式调用 来建立微信
// 1. 经过creat()建立被观察者对象 Observable.create(new ObservableOnSubscribe<Integer>() { // 2. 在复写的subscribe()里定义须要发送的事件 @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } // 至此,一个被观察者对象(Observable)就建立完毕 }).subscribe(new Observer<Integer>() { // 如下步骤仅为展现一个完整demo,能够忽略 // 3. 经过经过订阅(subscribe)链接观察者和被观察者 // 4. 建立观察者 & 定义响应事件的行为 @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } // 默认最早调用复写的 onSubscribe() @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }); }
需求场景
快速的建立被观察者对象网络
对应操做符类型ide
just()
Observable
)注:最多只能发送10个参数
应用场景
快速建立 被观察者对象(Observable
) & 发送10个如下事件
具体使用
// 1. 建立时传入整型一、二、三、4 // 在建立后就会发送这些对象,至关于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4) Observable.just(1, 2, 3,4) // 至此,一个Observable对象建立完毕,如下步骤仅为展现一个完整demo,能够忽略 // 2. 经过经过订阅(subscribe)链接观察者和被观察者 // 3. 建立观察者 & 定义响应事件的行为 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } // 默认最早调用复写的 onSubscribe() @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }); }
Observable
)会将数组中的数据转换为
Observable
对象
应用场景
Observable
) & 发送10个以上事件(数组形式)具体使用
// 1. 设置须要传入的数组 Integer[] items = { 0, 1, 2, 3, 4 }; // 2. 建立被观察者对象(Observable)时传入数组 // 在建立后就会将该数组转换成Observable & 发送该对象中的全部数据 Observable.fromArray(items) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }); } // 注: // 可发送10个以上参数 // 若直接传递一个list集合进去,不然会直接把list当作一个数据元素发送 /* * 数组遍历 **/ // 1. 设置须要传入的数组 Integer[] items = { 0, 1, 2, 3, 4 }; // 2. 建立被观察者对象(Observable)时传入数组 // 在建立后就会将该数组转换成Observable & 发送该对象中的全部数据 Observable.fromArray(items) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "数组遍历"); } @Override public void onNext(Integer value) { Log.d(TAG, "数组中的元素 = "+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "遍历结束"); } });
Observable
)List
数据会将数组中的数据转换为
Observable
对象
应用场景
Observable
) & 发送10个以上事件(集合形式)具体使用
/* * 快速发送集合 **/ // 1. 设置一个集合 List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); // 2. 经过fromIterable()将集合中的对象 / 数据发送出去 Observable.fromIterable(list) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }); /* * 集合遍历 **/ // 1. 设置一个集合 List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); // 2. 经过fromIterable()将集合中的对象 / 数据发送出去 Observable.fromIterable(list) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "集合遍历"); } @Override public void onNext(Integer value) { Log.d(TAG, "集合中的数据元素 = "+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "遍历结束"); } });
// 下列方法通常用于测试使用 <-- empty() --> // 该方法建立的被观察者对象发送事件的特色:仅发送Complete事件,直接通知完成 Observable observable1=Observable.empty(); // 即观察者接收后会直接调用onCompleted() <-- error() --> // 该方法建立的被观察者对象发送事件的特色:仅发送Error事件,直接通知异常 // 可自定义异常 Observable observable2=Observable.error(new RuntimeException()) // 即观察者接收后会直接调用onError() <-- never() --> // 该方法建立的被观察者对象发送事件的特色:不发送任何事件 Observable observable3=Observable.never(); // 即观察者接收后什么都不调用
Observer
)订阅时,才动态建立被观察者对象(Observable
) & 发送事件
- 经过
Observable
工厂方法建立被观察者对象(Observable
)- 每次订阅后,都会获得一个刚建立的最新的
Observable
对象,这能够确保Observable
对象里的数据是最新的
应用场景
动态建立被观察者对象(Observable
) & 获取最新的Observable
对象数据
具体使用
<-- 1. 第1次对i赋值 ->> Integer i = 10; // 2. 经过defer 定义被观察者对象 // 注:此时被观察者对象还没建立 Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> call() throws Exception { return Observable.just(i); } }); <-- 2. 第2次对i赋值 ->> i = 15; <-- 3. 观察者开始订阅 ->> // 注:此时,才会调用defer()建立被观察者对象(Observable) observable.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } @Override public void onNext(Integer value) { Log.d(TAG, "接收到的整数是"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
由于是在订阅时才建立,因此i值会取第2次的赋值
Observable
)Long
类型)本质 = 延迟指定时间后,调用一次
onNext(0)
应用场景
延迟指定事件,发送一个0,通常用于检测
具体使用
// 该例子 = 延迟2s后,发送一个long类型数值 Observable.timer(2, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } @Override public void onNext(Long value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }); // 注:timer操做符默认运行在一个新线程上 // 也可自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler)
Observable
)发送的事件序列 = 从0开始、无限递增1的的整数序列
// 参数说明: // 参数1 = 第1次延迟时间; // 参数2 = 间隔时间数字; // 参数3 = 时间单位; Observable.interval(3,1,TimeUnit.SECONDS) // 该例子发送的事件序列特色:延迟3s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } // 默认最早调用复写的 onSubscribe() @Override public void onNext(Long value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }); // 注:interval默认在computation调度器上执行 // 也可自定义指定线程调度器(第3个参数):interval(long,TimeUnit,Scheduler)
Observable
)a. 发送的事件序列 = 从0开始、无限递增1的的整数序列
b. 做用相似于interval()
,但可指定发送的数据的数量
// 参数说明: // 参数1 = 事件序列起始点; // 参数2 = 事件数量; // 参数3 = 第1次事件延迟发送时间; // 参数4 = 间隔时间数字; // 参数5 = 时间单位 Observable.intervalRange(3,10,2, 1, TimeUnit.SECONDS) // 该例子发送的事件序列特色: // 1. 从3开始,一共发送10个事件; // 2. 第1次延迟2s发送,以后每隔2秒产生1个数字(从0开始递增1,无限个) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } // 默认最早调用复写的 onSubscribe() @Override public void onNext(Long value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
Observable
)a. 发送的事件序列 = 从0开始、无限递增1的的整数序列
b. 做用相似于intervalRange()
,但区别在于:无延迟发送事件
// 参数说明: // 参数1 = 事件序列起始点; // 参数2 = 事件数量; // 注:若设置为负数,则会抛出异常 Observable.range(3,10) // 该例子发送的事件序列特色:从3开始发送,每次发送事件递增1,一共发送10个事件 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } // 默认最早调用复写的 onSubscribe() @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
range()
,区别在于该方法支持数据类型 = Long
range()
相似,此处不做过多描述至此,关于 RxJava2
中的建立操做符讲解完毕。
Retrofit
和 RxJava
进行讲解上述全部的Demo
源代码都存放在:Carson_Ho的Github地址:RxJava2_建立操做符
RxJava2
中经常使用的建立操做符Android
中 Rxjava 2.0
的一系列文章,包括原理、操做符、应用场景、背压等等