前几天,在家里研究了下阿里巴巴开源的分布式事务中间件Seata
,并记录了一下过程。java
SpringBoot+Dubbo+Seata分布式事务实战git
不过光有实战不行,咱多少也得把原理搞搞清楚,否则出了问题不知咋解决岂不是很尴尬。github
首先,设想一个传统的单体应用,经过 3 个 模块,在同一个数据源上更新数据来完成一项业务。spring
很天然的,整个业务过程的数据一致性由本地事务来保证。sql
随着业务需求和架构的变化,单体应用被拆分为微服务。原来的3个模块被拆分为3个独立的服务,分别使用独立的数据。数据库
业务过程将经过RPC的服务调用来完成。springboot
那么这个时候,每个服务内部的数据一致性仍由本地事务来保证。bash
而整个业务层面的全局数据一致性和完整性要如何保障呢?这就是微服务架构下面临的,典型的分布式事务需求。服务器
Seata
把一个分布式事务理解成一个包含了若干 分支事务 的 全局事务 。全局事务 的职责是协调其下管辖的 分支事务 达成一致,要么一块儿成功提交,要么一块儿失败回滚。此外,一般 分支事务 自己就是一个知足 ACID 的 本地事务。markdown
Seata
定义了3个组件来协议分布式事务的处理过程。
一个典型的分布式事务过程:
Seata有4种分布式事务解决方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式。<后两种实现还在官方计划版本中>
咱们的示例项目中,所用到的就是AT模式。在 AT 模式下,用户只需关注本身的“业务 SQL”,用户的 “业务 SQL” 做为一阶段,Seata 框架会自动生成事务的二阶段提交和回滚操做。
在一阶段,Seata 会拦截“业务 SQL”,首先解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,而后执行“业务 SQL”更新业务数据,在业务数据更新以后,再将其保存成“after image”,最后生成行锁。以上操做所有在一个数据库事务内完成,这样保证了一阶段操做的原子性。
二阶段若是是提交的话,由于“业务 SQL”在一阶段已经提交至数据库,因此Seata 框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理便可。
二阶段若是是回滚的话,Seata 就须要回滚一阶段已经执行的“业务 SQL”,还原业务数据。回滚方式即是用“before image”还原业务数据。
下面咱们从源码中来看看这整个流程是怎么串起来的。
为了方便看源码,首先就得把调试环境搞起,方便Debug。
Seata 源码:github.com/seata/seata 。
目前的版本是0.7.0-SNAPSHOT
,而后经过mvn install
将项目打包到本地。
咱们的SpringBoot+Seata
测试项目就能够引入这个依赖。
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
复制代码
为啥要这样干呢?由于Seata
不一样组件之间的通讯都是Netty
来完成的,在调试的时候,每每会由于超时而断开链接。
引入了本地版本,咱们就能够把心跳检测时间加长或者索性去掉,随便搞~
找到io.seata.server.Server
,直接运行main
方法,就启动了Seata服务,so easy~
咱们上面说Seata
定义了三个组件,其中有一个叫TC的事务协调器,就是指这个服务端。
咱们看看它具体干了些啥。
public class Server { public static void main(String[] args) throws IOException { //初始化参数解析器 ParameterParser parameterParser = new ParameterParser(args); //初始化RpcServer ,设置服务器参数 RpcServer rpcServer = new RpcServer(WORKING_THREADS); rpcServer.setHost(parameterParser.getHost()); rpcServer.setListenPort(parameterParser.getPort()); UUIDGenerator.init(1); //从文件或者数据库中加载Session SessionHolder.init(parameterParser.getStoreMode()); //初始化默认的协调器 DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer); coordinator.init(); rpcServer.setHandler(coordinator); //注册钩子程序 清理协调器相关资源 ShutdownHook.getInstance().addDisposable(coordinator); //127.0.0.1 and 0.0.0.0 are not valid here. if (NetUtil.isValidIp(parameterParser.getHost(), false)) { XID.setIpAddress(parameterParser.getHost()); } else { XID.setIpAddress(NetUtil.getLocalIp()); } XID.setPort(rpcServer.getListenPort()); //启动RPC服务 rpcServer.init(); System.exit(0); } } 复制代码
这里的RpcServer
是经过Netty实现的一个RPC服务端,用来接收并处理TM和RM的消息。本文的重点不在服务端,因此咱们先有一个大体的印象便可。
在项目中,咱们配置了SeataConfiguration
,其中的重点是配置全局事务扫描器和数据源代理。因此,咱们先来看看为啥要配置它们,它们具体又作了什么事。
@Bean public GlobalTransactionScanner globalTransactionScanner() { return new GlobalTransactionScanner("springboot-order", "my_test_tx_group"); } 复制代码
按照规矩,咱们看一个类,先看它的结构。好比它是谁的儿子,从哪里来,欲往何处去?
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware,DisposableBean {
}
复制代码
这里咱们看到它是AbstractAutoProxyCreator
的子类,又实现了InitializingBean
接口。
这俩哥们都是Spring你们族的成员,一个用于Spring AOP生成代理;一个用于调用Bean的初始化方法。
Bean的初始化方法有三种方式,按照前后顺序是,@PostConstruct、afterPropertiesSet、init-method
。
在这里,它的初始化方法中,主要就干了三件事。
private void initClient() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } //init TM 初始化事务管理器 TMClient.init(applicationId, txServiceGroup); //init RM 初始化资源管理器 RMClient.init(applicationId, txServiceGroup); //注册钩子程序,用于TM、RM的资源清理 registerSpringShutdownHook(); } 复制代码
到目前为止,Seata定义的三个组件都已经浮出水面了。
TMClient.init
主要是初始化事务管理器的客户端,创建与RPC服务端的链接,同时向事务协调器注册。
RMClient.init
也是同样过程,初始化资源管理器,创建与RPC服务端的链接,同时向事务协调器注册。
同时,它们都是经过定时任务来完成链接的,因此断线以后能够自动重连。
timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { clientChannelManager.reconnect(getTransactionServiceGroup()); } }, 5, 5, TimeUnit.SECONDS); 复制代码
最后,注册钩子程序,用于清理这两个组件中的资源。
它其实是一个Bean的后置处理器,在Bean初始化以后,调用postProcessAfterInitialization
方法。
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) { if (bean != null) { Object cacheKey = this.getCacheKey(bean.getClass(), beanName); if (this.earlyProxyReferences.remove(cacheKey) != bean) { return this.wrapIfNecessary(bean, beanName, cacheKey); } } return bean; } 复制代码
而后在GlobalTransactionScanner.wrapIfNecessary()
里它干了些什么呢?
就是检查Bean的方法上是否包含GlobalTransactional
和GlobalLock
注解,而后生成代理类。
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey){ if (disableGlobalTransaction) { return bean; } //已经生成了代理,直接返回 if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; //检查是否是TCC的代理 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); } else { Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); //判断类方法上是否包含GlobalTransactional注解和GlobalLock注解 if (!existsAnnotation(new Class[] {serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } //建立拦截器 if (interceptor == null) { interceptor = new GlobalTransactionalInterceptor(failureHandlerHook); } } //若是不是AOP代理,则建立代理;若是是代理,则将拦截器加入到Advisor if (!AopUtils.isAopProxy(bean)) { bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); for (Advisor avr : advisor) { advised.addAdvisor(0, avr); } } PROXYED_SET.add(beanName); return bean; } 复制代码
至此,咱们已经肯定了一件事。咱们ServiceImpl
实现类上带有GlobalTransactional
注解的方法,会生成一个代理类。
在调用方法时,实际会调用的就是代理类的拦截器方法invoke()
。
public class GlobalTransactionalInterceptor implements MethodInterceptor { @Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { //获取目标类 Class<?> targetClass = AopUtils.getTargetClass(methodInvocation.getThis()); //获取调用的方法 Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); //获取方法上的注解 final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class); //处理全局事务 if (globalTransactionalAnnotation != null) { return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation); } else { return methodInvocation.proceed(); } } } 复制代码
能够看到,这里是开始处理全局事务的地方。这里咱们先不深究,接着往下看。
除了上面建立方法的代理,还要建立数据源的代理;而后把这个代理对象设置到SqlSessionFactory
。
@Bean public DataSourceProxy dataSourceProxy(DataSource dataSource) { return new DataSourceProxy(dataSource); } @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSourceProxy); sqlSessionFactoryBean.setTransactionFactory(new JdbcTransactionFactory()); return sqlSessionFactoryBean.getObject(); } 复制代码
这里的重点是建立了DataSourceProxy
,并把它设置到Mybatis
中的SqlSessionFactory
。
咱们知道,在Mybatis
执行方法的时候,最终要建立PreparedStatement
对象,而后执行ps.execute()
返回SQL结果。
这里有两点咱们须要注意:
PreparedStatement
对象是从Connection
对象建立而来的,也许咱们都写过:
PreparedStatement pstmt = conn.prepareStatement(insert ........)
复制代码
Connection
又是从哪里来的呢?这个咱们没必要迟疑,固然从数据源中才能拿到一个链接。
不过咱们已经把数据源DataSource
对象已经被替换成了Seata
中的DataSourceProxy
对象。
因此,Connection和PreparedStatement
在建立的时候,都被搞成了Seata
中的代理对象。
不信你看嘛:
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource { public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy(this, targetConnection); } } 复制代码
而后调用AbstractDataSourceProxy
来建立PreparedStatement
。
public abstract class AbstractConnectionProxy implements Connection { @Override public PreparedStatement prepareStatement(String sql) throws SQLException { PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql); return new PreparedStatementProxy(this, targetPreparedStatement, sql); } } 复制代码
看到这里,咱们应该明白一件事。
在执行ps.execute()
的时候,则会调用到PreparedStatementProxy.execute()
。
理清了配置文件后面的逻辑,也许就掌握了它的脉络,再看代码的时候,能够知道从哪里下手。
上面已经说到,ServiceImpl
已是一个代理类,因此咱们直接看GlobalTransactionalInterceptor.invoke()
。
它会调用到TransactionalTemplate.execute()
,TransactionalTemplate
是业务逻辑和全局事务的模板。
public class TransactionalTemplate { public Object execute(TransactionalExecutor business) throws Throwable { // 1. 建立一个全局事务 GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.1 获取事务的属性 好比超时时间、事务名称 TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } try { // 2. 开始事务 beginTransaction(txInfo, tx); Object rs = null; try { // 执行业务逻辑 rs = business.execute(); } catch (Throwable ex) { // 3.回滚 completeTransactionAfterThrowing(txInfo,tx,ex); throw ex; } // 4. 提交 commitTransaction(tx); return rs; } finally { //5. 清理资源 triggerAfterCompletion(); cleanUp(); } } } 复制代码
这里的代码很清晰,事务的流程也一目了然。
下面咱们看看具体它是怎么作的。
从客户端的角度来看,开启事务就是告诉服务器说:我要开启一个全局事务了,请事务协调器TC先生分配一个全局事务ID给我。
TC先生会根据应用名称、事务分组、事务名称等建立全局Session,并生成一个全局事务XID。
而后客户端记录当前的事务状态为Begin
,并将XID绑定到当前线程。
开启事务以后,开始执行咱们本身的业务逻辑。这就涉及到了数据库操做,上面咱们说到Seata
已经将PreparedStatement
对象作了代理。因此在执行的时候将会调用到PreparedStatementProxy.execute()
。
public class PreparedStatementProxy{ public boolean execute() throws SQLException { return ExecuteTemplate.execute(this, new StatementCallback<Boolean, PreparedStatement>() { @Override public Boolean execute(PreparedStatement statement, Object... args) throws SQLException { return statement.execute(); } }); } } 复制代码
在这里它会先根据SQL的类型生成不一样的执行器。好比是一个INSERT INTO
语句,那么就是InsertExecutor
执行器。
而后判断是否是自动提交的,执行相应方法。那么接着看executeAutoCommitFalse()
public abstract class AbstractDMLBaseExecutor{ protected T executeAutoCommitFalse(Object[] args) throws Throwable { TableRecords beforeImage = beforeImage(); T result = statementCallback.execute(statementProxy.getTargetStatement(), args); TableRecords afterImage = afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); return result; } } 复制代码
这里就是AT模式一阶段所作的事,拦截业务SQL,在数据保存前将其保存为beforeImage
;而后执行业务SQL,在数据更新后再将其保存为afterImage
。这些操做所有在一个本地事务中完成,保证了一阶段操做的原子性。
咱们以INSERT INTO
为例,看看它是怎么作的。
因为是新增操做,因此在执行以前,这条记录尚未,beforeImage只是一个空表记录。
执行原有的SQL语句,好比INSERT INTO ORDER(ID,NAME)VALUE(?,?)
它要作的事就是,把刚刚添加的那条记录从数据库中再查出来。
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException { //查找主键ID的值 List<Object> pkValues = containsPK() ? getPkValuesByColumn() : getPkValuesByAuto(); //根据主键ID查找记录 TableRecords afterImage = getTableRecords(pkValues); return afterImage; } 复制代码
而后将beforeImage
和afterImage
构建成UndoLog
对象,保存到数据库。重要的是,这些操做都是在同一个本地事务中进行的。咱们看它的sqlList也能看出来。
最后,咱们看一下UndoLog
在数据库中的记录是长这样的:
{ "@class": "io.seata.rm.datasource.undo.BranchUndoLog", "xid": "192.168.216.1:8091:2016493467", "branchId": 2016493468, "sqlUndoLogs": ["java.util.ArrayList", [{ "@class": "io.seata.rm.datasource.undo.SQLUndoLog", "sqlType": "INSERT", "tableName": "t_order", "beforeImage": { "@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords", "tableName": "t_order", "rows": ["java.util.ArrayList", []] }, "afterImage": { "@class": "io.seata.rm.datasource.sql.struct.TableRecords", "tableName": "t_order", "rows": ["java.util.ArrayList", [{ "@class": "io.seata.rm.datasource.sql.struct.Row", "fields": ["java.util.ArrayList", [{ "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "id", "keyType": "PrimaryKey", "type": 4, "value": 116 }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "order_no", "keyType": "NULL", "type": 12, "value": "c233d8fb-5e71-4fc1-bc95-6f3d86312db6" }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "user_id", "keyType": "NULL", "type": 12, "value": "200548" }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "commodity_code", "keyType": "NULL", "type": 12, "value": "HYD5620" }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "count", "keyType": "NULL", "type": 4, "value": 10 }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "amount", "keyType": "NULL", "type": 8, "value": 5000.0 }]] }]] } }]] } 复制代码
若是执行业务没有异常,就进入二阶段提交。客户端向服务器发送Commit事件,同时将XID解绑。
服务器端回复确认提交后,客户端将本地UndoLog数据清除。
这里重要在AsyncWorker.init()
方法,它会启动一个定时任务来执行doBranchCommits
,来清除Log数据。
若是发生异常,则进行二阶段回滚。
先经过xid和branchId 找到UnDoLog这条记录,而后在解析里面的数据生成反向SQL,将刚才的执行结果给撤销。
这块代码较长,你们自行参考UndoLogManager.undo()
和AbstractUndoExecutor.executeOn()
方法。
只有一个事务管理器TM才会开启全局事务,那么其余服务参与者是如何自动归入到全局事务中去的呢?
首先,Seata
给Dubbo搞了个Filter过滤器叫作TransactionPropagationFilter
。
它会在Dubbo RPC
上下文中设置XID,这样在其余服务中也能够获取这个XID。
而后,咱们知道,Seata已经代理了PreparedStatement
。在执行数据操做的时候,就有个判断。
if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) { //若是不包含XID,就执行原始方法 return statementCallback.execute(statementProxy.getTargetStatement(), args); } 复制代码
这里的意思就是,若是当前线程不包含XID,就执行原始方法;若是包含呢,就继续往下执行事务方法。
本文大概阐述了Seata TA模式下,客户端的工做原理。还有一部分Seata服务端的逻辑,本文并无深刻涉及。
缘由在于笔者尚未彻底的吃透这部份内容,没办法通俗的写出来,等之后再补~
如若文中有不许确的地方,也但愿朋友们不吝赐教,谢谢。