目录java
在不少业务场景中,为了排除系统中的各类不稳定因素,以及逻辑上的错误,并最大几率保证得到预期的结果,重试机制都是必不可少的。git
尤为是调用远程服务,在高并发场景下,极可能由于服务器响应延迟或者网络缘由,形成咱们得不到想要的结果,或者根本得不到响应。这个时候,一个优雅的重试调用机制,可让咱们更大几率保证获得预期的响应。github
一般状况下,咱们会经过定时任务进行重试。例如某次操做失败,则记录下来,当定时任务再次启动,则将数据放到定时任务的方法中,从新跑一遍。最终直至获得想要的结果为止。web
不管是基于定时任务的重试机制,仍是咱们本身写的简单的重试器,缺点都是重试的机制太单一,并且实现起来不优雅。spring
一个完备的重试实现,要很好地解决以下问题:apache
而且,为了更好地封装性,重试的实现通常分为两步:springboot
一个完整的重试流程能够简单示意为:bash
guava-retrying是基于谷歌的核心类库guava的重试机制实现,能够说是一个重试利器。服务器
下面就快速看一下它的用法。网络
1.Maven配置
<!-- https://mvnrepository.com/artifact/com.github.rholder/guava-retrying -->
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>
复制代码
须要注意的是,此版本依赖的是27.0.1版本的guava。若是你项目中的guava低几个版本没问题,可是低太多就不兼容了。这个时候你须要升级你项目的guava版本,或者直接去掉你本身的guava依赖,使用guava-retrying传递过来的guava依赖。
2.实现Callable
Callable<Boolean> callable = new Callable<Boolean>() {
public Boolean call() throws Exception {
return true; // do something useful here
}
};
复制代码
Callable的call方法中是你本身实际的业务调用。
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(Predicates.<Boolean>isNull())
.retryIfExceptionOfType(IOException.class)
.retryIfRuntimeException()
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
复制代码
retryer.call(callable);
复制代码
下面是完整的参考实现。
public Boolean test() throws Exception {
//定义重试机制
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
//retryIf 重试条件
.retryIfException()
.retryIfRuntimeException()
.retryIfExceptionOfType(Exception.class)
.retryIfException(Predicates.equalTo(new Exception()))
.retryIfResult(Predicates.equalTo(false))
//等待策略:每次请求间隔1s
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
//中止策略 : 尝试请求6次
.withStopStrategy(StopStrategies.stopAfterAttempt(6))
//时间限制 : 某次请求不得超过2s , 相似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))
.build();
//定义请求实现
Callable<Boolean> callable = new Callable<Boolean>() {
int times = 1;
@Override
public Boolean call() throws Exception {
log.info("call times={}", times);
times++;
if (times == 2) {
throw new NullPointerException();
} else if (times == 3) {
throw new Exception();
} else if (times == 4) {
throw new RuntimeException();
} else if (times == 5) {
return false;
} else {
return true;
}
}
};
//利用重试器调用请求
return retryer.call(callable);
}
复制代码
guava-retrying的核心是Attempt类、Retryer类以及一些Strategy(策略)相关的类。
Attempt既是一次重试请求(call),也是请求的结果,并记录了当前请求的次数、是否包含异常和请求的返回值。
/** * An attempt of a call, which resulted either in a result returned by the call, * or in a Throwable thrown by the call. * * @param <V> The type returned by the wrapped callable. * @author JB */
public interface Attempt<V> 复制代码
Retryer经过RetryerBuilder这个工厂类进行构造。RetryerBuilder负责将定义的重试策略赋值到Retryer对象中。
在Retryer执行call方法的时候,会将这些重试策略一一使用。
下面就看一下Retryer的call方法的具体实现。
/** * Executes the given callable. If the rejection predicate * accepts the attempt, the stop strategy is used to decide if a new attempt * must be made. Then the wait strategy is used to decide how much time to sleep * and a new attempt is made. * * @param callable the callable task to be executed * @return the computed result of the given callable * @throws ExecutionException if the given callable throws an exception, and the * rejection predicate considers the attempt as successful. The original exception * is wrapped into an ExecutionException. * @throws RetryException if all the attempts failed before the stop strategy decided * to abort, or the thread was interrupted. Note that if the thread is interrupted, * this exception is thrown and the thread's interrupt status is set. */
public V call(Callable<V> callable) throws ExecutionException, RetryException {
long startTime = System.nanoTime();
//说明: 根据attemptNumber进行循环——也就是重试多少次
for (int attemptNumber = 1; ; attemptNumber++) {
//说明:进入方法不等待,当即执行一次
Attempt<V> attempt;
try {
//说明:执行callable中的具体业务
//attemptTimeLimiter限制了每次尝试等待的时常
V result = attemptTimeLimiter.call(callable);
//利用调用结果构造新的attempt
attempt = new ResultAttempt<V>(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
} catch (Throwable t) {
attempt = new ExceptionAttempt<V>(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}
//说明:遍历自定义的监听器
for (RetryListener listener : listeners) {
listener.onRetry(attempt);
}
//说明:判断是否知足重试条件,来决定是否继续等待并进行重试
if (!rejectionPredicate.apply(attempt)) {
return attempt.get();
}
//说明:此时知足中止策略,由于尚未获得想要的结果,所以抛出异常
if (stopStrategy.shouldStop(attempt)) {
throw new RetryException(attemptNumber, attempt);
} else {
//说明:执行默认的中止策略——线程休眠
long sleepTime = waitStrategy.computeSleepTime(attempt);
try {
//说明:也能够执行定义的中止策略
blockStrategy.block(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RetryException(attemptNumber, attempt);
}
}
}
}
复制代码
Retryer执行过程以下。
基于guava-retrying的实现原理,咱们能够根据实际业务来肯定本身的重试策略。
下面以数据同步
这种常规系统业务为例,自定义重试策略。
以下实现基于Spring Boot 2.1.2.RELEASE版本。
并使用Lombok简化Bean。
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
复制代码
当商品建立之后,须要另外设置商品的价格。因为两个操做是有两我的进行的,所以会出现以下问题,即商品没有建立,可是价格数据却已经建好了。遇到这种状况,价格数据须要等待商品正常建立之后,继续完成同步。
咱们经过一个http请求进行商品的建立,同时经过一个定时器来修改商品的价格。
当商品不存在,或者商品的数量小于1的时候,商品的价格不能设置。须要等商品成功建立且数量大于0的时候,才能将商品的价格设置成功。
默认的阻塞策略是线程休眠,这里使用自旋锁实现,不阻塞线程。
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy;
import com.github.rholder.retry.BlockStrategy;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.LocalDateTime;
/** * 自旋锁的实现, 不响应线程中断 */
@Slf4j
@NoArgsConstructor
public class SpinBlockStrategy implements BlockStrategy {
@Override
public void block(long sleepTime) throws InterruptedException {
LocalDateTime startTime = LocalDateTime.now();
long start = System.currentTimeMillis();
long end = start;
log.info("[SpinBlockStrategy]...begin wait.");
while (end - start <= sleepTime) {
end = System.currentTimeMillis();
}
//使用Java8新增的Duration计算时间间隔
Duration duration = Duration.between(startTime, LocalDateTime.now());
log.info("[SpinBlockStrategy]...end wait.duration={}", duration.toMillis());
}
}
复制代码
RetryListener能够监控屡次重试过程,并可使用attempt
作一些额外的事情。
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RetryLogListener implements RetryListener {
@Override
public <V> void onRetry(Attempt<V> attempt) {
// 第几回重试,(注意:第一次重试实际上是第一次调用)
log.info("retry time : [{}]", attempt.getAttemptNumber());
// 距离第一次重试的延迟
log.info("retry delay : [{}]", attempt.getDelaySinceFirstAttempt());
// 重试结果: 是异常终止, 仍是正常返回
log.info("hasException={}", attempt.hasException());
log.info("hasResult={}", attempt.hasResult());
// 是什么缘由致使异常
if (attempt.hasException()) {
log.info("causeBy={}" , attempt.getExceptionCause().toString());
} else {
// 正常返回时的结果
log.info("result={}" , attempt.getResult());
}
log.info("log listen over.");
}
}
复制代码
有些异常须要重试,有些不须要。
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception;
/** * 当抛出这个异常的时候,表示须要重试 */
public class NeedRetryException extends Exception {
public NeedRetryException(String message) {
super("NeedRetryException can retry."+message);
}
}
复制代码
使用call方法调用本身的业务。
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.math.BigDecimal;
/** * 商品model */
@Data
@AllArgsConstructor
public class Product {
private Long id;
private String name;
private Integer count;
private BigDecimal price;
}
复制代码
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
import org.springframework.stereotype.Repository;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/** * 商品DAO */
@Repository
public class ProductRepository {
private static ConcurrentHashMap<Long,Product> products=new ConcurrentHashMap();
private static AtomicLong ids=new AtomicLong(0);
public List<Product> findAll(){
return new ArrayList<>(products.values());
}
public Product findById(Long id){
return products.get(id);
}
public Product updatePrice(Long id, BigDecimal price){
Product p=products.get(id);
if (null==p){
return p;
}
p.setPrice(price);
return p;
}
public Product addProduct(Product product){
Long id=ids.addAndGet(1);
product.setId(id);
products.put(id,product);
return product;
}
}
复制代码
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository.ProductRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
/** * 业务方法实现 */
@Component
@Slf4j
public class ProductInformationHander implements Callable<Boolean> {
@Autowired
private ProductRepository pRepo;
private static Map<Long, BigDecimal> prices = new HashMap<>();
static {
prices.put(1L, new BigDecimal(100));
prices.put(2L, new BigDecimal(200));
prices.put(3L, new BigDecimal(300));
prices.put(4L, new BigDecimal(400));
prices.put(8L, new BigDecimal(800));
prices.put(9L, new BigDecimal(900));
}
@Override
public Boolean call() throws Exception {
log.info("sync price begin,prices size={}", prices.size());
for (Long id : prices.keySet()) {
Product product = pRepo.findById(id);
if (null == product) {
throw new NeedRetryException("can not find product by id=" + id);
}
if (null == product.getCount() || product.getCount() < 1) {
throw new NeedRetryException("product count is less than 1, id=" + id);
}
Product updatedP = pRepo.updatePrice(id, prices.get(id));
if (null == updatedP) {
return false;
}
prices.remove(id);
}
log.info("sync price over,prices size={}", prices.size());
return true;
}
}
复制代码
将上面的实现做为参数,构造Retryer。
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import com.github.rholder.retry.*;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener.RetryLogListener;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy.SpinBlockStrategy;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/** * 构造重试器 */
@Component
public class ProductRetryerBuilder {
public Retryer build() {
//定义重试机制
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
//retryIf 重试条件
//.retryIfException()
//.retryIfRuntimeException()
//.retryIfExceptionOfType(Exception.class)
//.retryIfException(Predicates.equalTo(new Exception()))
//.retryIfResult(Predicates.equalTo(false))
.retryIfExceptionOfType(NeedRetryException.class)
//等待策略:每次请求间隔1s
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
//中止策略 : 尝试请求3次
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
//时间限制 : 某次请求不得超过2s , 相似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))
//默认的阻塞策略:线程睡眠
//.withBlockStrategy(BlockStrategies.threadSleepStrategy())
//自定义阻塞策略:自旋锁
.withBlockStrategy(new SpinBlockStrategy())
//自定义重试监听器
.withRetryListener(new RetryLogListener())
.build();
return retryer;
}
}
复制代码
定时任务只须要跑一次,可是实际上实现了全部的重试策略。这样大大简化了定时器的设计。
首先使用@EnableScheduling
声明项目支持定时器注解。
@SpringBootApplication
@EnableScheduling
public class DemoRetryerApplication {
public static void main(String[] args) {
SpringApplication.run(DemoRetryerApplication.class, args);
}
}
复制代码
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.task;
import com.github.rholder.retry.Retryer;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductInformationHander;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductRetryerBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/** * 商品信息定时器 */
@Component
public class ProductScheduledTasks {
@Autowired
private ProductRetryerBuilder builder;
@Autowired
private ProductInformationHander hander;
/** * 同步商品价格定时任务 * @Scheduled(fixedDelay = 30000) :上一次执行完毕时间点以后30秒再执行 */
@Scheduled(fixedDelay = 30*1000)
public void syncPrice() throws Exception{
Retryer retryer=builder.build();
retryer.call(hander);
}
}
复制代码
执行结果:因为并无商品,所以重试之后,抛出异常。
2019-二月-28 14:37:52.667 INFO [scheduling-1] n.i.t.f.s.i.d.r.g.l.RetryLogListener - log listen over.
2019-二月-28 14:37:52.672 ERROR [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler - Unexpected error occurred in scheduled task.
com.github.rholder.retry.RetryException: Retrying failed to complete successfully after 3 attempts.
at com.github.rholder.retry.Retryer.call(Retryer.java:174)
复制代码
你也能够增长一些商品数据,看一下重试成功的效果。
完整示例代码在这里。
因为项目中依赖的guava版本太低,启动项目时出现了以下异常。
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService;
at org.apache.curator.framework.listen.ListenerContainer.addListener(ListenerContainer.java:41)
at com.bzn.curator.ZkOperator.getZkClient(ZkOperator.java:207)
at com.bzn.curator.ZkOperator.checkExists(ZkOperator.java:346)
at com.bzn.curator.watcher.AbstractWatcher.initListen(AbstractWatcher.java:87)
at com.bzn.web.listener.NebulaSystemInitListener.initZkWatcher(NebulaSystemInitListener.java:84)
at com.bzn.web.listener.NebulaSystemInitListener.contextInitialized(NebulaSystemInitListener.java:33)
at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4939)
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5434)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1559)
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1549)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
复制代码
所以,要排除项目中低版本的guava依赖。
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
复制代码
同时,因为Guava在新版本中移除了sameThreadExecutor
方法,但目前项目中的ZK须要此方法,所以须要手动设置合适的guava版本。
果真,在19.0版本中MoreExecutors的此方法依然存在,只是标注为过时了。
@Deprecated
@GwtIncompatible("TODO")
public static ListeningExecutorService sameThreadExecutor() {
return new DirectExecutorService();
}
复制代码
声明依赖的guava版本改成19.0便可。
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
复制代码
在实际使用过程当中,有时常常须要调整重试的次数、等待的时间等重试策略,所以,将重试策略的配置参数化保存,能够动态调节。
例如在秒杀、双十一购物节等时期增长等待的时间与重试次数,以保证错峰请求。在平时,能够适当减小等待时间和重试次数。
对于系统关键性业务,若是屡次重试步成功,能够经过RetryListener进行监控与报警。
关于『动态调节重试策略 』下面提供一个参考实现:
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.WaitStrategy;
/** * 自定义等待策略:根据重试次数动态调节等待时间,第一次请求间隔1s,第二次间隔10s,第三次及之后都是20s。 * * * 在建立Retryer的时候经过withWaitStrategy将该等待策略生效便可。 * * RetryerBuilder.<Boolean>newBuilder() * .withWaitStrategy(new AlipayWaitStrategy()) * * 相似的效果也能够经过自定义 BlockStrategy 来实现,你能够写一下试试。 * */
public class AlipayWaitStrategy implements WaitStrategy {
@Override
public long computeSleepTime(Attempt failedAttempt) {
long number = failedAttempt.getAttemptNumber();
if (number==1){
return 1*1000;
}
if (number==2){
return 10*1000;
}
return 20*1000;
}
}
复制代码