Rxjava
,因为其基于事件流的链式调用、逻辑简洁 & 使用简单的特色,深受各大 Android
开发者的欢迎。若是还不了解
RxJava
,请看文章:Android:这是一篇 清晰 & 易懂的Rxjava 入门教程java
RxJava
如此受欢迎的缘由,在于其提供了丰富 & 功能强大的操做符,几乎能完成全部的功能需求RxJava
操做符中最经常使用的 功能性操做符,并附带 Retrofit 结合 RxJava的实例Demo教学,但愿大家会喜欢。
- 本系列文章主要基于
Rxjava 2.0
- 接下来的时间,我将持续推出
Android
中Rxjava 2.0
的一系列文章,包括原理、操做符、应用场景、背压等等 ,有兴趣能够继续关注Carson_Ho的安卓开发笔记!!
辅助被观察者(Observable
) 在发送事件时实现一些功能性需求react
如错误处理、线程调度等等android
RxJava 2
中,常见的功能性操做符 主要有:注:在使用RxJava 2
操做符前,记得在项目的Gradle
中添加依赖:git
dependencies { compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxjava:2.0.7' // 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在 }
需求场景
即便得被观察者 & 观察者 造成订阅关系github
对应操做符web
做用
订阅,即链接观察者 & 被观察者微信
具体使用网络
observable.subscribe(observer); // 前者 = 被观察者(observable);后者 = 观察者(observer 或 subscriber) <-- 1. 分步骤的完整调用 --> // 步骤1: 建立被观察者 Observable 对象 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }); // 步骤2:建立观察者 Observer 并 定义响应事件行为 Observer<Integer> observer = new Observer<Integer>() { // 经过复写对应方法来 响应 被观察者 @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } // 默认最早调用复写的 onSubscribe() @Override public void onNext(Integer value) { Log.d(TAG, "对Next事件"+ value +"做出响应" ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }; // 步骤3:经过订阅(subscribe)链接观察者和被观察者 observable.subscribe(observer); <-- 2. 基于事件流的链式调用 --> Observable.create(new ObservableOnSubscribe<Integer>() { // 1. 建立被观察者 & 生产事件 @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).subscribe(new Observer<Integer>() { // 2. 经过经过订阅(subscribe)链接观察者和被观察者 // 3. 建立观察者 & 定义响应事件的行为 @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } // 默认最早调用复写的 onSubscribe() @Override public void onNext(Integer value) { Log.d(TAG, "对Next事件"+ value +"做出响应" ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }); } }
<-- Observable.subscribe(Subscriber) 的内部实现 --> public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); // 在观察者 subscriber抽象类复写的方法 onSubscribe.call(subscriber),用于初始化工做 // 经过该调用,从而回调观察者中的对应方法从而响应被观察者生产的事件 // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式 // 同时也看出:Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时 }
需求场景
快速、方便指定 & 控制被观察者 & 观察者 的工做线程app
对应操做符使用
因为该部份内容较多 & 重要,因此已独立一篇文章,请看文章:Android RxJava:细说 线程控制(切换 / 调度 )(含Retrofit实例讲解)ide
需求场景
即在被观察者发送事件前进行一些延迟的操做
对应操做符使用
做用
使得被观察者延迟一段时间再发送事件
方法介绍delay()
具有多个重载方法,具体以下:
// 1. 指定延迟时间 // 参数1 = 时间;参数2 = 时间单位 delay(long delay,TimeUnit unit) // 2. 指定延迟时间 & 调度器 // 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器 delay(long delay,TimeUnit unit,mScheduler scheduler) // 3. 指定延迟时间 & 错误延迟 // 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常 // 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数 delay(long delay,TimeUnit unit,boolean delayError) // 4. 指定延迟时间 & 调度器 & 错误延迟 // 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数 delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知能够设置是否延迟
Observable.just(1, 2, 3) .delay(3, TimeUnit.SECONDS) // 延迟3s再发送,因为使用相似,因此此处不做所有展现 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
如发送事件前的初始化、发送事件后的回调请求等
do()
操做符有不少个,具体以下:Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onError(new Throwable("发生错误了")); } }) // 1. 当Observable每发送1次数据事件就会调用1次 .doOnEach(new Consumer<Notification<Integer>>() { @Override public void accept(Notification<Integer> integerNotification) throws Exception { Log.d(TAG, "doOnEach: " + integerNotification.getValue()); } }) // 2. 执行Next事件前调用 .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "doOnNext: " + integer); } }) // 3. 执行Next事件后调用 .doAfterNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "doAfterNext: " + integer); } }) // 4. Observable正常发送事件完毕后调用 .doOnComplete(new Action() { @Override public void run() throws Exception { Log.e(TAG, "doOnComplete: "); } }) // 5. Observable发送错误事件时调用 .doOnError(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "doOnError: " + throwable.getMessage()); } }) // 6. 观察者订阅时调用 .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { Log.e(TAG, "doOnSubscribe: "); } }) // 7. Observable发送事件完毕后调用,不管正常发送完毕 / 异常终止 .doAfterTerminate(new Action() { @Override public void run() throws Exception { Log.e(TAG, "doAfterTerminate: "); } }) // 8. 最后执行 .doFinally(new Action() { @Override public void run() throws Exception { Log.e(TAG, "doFinally: "); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
需求场景
发送事件过程当中,遇到错误时的处理机制
对应操做符类型
可捕获在它以前发生的异常
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onError(new Throwable("发生错误了")); } }) .onErrorReturn(new Function<Throwable, Integer>() { @Override public Integer apply(@NonNull Throwable throwable) throws Exception { // 捕捉错误异常 Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() ); return 666; // 发生错误事件后,发送一个"666"事件,最终正常结束 } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
Observable
注:
onErrorResumeNext()
拦截的错误 =Throwable
;若需拦截Exception
请用onExceptionResumeNext()
- 若
onErrorResumeNext()
拦截的错误 =Exception
,则会将错误传递给观察者的onError
方法
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onError(new Throwable("发生错误了")); } }) .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception { // 1. 捕捉错误异常 Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() ); // 2. 发生错误事件后,发送一个新的被观察者 & 发送事件序列 return Observable.just(11,22); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
Observable
注:
onExceptionResumeNext()
拦截的错误 =Exception
;若需拦截Throwable
请用onErrorResumeNext()
- 若
onExceptionResumeNext()
拦截的错误 =Throwable
,则会将错误传递给观察者的onError
方法
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onError(new Exception("发生错误了")); } }) .onExceptionResumeNext(new Observable<Integer>() { @Override protected void subscribeActual(Observer<? super Integer> observer) { observer.onNext(11); observer.onNext(22); observer.onComplete(); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
Observable
)从新发射数据
- 接收到 onError()时,从新订阅 & 发送事件
Throwable
和Exception
均可拦截
共有5种重载方法
<-- 1. retry() --> // 做用:出现错误时,让被观察者从新发送数据 // 注:若一直错误,则一直从新发送 <-- 2. retry(long time) --> // 做用:出现错误时,让被观察者从新发送数据(具有重试次数限制 // 参数 = 重试次数 <-- 3. retry(Predicate predicate) --> // 做用:出现错误后,判断是否须要从新发送数据(若须要从新发送& 持续遇到错误,则持续重试) // 参数 = 判断逻辑 <-- 4. retry(new BiPredicate<Integer, Throwable>) --> // 做用:出现错误后,判断是否须要从新发送数据(若须要从新发送 & 持续遇到错误,则持续重试 // 参数 = 判断逻辑(传入当前重试次数 & 异常错误信息) <-- 5. retry(long time,Predicate predicate) --> // 做用:出现错误后,判断是否须要从新发送数据(具有重试次数限制 // 参数 = 设置重试次数 & 判断逻辑
<-- 1. retry() --> // 做用:出现错误时,让被观察者从新发送数据 // 注:若一直错误,则一直从新发送 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onError(new Exception("发生错误了")); e.onNext(3); } }) .retry() // 遇到错误时,让被观察者从新发射数据(若一直错误,则一直从新发送 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }); <-- 2. retry(long time) --> // 做用:出现错误时,让被观察者从新发送数据(具有重试次数限制 // 参数 = 重试次数 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onError(new Exception("发生错误了")); e.onNext(3); } }) .retry(3) // 设置重试次数 = 3次 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }); <-- 3. retry(Predicate predicate) --> // 做用:出现错误后,判断是否须要从新发送数据(若须要从新发送& 持续遇到错误,则持续重试) // 参数 = 判断逻辑 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onError(new Exception("发生错误了")); e.onNext(3); } }) // 拦截错误后,判断是否须要从新发送请求 .retry(new Predicate<Throwable>() { @Override public boolean test(@NonNull Throwable throwable) throws Exception { // 捕获异常 Log.e(TAG, "retry错误: "+throwable.toString()); //返回false = 不从新从新发送数据 & 调用观察者的onError结束 //返回true = 从新发送请求(若持续遇到错误,就持续从新发送) return true; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }); <-- 4. retry(new BiPredicate<Integer, Throwable>) --> // 做用:出现错误后,判断是否须要从新发送数据(若须要从新发送 & 持续遇到错误,则持续重试 // 参数 = 判断逻辑(传入当前重试次数 & 异常错误信息) Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onError(new Exception("发生错误了")); e.onNext(3); } }) // 拦截错误后,判断是否须要从新发送请求 .retry(new BiPredicate<Integer, Throwable>() { @Override public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception { // 捕获异常 Log.e(TAG, "异常错误 = "+throwable.toString()); // 获取当前重试次数 Log.e(TAG, "当前重试次数 = "+integer); //返回false = 不从新从新发送数据 & 调用观察者的onError结束 //返回true = 从新发送请求(若持续遇到错误,就持续从新发送) return true; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } }); <-- 5. retry(long time,Predicate predicate) --> // 做用:出现错误后,判断是否须要从新发送数据(具有重试次数限制 // 参数 = 设置重试次数 & 判断逻辑 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onError(new Exception("发生错误了")); e.onNext(3); } }) // 拦截错误后,判断是否须要从新发送请求 .retry(3, new Predicate<Throwable>() { @Override public boolean test(@NonNull Throwable throwable) throws Exception { // 捕获异常 Log.e(TAG, "retry错误: "+throwable.toString()); //返回false = 不从新从新发送数据 & 调用观察者的onError()结束 //返回true = 从新发送请求(最多从新发送3次) return true; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
- 若须要从新发送 & 持续遇到错误,则持续重试
- 做用相似于
retry(Predicate predicate)
retry(Predicate predicate)
,惟一区别:返回 true
则不从新发送数据事件。此处不做过多描述做用
遇到错误时,将发生的错误传递给一个新的被观察者(Observable
),并决定是否须要从新订阅原始被观察者(Observable
)& 发送事件
具体使用
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onError(new Exception("发生错误了")); e.onNext(3); } }) // 遇到error事件才会回调 .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception { // 参数Observable<Throwable>中的泛型 = 上游操做符抛出的异常,可经过该条件来判断异常的类型 // 返回Observable<?> = 新的被观察者 Observable(任意类型) // 此处有两种状况: // 1. 若 新的被观察者 Observable发送的事件 = Error事件,那么 原始Observable则不从新发送事件: // 2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则从新发送事件: return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception { // 1. 若返回的Observable发送的事件 = Error事件,则原始的Observable不从新发送事件 // 该异常错误信息可在观察者中的onError()中得到 return Observable.error(new Throwable("retryWhen终止啦")); // 2. 若返回的Observable发送的事件 = Next事件,则原始的Observable从新发送事件(若持续遇到错误,则持续重试) // return Observable.just(1); } }); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应" + e.toString()); // 获取异常错误信息 } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
需求场景
重复不断地发送被观察者事件
对应操做符类型repeat()
& repeatWhen()
具有重载方法,可设置重复建立次数
// 不传入参数 = 重复发送次数 = 无限次 repeat(); // 传入参数 = 重复发送次数有限 repeatWhen(Integer int ); // 注: // 1. 接收到.onCompleted()事件后,触发从新订阅 & 发送 // 2. 默认运行在一个新的线程上 // 具体使用 Observable.just(1, 2, 3, 4) .repeat(3) // 重复建立次数 =- 3次 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件" + value); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
做用
有条件地、重复发送 被观察者事件
原理
将原始 Observable
中止发送事件的标识(Complete()
/ Error()
)转换成1个 Object
类型数据传递给1个新被观察者(Observable
),以此决定是否从新订阅 & 发送原来的 Observable
- 若新被观察者(
Observable
)返回1个Complete
/Error
事件,则不从新订阅 & 发送原来的Observable
- 若新被观察者(
Observable
)返回其他事件时,则从新订阅 & 发送原来的Observable
Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() { @Override // 在Function函数中,必须对输入的 Observable<Object>进行处理,这里咱们使用的是flatMap操做符接收上游的数据 public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception { // 将原始 Observable 中止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable) // 以此决定是否从新订阅 & 发送原来的 Observable // 此处有2种状况: // 1. 若新被观察者(Observable)返回1个Complete() / Error()事件,则不从新订阅 & 发送原来的 Observable // 2. 若新被观察者(Observable)返回其他事件,则从新订阅 & 发送原来的 Observable return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Object throwable) throws Exception { // 状况1:若新被观察者(Observable)返回1个Complete() / Error()事件,则不从新订阅 & 发送原来的 Observable return Observable.empty(); // Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete() // return Observable.error(new Throwable("再也不从新订阅事件")); // 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。 // 状况2:若新被观察者(Observable)返回其他事件,则从新订阅 & 发送原来的 Observable // return Observable.just(1); // 仅仅是做为1个触发从新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() / Error()事件 } }); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe链接"); } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件" + value); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件做出响应:" + e.toString()); } @Override public void onComplete() { Log.d(TAG, "对Complete事件做出响应"); } });
至此,RxJava 2
中的功能性操做符讲解完毕。
Retrofit
& RxJava
,讲解功能性操做符的3个实际需求案例场景:
UI
Retrofit
与RxJava
用一个具体实例来实现轮询需求需求场景说明
功能说明
下面我将结合 Retrofit
与RxJava
用一个具体实例来实现 发送网络请求时的 差错重试机制需求
上述全部的Demo源代码都存放在:Carson_Ho的Github地址:RxJava2_功能性操做符
RxJava2
中经常使用的功能性操做符RxJava2
的其余操做符进行深刻讲解 ,感兴趣的同窗能够继续关注carson_ho的微信公众号**