不论是在响应式编程仍是普通的程序设计中,异常处理都是一个很是重要的方面。今天将会给你们介绍Reactor中异常的处理流程。java
先举一个例子,咱们建立一个Flux,在这个Flux中,咱们产生一个异常,看看是什么状况:react
Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)); flux2.subscribe(System.out::println);
咱们会获得一个异常ErrorCallbackNotImplemented:git
100 / 1 = 100 100 / 2 = 50 reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
那怎么处理这个异常呢?github
有两种方式,第一种方式就是咱们以前文章讲过的,在subscribe的时候指定onError方法:编程
Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)); flux2.subscribe(System.out::println, error -> System.err.println("Error: " + error));
仍是刚才的代码,可是此次咱们在subscribe的时候,添加了onError处理器,看下运行结果:app
Divided by zero :( 100 / 1 = 100 100 / 2 = 50 Error: java.lang.ArithmeticException: / by zero
能够看到异常已经被咱们捕获了,而且进行了合适的处理。ide
除了在subscribe中进行处理,咱们还能够在publish的时候,就指定异常的处理模式,这就是咱们要介绍的第二种方法:oop
Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorReturn("Divided by zero :("); flux.subscribe(System.out::println);
上面的例子中,在建立Flux的时候,手动指定了其onErrorReturn方法,咱们看下输出结果:设计
100 / 1 = 100 100 / 2 = 50 Divided by zero :(
注意,对于Flux或者Mono来讲,全部的异常都是一个终止的操做,即便你使用了异常处理,原生成序列也不会继续。可是若是你对异常进行了处理,那么它会将oneError信号转换成为新的序列的开始,并将替换掉以前上游产生的序列。code
在通常的程序中,咱们的异常应该怎么处理呢?你们很容易想到的是try catch。而Reactor中subscribe的onError方法,就是try catch的一个具体应用:
Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)); flux2.subscribe(System.out::println, error -> System.err.println("Error: " + error));
仍是上的例子,咱们在onError方法中,对异常进行了处理。
若是转换成为常规代码,应该是下面的样子:
public void normalErrorHandle(){ try{ Arrays.asList(1,2,0).stream().map(i -> "100 / " + i + " = " + (100 / i)).forEach(System.out::println); }catch (Exception e){ System.err.println("Error: " + e); } }
除了这种最基本的异常处理方法以外,Reactor还提供了不少种不一样的异常处理方法,下面咱们来一一介绍一下。
Static Fallback Value的意思是,在遇到异常的时候会fallback到一个静态的默认值。好比咱们以前讲到的onErrorReturn。
Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorReturn("Divided by zero :(");
固然onErrorReturn还支持一个Predicate参数,用来判断要falback的异常是否知足条件。
public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
除了fallback Value以外,还支持Fallback Method。也就是说若是你想在捕获异常以后调用其余的方法,就可使用Fallback Method。
这里Fallback Method是用onErrorResume来表示的。
public void useFallbackMethod(){ Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(e -> System.out::println); flux.subscribe(System.out::println); }
所谓的动态Fallback Value就是根据你抛出的异常进行判断,经过定位不一样的Error从而fallback到不一样的值:
public void useDynamicFallback(){ Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(error -> Mono.just( MyWrapper.fromError(error))); } public static class MyWrapper{ public static String fromError(Throwable error){ return "That is a new Error"; } }
一样的,咱们能够在捕获异常以后进行rethrow:
Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(error -> Flux.error( new RuntimeException("oops, ArithmeticException!", error))); Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorMap(error -> new RuntimeException("oops, ArithmeticException!", error));
有两种方式,第一种就是在onErrorResume中使用Flux.error构建一个新的Flux,另一种就是直接在onErrorMap中进行处理。
有时候你只是想记录一下异常信息,并不想破坏原来的React结构,那么能够试着使用doOnError。
public void useDoOnError(){ Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .doOnError(error -> System.out.println("we got the error: "+ error)); }
若是咱们在代码中使用了某些资源,通常状况下咱们须要在finally中对其进行关闭,或者使用JDK7中引入的 try-with-resource 。
举个例子,下面的是使用finally的方式:
Stats stats = new Stats(); stats.startTimer(); try { doSomethingDangerous(); } finally { stats.stopTimerAndRecordTiming(); }
下面是使用try-with-resource的方式:
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) { return disposableInstance.toString(); }
那么在Reactor中,咱们也有两种方式和其对应。
第一种就是doFinally方法:
Stats stats = new Stats(); LongAdder statsCancel = new LongAdder(); Flux<String> flux = Flux.just("foo", "bar") .doOnSubscribe(s -> stats.startTimer()) .doFinally(type -> { stats.stopTimerAndRecordTiming(); if (type == SignalType.CANCEL) statsCancel.increment(); }) .take(1);
上面的例子中,doFinally实际上作的就是finally block作的事情。
第二种是使用using,咱们先看一个using的定义:
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
能够看到using支持三个参数,resourceSupplier是一个生成器,用来在subscribe的时候生成要发送的resource对象。
sourceSupplier是一个生成Publisher的工厂,接收resourceSupplier传过来的resource,而后生成Publisher对象。
resourceCleanup用来对resource进行收尾操做。
那么咱们怎么用呢?
举个例子:
public void useUsing(){ AtomicBoolean isDisposed = new AtomicBoolean(); Disposable disposableInstance = new Disposable() { @Override public void dispose() { isDisposed.set(true); } @Override public String toString() { return "DISPOSABLE"; } }; Flux<String> flux = Flux.using( () -> disposableInstance, disposable -> Flux.just(disposable.toString()), Disposable::dispose); }
上面的例子中,咱们建立了一个Disposable对象,做为resource,而后对这个resource进行加工,返回一个Flux<String>对象,最后经过调用Disposable::dispose方法,对resource进行销毁。
有时候咱们遇到了异常,可能须要重试几回,Reactor为咱们提供了retry方法,先看一个例子:
public void testRetry(){ Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3){ return "tick " + input; } throw new RuntimeException("boom"); }) .retry(1) .elapsed() .subscribe(System.out::println, System.err::println); try { Thread.sleep(2100); } catch (InterruptedException e) { e.printStackTrace(); } }
看下输出结果:
[264,tick 0] [255,tick 1] [241,tick 2] [506,tick 0] [252,tick 1] [253,tick 2] java.lang.RuntimeException: boom
retry的做用就是当遇到异常的时候,重启一个新的序列。
elapsed是用来展现产生的value时间之间的duration。
从结果咱们能够看到,retry以前是不会产生异常信息的。
本文的例子learn-reactive
本文做者:flydean程序那些事本文连接:http://www.flydean.com/reactor-handle-errors/
本文来源:flydean的博客
欢迎关注个人公众号:「程序那些事」最通俗的解读,最深入的干货,最简洁的教程,众多你不知道的小技巧等你来发现!