摘要: 原创出处 http://www.iocoder.cn/TCC-Transaction/transaction-recovery/ 「芋道源码」欢迎转载,保留摘要,谢谢!git
本文主要基于 TCC-Transaction 1.2.3.3 正式版github
🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:spring
- RocketMQ / MyCAT / Sharding-JDBC 全部源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将获得认真回复。甚至不知道如何读源码也能够请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
- 认真的源码交流微信群。
本文分享 TCC 恢复。主要涉及以下二个 package 路径下的类:数据库
org.mengyun.tcctransaction.recover
org.mengyun.tcctransaction.spring.recover
:
本文涉及到的类关系以下图( 打开大图 ):服务器
在《TCC-Transaction 源码分析 —— 事务存储器》中,事务信息被持久化到外部的存储器中。事务存储是事务恢复的基础。经过读取外部存储器中的异常事务,定时任务会按照必定频率对事务进行重试,直到事务完成或超过最大重试次数。微信
你行好事会由于获得赞扬而愉悦
同理,开源项目贡献者会由于 Star 而更加有动力
为 TCC-Transaction 点赞!传送门架构
ps:笔者假设你已经阅读过《tcc-transaction 官方文档 —— 使用指南1.2.x》。并发
org.mengyun.tcctransaction.recover.RecoverConfig
,事务恢复配置接口,实现代码以下:app
public interface RecoverConfig {
/** * @return 最大重试次数 */
int getMaxRetryCount();
/** * @return 恢复间隔时间,单位:秒 */
int getRecoverDuration();
/** * @return cron 表达式 */
String getCronExpression();
/** * @return 延迟取消异常集合 */
Set<Class<? extends Exception>> getDelayCancelExceptions();
/** * 设置延迟取消异常集合 * * @param delayRecoverExceptions 延迟取消异常集合 */
void setDelayCancelExceptions(Set<Class<? extends Exception>> delayRecoverExceptions);
}
复制代码
#getMaxRetryCount()
,单个事务恢复最大重试次数。超过最大重试次数后,目前仅打出错误日志,下文会看到实现。#getRecoverDuration()
,单个事务恢复重试的间隔时间,单位:秒。#getCronExpression()
,定时任务 cron 表达式。#getDelayCancelExceptions()
,延迟取消异常集合。org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig
,默认事务恢复配置实现,实现代码以下:框架
public class DefaultRecoverConfig implements RecoverConfig {
public static final RecoverConfig INSTANCE = new DefaultRecoverConfig();
/** * 最大重试次数 */
private int maxRetryCount = 30;
/** * 恢复间隔时间,单位:秒 */
private int recoverDuration = 120;
/** * cron 表达式 */
private String cronExpression = "0 */1 * * * ?";
/** * 延迟取消异常集合 */
private Set<Class<? extends Exception>> delayCancelExceptions = new HashSet<Class<? extends Exception>>();
public DefaultRecoverConfig() {
delayCancelExceptions.add(OptimisticLockException.class);
delayCancelExceptions.add(SocketTimeoutException.class);
}
@Override
public void setDelayCancelExceptions(Set<Class<? extends Exception>> delayCancelExceptions) {
this.delayCancelExceptions.addAll(delayCancelExceptions);
}
}
复制代码
maxRetryCount
,单个事务恢复最大重试次数 为 30。recoverDuration
,单个事务恢复重试的间隔时间为 120 秒。cronExpression
,定时任务 cron 表达式为 "0 */1 * * * ?"
,每分钟执行一次。若是你但愿定时任务执行的更频繁,能够修改 cron 表达式,例如 0/30 * * * * ?
,每 30 秒执行一次。delayCancelExceptions
,延迟取消异常集合。在 DefaultRecoverConfig 构造方法里,预先添加了 OptimisticLockException / SocketTimeoutException 。
org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob
,事务恢复定时任务,基于 Quartz 实现调度,不断不断不断执行事务恢复。实现代码以下:
public class RecoverScheduledJob {
private TransactionRecovery transactionRecovery;
private TransactionConfigurator transactionConfigurator;
private Scheduler scheduler;
public void init() {
try {
// Quartz JobDetail
MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
jobDetail.setTargetObject(transactionRecovery);
jobDetail.setTargetMethod("startRecover");
jobDetail.setName("transactionRecoveryJob");
jobDetail.setConcurrent(false); // 禁止并发
jobDetail.afterPropertiesSet();
// Quartz CronTriggerFactoryBean
CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
cronTrigger.setBeanName("transactionRecoveryCronTrigger");
cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
cronTrigger.setJobDetail(jobDetail.getObject());
cronTrigger.afterPropertiesSet();
// 启动任务调度
scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
// 启动 Quartz Scheduler
scheduler.start();
} catch (Exception e) {
throw new SystemException(e);
}
}
}
复制代码
MethodInvokingJobDetailFactoryBean#setConcurrent(false)
方法,禁用任务并发执行。MethodInvokingJobDetailFactoryBean#setTargetObject(...)
+ MethodInvokingJobDetailFactoryBean#setTargetMethod(...)
方法,设置任务调用 TransactionRecovery#startRecover(...)
方法执行。若是应用集群部署,会不会相同事务被多个定时任务同时重试?
答案是不会,事务在重试时会乐观锁更新,同时只有一个应用节点能更新成功。
官方解释:多机部署下,全部机器都宕机,从异常中恢复时,全部的机器岂不是均可以查询到全部的须要恢复的服务?
固然极端状况下,Socket 调用超时时间大于事务重试间隔,第一个节点在重试某个事务,一直未执行完成,第二个节点已经能够重试。
ps:建议,Socket 调用超时时间小于事务重试间隔。
是否认时任务和应用服务器解耦?
蚂蚁金服的分布式事务服务 DTS 采用 client-server 模式:
FROM 《蚂蚁金融云 DTS 文档》
分布式事务服务 (Distributed Transaction Service, DTS) 是一个分布式事务框架,用来保障在大规模分布式环境下事务的最终一致性。DTS 从架构上分为 xts-client 和 xts-server 两部分,前者是一个嵌入客户端应用的 JAR 包,主要负责事务数据的写入和处理;后者是一个独立的系统,主要负责异常事务的恢复。
org.mengyun.tcctransaction.recover.TransactionRecovery
,异常事务恢复,实现主体代码以下:
public class TransactionRecovery {
/** * 启动恢复事务逻辑 */
public void startRecover() {
// 加载异常事务集合
List<Transaction> transactions = loadErrorTransactions();
// 恢复异常事务集合
recoverErrorTransactions(transactions);
}
}
复制代码
调用 #loadErrorTransactions()
方法,加载异常事务集合。实现代码以下:
private List<Transaction> loadErrorTransactions() {
TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();
return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));
}
复制代码
RecoverConfig#getRecoverDuration()
)。这里有一点要注意,已完成的事务会从事务存储器删除。调用 #recoverErrorTransactions(...)
方法,恢复异常事务集合。实现代码以下:
private void recoverErrorTransactions(List<Transaction> transactions) {
for (Transaction transaction : transactions) {
// 超过最大重试次数
if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));
continue;
}
// 分支事务超过最大可重试时间
if (transaction.getTransactionType().equals(TransactionType.BRANCH)
&& (transaction.getCreateTime().getTime() +
transactionConfigurator.getRecoverConfig().getMaxRetryCount() *
transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000
> System.currentTimeMillis())) {
continue;
}
// Confirm / Cancel
try {
// 增长重试次数
transaction.addRetriedCount();
// Confirm
if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
transaction.changeStatus(TransactionStatus.CONFIRMING);
transactionConfigurator.getTransactionRepository().update(transaction);
transaction.commit();
transactionConfigurator.getTransactionRepository().delete(transaction);
// Cancel
} else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)
|| transaction.getTransactionType().equals(TransactionType.ROOT)) { // 处理延迟取消的状况
transaction.changeStatus(TransactionStatus.CANCELLING);
transactionConfigurator.getTransactionRepository().update(transaction);
transaction.rollback();
transactionConfigurator.getTransactionRepository().delete(transaction);
}
} catch (Throwable throwable) {
if (throwable instanceof OptimisticLockException
|| ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {
logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
} else {
logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
}
}
}
}
复制代码
TransactionManager#commit()
相似。TransactionManager#rollback()
相似。这里加判断的事务类型为根事务,用于处理延迟回滚异常的事务的回滚。在写本文的过程当中,无心中翻到蚂蚁云的文档,分享给看到此处的真爱们。
真爱们,请猛击《AntCloudPayPublic》跳转。
胖友,分享一个朋友圈可好?