深度剖析如何实现事务消息

这是一篇从去年写到今年的文章,但愿你们会喜欢spring

1.背景

分布式事务一直是一个老生常谈的一个话题,在个人公众号下面下面已经写过不少篇分布式事务相关的文章了,可是依旧没有将其彻底剖析。在以前的文章中我也屡次提到咱们可使用消息队列来实现咱们的分布式事务,可是大多都是一笔带过,不少读者都对这一块产生了不少疑问,但愿读完这篇文章能让你理解如何用消息队列实现分布式事务。数据库

固然首先要回顾一下咱们的一些基本概念:网络

CAP

CAP定理,又被叫做布鲁尔定理。对于设计分布式系统来讲(不只仅是分布式事务)的架构师来讲,CAP就是你的入门理论。架构

  • C (一致性):对某个指定的客户端来讲,读操做能返回最新的写操做。对于数据分布在不一样节点上的数据上来讲,若是在某个节点更新了数据,那么在其余节点若是都能读取到这个最新的数据,那么就称为强一致,若是有某个节点没有读取到,那就是分布式不一致。
  • A (可用性):非故障的节点在合理的时间内返回合理的响应(不是错误和超时的响应)。可用性的两个关键一个是合理的时间,一个是合理的响应。合理的时间指的是请求不能无限被阻塞,应该在合理的时间给出返回。合理的响应指的是系统应该明确返回结果而且结果是正确的,这里的正确指的是好比应该返回50,而不是返回40。
  • P (分区容错性):当出现网络分区后,系统可以继续工做。打个比方,这里个集群有多台机器,有台机器网络出现了问题,可是这个集群仍然能够正常工做。

熟悉CAP的人都知道,三者不能共有,若是感兴趣能够搜索CAP的证实,在分布式系统中,网络没法100%可靠,分区实际上是一个必然现象,若是咱们选择了CA而放弃了P,那么当发生分区现象时,为了保证一致性,这个时候必须拒绝请求,可是A又不容许,因此分布式系统理论上不可能选择CA架构,只能选择CP或者AP架构。分布式

对于CP来讲,放弃可用性,追求一致性和分区容错性,咱们的zookeeper其实就是追求的强一致。线程

对于AP来讲,放弃一致性(这里说的一致性是强一致性),追求分区容错性和可用性,这是不少分布式系统设计时的选择,后面的BASE也是根据AP来扩展。设计

顺便一提,CAP理论中是忽略网络延迟,也就是当事务提交时,从节点A复制到节点B,可是在现实中这个是明显不可能的,因此总会有必定的时间是不一致。同时CAP中选择两个,好比你选择了CP,并非叫你放弃A。由于P出现的几率实在是过小了,大部分的时间你仍然须要保证CA。就算分区出现了你也要为后来的A作准备,好比经过一些日志的手段,是其余机器回复至可用。3d

BASE

BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写。是对CAP中AP的一个扩展日志

基本可用:分布式系统在出现故障时,容许损失部分可用功能,保证核心功能可用。 软状态:容许系统中存在中间状态,这个状态不影响系统可用性,这里指的是CAP中的不一致。 最终一致:最终一致是指通过一段时间后,全部节点数据都将会达到一致。code

BASE解决了CAP中理论没有网络延迟,在BASE中用软状态和最终一致,保证了延迟后的一致性。BASE和 ACID 是相反的,它彻底不一样于ACID的强一致性模型,而是经过牺牲强一致性来得到可用性,并容许数据在一段时间内是不一致的,但最终达到一致状态。

事务消息

咱们的全部事务消息均可以看做是BASE模型的实现。在业界中有事务消息功能比较有表明性的就是阿里开源的RocketMQ和去哪儿开源的QMQ,他们两个消息队列都实现了事务消息功能,可是实现的方式却各有不一样,接下来也会分别剖析这两个消息队列是如何实现事务消息。

2. RocketMQ-事务消息

RocketMQ事务消息究竟是怎么一回事呢?

基本流程以下: 第一阶段Prepared消息,会拿到消息的地址。 第二阶段执行本地事务。 第三阶段经过第一阶段拿到的地址去访问消息,并修改状态。消息接受者就能使用这个消息。 若是确认消息失败,在RocketMq Broker中提供了定时扫描没有更新状态的消息,若是有消息没有获得确认,会向消息发送者发送消息,来判断是否提交,在rocketmq中是以listener的形式给发送者,用来处理。

若是确认消息失败,在RocketMq Broker中提供了定时扫描没有更新状态的消息,若是有消息没有获得确认,会向消息发送者发送消息,来判断是否提交,在rocketmq中是以listener的形式给发送者,用来处理。

若是消费超时,则须要一直重试,消息接收端须要保证幂等。若是消息消费失败,这个就须要人工进行处理,由于这个几率较低,若是为了这种小几率时间而设计这个复杂的流程反而得不偿失

这个图你们想必再其余地方已经看见过不少次了,不少时候从看这个图只能只知其一;不知其二,那接下来看看代码是如何实现的吧。

2.1 使用事务消息

在RocketMQ的事务消息中有个很重要的监听器叫TransactionListener,咱们须要实现他

其中有两个方法:

  • executeLocalTransaction:顾名思义执行咱们的本地事务方法,通常来讲咱们的本地事务方法是由上层的业务顺序推动调用,可是在rocketMQ的事务消息中是须要由Listener来进行驱动,若是要使用RocketMQ的事务消息须要对咱们的业务进行必定的改造。而且这里还须要注意的是,咱们在事务中还须要保存消息的事务ID和当前事务的对应关系。

  • checkLocalTransaction:根据咱们以前的事务ID来检查咱们的本地事务状态,这里的状态有三种: 事务消息共有三种状态,提交状态、回滚状态、中间状态:

    • TransactionStatus.CommitTransaction: 提交事务,它容许消费者消费此消息。
    • TransactionStatus.RollbackTransaction: 回滚事务,它表明该消息将被删除,不容许被消费。
    • TransactionStatus.Unknown: 中间状态,它表明须要检查消息队列来肯定状态。返回这个状态的时候RocketMQ会进行重试检查,为了防止频繁检查,默认将单个消息的检查次数限制为15 次。

对于咱们的消息发送有以下代码:

咱们发如今代码中咱们将咱们以前的listener以及一个线程池来和咱们的producer进行绑定,这里线程池的做用是咱们checkLocalTransaction所使用的线程池。

2.2 实现原理

2.2.1 客户端

这里的代码比较简单,主要分下面几个步骤

  • Step 1: 先发送消息至Broker.
  • Step 2: 根据发送的结果,判断是否执行本地事务,若是发送成功,则执行本地事务。
  • Step 3: 记录本地事务状态,这里的状态也就是上面咱们所讲的提交事务,回滚事务,中间状态三个状态。
  • Step 4: 结束事务,根据本地事务状态决定是提交或者回滚。

对于checkLocalTransaction: 在RocketMQ中会接收RocketMQ-Broker发送的CHECK_TRANSACTION_STATE请求,来执行检查本地事务状态。

2.3.1 服务端

在Broker上会对事务消息进行特殊判断:

若是是事务消息那么就须要走prepareMessage这个逻辑,prepareMessage这个逻辑以下:

主要是将当前消息的topic替换成RMQ_SYS_TRANS_HALF_TOPIC。咱们的一阶段发送半消息到这里就完成了,接下来就是Broker处理咱们事务的commit或者rollback: 图中红色方框表示咱们的核心步骤,对于commit的一共有三步:

  • 获取须要commit的半消息
  • 将消息发送到原来的topic
  • 删除半消息

对于rollback一共有两步:

  • 获取须要rollback的半消息
  • 删除半消息

对于获取消息这个比较简单,经过记录的offset直接查询就好,对于将消息发送到原来的topic逻辑基本上能够复用,这里要重点讨论的是如何删除半消息,咱们都知道RocketMQ是顺序写入,咱们不可能去真正的删除消息,那么就只能依靠一些其余的途径,咱们能够想到消息消费了以后,只要offset不重置,这个消息就不会再被消费,那么其实就实现了删除的功能。RocketMQ也是经过这样的思路,本身实现了一个消费者,去消费RMQ_SYS_TRANS_HALF_TOPIC这个Topic,若是消息须要删除的话消费了以后就不须要作其余操做,若是不须要删除的话,消费了以后又会从新投递。

那其实核心就在于怎么去记录半消息是否应该删除呢?对于这个问题RocketMQ采用了新的TopicRMQ_SYS_TRANS_OP_HALF_TOPIC来保存半消息是否删除,其实在上面的删除半消息的流程中其实也是对RMQ_SYS_TRANS_OP_HALF_TOPIC投递了一个op_message,而后由后台任务去进行操做。

整个流程原理图以下面所示:

  • Step1: 发送事务消息,这里也叫作halfMessage,会将Topic替换为HalfMessage的Topic。
  • Step2: 发送commit或者rollback,若是是commit这里会查询出以前的消息,而后将消息复原成原Topic,而且发送一个OpMessage用于记录当前消息能够删除。若是是rollback这里会直接发送一个OpMessage删除。
  • Step3: 在Broker有个处理事务消息的定时任务,定时对比halfMessage和OpMessage,若是有OpMessage且状态为删除,那么该条消息一定commit或者rollback,因此就能够删除这条消息。
  • Step4: 若是事务超时(默认是6s),尚未opMessage,那么颇有可能commit信息丢了,这里会去反查咱们的Producer本地事务状态。
  • Step5: 根据查询出来的信息作Step2。

2.3 小结

上面已经讲了如何使用RocketMQ的事务消息和实现原理,想必你们已经对RocketMQ事务消息有本身的认识了。可是RocketMQ的事务消息目前在个人一些业务实战中是历来没有使用过的,主要缘由有几个方面:

  • 改形成本大,好比一个下单的操做,建立订单的本地事务通常来讲是同步进行的,建立以后会获取到订单ID,可是在RocketMQ中这个本地事务变成了在Listener里面的操做了,那么就不能经过返回参数来进行,只能经过一些其余方法来完成这个业务逻辑,好比ThreadLocal等等。
  • 须要记录TransactionId和本地事务状态的关系
  • 只支持单个事务消息,若是我建立订单须要发送10种消息,若是都想保持事务一致,那么RocketMQ是不支持的。

综上所述,RocketMQ的事务消息在我看来的确属于比较鸡肋,很难去适应于老业务。那么怎么去接下来说一下QMQ的事务消息的解决方案,看看这种方案可否解决咱们所说的这种问题呢?

3. QMQ事务消息

QMQ的事务消息没有RocketMQ那么的复杂,对于消息中间件的自己改造是很小的,其依赖了数据库自身的本地事务,好比一个建立订单,须要发送两种消息,分别是A和B,那么有以下的伪代码:

begin transaction;
createOrder();
commit transaction;

sendMessageA();
snedMessageB();

这个时候咱们发现消息A和消息B都在事务以外,其一致性得不到保证,那么其实咱们发送消息的时候不必定要真正的和消息中间件打交道,咱们能够作一个本地的存储,保存咱们的消息:

begin transaction;
createOrder();
saveMessageA();
saveMessageB();
commit transaction;
// 发送消息
sendMessageA();
snedMessageB();

能够看见其实咱们只是增长两个保存消息的操做,那么咱们是如何保证一致性呢,若是发送MessageA的时候挂了,那么咱们就能够经过定时任务去拉去咱们数据库中保存的并无发送的消息,而后再次进行发送。

其实这种方法一样的能够扩展至其余的消息队列,由于对于消息中间件自己是没有入侵的,若是RocketMQ或者Kafka也想使用这种方法来保证事务消息,也是能够的。

咱们来看看这种方法可否解决RocketMQ事务消息带来的问题呢?

  • 改形成本,只须要改造一次Client,在QMQ中重写了spring的TransactionSynchronization,能够直接把代码简化成以下面所示:
begin transaction;
createOrder();
sendMessageA();
snedMessageB();
commit transaction;

这里的send其实内部逻辑是saveMessage,在commit以后会自动进行发送,而且后台有定时任务会补偿发送。

  • 不须要额外作transactionId和message的绑定
  • 支持发送多个事务消息

RocketMQ事务消息带来的问题基本能够解决,可是其一样也有缺点,由于其引入了额外的数据库写,若是事务消息较多,那么就会多出不少写数据库的操做,对于响应时间比较敏感的服务须要仔细考虑

4.总结

介绍了两种事务消息,对于我我的而言,QMQ实现的方案能更加适应于大多数业务。可是这里要注意事务消息并非全部的分布式一致性都能使用,事务消息使用的场景只能是发出这个消息就能表明这个操做成功的场景,什么意思呢?举个例子,好比咱们支付的时候会扣积分,扣券等等,若是我发一个扣积分的消息能表明必定成功吗?这个确定是不行的,由于用户的积分可能不够,就会致使扣除失败。若是是发送一个赠送积分的消息那么就能够表明成功,由于赠送积分是属于加法,并无太多的限制。

若是发现事务消息不能很好的知足的知足业务场景,那么你就能够考虑其余的一些事务策略,好比TCC,saga等,这些在我以前的文章都有讲述。

若是你们以为这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O:

相关文章
相关标签/搜索