seata是阿里巴巴研发的一套开源分布式事务框架,提供了AT、TCC、SAGA 和 XA 几种事务模式。本文以精品课项目组的物流后台服务为例,介绍seata框架落地的过程,遇到的问题以及解决方案。java
做者/ 邓新伟mysql
编辑/ 网易有道redis
有道精品课教务系统是基于springcloud的分布式集群服务。在实际业务中,存在许多分布式事务场景。然而传统的事务框架是没法实现全局事务的。长期以来,咱们的分布式场景的一致性,每每指的是放弃强一致性,保证最终一致性。spring
咱们从调研中发现,seata框架既能够知足业务需求,灵活兼容多种事务模式,又能够实现数据强一致性。sql
本文以物流业务为例,记录了在实际业务中落地seata框架落地的过程当中遇到的一些问题以及解决方案,供你们学习讨论~欢迎你们在留言区讨论交流数据库
seata框架分为3个组件:segmentfault
维护全局和分支事务的状态,驱动全局事务提交或回滚。api
定义全局事务的范围:开始全局事务、提交或回滚全局事务。架构
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚oracle
在官网下载 seata 服务端,解压后执行bin/seata-server.sh便可启动。
seata-server 有2个配置文件:registry.conf 与 file.conf。而 registry.conf 文件决定了 seata-server 使用的注册中心配置和配置信息获取方式。
咱们使用 consul 作注册中心,所以须要在registry.conf文件中,须要修改如下配置:
registry { #file 、nacos 、eureka、redis、zk、consul、etcd三、sofa type = "consul" ## 这里注册中心填consul loadBalance = "RandomLoadBalance" loadBalanceVirtualNodes = 10 ... ... consul { cluster = "seata-server" serverAddr = "***注册中心地址***" #这里的dc指的是datacenter,若consul为多数据源配置须要在请求中加入dc参数。 #dc与namespace并不是是seata框架自带的,文章后面将会进一步解释 dc="bj-th" namespace="seata-courseop" } ... ... } config { # file、nacos 、apollo、zk、consul、etcd3 ## 若是启动时从注册中心获取基础配置信息,填consul ## 不然从file.conf文件中获取 type = "consul" consul { serverAddr = "127.0.0.1:8500" } ... ... }
其中须要注意的是,若是须要高可用部署,seata获取配置信息的方式就必须是注册中心,此时file.conf就没用了。
(固然,须要事先把file.conf文件中的配置信息迁移到consul中)
store { ## store mode: file、db、redis mode = "db" ... ... ## database store property ## 若是使用数据库模式,须要配置数据库链接设置 db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" driverClassName = "com.mysql.jdbc.Driver" url = "jdbc:mysql://***线上数据库地址***/seata" user = "******" password = "******" minConn = 5 maxConn = 100 ## 这里的三张表须要提早在数据库建好 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 maxWait = 5000 } ... ... } service { #vgroup->rgroup vgroupMapping.tx-seata="seata-server" default.grouplist="127.0.0.1:8091" #degrade current not support enableDegrade = false #disable disable = false max.commit.retry.timeout = "-1" max.rollback.retry.timeout = "-1" }
其中,global_table, branch_table, lock_table三张表须要提早在数据库中建好。
每一个使用seata框架的服务都须要引入seata组件
dependencies { api 'com.alibaba:druid-spring-boot-starter:1.1.10' api 'mysql:mysql-connector-java:6.0.6' api('com.alibaba.cloud:spring-cloud-alibaba-seata:2.1.0.RELEASE') { exclude group:'io.seata', module:'seata-all' } api 'com.ecwid.consul:consul-api:1.4.5' api 'io.seata:seata-all:1.4.0' }
每一个服务都一样须要配置file.conf与registry.conf文件,放在resource目录下。registry.conf与server的保持一致。在file.conf文件中,除了db配置外,还须要进行client参数的配置:
client { rm { asyncCommitBufferLimit = 10000 lock { retryInterval = 10 retryTimes = 30 retryPolicyBranchRollbackOnConflict = true } reportRetryCount = 5 tableMetaCheckEnable = false reportSuccessEnable = false } tm { commitRetryCount = 5 rollbackRetryCount = 5 } undo { dataValidation = true logSerialization = "jackson" ## 这个undo_log也须要提早在mysql中建立 logTable = "undo_log" } log { exceptionRate = 100 } }
在application.yml文件中添加seata配置:
spring: cloud: seata: ## 注意tx-seata须要与服务端和客户端的配置文件保持一致 tx-service-group: tx-seata
另外,还须要替换项目的数据源,
@Primary @Bean("dataSource") public DataSource druidDataSource(){ DruidDataSource druidDataSource = new DruidDataSource(); druidDataSource.setUrl(url); druidDataSource.setUsername(username); druidDataSource.setPassword(password); druidDataSource.setDriverClassName(driverClassName); return new DataSourceProxy(druidDataSource); }
至此,client端的配置也已经完成了。
一个分布式的全局事务,总体是两阶段提交的模型。
全局事务是由若干分支事务组成的,
分支事务要知足两阶段提交的模型要求,即须要每一个分支事务都具有本身的:
根据两阶段行为模式的不一样,咱们将分支事务划分为 Automatic (Branch) Transaction Mode 和 TCC (Branch) Transaction Mode.
AT 模式基于支持本地ACID事务的关系型数据库:
直接在须要添加全局事务的方法中加上注解@GlobalTransactional
@SneakyThrows @GlobalTransactional @Transactional(rollbackFor = Exception.class) public void buy(int id, int itemId){ // 先生成订单 Order order = orderFeignDao.create(id, itemId); // 根据订单扣减帐户余额 accountFeignDao.draw(id, order.amount); }
注意:同@Transactional同样,@GlobalTransactional若要生效也要知足:
TCC 模式是支持把自定义的分支事务归入到全局事务的管理中。
首先编写一个TCC服务接口:
@LocalTCC public interface BusinessAction { @TwoPhaseBusinessAction(name = "doBusiness", commitMethod = "commit", rollbackMethod = "rollback") boolean doBusiness(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "message") String msg); boolean commit(BusinessActionContext businessActionContext); boolean rollback(BusinessActionContext businessActionContext); }
其中,BusinessActionContext为全局事务上下文,能够今后对象中获取全局事务相关信息(若是是发起全局事务方,传入null后自动生成),而后实现该接口:
@Slf4j @Service public class BusinessActionImpl implements BusinessAction { @Transactional(rollbackFor = Exception.class) @Override public boolean doBusiness(BusinessActionContext businessActionContext, String msg) { log.info("准备do business:{}",msg); return true; } @Transactional(rollbackFor = Exception.class) @Override public boolean commit(BusinessActionContext businessActionContext) { log.info("business已经commit"); return true; } @Transactional(rollbackFor = Exception.class) @Override public boolean rollback(BusinessActionContext businessActionContext) { log.info("business已经rollback"); return true; } }
最后,开启全局事务方法同AT模式。
@SneakyThrows @GlobalTransactional public void doBusiness(BusinessActionContext context, String msg){ accountFeignDao.draw(3, new BigDecimal(100)); businessAction.doBusiness(context, msg); }
在部署seata项目时经常会遇到这样的问题:在本地调试时一切正常,可是当试图部署到线上时,老是在clinet端提示注册TC端失败。
seata服务的高可用部署只支持注册中心模式。所以,咱们须要想办法将file.conf文件以键值对的形式存到consul中。
遗憾的是,consul并无显式支持namespace,咱们只能在put请求中用“/”为分隔符起到相似的效果。固然,seata框架也没有考虑到这一点。因此咱们须要修改源码中的Configuration接口与RegistryProvider接口的consul实现类,增长namespace属性
TC在想mysql插入日志数据时,偶尔会报:
Caused by: java.sql.SQLException: Incorrect string value:
application_data字段其实就是对业务数据的记录。官方给出的建表语句是这样的:
CREATE TABLE IF NOT EXISTS `global_table` ( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8;
显然,VARCHAR(2000)的大小是不合适的, utf8的格式也是不合适的。因此咱们须要修改seata关于数据源链接的部分代码:
// connectionInitSql设置 protected Set<String> getConnectionInitSqls(){ Set<String> set = new HashSet<>(); String connectionInitSqls = CONFIG.getConfig(ConfigurationKeys.STORE_DB_CONNECTION_INIT_SQLS); if(StringUtils.isNotEmpty(connectionInitSqls)) { String[] strs = connectionInitSqls.split(","); for(String s:strs){ set.add(s); } } // 默认支持utf8mb4 set.add("set names utf8mb4"); return set; }
seata基于java的spi机制提供了自定义实现接口的功能,咱们只须要在本身的服务中,根据seata的接口写好本身的实现类便可。
SPI(Service Provider Interface)是JDK内置的服务发现机制,用在不一样模块间经过接口调用服务,避免对具体服务服务接口具体实现类的耦合。好比JDBC的数据库驱动模块,不一样数据库链接驱动接口相同但实现类不一样,在使用SPI机制之前调用驱动代码须要直接在类里采用Class.forName(具体实现类全名)的方式调用,这样调用方依赖了具体的驱动实现,在替换驱动实现时要修改代码。
以ConsulRegistryProvider为例:
ConsulRegistryServiceImpl
// 增长DC和namespace private static String NAMESPACE; private static String DC; private ConsulConfiguration() { Config registryCongig = ConfigFactory.parseResources("registry.conf"); NAMESPACE = registryCongig.getString("config.consul.namespace"); DC = CommonSeataConfiguration.getDatacenter(); consulNotifierExecutor = new ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("consul-config-executor", THREAD_POOL_NUM)); } ... ... // 同时在getHealthyServices中,删除请求参数wait&index /** * get healthy services * * @param service * @return */ private Response<List<HealthService>> getHealthyServices(String service, long index, long watchTimeout) { return getConsulClient().getHealthServices(service, HealthServicesRequest.newBuilder() .setTag(SERVICE_TAG) .setDatacenter(DC) .setPassing(true) .build()); }
ConsulRegistryProvider 注意order要大于seata包中的默认值1,seata类加载器会优先加载order更大的实现类
@LoadLevel(name = "Consul" ,order = 2) public class ConsulRegistryProvider implements RegistryProvider { @Override public RegistryService provide() { return ConsulRegistryServiceImpl.getInstance(); } }
而后在META-INF 的services目录下添加:io.seata.discovery.registry.RegistryProvider
com.youdao.ke.courseop.common.seata.ConsulRegistryProvider
这样就能够替换seata包中的实现了。
对于这些自定义实现类,以及一些公共client配置,咱们能够统一封装到一个工具包下:
这样,其余项目只须要引入这个工具包,就能够无需繁琐的配置,直接使用了。
gradle引入common包:
api 'com.youdao.ke.courseop.common:common-seata:0.0.+'
以一个物流场景为例:
业务架构:
业务背景:logistics 执行领用单新增,在 elasticsearch 中更新数据,同时经过 rpc 调用 logistics-k3c 的金蝶出库方法,生成金蝶单据,如图2所示
问题:若是elasticsearch单据更新出现异常,金蝶单据将没法回滚,形成数据不一致的问题。
在部署完seata线上服务后,只须要在logistics与logistics-k3c中分别引入common-seata工具包
logistics服务:
// 使用全局事务注解开启全局事务 @GlobalTransactional @Transactional(rollbackFor = Exception.class) public void Scm经过(StaffOutStockDoc staffOutStock, String body) throws Exception { ... 一些业务处理... // 构建金蝶单据请求 K3cApi.StaffoutstockReq req = new K3cApi.StaffoutstockReq(); req.materialNums = materialNums; req.staffOutStockId = staffOutStock.id; ... 一些业务处理 ... // 调用logistics-k3c-api金蝶出库 k3cApi.staffoutstockAuditPass(req); staffOutStock.status = 待发货; staffOutStock.scmAuditTime = new Date(); staffOutStock.updateTime = new Date(); staffOutStock.historyPush("scm经过"); // 更新对象后存入elasticsearch es.set(staffOutStock); }
logistics-k3c:
因为咱们新增单据接口是调用金蝶的服务,因此这里使用TCC模式构建事务接口
首先建立StaffoutstockCreateAction接口
@LocalTCC public interface StaffoutstockCreateAction { @TwoPhaseBusinessAction(name = "staffoutstockCreate") boolean create(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "staffOutStock") StaffOutStock staffOutStock, @BusinessActionContextParameter(paramName = "materialNum") List<Triple<Integer, Integer, Integer>> materialNum); boolean commit(BusinessActionContext businessActionContext); boolean rollback(BusinessActionContext businessActionContext); }
接口实现StaffoutstockCreateActionImpl
@Slf4j @Service public class StaffoutstockCreateActionImpl implements StaffoutstockCreateAction { @Autowired private K3cAction4Staffoutstock k3cAction4Staffoutstock; @SneakyThrows @Transactional(rollbackFor = Exception.class) @Override public boolean create(BusinessActionContext businessActionContext, StaffOutStock staffOutStock, List<Triple<Integer, Integer, Integer>> materialNum) { //金蝶单据新增 k3cAction4Staffoutstock.staffoutstockAuditPass(staffOutStock, materialNum); return true; } @SneakyThrows @Transactional(rollbackFor = Exception.class) @Override public boolean commit(BusinessActionContext businessActionContext) { Map<String, Object> context = businessActionContext.getActionContext(); JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock"); // 若是尝试新增成功,commit不作任何事 StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class); log.info("staffoutstock {} commit successfully!", staffOutStock.id); return true; } @SneakyThrows @Transactional(rollbackFor = Exception.class) @Override public boolean rollback(BusinessActionContext businessActionContext) { Map<String, Object> context = businessActionContext.getActionContext(); JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock"); StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class); // 这里调用金蝶单据删除接口进行回滚 k3cAction4Staffoutstock.staffoutstockRollback(staffOutStock); log.info("staffoutstock {} rollback successfully!", staffOutStock.id); return true; } }
封装为业务方法
/** * 项目组领用&报废的审核经过:新增其余出库单 * 该方法使用seata-TCC方案实现全局事务 * @param staffOutStock * @param materialNum */ @Transactional public void staffoutstockAuditPassWithTranscation(StaffOutStock staffOutStock, List<Triple<Integer, Integer, Integer>> materialNum){ staffoutstockCreateAction.create(null, staffOutStock, materialNum); }
k3c API实现类
@SneakyThrows @Override public void staffoutstockAuditPass(StaffoutstockReq req) { ... 一些业务处理方法 ... //这里调用了封装好的事务方法 k3cAction4Staffoutstock.staffoutstockAuditPassWithTranscation(staffOutStock, triples); }
这样,一个基于 TCC 的全局事务链路就创建起来了。
当全局事务执行成功时,咱们能够在 server 中看到打印的日志(如图3):
若是全局事务执行失败,会进行回滚,此时会执行接口中的rollback,调用金蝶接口删除生成的单据,如图4。
本文以seata框架的部署与使用为主线,记录了seata 框架运用的一些关键步骤与技术细节,并针对项目落地时遇到的一些的技术问题提供了解决方案。
在后续的推文中,咱们还将继续以 seata 框架的源码解析为主线,向你们介绍 seata 实现分布式事务的核心原理与技术细节。
-END-