学习TCC分布式事务的知识是使用了GIT上的一个开源项目,以前有简单的看过一些,有了一个大概的了解,可是随着时间的‘清洗’,又开始变得‘浑浊不清’了,此次索性把这份源码从头看了下,又把流程推演了好几回,以为已经懂了,想把理解的东西经过博客写出来。在这个过程当中,再次梳理一遍,加深理解,同时也能够发现一些理解误差和错误的地方。
GIT: https://github.com/1755728288...java
在分布式系统中,为了保证执行指令的正确性,引入了事务的概念。通常意义上,事务主要突出的是ACID,即原子性,一致性,隔离性和持久性。而在分布式事务中,最主要的原子性的保证,要保证一个业务操做在其分布式系统中的全部指令所有执行或不执行。由于各个指令的操做耗时以及顺序,因此原子性都对应了必定的时间窗口,好比单机系统下,这个时间窗口很是短,并且其原子性也由数据库事务来保证。而在分布式系统中,原子性就依赖于具体的应用设计了,主要的依据是业务上容许的时间窗口的长短。目前通常来讲,若容许的时间窗口较短,就用TCC,若容许的时间窗口较长,则使用最终一致性。最终一致性一般使用消息机制来设计,其核心是消息的安全送达与消费。git
TCC能够说是一种设计模式,将传统单机系统中的大事务进行拆分,经过小事务来拼装,并协调总体的事务提交和回滚。按字面意思理解,TCC主要分Try,Confirm,Cancel三个阶段,Try预留资源,Confirm使用预留资源执行业务操做,Cancel则是释放资源。貌似简单,可是在实际的业务场景中,每每会困惑咱们在这三个阶段分别要作什么,这个是业务设计的一部份内容了,在此不展开叙述。不过要注意的一点时,任何一个阶段的结果都是可见的,好比一个库存子服务的入库方法,在try阶段就直接加到库存上去了,回滚的时候发现刚加上的库存已经有买家下单准备出库了,那就GG了。github
先放个图,这是项目的组件,上面绿色区域是框架的子模块,下面黄色的是样例模块。不要被黄色区域吓到,TCC的代码很少,主要是tcc-transaction-core.spring
组件名 | 说明 |
---|---|
tcc-transaction-core | 【重要】核心代码,主要的切面和job |
tcc-transaction-api | 注解和常量 |
tcc-transaction-spring | spring使用的扩展 |
tcc-transaction-dubbo | dubbo使用时的扩展 |
tcc-transaction-server | tcc事务管理控制台 |
tcc-transaction-unit-test | 单元测试 |
tcc-transaction-tutorial-sample | 订单场景示例工程,分dubbo和spring版本 |
因此重点只有一个模块,就是tcc-transaction-core,代码量很少,主要分红下面的模块数据库
模块名 | 说明 |
---|---|
interceptor | 【重要】2个切面,一个是根据事务状态进行tcc阶段方法的选择,一个是组织事务的参与者,用XID将各个参与者绑起来,以便事务的提交和回滚 |
recover | 恢复在同步调用失败事务的定时处理 |
repository | 事务对象持久DAO对象,提供了多种选择:缓存,文件系统,jdbc,zookeeper |
context | 传递事务上下文的方法类,通常会在远程方法调用的切面中,将上下文加入到参数列表中 |
common | 方法枚举和事务枚举 |
serializer | 事务对象的序列化器,使用了kyto序列化工具 |
support | 实现了工厂类,管理TCC的类的实例化 |
utils | 工具类 |
最重要的是interceptor,是主要业务逻辑所在。设计模式
题外话,有人初看了一遍TCC,拿来和数据库事务来比较,会说‘TCC不就是业务补偿么,没什么难点啊’,貌似有道理,其实否则。还记得数据库事务的ACID吗?由于数据库提供了事务特性,ACID由数据库保证,应用只须要一个简单的@Transactional注解就都搞定了。而分布式事务,就把ACID的特性从数据库里面拿了出来,由应用程序来保证。固然ACID也有了和以前不同的含义。api
特性 | 数据库 | 分布式事务 |
---|---|---|
A[Atomicity] | 全部数据库更新一块儿提交和回滚,数据库经过日志来实现 | 经过协调各个事务的参与方,经过框架来处理异常 |
C[Consistncy] | 经过数据库约束来实现,好比非空,惟一,外键等 | |
I[Isolation] | 各个数据库实现方式不同,主要分读已提交,读未提交等 | TCC经过try阶段锁定资源来实现隔离,即业务资源的隔离 |
D[Durability] | 数据库实现 | 数据库实现 |
以项目中的例子来讲明整个TCC的处理流程,这个例子是相似电商下单的场景,在支付的时候能够选择红包或本金,好比一个笔记本4000,能够选择使用3000的本金和1000的红包来支付,有三个服务:订单服务(order),本金服务(capital)和红包服务(redPacket)。主调方是Order,被调方Capital和RedPacket.缓存
下面是绞尽脑汁画的调用流程图,为了图的简便直观,省去了cancel的处理,只有try和confirm的调用(cancel的处理和confirm基本一致)。红线是try的调用,蓝线是confirm的调用。粗红线是开始,粗蓝线是结束。
在try阶段的makePayment,方法调用了两个子事务的api方法,其余的方法的调用均是全局事务切面决定的。业务方法只要按TCC框架的要求实现便可,其余的交给TCC框架。安全
咱们从TCC事务注解开始,先介绍几个基本的类。框架
/** 属性主要是事务传播属性,提交方法,回滚方法,上下文传递类,是否异步提交,是否异步回滚 propagation属性比较重要,值有required,support,mandatory和requireNEW,具体的含义和spring的事务传播相似。 **/ public @interface Compensable { public Propagation propagation() default Propagation.REQUIRED; public String confirmMethod() default ""; public String cancelMethod() default ""; public Class<? extends TransactionContextEditor> transactionContextEditor() default DefaultTransactionContextEditor.class; public boolean asyncConfirm() default false; public boolean asyncCancel() default false;
/** 事务状态,在TCC的不通阶段进行事务状态的变动。 **/ public enum TransactionStatus { TRYING(1), CONFIRMING(2), CANCELLING(3);
/** ROOT:事务启动方法,好比例子中order的makePayment方法 PROVIDER:事务协同方法,好比例子中redpacket的record方法 Normal:普通方法,好比API里面的record方法 **/ public enum MethodType { ROOT, PROVIDER, NORMAL; }
/** ROOT:主事务,通常MethodType为ROOT的启动 BRANCH:分支事务 **/ public enum TransactionType { ROOT(1), BRANCH(2);
下面咱们看一下事务持久的内容,能更好的帮助理解。每一个服务都会有一张事务表,会记录全部的ROOT和BRANCH事务,在事务完成以后,会自动删除。
TRANSACTION_ID | DOMAIN | GLOBAL_TX_ID | BRANCH_QUALIFIER | CONTENT | STATUS | TRANSACTION_TYPE | RETRIED_COUNT | VERSION |
---|---|---|---|---|---|---|---|---|
2 | ORDER | 事务编号001 | 分支标识A | 事务对象A | 2 | 1 | 0 | 11 |
2 | CAPITAL | 事务编号001 | 分支标识B | 事务对象B | 2 | 2 | 0 | 11 |
2 | REDPACKET | 事务编号001 | 分支标识C | 事务对象C | 2 | 2 | 0 | 11 |
domain区分了服务
global_tx_id 串起了全部的事务
transaction_type 1为主事务,2为分支事务
content 用于恢复事务,主事务的content中包含了参与者
代码部分的最后是两个切面和恢复的job,不进行特别的解释了,将个人理解放在代码的注释里面。
//CompensableTransactionInterceptor.java public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable { //获取所执行事务方法的设置属性 Method method = CompensableMethodUtils.getCompensableMethod(pjp); Compensable compensable = method.getAnnotation(Compensable.class); Propagation propagation = compensable.propagation(); TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()); boolean asyncConfirm = compensable.asyncConfirm(); boolean asyncCancel = compensable.asyncCancel(); //当前是否存在事务, boolean isTransactionActive = transactionManager.isTransactionActive(); /** * 判断方法类型 * MethodType一共有三种,Root,Provider,Normal. 主要经过propagation和isTransactionActive来肯定 */ MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext); switch (methodType) { case ROOT: //ROOT:(1)propagation为require_new (2)propataion为require,且以前没有有事务存在 //主事务处理方法 return rootMethodProceed(pjp, asyncConfirm, asyncCancel); case PROVIDER: //PROVIDER:(1) propation为require或者mandatory,且以前有事务 //从事务处理方法 return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel); default: //普通方法执行 return pjp.proceed(); } } private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable { Object returnValue = null; Transaction transaction = null; try { //开启新的主事务,使用新生成的xid,状态为trying,事务类型为ROOT transaction = transactionManager.begin(); try { returnValue = pjp.proceed(); } catch (Throwable tryingException) { if (!isDelayCancelException(tryingException)) { logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException); //异常回滚,将事务状态更新为CANCELLING transactionManager.rollback(asyncCancel); } throw tryingException; } //执行confirm方法,将事务状态更新为CONFIRMING。若是是异步,则TM会使用线程池异步执行,不然直接调用,会协调全部的参与者进行提交。并将事务记录删除 transactionManager.commit(asyncConfirm); } finally { //清理事务数据 transactionManager.cleanAfterCompletion(transaction); } return returnValue; } private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable { Transaction transaction = null; try { switch (TransactionStatus.valueOf(transactionContext.getStatus())) { case TRYING: //开启一个子事务,并调用TCC的try方法 transaction = transactionManager.propagationNewBegin(transactionContext); return pjp.proceed(); case CONFIRMING: //获取子事务TRY阶段的事务,并调用TCC的commit方法 try { transaction = transactionManager.propagationExistBegin(transactionContext); transactionManager.commit(asyncConfirm); } catch (NoExistedTransactionException excepton) { //the transaction has been commit,ignore it. } break; case CANCELLING: //获取子事务保存的事务数据,执行cancel方法 try { transaction = transactionManager.propagationExistBegin(transactionContext); transactionManager.rollback(asyncCancel); } catch (NoExistedTransactionException exception) { //the transaction has been rollback,ignore it. } break; } } finally { //清理事务数据 transactionManager.cleanAfterCompletion(transaction); } Method method = ((MethodSignature) (pjp.getSignature())).getMethod(); return ReflectionUtils.getNullValue(method.getReturnType()); } private boolean isDelayCancelException(Throwable throwable) { if (delayCancelExceptions != null) { for (Class delayCancelException : delayCancelExceptions) { Throwable rootCause = ExceptionUtils.getRootCause(throwable); if (delayCancelException.isAssignableFrom(throwable.getClass()) || (rootCause != null && delayCancelException.isAssignableFrom(rootCause.getClass()))) { return true; } } } return false; }
代码分析的比较少,这份代码仍是有不少值得称道的地方,工厂类,缓存,模板方法等设计模式的使用,下次能够从设计模式的角度来进行分析。除了TCC,最近项目中还涉及了安全消息,等弄清楚了再来一发。