本文主要研究一下reactor extra的retryjava
<dependency> <groupId>io.projectreactor.addons</groupId> <artifactId>reactor-extra</artifactId> <version>3.1.4.RELEASE</version> </dependency>
TcpClient client = TcpClient.create("localhost", 8888); client.newHandler((inbound,outbound) -> { return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive() .asString().next().log().then()); }).doOnError(e -> e.printStackTrace()) .subscribe();
上面这个TcpClient,在server没有启动的状况下链接会直接报错react
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8888 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Connection refused ... 10 more
client.newHandler((inbound,outbound) -> { return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive() .asString().next().log().then()); }).doOnError(e -> e.printStackTrace()) .retry(3) .subscribe();
retry能够直接指定重试次数api
client.newHandler((inbound,outbound) -> { return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive() .asString().next().log().then()); }).doOnError(e -> e.printStackTrace()) .retryWhen(Retry.allBut(RuntimeException.class) .retryMax(5000) .fixedBackoff(Duration.ofSeconds(5)) .doOnRetry(e -> { e.exception().printStackTrace(); }) ) .subscribe();
利用reactor extra项目中的Retry帮助类,能够轻松指定高级重试策略,好比fixedBackoff,亦或是exponentialBackoff等socket
client.newHandler((inbound,outbound) -> { return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive() .asString().next().log().then()); }).doOnError(e -> e.printStackTrace()) .retryWhen(Retry.allBut(RuntimeException.class) .retryMax(5000) .exponentialBackoffWithJitter(Duration.ofMillis(100),Duration.ofMillis(500)) .doOnRetry(e -> { e.exception().printStackTrace(); }) ) .subscribe();
这里使用了exponentialBackoffWithJitter,第一个参数是firstBackoff时间,第二个参数是maxBackoff,也就是maxBackoffInterval,若是为null则至关于Duration.ofSeconds(Long.MAX_VALUE)maven
reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/Retry.java工具
/** * Returns a retry function that retries errors resulting from all exceptions except * the specified non-retriable exceptions. More constraints may be added using * {@link #retryMax(int)} or {@link #timeout(Duration)}. * * @param nonRetriableExceptions exceptions that may not be retried * @return retry function that retries all exceptions except the specified non-retriable exceptions. */ @SafeVarargs static <T> Retry<T> allBut(final Class<? extends Throwable>... nonRetriableExceptions) { Predicate<? super RetryContext<T>> predicate = context -> { Throwable exception = context.exception(); if (exception == null) return true; for (Class<? extends Throwable> clazz : nonRetriableExceptions) { if (clazz.isInstance(exception)) return false; } return true; }; return DefaultRetry.<T>create(predicate); }
能够看到使用DefaultRetry来建立oop
reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/DefaultRetry.java.net
public static <T> DefaultRetry<T> create(Predicate<? super RetryContext<T>> retryPredicate) { return new DefaultRetry<T>(retryPredicate, 1, null, Backoff.zero(), Jitter.noJitter(), null, NOOP_ON_RETRY, (T) null); }
注意这里的maxIterations默认为1,也就是若是不指定retryMax,至关于高级重试策略就白费了,这个要额外注意一下。netty
Reactor Extra提供的Retry工具类很是好用,值得实验一下。code