需求了解:java
Rxjava 中当数据处理派发中发生了异常 ,观察者会接受到一个 Error
的通知,那若是不想发射这个异常的通知,本身处理掉呢?答案固然是能够的,在 Rxjava 中不少操做符可用于对 Observable 发射的 onError 通知作出响应或者从错误中恢复。react
例如:git
Rxjava中常见的错误处理操做符有以下几类:github
从 onError 通知中恢复发射数据。并发
Catch
操做符拦截原始Observable的 onError 通知,将它替换为其它的数据项或数据序列,让产生的Observable可以正常终止或者根本不终止。app
onErrorReturn
方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的 onError 调用,不会将错误传递给观察者,做为替代,它会发发射一个特殊的项并调用观察者的 onCompleted 方法。ide
示例代码:函数
// 建立一个能够发射异常的Observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(1 / 0); // 产生一个异常 emitter.onNext(3); emitter.onNext(4); } }); /** 1. onErrorReturnItem(T item) * 让Observable遇到错误时发射一个指定的项(item)而且正常终止。 */ observable.onErrorReturnItem(888) // 源Observable发生异常时发射指定的888数据 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(1)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(1): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(1): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(1)"); } }); System.out.println("-----------------------------------------------"); /** * 2. onErrorReturn(Function<Throwable, T> valueSupplier) * 让Observable遇到错误时经过一个函数Function来接受Error参数并进行判断返回指定的类型数据,而且正常终止。 */ observable.onErrorReturn(new Function<Throwable, Integer>() { @Override public Integer apply(Throwable throwable) throws Exception { System.out.println("--> apply(1): e = " + throwable); return 888; // 源Observable发生异常时发射指定的888数据 } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(2)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(2): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(2): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(2)"); } });
输出:.net
--> onSubscribe(1) --> onNext(1): 1 --> onNext(1): 2 --> onNext(1): 888 --> onCompleted(1) ----------------------------------------------- --> onSubscribe(2) --> onNext(2): 1 --> onNext(2): 2 --> apply(1): e = java.lang.ArithmeticException: / by zero --> onNext(2): 888 --> onCompleted(2)
Javadoc: onErrorReturnItem(T item)
Javadoc: onErrorReturn(Function<Throwable, T> valueSupplier)
onErrorResumeNext
方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的 onError 调用,不会将错误传递给观察者,做为替代,它会开始另外一个指定的备用Observable。
示例代码:
// 建立一个能够发射异常的Observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(1 / 0); // 产生一个异常 emitter.onNext(3); emitter.onNext(4); } }); /** * 3. onErrorResumeNext(ObservableSource next) * 让Observable在遇到错误时开始发射第二个指定的Observable的数据序列 */ observable.onErrorResumeNext(Observable.just(888)) // 当发生异常的时候继续发射此项Observable .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(3)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(3): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(3)"); } }); System.out.println("-----------------------------------------------"); /** * 4. onErrorResumeNext(Function<Throwable, ObservableSource<T>> resumeFunction) * 让Observable在遇到错误时经过一个函数Function来接受Error参数并进行判断返回指定的第二个Observable的数据序列 */ observable.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception { System.out.println("--> apply(4): " + throwable); return Observable.just(888); // 当发生异常的时候继续发射此项Observable } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(4)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(4): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(4): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(4)"); } });
输出:
--> onSubscribe(3) --> onNext(3): 1 --> onNext(3): 2 --> onNext(3): 888 --> onCompleted(3) ----------------------------------------------- --> onSubscribe(4) --> onNext(4): 1 --> onNext(4): 2 --> apply(4): java.lang.ArithmeticException: / by zero --> onNext(4): 888 --> onCompleted(4)
Javadoc: onErrorResumeNext(ObservableSource next)
Javadoc: onErrorResumeNext(Function<Throwable, ObservableSource<T>> resumeFunction)
与 onErrorResumeNext 相似, onExceptionResumeNext
方法返回一个镜像原有Observable行为的新Observable,也使用一个备用的Observable,不一样的是,若是 onError 收到的 Throwable 不是一个 Exception ,它会将错误传递给观察者的 onError 方法,不会使用备用的Observable。
解析: onExceptionResumeNext
只会对Exception
类型的异常进行处理,若是onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable 。
示例代码:
// 建立一个能够发射异常的Observable Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); // emitter.onError(new Throwable("This is Throwable!")); // Throwable类型异常,直接通知观察者 // emitter.onError(new Error("This is Error!")); // Error类型异常,直接通知观察者 emitter.onError(new Exception("This is Exception!")); // Exception类型异常,进行处理,发送备用的Observable数据 // emitter.onNext(1 / 0); // 会产生一个ArithmeticException异常,异常会被处理,发送备用的Observable数据 emitter.onNext(3); emitter.onNext(4); } }); /** * 5. onExceptionResumeNext(ObservableSource next) * 若是onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable * 只对Exception类型的异常通知进行备用Observable处理 */ observable1.onExceptionResumeNext(Observable.just(888)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(5)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(5): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(5): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(5)"); } });
输出:
--> onSubscribe(5) --> onNext(5): 1 --> onNext(5): 2 --> onNext(5): 888 --> onCompleted(5)
若是原始Observable遇到错误,从新订阅它指望它能正常终止。
Retry
操做符不会将原始 Observable 的 onError
通知传递给观察者,它会订阅这个Observable,再给它机会无错误地完成它的数据序列。 Retry 老是传递 onNext 通知给观察者,因为从新订阅,可能会形成数据项重复状况。
retry():不管收到多少次 onError
通知,无参数版本的 retry
都会继续订阅并发射原始Observable。
注意: 由于若是遇到异常,将会无条件的从新订阅原始的Observable,知道没有异常的发射所有的数据序列为止。因此若是你的异常发生后从新订阅也不会恢复正常的话,会一直订阅下去,有内存泄露的风险。
retry(long times):接受单个 count
参数的 retry 会最多从新订阅指定的次数,若是次数超了,它不会尝试再次订阅,它会把最新的一个 onError
通知传递给它的观察者。
retry(long times, Predicate<Throwable> predicate):遇到异常后最多从新订阅 times
次,每次从新订阅通过函数predicate
最终判断是否继续从新订阅,若是 times 到达上限或者 predicate 返回 false 中任意一个最早知足条件,都会终止从新订阅,retry 会将最新的一个 onError 通知传递给它的观察者。
retry(Predicate<Throwable> predicate):接受一个谓词函数做为参数,这个函数的两个参数是:重试次数和致使发射 onError
通知的 Throwable 。这个函数返回一个布尔值,若是返回 true
, retry 应该再次订阅和镜像原始的Observable,若是返回 false
, retry 会将最新的一个 onError 通知传递给它的观察者
retry(BiPredicate<Integer, Throwable> predicate):遇到异常时,经过函数 predicate
判断是否从新订阅源Observable,而且经过参数 Integer
传递给 predicate 从新订阅的次数,retry 会将最新的一个 onError 通知传递给它的观察者。
retryUntil(BooleanSupplier stop):重试从新订阅,直到给定的中止函数 stop
返回 true
,retry 会将最新的一个 onError 通知传递给它的观察者。
retryWhen(Function<Observable<Throwable>, ObservableSource> handler):retryWhen
和 retry 相似,区别是, retryWhen 将 onError 中的 Throwable 传递给一个函数,这个函数产生另外一个 Observable, retryWhen 观察它的结果再决定是否是要从新订阅原始的Observable。若是这个Observable发射了一项数据,它就从新订阅,若是这个Observable发射的是 onError 通知,它就将这个通知传递给观察者而后终止。
实例代码:
// flag for emitted onError times public static int temp = 0; // 建立能够发送Error通知的Observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); if (temp <= 2) { emitter.onError(new Exception("Test Error!")); temp++; } emitter.onNext(3); emitter.onNext(4); } }); /** * 1. retry() * 不管收到多少次onError通知, 都会去继续订阅并发射原始Observable。 */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(1)"); } }).retry().subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(1): " + integer); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 2. retry(long times) * 遇到异常后,最多从新订阅源Observable times次 */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(2)"); } }).retry(1) // 遇到异常后,重复订阅的1次 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(2)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(2): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(2): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(2)"); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 3. retry(long times, Predicate<Throwable> predicate) * 遇到异常后最多从新订阅times次,每次从新订阅通过函数predicate最终判断是否继续从新订阅 * 若是times到达上限或者predicate返回false中任意一个最早知足条件,都会终止从新订阅 */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(3)"); } }).retry(2, new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) throws Exception { System.out.println("--> test(3)"); if(throwable instanceof Exception) { return true; // 遇到异常通知后是否继续继续订阅 } return false; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(3)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(3): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(3)"); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 4. retry(Predicate<Throwable> predicate) * 遇到异常时,经过函数predicate判断是否从新订阅源Observable */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(4)"); } }).retry(new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) throws Exception { if (throwable instanceof Exception) { return true; // 遇到异常通知后是否继续继续订阅 } return false; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(4)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(4): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(4): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(4)"); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 5. retry(BiPredicate<Integer, Throwable> predicate) * 遇到异常时,经过函数predicate判断是否从新订阅源Observable,而且经过参数integer传递给predicate从新订阅的次数 */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(5)"); } }).retry(new BiPredicate<Integer, Throwable>() { @Override public boolean test(Integer integer, Throwable throwable) throws Exception { System.out.println("--> test(5): " + integer); if (throwable instanceof Exception) { return true; // 遇到异常通知后是否继续继续订阅 } return false; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(5)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(5): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(5): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(5)"); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 6. retryUntil(BooleanSupplier stop) * 重试从新订阅,直到给定的中止函数stop返回true */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(6)"); } }).retryUntil(new BooleanSupplier() { @Override public boolean getAsBoolean() throws Exception { System.out.println("--> getAsBoolean(6)"); if(temp == 1){ // 知足条件,中止从新订阅 return true; } return false; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(6)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(6): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(6): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(6)"); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 7. retryWhen(Function<Observable<Throwable>, ObservableSource> handler) * 将onError中的Throwable传递给一个函数handler,这个函数产生另外一个Observable, * retryWhen观察它的结果再决定是否是要从新订阅原始的Observable。 * 若是这个Observable发射了一项数据,它就从新订阅, * 若是这个Observable发射的是onError通知,它就将这个通知传递给观察者而后终止。 */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(7)"); } }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception { System.out.println("--> apply(7)"); // 根据产生的Error的Observable是否正常发射数据来进行从新订阅,若是发射Error通知,则直接传递给观察者后终止 return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Throwable throwable) throws Exception { if (temp == 1) { return Observable.error(throwable); // 知足条件后,传递这个Error,终止从新订阅 } return Observable.timer(1, TimeUnit.MILLISECONDS); // 正常发射数据,能够从新订阅 } }); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(7)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(7): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(7): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(7)"); } }); System.in.read();
输出:
----> doOnSubscribe(1) --> accept(1): 1 --> accept(1): 2 ----> doOnSubscribe(1) --> accept(1): 1 --> accept(1): 2 ----> doOnSubscribe(1) --> accept(1): 1 --> accept(1): 2 ----> doOnSubscribe(1) --> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --------------------------------------------- --> onSubscribe(2) ----> doOnSubscribe(2) --> onNext(2): 1 --> onNext(2): 2 ----> doOnSubscribe(2) --> onNext(2): 1 --> onNext(2): 2 --> onError(2): java.lang.Exception: Test Error! --------------------------------------------- --> onSubscribe(3) ----> doOnSubscribe(3) --> onNext(3): 1 --> onNext(3): 2 --> test(3) ----> doOnSubscribe(3) --> onNext(3): 1 --> onNext(3): 2 --> test(3) ----> doOnSubscribe(3) --> onNext(3): 1 --> onNext(3): 2 --> onError(3): java.lang.Exception: Test Error! --------------------------------------------- --> onSubscribe(4) ----> doOnSubscribe(4) --> onNext(4): 1 --> onNext(4): 2 ----> doOnSubscribe(4) --> onNext(4): 1 --> onNext(4): 2 ----> doOnSubscribe(4) --> onNext(4): 1 --> onNext(4): 2 ----> doOnSubscribe(4) --> onNext(4): 1 --> onNext(4): 2 --> onNext(4): 3 --> onNext(4): 4 --------------------------------------------- --> onSubscribe(5) ----> doOnSubscribe(5) --> onNext(5): 1 --> onNext(5): 2 --> test(5): 1 ----> doOnSubscribe(5) --> onNext(5): 1 --> onNext(5): 2 --> test(5): 2 ----> doOnSubscribe(5) --> onNext(5): 1 --> onNext(5): 2 --> test(5): 3 ----> doOnSubscribe(5) --> onNext(5): 1 --> onNext(5): 2 --> onNext(5): 3 --> onNext(5): 4 --------------------------------------------- --> onSubscribe(6) ----> doOnSubscribe(6) --> onNext(6): 1 --> onNext(6): 2 --> getAsBoolean(6) ----> doOnSubscribe(6) --> onNext(6): 1 --> onNext(6): 2 --> getAsBoolean(6) --> onError(6): java.lang.Exception: Test Error! --------------------------------------------- --> apply(7) --> onSubscribe(7) ----> doOnSubscribe(7) --> onNext(7): 1 --> onNext(7): 2 ----> doOnSubscribe(7) --> onNext(7): 1 --> onNext(7): 2 --> onError(7): java.lang.Exception: Test Error!
Javadoc: retry()
Javadoc: retry(long times)
Javadoc: retry(long times, Predicate<Throwable> predicate)
Javadoc: retry(Predicate<Throwable> predicate)
Javadoc: retry(BiPredicate<Integer, Throwable> predicate)
Javadoc: retryUntil(BooleanSupplier stop)
Javadoc: retryWhen(Function<Observable<Throwable>, ObservableSource> handler)
本节主要介绍了 Rxjava 中关于 Error
通知的处理,主要是在遇到异常通知时,无条件或者指定条件的去从新订阅原始 Observable 直到没有异常(正常发射全部数据序列)或者知足指定的条件后终止从新订阅,发射异常通知给观察者。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例
实例代码: