分布式事务是企业集成中的一个技术难点,也是每个分布式系统架构中都会涉及到的一个东西,特别是在这几年愈来愈火的微服务架构中,几乎能够说是没法避免,本文就围绕单机事务,分布式事务以及分布式事务的处理方式来展开。java
事务提供一种“要么什么都不作,要么作全套(All or Nothing)”的机制,她有ACID四大特性mysql
以mysql的InnoDB存储引擎为例,来了解单机事务是如何保证ACID特性的。 git
单机事务是经过将操做限制在一个会话内经过数据库自己的锁以及日志来实现ACID,那么分布式环境下该如何保证ACID特性那?github
X/Open DTP(X/Open Distributed Transaction Processing Reference Model) 是X/Open 这个组织定义的一套分布式事务的标准,也就是了定义了规范和API接口,由各个厂商进行具体的实现。 X/Open DTP 定义了三个组件: AP,TM,RM
web
其中在DTP定义了如下几个概念spring
若是一个事务管理器管理着多个资源管理器,DTP是经过两阶段提交协议来控制全局事务和分支事务。sql
TCC(Try-Confirm-Cancel)分布式事务模型相对于 XA 等传统模型,其特征在于它不依赖资源管理器(RM)对分布式事务的支持,而是经过对业务逻辑的分解来实现分布式事务。
数据库
3PC的CanCommit阶段其实和2PC的准备阶段很像。协调者向参与者发送commit请求,参与者若是能够提交就返回Yes响应,不然返回No响应。
事务询问:协调者向参与者发送CanCommit请求。询问是否能够执行事务提交操做。而后开始等待参与者的响应
响应反馈:参与者接到CanCommit请求以后,正常状况下,若是其自身认为能够顺利执行事务,则返回Yes响应,并进入预备状态;不然反馈No。apache
执行事务预提交
发送预提交请求:协调者向全部参与者节点发出 preCommit 的请求,并进入 prepared 状态。
事务预提交:参与者受到 preCommit 请求后,会执行事务操做,对应 2PC 准备阶段中的 “执行事务”,也会 Undo 和 Redo 信息记录到事务日志中。
各参与者响应反馈:若是参与者成功执行了事务,就反馈 ACK 响应,同时等待指令:提交(commit) 或终止(abort)编程
中断事务
发送中断请求:协调者向全部参与者节点发出 abort 请求 。
中断事务:参与者若是收到 abort 请求或者超时了,都会中断事务。
执行提交
发送提交请求:协调者接收到各参与者发送的ACK响应,那么他将从预提交状态进入到提交状态。并向全部参与者发送 doCommit 请求。
事务提交:参与者接收到 doCommit 请求以后,执行正式的事务提交。并在完成事务提交以后释放全部事务资源。
响应反馈:事务提交完以后,向协调者发送 ACK 响应。
完成事务:协调者接收到全部参与者的 ACK 响应以后,完成事务。
中断事务
协调者没有接收到参与者发送的 ACK 响应(多是接受者发送的不是ACK响应,也可能响应超时),那么就会执行中断事务。
发送中断请求:协调者向全部参与者发送 abort 请求。
事务回滚:参与者接收到 abort 请求以后,利用其在阶段二记录的 undo 信息来执行事务的回滚操做,并在完成回滚以后释放全部的事务资源。
反馈结果:参与者完成事务回滚以后,向协调者发送 ACK 消息。
中断事务:协调者接收到参与者反馈的 ACK 消息以后,完成事务的中断。
Saga的组成:
saga的执行顺序有两种:
Saga定义了两种恢复策略:
Saga的注意事项
Saga架构
仅在同一个事务上下文中须要协调多种资源(即数据库,以及消息主题或队列)时,才有必要使用 X/Open XA 接口。数据库接入XA须要使用XA版的数据库驱动,消息队列要实现XA须要实现javax.transaction.xa.XAResource接口。
代码以下:
public class UserService {
@Autowired
private UserDao userDao;
@Autowired
private LogDao logDao;
@Transactional
public void save(User user){
userDao.save(user);
logDao.save(user);
throw new RuntimeException();
}
}
@Resource
public class UserDao {
@Resource(name="jdbcTemplateA")
private JdbcTemplate jdbcTemplate;
public void save(User user){
jdbcTemplate.update("insert into user(name,age) values(?,?)",user.getName(),user.getAge());
}
}
@Repository
public class LogDao {
@Resource(name="jdbcTemplateB")
private JdbcTemplate jdbcTemplate;
public void save(User user){
jdbcTemplate.update("insert into log(name,age) values(?,?)",user.getName(),user.getAge());
}
}
复制代码
配置:
<bean id="jotm" class="org.objectweb.jotm.Current" />
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="jotm" />
</bean>
<tx:annotation-driven transaction-manager="transactionManager"/>
<!-- 配置数据源 -->
<bean id="dataSourceA" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
<property name="dataSource">
<bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
<property name="transactionManager" ref="jotm" />
<property name="driverName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8" />
</bean>
</property>
<property name="user" value="xxx" />
<property name="password" value="xxx" />
</bean>
<!-- 配置数据源 -->
<bean id="dataSourceB" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
<property name="dataSource">
<bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
<property name="transactionManager" ref="jotm" />
<property name="driverName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/test2?useUnicode=true&characterEncoding=utf-8" />
</bean>
</property>
<property name="user" value="xxx" />
<property name="password" value="xxx" />
</bean>
<bean id="jdbcTemplateA" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSourceA" />
</bean>
<bean id="jdbcTemplateB" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSourceB" />
</bean>
复制代码
使用到的JAR包:
compile 'org.ow2.jotm:jotm-core:2.3.1-M1'
compile 'org.ow2.jotm:jotm-datasource:2.3.1-M1'
compile 'com.experlog:xapool:1.5.0'
复制代码
事务配置: 咱们知道分布式事务中须要一个事务管理器即接口javax.transaction.TransactionManager、面向开发人员的javax.transaction.UserTransaction。对于jotm来讲,他们的实现类都是Current
public class Current implements UserTransaction, TransactionManager
咱们若是想使用分布式事务的同时,又想使用Spring带给咱们的@Transactional便利,就须要配置一个JtaTransactionManager,而该JtaTransactionManager是须要一个userTransaction实例的,因此用到了上面的Current,以下配置:
<bean id="jotm" class="org.objectweb.jotm.Current" />
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="jotm" />
</bean>
<tx:annotation-driven transaction-manager="transactionManager"/>
复制代码
同时上述StandardXADataSource是须要一个TransactionManager实例的,因此上述StandardXADataSource配置把jotm加了进去.
执行过程:
代码同上,配置为:
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="atomikosUserTransaction" />
</bean>
<tx:annotation-driven transaction-manager="springTransactionManager"/>
<!-- 配置数据源 -->
<bean id="dataSourceC" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="XA1DBMS" />
<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
<property name="xaProperties">
<props>
<prop key="URL">jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8</prop>
<prop key="user">xxx</prop>
<prop key="password">xxx</prop>
</props>
</property>
<property name="poolSize" value="3" />
<property name="minPoolSize" value="3" />
<property name="maxPoolSize" value="5" />
</bean>
<!-- 配置数据源 -->
<bean id="dataSourceD" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="XA2DBMS" />
<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
<property name="xaProperties">
<props>
<prop key="URL">jdbc:mysql://localhost:3306/test2?useUnicode=true&characterEncoding=utf-8</prop>
<prop key="user">xxx</prop>
<prop key="password">xxx</prop>
</props>
</property>
<property name="poolSize" value="3" />
<property name="minPoolSize" value="3" />
<property name="maxPoolSize" value="5" />
</bean>
<bean id="jdbcTemplateC" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSourceC" />
</bean>
<bean id="jdbcTemplateD" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSourceD" />
</bean>
复制代码
事务配置:
咱们知道分布式事务中须要一个事务管理器即接口javax.transaction.TransactionManager、面向开发人员的javax.transaction.UserTransaction。对于Atomikos来讲分别对应以下:
<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="userTransaction" />
</bean>
<tx:annotation-driven transaction-manager="springTransactionManager"/>
复制代码
能够对比下jotm的案例配置jotm的分布式事务配置。能够看到jotm中使用的xapool中的StandardXADataSource是须要一个transactionManager的,而Atomikos使用的AtomikosNonXADataSourceBean则不须要。咱们知道,StandardXADataSource中有了transactionManager就能够获取当前线程的事务,同时把XAResource加入进当前事务中去,而AtomikosNonXADataSourceBean却没有,它是怎么把XAResource加入进当前线程绑定的事务呢?这时候就须要能够经过静态方法随时获取当前线程绑定的事务。 使用到的JAR包:
compile 'com.atomikos:transactions-jdbc:4.0.0M4'
复制代码
以订单子系统和支付子系统为例,以下图:
begin tx;
count = update account set amount = amount - ${cash} where uid = ${uid} and amount >= amount if (count <= 0) return false update payment_record set status = paid where trade_id = ${tradeId}
commit;
复制代码
对于trade要执行的操做能够用伪代码表示以下:
begin tx;
count = update trade_record set status = paid where trade_id = ${trade_id} and status = unpaid if (count <= 0) return false do other things ... commit;
复制代码
可是对于这两段代码如何串起来是个问题,咱们增长一个事务表,即图中的tx_info,来记录成功完成的支付事务,tx_info中须要有能够标示被支付系统处理状态的字段,为了和支付信息一致,须要放入事务中,代码以下:
begin tx;
count = update account set amount = amount - ${cash} where uid = ${uid} and amount >= amount if (count <= 0) return false update payment_record set status = paid where trade_id = ${tradeId}
insert into tx_info values(${trade_id},${amount}...) commit;
复制代码
支付系统边界到此为止,接下来就是订单系统轮询访问tx_info,拉取已经支付成功的订单信息,对每一条信息都执行trade系统的逻辑,伪代码以下:
foreach trade_id in tx_info
do trade_tx
save tx_info.id to some store
复制代码
事无延迟取决于时间程序轮询间隔,这样咱们作到了一致性,最终订单都会在支付以后的最大时间间隔内完成状态迁移。
固然,这里也能够采用支付系统经过RPC方式同步通知订单系统的方式来实现,处理状态经过tx_info中的字段来表示。
另外,交易系统每次拉取数据的起点以及消费记录须要记录下来,这样才能不遗漏不重复地执行,因此须要增长一张表用于排重,即上图中的tx_duplication。可是每次对tx_duplication表的插入要在trade_tx的事务中完成,伪代码以下:
begin tx;
c = insert ignore tx_duplication values($trade_id...) if (c <= 0) return false count = update trade_record set status = paid where trade_id = ${trade_id} and status = unpaid if (count <= 0) return false do other things ... commit;
复制代码
另外,tx_duplication表中trade_id表上必须有惟一键,这个算是结合以前的幂等篇来保证trade_tx的操做是幂等的。
在上面的方案中,tx_info表所起到的做用就是队列做用,记录一个系统的表更,做为通知给须要感知的系统的事件。而时间程序去拉取只是系统去获取感兴趣事件的一个方式,而对应交易系统的本地事务只是对应消费事件的一个过程。在这样的描述下,这些功能就是一个MQ——消息中间件。以下图
sendPrepare();
isCommit = local_tx()
if (isCommit) sendCommit()
else sendRollback()
复制代码
在作本地事务以前,先向MQ发送一个prepare消息,而后执行本地事务,本地事务提交成功的话,向MQ发送一个commit消息,不然发送一个abort消息,取消以前的消息。MQ只会在收到commit确认才会将消息投递出去,因此这样的形式能够保证在一切正常的状况下,本地事务和MQ能够达到一致性。
可是分布式存在异常状况,网络超时,机器宕机等等,好比当系统执行了local_tx()成功以后,还没来得及将commit消息发送给MQ,或者说发送出去了,网络超时了等等缘由,MQ没有收到commit,即commit消息丢失了,那么MQ就不会把prepare消息投递出去。若是这个没法保证的话,那么这个方案是不可行的。针对这种状况,须要一个第三方异常校验模块来对MQ中在必定时间段内没有commit/abort 的消息和发消息的系统进行检查,确认该消息是否应该投递出去或者丢弃,获得系统的确认以后,MQ会作投递仍是丢弃,这样就彻底保证了MQ和发消息的系统的一致性,从而保证了接收消息系统的一致性。
这个方案要求MQ的系统可用性必须很是高,至少要超过使用MQ的系统(推荐rocketmq,kafka都支持发送预备消息和业务回查),这样才能保证依赖他的系统能稳定运行。
项目地址:github.com/apache/serv… Saga处理场景是要求相关的子事务提供事务处理函数同时也提供补偿函数。Saga协调器alpha会根据事务的执行状况向omega发送相关的指令,肯定是否向前重试或者向后恢复。
成功场景下,每一个事务都会有开始和有对应的结束事件。
异常场景下,omega会向alpha上报中断事件,而后alpha会向该全局事务的其它已完成的子事务发送补偿指令,确保最终全部的子事务要么都成功,要么都回滚。
超时场景下,已超时的事件会被alpha的按期扫描器检测出来,与此同时,该超时事务对应的全局事务也会被中断。
假设要租车、预订酒店知足分布式事务。
租车服务
@Service
class CarBookingService {
private Map<Integer, CarBooking> bookings = new ConcurrentHashMap<>();
@Compensable(compensationMethod = "cancel")
void order(CarBooking booking) {
booking.confirm();
bookings.put(booking.getId(), booking);
}
void cancel(CarBooking booking) {
Integer id = booking.getId();
if (bookings.containsKey(id)) {
bookings.get(id).cancel();
}
}
Collection<CarBooking> getAllBookings() {
return bookings.values();
}
void clearAllBookings() {
bookings.clear();
}
}
复制代码
酒店预订
@Service
class HotelBookingService {
private Map<Integer, HotelBooking> bookings = new ConcurrentHashMap<>();
@Compensable(compensationMethod = "cancel")
void order(HotelBooking booking) {
if (booking.getAmount() > 2) {
throw new IllegalArgumentException("can not order the rooms large than two");
}
booking.confirm();
bookings.put(booking.getId(), booking);
}
void cancel(HotelBooking booking) {
Integer id = booking.getId();
if (bookings.containsKey(id)) {
bookings.get(id).cancel();
}
}
Collection<HotelBooking> getAllBookings() {
return bookings.values();
}
void clearAllBookings() {
bookings.clear();
}
}
复制代码
主服务
@RestController
public class BookingController {
@Value("${car.service.address:http://car.servicecomb.io:8080}")
private String carServiceUrl;
@Value("${hotel.service.address:http://hotel.servicecomb.io:8080}")
private String hotelServiceUrl;
@Autowired
private RestTemplate template;
@SagaStart
@PostMapping("/booking/{name}/{rooms}/{cars}")
public String order(@PathVariable String name, @PathVariable Integer rooms, @PathVariable Integer cars) {
template.postForEntity(
carServiceUrl + "/order/{name}/{cars}",
null, String.class, name, cars);
postCarBooking();
template.postForEntity(
hotelServiceUrl + "/order/{name}/{rooms}",
null, String.class, name, rooms);
postBooking();
return name + " booking " + rooms + " rooms and " + cars + " cars OK";
}
// This method is used by the byteman to inject exception here
private void postCarBooking() {
}
// This method is used by the byteman to inject the faults such as the timeout or the crash
private void postBooking() {
}
}
复制代码
执行流程
项目地址https://github.com/QNJR-GROUP/EasyTransaction[对比tcc-transaction,Hmily,ByteTCC来讲EasyTransaction性能最好,压测未发现错误], 固然你也可使用上面提到的SAGA项目,也是支持TCC协议的。下面咱们举个例子来看TCC是如何处理业务逻辑的。
eg:订单支付
try阶段
confirm阶段
cancel阶段
基本概念 | 优势 | 缺点 |
---|---|---|
本地事务。事务由资源管理器(如DBMS)本地管理 | 严格的ACID | 不具有分布事务处理能力 |
全局事务(DTP模型) TX协议:应用或应用服务器与事务管理器的接口 XA协议:全局事务管理器与资源管理器的接口 |
严格的ACID | 效率很是低 |
JTA:面向应用、应用服务器与资源管理器的高层事务接口 JTS:JTA事务管理器的实现标准,向上支持JTA,向下经过CORBA OTS实现跨事务域的互操做性 EJB |
简单一致的编程模型 跨域分布处理的ACID保证 |
DTP模型自己的局限 缺乏充分公开的大规模、高可用、密集事务应用的成功案例 |
基于MQ | 消息数据独立存储、独立伸缩 下降业务系统与消息系统间的耦合 |
一次消息发送须要两次请求 业务处理服务需实现消息状态回查接口 |
二阶段提交 | 原理简单,实现方便 | 同步阻塞:在二阶段提交的过程当中,全部的节点都在等待其余节点的响应,没法进行其余操做。这种同步阻塞极大的限制了分布式系统的性能。 单点问题:协调者在整个二阶段提交过程当中很重要,若是协调者在提交阶段出现问题,那么整个流程将没法运转。更重要的是,其余参与者将会处于一直锁定事务资源的状态中,而没法继续完成事务操做。 数据不一致:假设当协调者向全部的参与者发送commit请求以后,发生了局部网络异常,或者是协调者在还没有发送完全部 commit请求以前自身发生了崩溃,致使最终只有部分参与者收到了commit请求。这将致使严重的数据不一致问题。 容错性很差:若是在二阶段提交的提交询问阶段中,参与者出现故障,致使协调者始终没法获取到全部参与者的确认信息,这时协调者只能依靠其自身的超时机制,判断是否须要中断事务。显然,这种策略过于保守。换句话说,二阶段提交协议没有设计较为完善的容错机制,任意一个节点是失败都会致使整个事务的失败。 |
TCC | 相对于二阶段提交,三阶段提交主要解决的单点故障问题,并减小了阻塞的时间。由于一旦参与者没法及时收到来自协调者的信息以后,他会默认执行 commit。而不会一直持有事务资源并处于阻塞状态。 | 三阶段提交也会致使数据一致性问题。因为网络缘由,协调者发送的 abort 响应没有及时被参与者接收到,那么参与者在等待超时以后执行了 commit 操做。这样就和其余接到 abort 命令并执行回滚的参与者之间存在数据不一致的状况。 |
SAGA | 简单业务使用TCC须要修改原来业务逻辑,saga只须要添加一个补偿动做 因为没有预留动做因此不用担忧资源释放的问题异常处理简单 |
因为没有预留动做致使补偿处理麻烦 |
业务各有各的不一样,有些业务能容忍短时间不一致,有些业务的操做能够幂等,不管什么样的分布式事务解决方案都有其优缺点,没有一个银弹可以适配全部。所以,业务须要什么样的解决方案,还须要结合自身的业务需求、业务特色、技术架构以及各解决方案的特性,综合分析,才能找到最适合的方案。