聊聊reactor extra的retry

本文主要研究一下reactor extra的retryjava

maven

<dependency>
			<groupId>io.projectreactor.addons</groupId>
			<artifactId>reactor-extra</artifactId>
			<version>3.1.4.RELEASE</version>
		</dependency>

实例

TcpClient client = TcpClient.create("localhost", 8888);
        client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .subscribe();

上面这个TcpClient,在server没有启动的状况下链接会直接报错react

io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8888
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
	... 10 more

链接失败重连

简单重试

client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retry(3)
                .subscribe();

retry能够直接指定重试次数api

高级重试

client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retryWhen(Retry.allBut(RuntimeException.class)
                .retryMax(5000)
                .fixedBackoff(Duration.ofSeconds(5))
                .doOnRetry(e -> {
                    e.exception().printStackTrace();
                })
        )
                .subscribe();

利用reactor extra项目中的Retry帮助类,能够轻松指定高级重试策略,好比fixedBackoff,亦或是exponentialBackoff等socket

client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retryWhen(Retry.allBut(RuntimeException.class)
                .retryMax(5000)           .exponentialBackoffWithJitter(Duration.ofMillis(100),Duration.ofMillis(500))
                .doOnRetry(e -> {
                    e.exception().printStackTrace();
                })
        )
                .subscribe();

这里使用了exponentialBackoffWithJitter,第一个参数是firstBackoff时间,第二个参数是maxBackoff,也就是maxBackoffInterval,若是为null则至关于Duration.ofSeconds(Long.MAX_VALUE)maven

Retry

reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/Retry.java工具

/**
	 * Returns a retry function that retries errors resulting from all exceptions except
	 * the specified non-retriable exceptions. More constraints may be added using
	 * {@link #retryMax(int)} or {@link #timeout(Duration)}.
	 *
	 * @param nonRetriableExceptions exceptions that may not be retried
	 * @return retry function that retries all exceptions except the specified non-retriable exceptions.
	 */
	@SafeVarargs
	static <T> Retry<T> allBut(final Class<? extends Throwable>... nonRetriableExceptions) {
		Predicate<? super RetryContext<T>> predicate = context -> {
			Throwable exception = context.exception();
			if (exception == null)
				return true;
			for (Class<? extends Throwable> clazz : nonRetriableExceptions) {
				if (clazz.isInstance(exception))
					return false;
			}
			return true;
		};
		return DefaultRetry.<T>create(predicate);
	}

能够看到使用DefaultRetry来建立oop

reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/DefaultRetry.java.net

public static <T> DefaultRetry<T> create(Predicate<? super RetryContext<T>> retryPredicate) {
		return new DefaultRetry<T>(retryPredicate,
				1,
				null,
				Backoff.zero(),
				Jitter.noJitter(),
				null,
				NOOP_ON_RETRY,
				(T) null);
	}

注意这里的maxIterations默认为1,也就是若是不指定retryMax,至关于高级重试策略就白费了,这个要额外注意一下。netty

小结

Reactor Extra提供的Retry工具类很是好用,值得实验一下。code

doc

相关文章
相关标签/搜索