普通事务管理的过程html
外部(全局)事务-JTAjava
JTA事务管理的过程mysql
使用应用服务器web
不使用应用服务器(通常使用的是Atomikos)spring
XA与JTAsql
XA规范的JAVA实现-JTA数据库
上图中JTA是事务管理器在Java中的实现,它的全称为Java Transaction API.XAResource是Java中对Resource规范的实现。服务器
JTA网络
咱们来看一下TransactionManager接口mybatis
public interface TransactionManager { /** * 开启一个事务 * */ public void begin() throws NotSupportedException, SystemException; /** * 提交一个事务 * */ public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException; /** * 获取事务状态 * */ public int getStatus() throws SystemException; /** * 开启一个事务 * */ public Transaction getTransaction() throws SystemException; /** * 继续挂起的事务 */ public void resume(Transaction tobj) throws InvalidTransactionException, IllegalStateException, SystemException; /** * 回滚 * */ public void rollback() throws IllegalStateException, SecurityException, SystemException; /** * 设置回滚只读 * */ public void setRollbackOnly() throws IllegalStateException, SystemException; /** * 设置事务的超时时间 * */ public void setTransactionTimeout(int seconds) throws SystemException; /** * 挂起一个事务 * */ public Transaction suspend() throws SystemException; }
XAResource接口
public interface XAResource { int TMENDRSCAN = 8388608; int TMFAIL = 536870912; int TMJOIN = 2097152; int TMNOFLAGS = 0; int TMONEPHASE = 1073741824; int TMRESUME = 134217728; int TMSTARTRSCAN = 16777216; int TMSUCCESS = 67108864; int TMSUSPEND = 33554432; int XA_RDONLY = 3; int XA_OK = 0; //控制某个id的事务进行第几阶段的提交 void commit(Xid var1, boolean var2) throws XAException; void end(Xid var1, int var2) throws XAException; void forget(Xid var1) throws XAException; //获取事务的超时时间 int getTransactionTimeout() throws XAException; //是否在同一个ResourceManager里面呢 boolean isSameRM(XAResource var1) throws XAException; //准备一个全局的事务 int prepare(Xid var1) throws XAException; //恢复一个全局事务 Xid[] recover(int var1) throws XAException; void rollback(Xid var1) throws XAException; boolean setTransactionTimeout(int var1) throws XAException; void start(Xid var1, int var2) throws XAException; }
XID接口
public interface Xid { int MAXGTRIDSIZE = 64; int MAXBQUALSIZE = 64; int getFormatId(); byte[] getGlobalTransactionId(); byte[] getBranchQualifier(); }
JTA事务管理的弊端
如今咱们用一个样例来讲明JTA事务管理,咱们先在不一样的数据库中添加两张表
pom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>8.0.11</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.29</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> </dependencies>
配置文件
logging: level: root: info com.guanjiann: debug file: logs/${spring.application.name}.log server: port: 8080 spring: application: name: Twocommit datasource: test1: driver-class-name: com.mysql.cj.jdbc.Driver jdbcurl: jdbc:mysql://xxx.xxx.xxx.xxx:3306/cloud_resource?useSSL=FALSE&serverTimezone=GMT%2B8 username: root password: abcd123 type: com.alibaba.druid.pool.DruidDataSource filters: stat maxActive: 20 initialSize: 1 maxWait: 60000 minIdle: 1 timeBetweenEvictionRunsMillis: 60000 minEvictableIdleTimeMillis: 300000 validationQuery: select 'x' testWhileIdle: true testOnBorrow: false testOnReturn: false poolPreparedStatements: true maxOpenPreparedStatements: 20 minPoolSize: 3 maxPoolSize: 25 maxLifetime: 20000 borrowConnectionTimeout: 30 loginTimeout: 30 maintenanceInterval: 60 maxIdleTime: 60 test2: driver-class-name: com.mysql.cj.jdbc.Driver jdbcurl: jdbc:mysql://xxx.xxx.xxx.xxx:3306/cloud_resource_base?useSSL=FALSE&serverTimezone=GMT%2B8 username: root password: abcd123 type: com.alibaba.druid.pool.DruidDataSource filters: stat maxActive: 20 initialSize: 1 maxWait: 60000 minIdle: 1 timeBetweenEvictionRunsMillis: 60000 minEvictableIdleTimeMillis: 300000 validationQuery: select 'x' testWhileIdle: true testOnBorrow: false testOnReturn: false poolPreparedStatements: true maxOpenPreparedStatements: 20 minPoolSize: 3 maxPoolSize: 25 maxLifetime: 20000 borrowConnectionTimeout: 30 loginTimeout: 30 maintenanceInterval: 60 maxIdleTime: 60 mybatis: type-aliases-package: com.guanjian.twocommit.domain mapper-locations: classpath:/mybatis-mappers/* configuration: mapUnderscoreToCamelCase: true
SpringBoot启动类
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}, scanBasePackages = {"com.guanjian.twocommit"}) @EnableTransactionManagement @EnableConfigurationProperties(value = {DBConfig1.class, DBConfig2.class}) public class TwocommitApplication extends SpringBootServletInitializer { @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(TwocommitApplication.class); } public static void main(String[] args) { SpringApplication.run(TwocommitApplication.class, args); } }
配置文件读取类
@Data @ConfigurationProperties(prefix = "spring.datasource.test1") public class DBConfig1 { private String jdbcurl; private String username; private String password; private int minPoolSize; private int maxPoolSize; private int maxLifetime; private int borrowConnectionTimeout; private int loginTimeout; private int maintenanceInterval; private int maxIdleTime; private String testQuery; }
@Data @ConfigurationProperties(prefix = "spring.datasource.test2") public class DBConfig2 { private String jdbcurl; private String username; private String password; private int minPoolSize; private int maxPoolSize; private int maxLifetime; private int borrowConnectionTimeout; private int loginTimeout; private int maintenanceInterval; private int maxIdleTime; private String testQuery; }
Mybatis整合atomikos全局事务管理类
@Configuration @MapperScan(basePackages = "com.guanjian.twocommit.dao", sqlSessionTemplateRef = "test1SqlSessionTemplate") public class MyBatisConfig1 { @Bean(name = "test1DataSource") //test1DataSource public DataSource testDataSource(DBConfig1 testConfig) throws SQLException { MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); //mysqlXaDataSource.setUrl(testConfig.getUrl()); mysqlXaDataSource.setUrl(testConfig.getJdbcurl()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); mysqlXaDataSource.setPassword(testConfig.getPassword()); mysqlXaDataSource.setUser(testConfig.getUsername()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); // 将本地事务注册到创 Atomikos全局事务 AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(mysqlXaDataSource); xaDataSource.setUniqueResourceName("test1DataSource"); xaDataSource.setMinPoolSize(testConfig.getMinPoolSize()); xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize()); xaDataSource.setMaxLifetime(testConfig.getMaxLifetime()); xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout()); xaDataSource.setLoginTimeout(testConfig.getLoginTimeout()); xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval()); xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime()); xaDataSource.setTestQuery(testConfig.getTestQuery()); return xaDataSource; } @Bean(name = "test1SqlSessionFactory") public SqlSessionFactory testSqlSessionFactory(@Qualifier("test1DataSource") DataSource dataSource) throws Exception { SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); return bean.getObject(); } @Bean(name = "test1SqlSessionTemplate") public SqlSessionTemplate testSqlSessionTemplate( @Qualifier("test1SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception { return new SqlSessionTemplate(sqlSessionFactory); } }
@Configuration @MapperScan(basePackages = "com.guanjian.twocommit.dao2", sqlSessionTemplateRef = "test2SqlSessionTemplate") public class MyBatisConfig2 { @Bean(name = "test2DataSource") //test1DataSource public DataSource testDataSource(DBConfig2 testConfig) throws SQLException { MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); //mysqlXaDataSource.setUrl(testConfig.getUrl()); mysqlXaDataSource.setUrl(testConfig.getJdbcurl()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); mysqlXaDataSource.setPassword(testConfig.getPassword()); mysqlXaDataSource.setUser(testConfig.getUsername()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); // 将本地事务注册到创 Atomikos全局事务 AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(mysqlXaDataSource); xaDataSource.setUniqueResourceName("test2DataSource"); xaDataSource.setMinPoolSize(testConfig.getMinPoolSize()); xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize()); xaDataSource.setMaxLifetime(testConfig.getMaxLifetime()); xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout()); xaDataSource.setLoginTimeout(testConfig.getLoginTimeout()); xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval()); xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime()); xaDataSource.setTestQuery(testConfig.getTestQuery()); return xaDataSource; } @Bean(name = "test2SqlSessionFactory") public SqlSessionFactory testSqlSessionFactory(@Qualifier("test2DataSource") DataSource dataSource) throws Exception { SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); return bean.getObject(); } @Bean(name = "test2SqlSessionTemplate") public SqlSessionTemplate testSqlSessionTemplate( @Qualifier("test2SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception { return new SqlSessionTemplate(sqlSessionFactory); } }
实体类
@Data public class User { private int id; private String name; }
@Data public class Account { private int id; private String name; }
@Data public class UADTO { private User user; private Account account; }
dao接口
public interface UserDao { @Select("select * from user where name=#{name}") User findUserByName(String name); @Options(useGeneratedKeys = true, keyProperty = "id") @Insert("insert into user (name) values (#{name})") int addUser(User user); }
public interface AccountDao { @Select("select * from account where name=#{name}") Account findAccountByName(String name); @Options(useGeneratedKeys = true, keyProperty = "id") @Insert("insert into account (name) values (#{name})") int addAccount(Account account); }
service接口
public interface UADTOService { int addUserAndAccount(UADTO uadto); }
打标签的全局事务实现类
@Service public class UADTOSerciceImpl implements UADTOService { @Autowired private UserDao userDao; @Autowired private AccountDao accountDao; @Override @Transactional public int addUserAndAccount(UADTO uadto) { userDao.addUser(uadto.getUser()); accountDao.addAccount(uadto.getAccount()); return 1; } }
代码事务实现类
@Service @Primary public class UADTOServiceXAImpl implements UADTOService { @Autowired private UserDao userDao; @Autowired private AccountDao accountDao; @Autowired private PlatformTransactionManager transactionManager; @Override public int addUserAndAccount(UADTO uadto) { DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); def.setTimeout(15); TransactionStatus status = transactionManager.getTransaction(def); try { userDao.addUser(uadto.getUser()); accountDao.addAccount(uadto.getAccount()); transactionManager.commit(status); return 1; }catch (Exception e) { transactionManager.rollback(status); throw e; } } }
以上这两个实现类的全局事务是等价的,不管是标签仍是代码,其实都使用到了JTA的分布式事务的二阶段提交。
Controller
@RestController @Slf4j public class AllController { @Autowired private UADTOService uadtoService; @PostMapping("/addall") public int addUserAndAccount(@RequestBody UADTO uadto) { log.info(uadto.toString()); return uadtoService.addUserAndAccount(uadto); } }
这里的二阶段提交事务,对不一样数据库的表插入要么是同时成功,要么是同时失败的。
TCC模式实现思路
每一个须要实现事务的接口,都须要3个接口,分别是:
比方说在上图的下单的操做中,Order服务接收了下单的请求,扣费的时候,Order服务去调用User服务的时候,它去调用User服务的tryCharge()方法。调用完以后就把这一次的调用注册到协调器里面,当咱们的下单服务的所有事务完成以后,协调器服务会调用confirmCharge()方法,去完成这个扣费的操做。若是在下单的过程当中,出了任何的错误,协调器服务会帮咱们去调用User服务的cancelCharge()方法,去把扣费操做去取消。
TCC模式协调器的功能
TCC模式实现分布式事务
如今咱们以一个实际业务场景来加以说明
TCC 实现阶段一:Try
在上图的预处理中,那个订单服务先把本身的状态修改成:OrderStatus.UPDATING。这个状态是个没有任何含义的这么一个状态,表明有人正在修改这个状态罢了。库存服务别直接扣减库存,而是冻结掉库存。举个例子,原本你的库存数量是 100,你别直接 100 - 2 = 98,扣减这个库存!你能够把可销售的库存:100 - 2 = 98,设置为 98 没问题,而后在一个单独的冻结库存的字段里,设置一个 2。也就是说,有 2 个库存是给冻结了。
积分服务也是同理,别直接给用户增长会员积分。你能够先在积分表里的一个预增长积分字段加入积分。好比:用户积分本来是 1190,如今要增长 10 个积分,别直接 1190 + 10 = 1200 个积分!你能够保持积分为 1190 不变,在一个预增长字段里,好比说 prepare_add_credit 字段,设置一个 10,表示有 10 个积分准备增长。
仓储服务也是同理啊,你能够先建立一个销售出库单,可是这个销售出库单的状态是“UNKNOWN”。也就是说,刚刚建立这个销售出库单,此时还不肯定它的状态是什么。
这个操做,通常都是锁定某个资源,设置一个预备类的状态,冻结部分数据,等等,大概都是这类操做。
TCC 实现阶段二:Confirm
而后就分红两种状况了,第一种状况是比较理想的,那就是各个服务执行本身的那个 Try 操做,都执行成功了!此时,TCC 分布式事务框架会控制进入 TCC 下一个阶段,第一个 C 阶段,也就是 Confirm 阶段。为了实现这个阶段,你须要在各个服务里再加入一些代码。好比说,订单服务里,你能够加入一个 Confirm 的逻辑,就是正式把订单的状态设置为“已支付”了。
库存服务也是相似的,将以前冻结库存字段的 2 个库存扣掉变为 0。这样的话,可销售库存以前就已经变为 98 了,如今冻结的 2 个库存也没了,那就正式完成了库存的扣减。
积分服务也是相似的,就是将预增长字段的 10 个积分扣掉,而后加入实际的会员积分字段中,从 1190 变为 1120。
仓储服务也是相似,将销售出库单的状态正式修改成“已建立”,能够供仓储管理人员查看和使用,而不是停留在以前的中间状态“UNKNOWN”了。
上面各类服务的 Confirm 的逻辑都实现好了,一旦订单服务里面的 TCC 分布式事务框架感知到各个服务的 Try 阶段都成功了之后,就会执行各个服务的 Confirm 逻辑。
TCC 实现阶段三:Cancel
在 Try 阶段,好比积分服务吧,它执行出错了,此时会怎么样?那订单服务内的 TCC 事务框架是能够感知到的,而后它会决定对整个 TCC 分布式事务进行回滚。也就是说,会执行各个服务的第二个 C 阶段,Cancel 阶段。一样,为了实现这个 Cancel 阶段,各个服务还得加一些代码。
首先订单服务,就是能够将订单的状态设置为“CANCELED”,也就是这个订单的状态是已取消。
库存服务也是同理,就是将冻结库存扣减掉 2,加回到可销售库存里去,98 + 2 = 100。
积分服务也须要将预增长积分字段的 10 个积分扣减掉。
仓储服务也须要将销售出库单的状态修改成“CANCELED”设置为已取消。
而后这个时候,订单服务的 TCC 分布式事务框架只要感知到了任何一个服务的 Try 逻辑失败了,就会跟各个服务内的 TCC 分布式事务框架进行通讯,而后调用各个服务的 Cancel 逻辑。
总结与思考
先来 Try 一下,不要把业务逻辑完成,先试试看,看各个服务能不能基本正常运转,能不能先冻结我须要的资源。
若是 Try 都 OK,也就是说,底层的数据库、Redis、Elasticsearch、MQ 都是能够写入数据的,而且你保留好了须要使用的一些资源(好比冻结了一部分库存)。
接着,再执行各个服务的 Confirm 逻辑,基本上 Confirm 就能够很大几率保证一个分布式事务的完成了。
那若是 Try 阶段某个服务就失败了,好比说底层的数据库挂了,或者 Redis 挂了,等等。
此时就自动执行各个服务的 Cancel 逻辑,把以前的 Try 逻辑都回滚,全部服务都不要执行任何设计的业务逻辑。保证你们要么一块儿成功,要么一块儿失败。
等一等,你有没有想到一个问题?若是有一些意外的状况发生了,好比说订单服务忽然挂了,而后再次重启,TCC 分布式事务框架是如何保证以前没执行完的分布式事务继续执行的呢?
因此,TCC 事务框架都是要记录一些分布式事务的活动日志的,能够在磁盘上的日志文件里记录,也能够在数据库里记录。保存下来分布式事务运行的各个阶段和状态。
问题还没完,万一某个服务的 Cancel 或者 Confirm 逻辑执行一直失败怎么办呢?
那也很简单,TCC 事务框架会经过活动日志记录各个服务的状态。举个例子,好比发现某个服务的 Cancel 或者 Confirm 一直没成功,会不停的重试调用它的 Cancel 或者 Confirm 逻辑,务必要它成功!
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
有关于Seata的说明能够参考https://seata.io/zh-cn/docs/overview/what-is-seata.html