本篇实战所使用Spring有关版本:java
SpringBoot:2.1.7.RELEASEmysql
Spring Cloud:Greenwich.SR2git
Spring CLoud Alibaba:2.1.0.RELEASEgithub
在构建微服务的过程当中,不论是使用什么框架、组件来构建,都绕不开一个问题,跨服务的业务操做如何保持数据一致性。web
首先,设想一个传统的单体应用,不管多少内部调用,最后终归是在同一个数据库上进行操做来完成一贯业务操做,如图:redis
随着业务量的发展,业务需求和架构发生了巨大的变化,总体架构由原来的单体应用逐渐拆分红为了微服务,原来的3个服务被从一个单体架构上拆开了,成为了3个独立的服务,分别使用独立的数据源,也不在以前共享同一个数据源了,具体的业务将由三个服务的调用来完成,如图:算法
此时,每个服务的内部数据一致性仍然有本地事务来保证。可是面对整个业务流程上的事务应该如何保证呢?这就是在微服务架构下面临的挑战,如何保证在微服务中的数据一致性。spring
所谓的 XA 方案,即两阶段提交,有一个事务管理器的概念,负责协调多个数据库(资源管理器)的事务,事务管理器先问问各个数据库你准备好了吗?若是每一个数据库都回复 ok,那么就正式提交事务,在各个数据库上执行操做;若是任何其中一个数据库回答不 ok,那么就回滚事务。sql
分布式系统的一个难点是如何保证架构下多个节点在进行事务性操做的时候保持一致性。为实现这个目的,二阶段提交算法的成立基于如下假设:数据库
TCC的全称是:Try、Confirm、Cancel。
这种方案说实话几乎不多人使用,可是也有使用的场景。由于这个事务回滚其实是严重依赖于你本身写代码来回滚和补偿了,会形成补偿代码巨大。
TCC的理论有点抽象,下面咱们借助一个帐务拆分这个实际业务场景对TCC事务的流程作一个描述,但愿对理解TCC有所帮助。
业务流程:分别位于三个不一样分库的账户A、B、C,A和B一块儿向C转账共80元:
Try:尝试执行业务。
完成全部业务检查(一致性):检查A、B、C的账户状态是否正常,账户A的余额是否很多于30元,账户B的余额是否很多于50元。
预留必须业务资源(准隔离性):账户A的冻结金额增长30元,账户B的冻结金额增长50元,这样就保证不会出现其余并发进程扣减了这两个账户的余额而致使在后续的真正转账操做过程当中,账户A和B的可用余额不够的状况。
Confirm:确认执行业务。
真正执行业务:若是Try阶段账户A、B、C状态正常,且账户A、B余额够用,则执行账户A给帐户C转帐30元、账户B给帐户C转帐50元的转账操做。
不作任何业务检查:这时已经不须要作业务检查,Try阶段已经完成了业务检查。
只使用Try阶段预留的业务资源:只须要使用Try阶段账户A和账户B冻结的金额便可。
Cancel:取消执行业务。
释放Try阶段预留的业务资源:若是Try阶段部分红功,好比账户A的余额够用,且冻结相应金额成功,账户B的余额不够而冻结失败,则须要对账户A作Cancel操做,将账户A被冻结的金额解冻掉。
Seata 的方案其实一个 XA 两阶段提交的改进版,具体区别以下:
架构的层面:
XA 方案的 RM 其实是在数据库层,RM 本质上就是数据库自身(经过提供支持 XA 的驱动程序来供应用使用)。
而 Seata 的 RM 是以二方包的形式做为中间件层部署在应用程序这一侧的,不依赖与数据库自己对协议的支持,固然也不须要数据库支持 XA 协议。这点对于微服务化的架构来讲是很是重要的:应用层不须要为本地事务和分布式事务两类不一样场景来适配两套不一样的数据库驱动。
这个设计,剥离了分布式事务方案对数据库在 协议支持 上的要求。
两阶段提交:
不管 Phase2 的决议是 commit 仍是 rollback,事务性资源的锁都要保持到 Phase2 完成才释放。
设想一个正常运行的业务,大几率是 90% 以上的事务最终应该是成功提交的,咱们是否能够在 Phase1 就将本地事务提交呢?这样 90% 以上的状况下,能够省去 Phase2 持锁的时间,总体提升效率。
这个设计,极大地减小了分支事务对资源(数据和链接)的锁定时间,给总体并发和吞吐的提高提供了基础。
在本节,咱们将经过一个实战案例来具体介绍Seata的使用方式,咱们将模拟一个简单的用户购买商品下单场景,建立3个子工程,分别是 order-server (下单服务)、storage-server(库存服务)和 pay-server (支付服务),具体流程图如图:
在本次实战中,咱们使用Nacos作为服务中心和配置中心,Nacos部署请参考本书的第十一章,这里再也不赘述。
接下来咱们须要部署Seata的Server端,下载地址为:https://github.com/seata/seata/releases ,建议选择最新版本下载,目前笔者看到的最新版本为 v0.8.0 ,下载 seata-server-0.8.0.tar.gz 解压后,打开 conf 文件夹,咱们需对其中的一些配置作出修改。
registry {
type = "nacos"
nacos {
serverAddr = "192.168.0.128"
namespace = "public"
cluster = "default"
}
}
config {
type = "nacos"
nacos {
serverAddr = "192.168.0.128"
namespace = "public"
cluster = "default"
}
}复制代码
这里咱们选择使用Nacos做为服务中心和配置中心,这里作出对应的配置,同时能够看到Seata的注册服务支持:file 、nacos 、eureka、redis、zk、consul、etcd三、sofa等方式,配置支持:file、nacos 、apollo、zk、consul、etcd3等方式。
这里咱们须要其中配置的数据库相关配置,具体以下:
## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://192.168.0.128:3306/seata"
user = "root"
password = "123456"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}复制代码
这里数据库默认是使用mysql,须要配置对应的数据库链接、用户名和密码等。
service.vgroup_mapping.spring-cloud-pay-server=default
service.vgroup_mapping.spring-cloud-order-server=default
service.vgroup_mapping.spring-cloud-storage-server=default复制代码
这里的语法为:service.vgroup_mapping.${your-service-gruop}=default
,中间的${your-service-gruop}
为本身定义的服务组名称,这里须要咱们在程序的配置文件中配置,笔者这里直接使用程序的spring.application.name
。
须要在刚才配置的数据库中执行数据初始脚本 db_store.sql ,这个是全局事务控制的表,须要提早初始化。
这里咱们只是作演示,理论上上面三个业务服务应该分属不一样的数据库,这里咱们只是在同一台数据库下面建立三个 Schema ,分别为 dbaccount 、 dborder 和 db_storage ,具体如图:
由于咱们是使用的Nacos做为配置中心,因此这里须要先执行脚原本初始化Nacos的相关配置,命令以下:
cd conf
sh nacos-config.sh 192.168.0.128复制代码
执行成功后能够打开Nacos的控制台,在配置列表中,能够看到初始化了不少 Group 为 SEATA_GROUP 的配置,如图:
初始化成功后,可使用下面的命令启动Seata的Server端:
cd bin
sh seata-server.sh -p 8091 -m file复制代码
启动后在 Nacos 的服务列表下面能够看到一个名为 serverAddr 的服务
到这里,咱们的环境准备工做就作完了,接下来开始代码实战。
因为本示例代码偏多,这里仅介绍核心代码和一些须要注意的代码,其他代码各位读者能够访问本书配套的代码仓库获取。
子工程common用来放置一些公共类,主要包含视图 VO 类和响应类 OperationResponse.java。
代码清单:Alibaba/seata-nacos-jpa/pom.xml
***
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Cloud Nacos Service Discovery -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- Spring Cloud Nacos Config -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- Spring Cloud Seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>复制代码
说明:本示例是使用 JPA 做为数据库访问 ORM 层, Mysql 做为数据库,需引入 JPA 和 Mysql 相关依赖, spring-cloud-alibaba-dependencies
的版本是 2.1.0.RELEASE , 其中有关Seata的组件版本为 v0.7.1 ,虽然和服务端版本不符,经简单测试,未发现问题。
Seata 是经过代理数据源实现事务分支,因此须要配置 io.seata.rm.datasource.DataSourceProxy 的 Bean,且是 @Primary默认的数据源,不然事务不会回滚,没法实现分布式事务,数据源配置类DataSourceProxyConfig.java以下:
代码清单:Alibaba/seata-nacos-jpa/order-server/src/main/java/com/springcloud/orderserver/config/DataSourceProxyConfig.java***
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}复制代码
咱们在order-server服务中开始整个业务流程,须要在这里的方法上增长全局事务的注解@GlobalTransactional
,具体代码以下:
代码清单:Alibaba/seata-nacos-jpa/order-server/src/main/java/com/springcloud/orderserver/service/impl/OrderServiceImpl.java***
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private RestTemplate restTemplate;
@Autowired
private OrderDao orderDao;
private final String STORAGE_SERVICE_HOST = "http://spring-cloud-storage-server/storage";
private final String PAY_SERVICE_HOST = "http://spring-cloud-pay-server/pay";
@Override
@GlobalTransactional
public OperationResponse placeOrder(PlaceOrderRequestVO placeOrderRequestVO) {
Integer amount = 1;
Integer price = placeOrderRequestVO.getPrice();
Order order = Order.builder()
.userId(placeOrderRequestVO.getUserId())
.productId(placeOrderRequestVO.getProductId())
.status(OrderStatus.INIT)
.payAmount(price)
.build();
order = orderDao.save(order);
log.info("保存订单{}", order.getId() != null ? "成功" : "失败");
log.info("当前 XID: {}", RootContext.getXID());
// 扣减库存
log.info("开始扣减库存");
ReduceStockRequestVO reduceStockRequestVO = ReduceStockRequestVO.builder()
.productId(placeOrderRequestVO.getProductId())
.amount(amount)
.build();
String storageReduceUrl = String.format("%s/reduceStock", STORAGE_SERVICE_HOST);
OperationResponse storageOperationResponse = restTemplate.postForObject(storageReduceUrl, reduceStockRequestVO, OperationResponse.class);
log.info("扣减库存结果:{}", storageOperationResponse);
// 扣减余额
log.info("开始扣减余额");
ReduceBalanceRequestVO reduceBalanceRequestVO = ReduceBalanceRequestVO.builder()
.userId(placeOrderRequestVO.getUserId())
.price(price)
.build();
String reduceBalanceUrl = String.format("%s/reduceBalance", PAY_SERVICE_HOST);
OperationResponse balanceOperationResponse = restTemplate.postForObject(reduceBalanceUrl, reduceBalanceRequestVO, OperationResponse.class);
log.info("扣减余额结果:{}", balanceOperationResponse);
Integer updateOrderRecord = orderDao.updateOrder(order.getId(), OrderStatus.SUCCESS);
log.info("更新订单:{} {}", order.getId(), updateOrderRecord > 0 ? "成功" : "失败");
return OperationResponse.builder()
.success(balanceOperationResponse.isSuccess() && storageOperationResponse.isSuccess())
.build();
}
}复制代码
其次,咱们须要在另外两个服务的方法中增长注解@Transactional
,表示开启事务。
这里的远端服务调用是经过 RestTemplate
,须要在工程启动时将 RestTemplate
注入 Spring 容器中管理。
工程中需在 resources 目录下增长有关Seata的配置文件 registry.conf ,以下:
代码清单:Alibaba/seata-nacos-jpa/order-server/src/main/resources/registry.conf***
registry {
type = "nacos"
nacos {
serverAddr = "192.168.0.128"
namespace = "public"
cluster = "default"
}
}
config {
type = "nacos"
nacos {
serverAddr = "192.168.0.128"
namespace = "public"
cluster = "default"
}
}复制代码
在 bootstrap.yml 中的配置以下:
代码清单:Alibaba/seata-nacos-jpa/order-server/src/main/resources/bootstrap.yml
***
spring:
application:
name: spring-cloud-order-server
cloud:
nacos:
# nacos config
config:
server-addr: 192.168.0.128
namespace: public
group: SEATA_GROUP
# nacos discovery
discovery:
server-addr: 192.168.0.128
namespace: public
enabled: true
alibaba:
seata:
tx-service-group: ${spring.application.name}复制代码
service.vgroup_mapping.${your-service-gruop}=default
中间的 ${your-service-gruop}
。这两处配置请务必一致,不然在启动工程后会一直报错 no available server to connect
。 数据库初始脚本位于:Alibaba/seata-nacos-jpa/sql ,请分别在三个不一样的 Schema 中执行。
测试工具咱们选择使用 PostMan ,启动三个服务,顺序无关 order-server、pay-server 和 storage-server 。
使用 PostMan 发送测试请求,如图:
数据库初始化余额为 10 ,这里每次下单将会消耗 5 ,咱们能够正常下单两次,第三次应该下单失败,而且回滚 db_order 中的数据。数据库中数据如图:
咱们进行第三次下单操做,如图:
这里看到直接报错500,查看数据库 db_order 中的数据,如图:
能够看到,这里的数据并未增长,咱们看下子工程_rder-server的控制台打印:
日志已通过简化处理
Hibernate: insert into orders (pay_amount, product_id, status, user_id) values (?, ?, ?, ?)
c.s.b.c.service.impl.OrderServiceImpl : 保存订单成功
c.s.b.c.service.impl.OrderServiceImpl : 当前 XID: 192.168.0.102:8091:2021674307
c.s.b.c.service.impl.OrderServiceImpl : 开始扣减库存
c.s.b.c.service.impl.OrderServiceImpl : 扣减库存结果:OperationResponse(success=true, message=操做成功, data=null)
c.s.b.c.service.impl.OrderServiceImpl : 开始扣减余额
i.s.core.rpc.netty.RmMessageListener : onMessage:xid=192.168.0.102:8091:2021674307,branchId=2021674308,branchType=AT,resourceId=jdbc:mysql://192.168.0.128:3306/db_order,applicationData=null
io.seata.rm.AbstractRMHandler : Branch Rollbacking: 192.168.0.102:8091:2021674307 2021674308 jdbc:mysql://192.168.0.128:3306/db_order
i.s.rm.datasource.undo.UndoLogManager : xid 192.168.0.102:8091:2021674307 branch 2021674308, undo_log deleted with GlobalFinished
io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
i.seata.tm.api.DefaultGlobalTransaction : [192.168.0.102:8091:2021674307] rollback status:Rollbacked复制代码
从日志中没有能够清楚的看到,在服务order-server是先执行了订单写入操做,而且调用扣减库存的接口,经过查看storage-server的日志也能够发现,同样是先执行了库存修改操做,直到扣减余额的时候发现余额不足,开始对 xid 为 192.168.0.102:8091:2021674307
执行回滚操做,而且这个操做是全局回滚。
目前在 Seata v0.8.0 的版本中,Server端还没有支持集群部署,不建议应用于生产环境,而且开源团队计划在 v1.0.0 版本的时候可使用与生产环境,各位读者能够持续关注这个开源项目。
参考资料:Seata官方文档