本文目录:java
后端工具和环境node
在开发个人开源项目 prex 时,加入工做流,解决工做流用户与当前系统用户同步问题时,涉及到远程调用操做两个数据库所产生的事务问题,好比系统用户在增长用户同步工做流用户时,系统用户添加成功,工做流用户没有添加成功,则形成数据不一致问题,本地事务没法回滚,那么则使用分布式事务解决方案。
开源项目:gitee.com/kaiyuantuan…mysql
指一次大的操做由不一样的小操做组成的,这些小的操做分布在不一样的服务器上,分布式事务须要保证这些小操做要么所有成功,要么所有失败。从本质上来讲,分布式事务就是为了保证不一样数据库的数据一致性。
通俗一点说就是单体应用被拆分红微服务应用,原来的一个模块被拆分红三个独立的应用,分别使用独立的数据源,业务操做须要调用三个服务来完成。git
分布式事务做为微服务应用中的大难题,在现有的解决方案中,我的认为 Seata
是目前最轻量的解决方案github
Seata
是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata
将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。redis
两阶段提交协议的演变:spring
以一个示例来讲明:
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操做,m 的初始值 1000。sql
tx1 先开始,开启本地事务,拿到本地锁,更新操做 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操做 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 须要重试等待 全局锁 。数据库
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。后端
若是 tx1 的二阶段全局回滚,则 tx1 须要从新获取该数据的本地锁,进行反向补偿的更新操做,实现分支的回滚。
此时,若是 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
由于整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,因此不会发生 脏写 的问题。
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
若是应用在特定场景下,必须要求全局的 读已提交 ,目前 Seata 的方式是经过 SELECT FOR UPDATE 语句的代理。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,若是 全局锁 被其余事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程当中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于整体性能上的考虑,Seata 目前的方案并无对全部 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
以一个示例来讲明整个 AT 分支的工做过程。
业务表:product
Field | Type | Key |
---|---|---|
id | bigint(20) | PRI |
name | varchar(100) | |
since | varchar(100) |
AT 分支事务的业务逻辑:
update product set name = 'GTS' where name = 'TXC';
过程:
select id, name, since from product where name = 'TXC';
复制代码
获得前镜像:
id name since 1 TXC 2014
select id, name, since from product where id = 1`;
复制代码
获得后镜像:
id name since 1 GTS 2014
{
"branchId": 641789253,
"undoItems": [{
"afterImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "GTS"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"beforeImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "TXC"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"sqlType": "UPDATE"
}],
"xid": "xid:xxx"
}
复制代码
update product set name = 'TXC' where id = 1;
复制代码
UNDO_LOG Table:不一样数据库在类型上会略有差异。
以 MySQL 为例:
Field | Type |
---|---|
branch_id | bigint PK |
xid | varchar(100) |
context | varchar(128) |
rollback_info | longblob |
log_status | tinyint |
log_created | datetime |
log_modified | datetime |
-- 注意此处0.7.0+ 增长字段 context
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
复制代码
回顾总览中的描述:一个分布式的全局事务,总体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要知足 两阶段提交 的模型要求,即须要每一个分支事务都具有本身的:
根据两阶段行为模式的不一样,咱们将分支事务划分为 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.
AT 模式(参考连接 TBD)基于 支持本地 ACID 事务 的 关系型数据库:
相应的,TCC 模式,不依赖于底层数据资源的事务支持:
Saga 模式是 SEATA 提供的长事务解决方案,在 Saga 模式中,业务流程中每一个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
理论基础:Hector & Kenneth 发表论⽂ Sagas (1987)
Nacos
做为注册中心,Nacos 的安装及使用能够参考seata-server
,这里下载的是 seata-server-0.9.0.zip,下载地址:github.com/seata/seata…seata安装包
快速获取百度云下载连接解压完成后咱们获得了几个文件夹
seata server
全部的配置都在 conf 文件夹内,该文件夹内有两个文件咱们必需要详细介绍下。
seata server
默认使用 file(文件方式)进行存储事务日志、事务运行信息,咱们能够经过-m db 脚本参数的形式来指定,目前仅支持 file、db 这两种方式。
修改 conf 目录下的 file.conf 配置文件,主要修改自定义事务组名称,事务日志存储模式及数据库链接信息
transport {
...省略
}
service {
#vgroup->rgroup
vgroup_mapping.prex_tx_group = "default" #修改事务组名称为:prex_tx_group,和客户端自定义的名称对应
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
## transaction log store
store {
## store mode: file、db
mode = "db" #修改此处将事务信息存储到db数据库中
## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "druid"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/seat" #修改数据库链接地址
user = "root" #修改数据库用户名
password = "root" #修改数据库密码
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
复制代码
说明:
registry.conf
配置文件,指明注册中心为 nacos,及修改 nacos 链接信息便可;registry {
# file 、nacos 、eureka、redis、zk、consul、etcd三、sofa
type = "nacos"
nacos {
serverAddr = "localhost:8848"
namespace = ""
cluster = "default"
}
... 省略
}
}
复制代码
启动 seata server 的脚本位于 bin 文件内,Linux/Mac
环境使用 seata-server.sh 脚本启动,Windows 环境使用 seata-server.bat 脚本启动。
Linux/Mac
启动方式示例以下所示:
nohup sh seata-server.sh -p 8091 -h 127.0.0.1 -m db &> seata.log &
复制代码
经过 nohup 命令让 seata server 在系统后台运行。
脚本参数:
当咱们看到-Server started 时并未发现其余错误信息,咱们的 seata server 已经启动成功
让咱们从一个微服务示例开始 用户购买商品的业务逻辑。整个业务逻辑由 3 个微服务提供支持:
建立业务数据库
db-order:存储订单的数据库
db-storage:存储库存的数据库
db-account:存储帐户信息的数据库
order 订单表:
DROP TABLE IF EXISTS `order`;
CREATE TABLE `order` (
`id` int(20) NOT NULL AUTO_INCREMENT COMMENT '主键Id',
`user_id` int(20) DEFAULT NULL COMMENT '用户Id',
`pay_money` decimal(11,0) DEFAULT NULL COMMENT '付款金额',
`product_id` int(20) DEFAULT NULL COMMENT '商品Id',
`status` int(11) DEFAULT NULL COMMENT '状态',
`count` int(11) DEFAULT NULL COMMENT '商品数量',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC COMMENT='订单表';
SET FOREIGN_KEY_CHECKS = 1;
复制代码
product 商品表:
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product` (
`id` int(20) NOT NULL COMMENT '主键',
`product_id` int(11) DEFAULT NULL COMMENT '商品Id',
`price` decimal(11,0) DEFAULT NULL COMMENT '价格',
`count` int(11) DEFAULT NULL COMMENT '库存数量',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC COMMENT='仓储服务';
-- ----------------------------
-- Records of product
-- ----------------------------
BEGIN;
INSERT INTO `product` VALUES (1, 1, 50, 100);
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
复制代码
account 帐户表:
DROP TABLE IF EXISTS `account`;
CREATE TABLE `account` (
`id` int(20) NOT NULL AUTO_INCREMENT COMMENT '主键Id',
`user_id` int(20) DEFAULT NULL COMMENT '用户Id',
`balance` decimal(11,0) DEFAULT NULL COMMENT '余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC;
-- ----------------------------
-- Records of account
-- ----------------------------
BEGIN;
INSERT INTO `account` VALUES (1, 1, 100);
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
复制代码
须要在每一个数据库中建立日志回滚表,建表 sql 在 seata-server 的/conf/db_undo_log.sql 中。
三个服务,一个订单服务,一个仓储服务,一个帐户服务。当用户下单时,会在订单服务中建立一个订单,而后经过远程调用库存服务来扣减下单商品的库存,再经过远程调用帐户服务来扣减用户帐户里面的余额,最后在订单服务中修改订单状态为已完成。该操做跨越三个数据库,有两次远程调用,很明显会有分布式事务问题
复制代码
nacos-seata-account-server 帐户服务
nacos-seata-order-server 订单服务
nacos-seata-storage-server 仓储服务
对 nacos-seata-account-server、nacos-seata-order-server 和 nacos-seata-storage-server 三个 seata 的客户端进行配置,它们配置大体相同,咱们下面以 nacos-seata-account-server 的配置为例;
修改 application.yml 文件,自定义事务组的名称
spring:
cloud:
alibaba:
seata:
tx-service-group: prex_tx_group #自定义事务组名称须要与seata-server中的对应
复制代码
service {
#vgroup->rgroup
vgroup_mapping.prex_tx_group = "default" #修改自定义事务组名称
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
disableGlobalTransaction = false
}
复制代码
添加并修改 registry.conf 配置文件,主要是将注册中心改成 nacos
registry {
# file 、nacos 、eureka、redis、zk
type = "nacos" #修改成nacos
nacos {
serverAddr = "localhost:8848" #修改成nacos的链接地址
namespace = ""
cluster = "default"
}
}
复制代码
代码只展现核心代码 具体代码文章尾部连接
@EnableDiscoveryClient
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@MapperScan("com.xd.example.seata.mapper")
public class NacosSeataAccountServerApplication {
public static void main(String[] args) {
SpringApplication.run(NacosSeataAccountServerApplication.class, args);
}
}
复制代码
MyBatisPlusConfig:
/**
* @Classname MyBatisPlusConfig
* @Description 配置MybatisPlus使用Seata对数据源进行代理
* @Author Created by Lihaodong (alias:小东啊) im.lihaodong@gmail.com
* @Date 2019-11-25 11:21
* @Version 1.0
*/
@Configuration
public class MyBatisPlusConfig {
@Value("${mybatis-plus.mapper-locations}")
private String mapperLocations;
/**
* @param sqlSessionFactory SqlSessionFactory
* @return SqlSessionTemplate
*/
@Bean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
/**
* 从配置文件获取属性构造datasource,注意前缀,这里用的是druid,根据本身状况配置,
* 原生datasource前缀取"spring.datasource"
*
* @return
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari")
public DataSource hikariDataSource() {
return new HikariDataSource();
}
/**
* 构造datasource代理对象,替换原来的datasource
*
* @param hikariDataSource
* @return
*/
@Primary
@Bean("dataSource")
public DataSourceProxy dataSourceProxy(DataSource hikariDataSource) {
return new DataSourceProxy(hikariDataSource);
}
@Bean(name = "sqlSessionFactory")
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
bean.setDataSource(dataSourceProxy);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
bean.setMapperLocations(resolver.getResources(mapperLocations));
SqlSessionFactory factory = null;
try {
factory = bean.getObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
return factory;
}
/**
* MP 自带分页插件
*
* @return
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor page = new PaginationInterceptor();
page.setDialectType("mysql");
return page;
}
}
复制代码
package com.xd.example.seata.service.impl;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.xd.example.seata.domain.Order;
import com.xd.example.seata.mapper.OrderMapper;
import com.xd.example.seata.service.IOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.xd.example.seata.service.RemoteAccountService;
import com.xd.example.seata.service.RemoteStorageService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* <p>
* 订单表 服务实现类
* </p>
*
* @author lihaodong
* @since 2019-11-25
*/
@Slf4j
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {
@Autowired
private RemoteStorageService remoteStorageService;
@Autowired
private RemoteAccountService remoteAccountService;
@GlobalTransactional(rollbackFor = Exception.class)
@Override
public void createOrder(Order order) {
log.info("下单开始,用户:{},商品:{},数量:{},金额:{}", order.getUserId(), order.getProductId(), order.getCount(), order.getPayMoney());
//建立订单
order.setStatus(0);
boolean save = save(order);
log.info("保存订单{}", save ? "成功" : "失败");
log.info("当前 XID: {}", RootContext.getXID());
//远程调用库存服务扣减库存
log.info("扣减库存开始");
remoteStorageService.decrease(order.getProductId(), order.getCount());
log.info("扣减库存结束");
//远程调用帐户服务扣减余额
log.info("扣减余额开始");
remoteAccountService.decrease(order.getUserId(), order.getPayMoney());
log.info("扣减余额结束");
//修改订单状态为已完成
log.info("修改订单状态开始");
update(Wrappers.<Order>lambdaUpdate().set(Order::getStatus, 1).eq(Order::getUserId, order.getUserId()));
log.info("修改订单状态结束");
log.info("下单结束");
}
}
复制代码
分别运行 nacos-seata-order-server、nacos-seata-storage-server 和 nacos-seata-account-server 三个服务
查询数据库初始数据信息
打开浏览器/Postman 调用接口进行下单操做:http://localhost:8081/order/create?userId=1&productId=1&count=1&payMoney=50
结果:
仓储服务:
帐户服务:
package com.xd.example.seata.service.impl;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.xd.example.seata.domain.Account;
import com.xd.example.seata.mapper.AccountMapper;
import com.xd.example.seata.service.IAccountService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.Optional;
/**
* <p>
* 服务实现类
* </p>
*
* @author lihaodong
* @since 2019-11-25
*/
@Slf4j
@Service
public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements IAccountService {
@Override
public boolean reduceBalance(Integer userId, BigDecimal balance) throws Exception {
log.info("当前 XID: {}", RootContext.getXID());
checkBalance(userId, balance);
log.info("开始扣减用户 {} 余额", userId);
//模拟超时异常
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer record = baseMapper.reduceBalance(userId, balance);
log.info("结束扣减用户 {} 余额结果:{}", userId, record > 0 ? "操做成功" : "扣减余额失败");
return record > 0;
}
private void checkBalance(Integer userId, BigDecimal price) throws Exception {
log.info("检查用户 {} 余额", userId);
Optional<Account> account = Optional.ofNullable(baseMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getUserId, userId)));
if (account.isPresent()) {
BigDecimal balance = account.get().getBalance();
if (balance.compareTo(price) == -1) {
log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);
throw new Exception("余额不足");
}
}
}
}
复制代码
修改完会重启帐户服务,再次发送请求
能够看到订单正常,扣除库存正常,帐户服务读取超时异常
发现下单后数据库数据并无任何改变
咱们在 seata-order-service 中注释掉@GlobalTransactional 来看看会发生什么
// @GlobalTransactional(name = "prex-create-order",rollbackFor = Exception.class)
@Override
public void createOrder(Order order) {
log.info("当前 XID: {}", RootContext.getXID());
log.info("下单开始,用户:{},商品:{},数量:{},金额:{}", order.getUserId(), order.getProductId(), order.getCount(), order.getPayMoney());
//建立订单
order.setStatus(0);
boolean save = save(order);
log.info("保存订单{}", save ? "成功" : "失败");
... 省略代码
}
复制代码
保存重启订单服务,再次请求接口 因为 nacos-seata-account-server 的超时会致使当库存和帐户金额扣减后订单状态并无设置为已经完成
下一篇更新
下一篇更新
参考资料:
seata.io/zh-cn/