最近在重构项目中,须要兼容多数据源,故此实现下多数据源事务。java
此次重构项目中,为了支持后续庞大的数据量接入,更迭了数据库,可是为了要兼容老版本,也不能直接拿掉老的数据库。因此就有了兼容多数据源的需求,尤为是要保证事务。git
其实这个需求就是要实现分布式事务,可是咱们的这个场景是在一个服务内,因此能够利用AOP来轻量的实现这个需求,如果多个服务的话,就须要实现一个管理器。github
用过spring的都知道,咱们通常都是使用@Transactional
注解,可是这个注解在多数据源下,只能支持指定的数据源(不指定就是默认的)。spring
因此咱们新建个自定义注解:sql
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransactional {
String[] values() default "";
}
复制代码
再定义一个切面:数据库
/**
* 多数据源事务
*
* @author 7le
*/
@Slf4j
@Aspect
@Order(-7)
@Component
public class MultiTransactionalAspect {
@Autowired
private ApplicationContext applicationContext;
@Around(value = "@annotation(multiTransactional)")
public Object around(ProceedingJoinPoint pjp, MultiTransactional multiTransactional) throws Throwable {
Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack = new Stack<>();
Stack<TransactionStatus> transactionStatusStack = new Stack<>();
try {
if (!openTransaction(dataSourceTransactionManagerStack, transactionStatusStack, multiTransactional)) {
return null;
}
Object ret = pjp.proceed();
commit(dataSourceTransactionManagerStack, transactionStatusStack);
return ret;
} catch (Throwable e) {
rollback(dataSourceTransactionManagerStack, transactionStatusStack);
log.error(String.format(
"MultiTransactionalAspect catch exception class: %s method: %s detail:", pjp.getTarget().getClass().getSimpleName(),
pjp.getSignature().getName()), e);
throw e;
}
}
private boolean openTransaction(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
Stack<TransactionStatus> transactionStatusStack, MultiTransactional multiTransactional) {
String[] transactionMangerNames = multiTransactional.values();
if (ArrayUtils.isEmpty(multiTransactional.values())) {
return false;
}
for (String beanName : transactionMangerNames) {
DataSourceTransactionManager dataSourceTransactionManager = (DataSourceTransactionManager) applicationContext
.getBean(beanName);
TransactionStatus transactionStatus = dataSourceTransactionManager
.getTransaction(new DefaultTransactionDefinition());
transactionStatusStack.push(transactionStatus);
dataSourceTransactionManagerStack.push(dataSourceTransactionManager);
}
return true;
}
private void commit(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
Stack<TransactionStatus> transactionStatusStack) {
while (!dataSourceTransactionManagerStack.isEmpty()) {
dataSourceTransactionManagerStack.pop().commit(transactionStatusStack.pop());
}
}
private void rollback(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
Stack<TransactionStatus> transactionStatusStack) {
while (!dataSourceTransactionManagerStack.isEmpty()) {
dataSourceTransactionManagerStack.pop().rollback(transactionStatusStack.pop());
}
}
}
复制代码
这样就大功告成,只须要在须要的方法上,加上@MultiTransactional({"xxxx","xxxxx"})
bash
实现的代码在 springcloud-gateway架构
在使用的时候,须要注意一些细节,要加上@EnableTransactionManagement
。app
以及@MultiTransactional({"xxxx","xxxxx"})
是AOP的方式,那就意味着是动态代理的,那下面的方式就会失效:分布式
上面也提到过这个方案比较轻量,也是针对一些对数据一致性要求不高的场景,由于会存在数据不一致的可能。
咱们用伪代码来描述下,假设2个数据源
begin1
begin2
sql1
sql2
commit1
commit2
复制代码
这种方案是能够实如今sql1 sql2之间的异常回滚。若是出现commit1提交成功,commit2提交失败(或者超时)这种状况,就会形成数据不一致,虽然这种状况几率很低,但也是一个隐患。
这个实现很相似于2PC,都会有在一个参与者执行任务提交后,另外一个参与者出现异常而致使数据不一致的问题。
其实通常状况下,系统的需求只是要达到最终一致性,那就能够考虑使用TCC,对每一个事物进行Try,若是执行没有问题,再执行Confirm,若是执行过程当中出了问题,则执行操做的逆操Cancel(自动化补偿手段)。
可是TCC对每一个事务都须要Try,再执行Confirm,略微显得臃肿,根据不一样的业务场景能够有更好的方案(好比补偿模式,按期校对模式之类),具体的能够看分布式服务化系统一致性的“最佳实干”
抽了点时间,本身实现了一个分布式事务的中间件,在一些对数据一致性要求高的场景可使用。shine-mq 相应的设计思路在 分布式事务:基于可靠消息服务
李艳鹏. 著. 分布式服务架构:原理、设计与实战[M].