案例概述
在本文中,咱们讨论一下Resilience4j库。
该库经过管理远程通讯的容错性来帮助实现弹性系统。
这个库受到Hystrix的启发,但提供了更方便的API和许多其余特性,如速率限制器(阻塞太频繁的请求)、Bulkhead(避免太多并发请求)等。html
Maven设置
首先,咱们须要将目标模块添加到咱们的pom.xml中(例如,咱们添加了Circuit Breaker):java
<dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-circuitbreaker</artifactId> <version>0.12.1</version> </dependency>
在这里,咱们使用的是断路器模块。全部模块及其最新版本都可在Maven Central上找到。
在接下来的部分中,咱们将介绍库中最经常使用的模块。git
断路器
请注意,对于此模块,咱们须要上面显示的设置resilience4j-circuitbreaker依赖项。github
断路器模式能够帮助咱们在远程服务中断时防止一连串的故障。spring
在屡次失败的尝试以后,咱们能够认为服务不可用/重载,并急切地拒绝全部后续的请求。经过这种方式,咱们能够为可能失败的调用节省系统资源。缓存
让咱们看看咱们如何经过Resilience4j实现这一目标。服务器
首先,咱们须要定义要使用的设置。最简单的方法是使用默认设置:并发
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
也可使用自定义参数:app
CircuitBreakerConfig config = CircuitBreakerConfig.custom() .failureRateThreshold(20) .ringBufferSizeInClosedState(5) .build();
在这里,咱们将速率阈值设置为20%,并将尝试呼叫设置为最少5次。框架
而后,咱们建立一个CircuitBreaker对象并经过它调用远程服务:
interface RemoteService { int process(int i); } CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config); CircuitBreaker circuitBreaker = registry.circuitBreaker("my"); Function<Integer, Integer> decorated = CircuitBreaker .decorateFunction(circuitBreaker, service::process);
最后,让咱们经过JUnit测试看看它是如何工做的。
咱们将尝试调用服务10次。咱们应该可以验证该呼叫至少尝试了5次,而后在20%的呼叫失败后中止:
when(service.process(any(Integer.class))).thenThrow(new RuntimeException()); for (int i = 0; i < 10; i++) { try { decorated.apply(i); } catch (Exception ignore) {} } verify(service, times(5)).process(any(Integer.class));
断路器的状态和设置
断路器能够处于如下三种状态之一:
咱们能够配置如下设置:
速率限制器
与上一节相似,此功能须要resilience4j-ratelimiter依赖项。
顾名思义,此功能容许限制对某些服务的访问。它的API与CircuitBreaker很是类似- 有Registry,Config和Limiter类。
如下是它的示例:
RateLimiterConfig config = RateLimiterConfig.custom().limitForPeriod(2).build(); RateLimiterRegistry registry = RateLimiterRegistry.of(config); RateLimiter rateLimiter = registry.rateLimiter("my"); Function<Integer, Integer> decorated = RateLimiter.decorateFunction(rateLimiter, service::process);
如今,若是须要的话,全部对已修饰的服务块的调用都要符合速率限制器配置。
咱们能够配置以下参数:
Bulkhead
在这里,咱们首先须要relasticience4j-bulkhead依赖项。
能够限制对特定服务的并发调用数。
让咱们看一个使用Bulkhead API配置最多一个并发调用的示例:
BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(1).build(); BulkheadRegistry registry = BulkheadRegistry.of(config); Bulkhead bulkhead = registry.bulkhead("my"); Function<Integer, Integer> decorated = Bulkhead.decorateFunction(bulkhead, service::process);
要测试此配置,咱们将调用模拟服务的方法。
而后,咱们确保Bulkhead不容许任何其余调用:
CountDownLatch latch = new CountDownLatch(1); when(service.process(anyInt())).thenAnswer(invocation -> { latch.countDown(); Thread.currentThread().join(); return null; }); ForkJoinTask<?> task = ForkJoinPool.commonPool().submit(() -> { try { decorated.apply(1); } finally { bulkhead.onComplete(); } }); latch.await(); assertThat(bulkhead.isCallPermitted()).isFalse();
咱们能够配置如下设置:
重试
对于此功能,咱们须要将resilience4j-retry库添加到项目中。
咱们可使用Retry API 自动重试失败的呼叫:
RetryConfig config = RetryConfig.custom().maxAttempts(2).build(); RetryRegistry registry = RetryRegistry.of(config); Retry retry = registry.retry("my"); Function<Integer, Void> decorated = Retry.decorateFunction(retry, (Integer s) -> { service.process(s); return null; });
如今让咱们模拟在远程服务调用期间抛出异常的状况,并确保库自动重试失败的调用:
when(service.process(anyInt())).thenThrow(new RuntimeException()); try { decorated.apply(1); fail("Expected an exception to be thrown if all retries failed"); } catch (Exception e) { verify(service, times(2)).process(any(Integer.class)); }
咱们还能够配置如下内容:
缓存
Cache模块须要resilience4j-cache依赖项。
初始化看起来与其余模块略有不一样:
javax.cache.Cache cache = ...; // Use appropriate cache here Cache<Integer, Integer> cacheContext = Cache.of(cache); Function<Integer, Integer> decorated = Cache.decorateSupplier(cacheContext, () -> service.process(1));
这里的缓存是经过使用JSR-107 Cache实现完成的,Resilience4j提供了一种应用它的方法。
请注意,没有用于装饰功能的API(如Cache.decorateFunction(Function)),API仅支持 Supplier和Callable类型。
TimeLimiter
对于此模块,咱们必须添加resilience4j-timelimiter依赖项。
可使用TimeLimiter限制调用远程服务所花费的时间。
为了演示,让咱们设置一个配置超时为1毫秒的TimeLimiter:
long ttl = 1; TimeLimiterConfig config = TimeLimiterConfig.custom().timeoutDuration(Duration.ofMillis(ttl)).build(); TimeLimiter timeLimiter = TimeLimiter.of(config);
接下来,让咱们验证Resilience4j是否使用预期的超时调用Future.get():
Future futureMock = mock(Future.class); Callable restrictedCall = TimeLimiter.decorateFutureSupplier(timeLimiter, () -> futureMock); restrictedCall.call(); verify(futureMock).get(ttl, TimeUnit.MILLISECONDS);
咱们也能够将它与CircuitBreaker结合使用:
Callable chainedCallable = CircuitBreaker.decorateCallable(circuitBreaker, restrictedCall);
附加模块
Resilience4j还提供了许多附加模块,能够简化与流行框架和库的集成。
一些比较知名的集成是:
案例结论
在本文中,咱们了解了Resilience4j库的各个方面,并学习了如何使用它来解决服务器间通讯中的各类容错问题。