一直说写有关最新技术的文章,但前面彷佛都有点偏了,只能说算主流技术,今天这个主题,我以为应该名副其实。分布式微服务的深水区并非单个微服务的设计,而是服务间的数据一致性问题!解决了这个问题,才算是把分布式正式收编了!但分布式事务解决方案并无统一的标准,只能说根据业务特色来适配,有实时的,非实时的,同步或异步的,以前已经实现了异步MQ的分布式事务方案,今天来看看Seata方案,自19年初才推出,还几易其名,目前还不算特别完善,但其光环太耀眼,做为一名IT人,仍是有必要来瞧一瞧的。单说Seata,就有AT、TCC、Saga和XA模式,看来是盘大菜。java
**工具:**node
Idea201902/JDK11/Gradle5.6.2/Mysql8.0.11/Lombok0.26/Postman7.5.0/SpringBoot2.1.9/Nacos1.1.3/Seata0.8.1/SeataServer0.8.1/Dubbo2.7.3linux
**难度:**
新手--战士--老兵--大师git
**目标:**github
1.多模块微服务Dubbo框架整合Seata实现分布式事务的AT模式spring
2.使用Seata实现订单模块与其余模块的关联型事务的TCC模式
***sql
**步骤:**数据库
**为了更好的遇到各类问题,同时保持时效性,我尽可能使用最新的软件版本。代码地址:其中的day17,https://github.com/xiexiaobiao/dubbo-project.git**apache
文中图片有些显示不全,是图片很大,我担忧缩放会看不清,因此部分显示不全的,能够下载图片再看。api
1.先照搬来点背景材料,分布式事务典型场景以下图,一个business主事务发起多个分支事务,并须要保证一致的commit或rollback:
Seata框架,有三个模块,分别是
分布式事务流程:
I. TM 请求TC 发起一个全局事务,同时TC生成一个 XID做为全局事务ID.
II. XID将分发给事务调用链上的全部微服务.
III. RM响应全局事务XID向TC注册本地分支事务.
IV. TM向TC发出提交或回滚全局事务XID的请求.
V. TC响应全局事务XID,驱动全部分支事务提交或 回滚本地分支事务.
其中 TM 和 RM 是做为 Seata 的客户端与业务系统集成在一块儿,TC 做为 Seata 的服务端独立部署。
再说seata的AT模式:AT 模式是一种无侵入的分布式事务解决方案。在 AT 模式下,用户只需关注本身的“业务 SQL”,用户的 “业务 SQL” 做为一阶段,Seata 框架会自动生成事务的二阶段提交和回滚操做。
2.为了单一化技术点,我直接新建一个gradle项目,以官方例子为原型作抽取制做,模拟电商业务,总体架构为多模块微服务Dubbo框架,创建5个module,common为公共模块,account为用户帐户处理,order为订单处理,storage为库存处理,business为业务处理,总体的处理逻辑为第一图。
3.在build.gradle中引入依赖,强烈建议边写代码边逐步引入,好比使用到druid才加入druid的依赖,这样才能知道每一个依赖的做用和用法。
4.建表,项目文件中已有SQL.script,几个业务模块的对应的表,比较简单,略。重点关注下undo_log,此表为MQ存储事务执行先后的日志表,为**AT模式所必须**,用于事务提交和回滚,其中最关键字段即xid(全局事务ID)和branch_id(分支事务ID)。另外,我将各模块DB独立,是为了模拟分布式DB环境。
5.使用common模块的mbg快速生成各模块的Entity、Service、Impl、Mapper、Dao和Controller,可参考往期文章《》。注意每次生成时,需修改配置。
6.common模块:放公共的对象,如全局Enum,Exception,Dto等,还有Dubbo的接口。
7.storage模块:`com.biao.mall.storage.conf.SeataAutoConfig`进行Seata配置:
@Configuration public class SeataAutoConfig { private DataSourceProperties dataSourceProperties; @Autowired public SeataAutoConfig(DataSourceProperties dataSourceProperties){ this.dataSourceProperties = dataSourceProperties; } /** * init durid datasource * @Return: druidDataSource datasource instance */ @Bean @Primary public DruidDataSource druidDataSource(){ DruidDataSource druidDataSource = new DruidDataSource(); druidDataSource.setUrl(dataSourceProperties.getUrl()); druidDataSource.setUsername(dataSourceProperties.getUsername()); druidDataSource.setPassword(dataSourceProperties.getPassword()); druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName()); druidDataSource.setInitialSize(0); druidDataSource.setMaxActive(180); druidDataSource.setMaxWait(60000); druidDataSource.setMinIdle(0); druidDataSource.setValidationQuery("Select 1 from DUAL"); druidDataSource.setTestOnBorrow(false); druidDataSource.setTestOnReturn(false); druidDataSource.setTestWhileIdle(true); druidDataSource.setTimeBetweenEvictionRunsMillis(60000); druidDataSource.setMinEvictableIdleTimeMillis(25200000); druidDataSource.setRemoveAbandoned(true); druidDataSource.setRemoveAbandonedTimeout(1800); druidDataSource.setLogAbandoned(true); return druidDataSource; } /** * init datasource proxy * @Param: druidDataSource datasource bean instance * @Return: DataSourceProxy datasource proxy */ @Bean public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){ return new DataSourceProxy(druidDataSource); } /** * init mybatis sqlSessionFactory * @Param: dataSourceProxy datasource proxy * @Return: DataSourceProxy datasource proxy */ @Bean public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(dataSourceProxy); factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver() .getResources("classpath:/mapper/*Mapper.xml")); factoryBean.setTransactionFactory(new JdbcTransactionFactory()); return factoryBean.getObject(); } /** * init global transaction scanner * @Return: GlobalTransactionScanner */ @Bean public GlobalTransactionScanner globalTransactionScanner(){ return new GlobalTransactionScanner("${spring.application.name}", "my_test_tx_group"); } }
com.biao.mall.storage.dubbo.StorageDubboServiceImpl
:Dubbo微服务storage的具体实现,@Service注解为com.apache.dubbo.config.annotation.Service
,将该服务注册到注册中心,本项目注册中心使用Nacos,不是ZK。
@Service(version = "1.0.0",protocol = "${dubbo.protocol.id}", application = "${dubbo.application.id}",registry = "${dubbo.registry.id}") public class StorageDubboServiceImpl implements StorageDubboService { @Autowired private ProductService productService; @Override public ObjectResponse decreaseStorage(CommodityDTO commodityDTO) { System.out.println("全局事务id :" + RootContext.getXID()); return productService.decreaseStorage(commodityDTO); } }
另外注意, `com.biao.mall.storage.impl.ProductServiceImpl`中,这里的本地方法,并不须要@Transactional注解。
8.account模块和order模块和storage模块相似,只是order模块中`com.biao.mall.order.impl.OrdersServiceImpl`多了一个经过@Reference调用account服务的注解,其余,略。
9.business模块:SeataAutoConfig中因无本地事务,只需一个GlobalTransactionScanner,BusinessServiceImpl中:
@Service public class BusinessServiceImpl implements BusinessService { @Reference(version = "1.0.0") private StorageDubboService storageDubboService; @Reference(version = "1.0.0") private OrderDubboService orderDubboService; private boolean flag; @Override @GlobalTransactional(timeoutMills = 30000,name = "dubbo-seata-at-springboot") public ObjectResponse handleBusiness(BusinessDTO businessDTO) { System.out.println("开始全局事务,XID = " + RootContext.getXID()); ObjectResponse<Object> objectResponse = new ObjectResponse<>(); //1,减库存 CommodityDTO commodityDTO = new CommodityDTO(); commodityDTO.setCommodityCode(businessDTO.getCommodityCode()); commodityDTO.setCount(businessDTO.getCount()); ObjectResponse storageResponse = storageDubboService.decreaseStorage(commodityDTO); //2,建立订单 OrderDTO orderDTO = new OrderDTO(); orderDTO.setUserId(businessDTO.getUserId()); orderDTO.setCommodityCode(businessDTO.getCommodityCode()); orderDTO.setOrderCount(businessDTO.getCount()); orderDTO.setOrderAmount(businessDTO.getAmount()); ObjectResponse<OrderDTO> response = orderDubboService.createOrder(orderDTO); //打开注释测试事务发生异常后,全局回滚功能 // if (!flag) { // throw new RuntimeException("测试抛异常后,分布式事务回滚!"); // } if (storageResponse.getStatus() != 200 || response.getStatus() != 200) { throw new DefaultException(RspStatusEnum.FAIL); } objectResponse.setStatus(RspStatusEnum.SUCCESS.getCode()); objectResponse.setMessage(RspStatusEnum.SUCCESS.getMessage()); objectResponse.setData(response.getData()); return objectResponse; } }
10.写个BusinessController的方法,用于测试:
@PostMapping("/buy") ObjectResponse handleBusiness(@RequestBody BusinessDTO businessDTO){ LOGGER.info("请求参数:{}",businessDTO.toString()); return businessService.handleBusiness(businessDTO); }
11.下载安装TC ,即 Seata 的服务端,须要独立部署运行,下载地址:https://github.com/seata/seata/releases,解压,支持window和linux下直接启动运行,以下linux命令,运行参数将指定port、host和imageFile的存储方式:
sh seata-server.sh -p 8091 -h 127.0.0.1 -m file
12.测试,按顺序启动:Nacos-->SeataServer-->account-->order-->storage-->business ,启动后的效果。
Nacos注册的服务信息,注意Dubbo是区分provider和consumer的,这是不一样于SpringCloud的地方,因此同一服务不一样身份就有两个了:
能够看到各RM向TC注册的信息:
Postman提交至Controller:
提交运行后,一阶段更新DB,二阶段只需释放锁:
数据库状况:
13.回滚测试:将`com.biao.mall.business.service.BusinessServiceImpl`中回滚测试代码注释去掉!手动抛出异常,再次Postman提交,可见:
- 数据库信息不变,贴图,略;
14.测试undo_log表用途:
`com.biao.mall.business.service.BusinessServiceImpl`加个断点:
其余模块正常启动,postman提交:
看undo_log表,这里只是个临时的数据,二阶段后会删除:
***
复盘记:
1.Seata只能支持RPC模式的事务,对MQ模式的分布式事务不能实施,比较好的搭配是Dubbo+Seata。
2.启动应用向SeataServer注册,不必定能一次成功,有时要尝试屡次,可见稳定性通常!
3.依赖冲突问题:报错提示:`Class path contains multiple SLF4J bindings`,因其来自于如下两个jar,
`logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class`
,`slf4j-nop-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class`
因为logback是主流,不排除,直接去掉`slf4j-nop`依赖,问题解决!
4.报错:`NoSuchMethodError:org.yaml.snakeyaml.nodes.ScalarNode.getScalarStyle`,**特别注意**这种状况不少时候也是依赖冲突,而不是缺乏类,处理方法:
a.先百度,须要加入snakeyaml依赖,结果仍是报错,
b.再先全局搜索,双击shift键,查找`ScalarNode`类,发现出如今两个地方,估计冲突了,
c.在Idea中使用依赖分析命令,`order`为module名,`snakeyaml`为依赖名:
`gradle :order:dependencyInsight --dependency snakeyaml`
发现有多方引入的状况,结果是dubbo自己也使用了snakeyaml,直接在dubbo依赖中使用exclude语法排除,问题解决!
5.报错:`NoSuchBeanDefinitionException: No qualifying bean of type 'com.biao.mall.order.dao.OrdersDao' available`:
表面上看是Mapper类无Bean实例,肯定加了@Mapper和@Repository注解,仍是错误!想到既然是缺乏注入的Bean,多是缺乏mybatis-plus依赖致使,添加`mybatis-plus-boot-starter`,问题解决!
6.报错:`io.seata.common.exception.NotSupportYetException: not support register type: null`,需添加 registry.conf 和 file.conf。
7.seata server安装和启动方法:
https://github.com/seata/seata/wiki/Quick-Start
8.报错:`com.alibaba.nacos.api.exception.NacosException: java.lang.ClassNotFoundException`,添加Nacos相关依赖 dubbo-registry-nacos/spring-context-support/nacos-api/nacos-client。
9.dubbo的service是明显区分consumer和provider的,若是使用Nacos作注册中心,能够经过detail查看其服务角色,还有其提供的方法。
10.`com.biao.mall.storage.conf.SeataAutoConfig`中设置Mapper路径,需使用`getResources("classpath:/mapper/*Mapper.xml"))`;不可以使用`getResources("${mybatis.mapper-locations}"))`配置方式,
会告警:`Property 'mapperLocations' was specified but matching resources are not found`,最后致使Mapper文件没法加载,Dao方法读取失败,应用运行会异常,我估计是Bean加载顺序问题,但没有验证,sorry。
11.本文参考文章地址:https://www.sofastack.tech/blog/seata-distributed-transaction-deep-dive/
***
推荐阅读: