消息队列之事务消息,RocketMQ 和 Kafka是如何作的?

每一个时代,都不会亏待会学习的人。java

你们好,我是 yes。web

今天咱们来谈一谈消息队列的事务消息,一提及事务相信你们都不陌生,脑海里蹦出来的就是 ACID。数据库

一般咱们理解的事务就是为了一些更新操做要么都成功,要么都失败,不会有中间状态的产生,而 ACID 是一个严格的事务实现的定义,不过在单体系统时候通常都不会严格的遵循 ACID 的约束来实现事务,更别说分布式系统了。微信

分布式系统每每只能妥协到最终一致性,保证数据最终的完整性和一致性,主要缘由就是实力不容许...由于可用性为王。并发

并且要保证彻底版的事务实现代价很大,你想一想要维护这么多系统的数据,不容许有中间状态数据能够被读取,全部的操做必须不可分割,这意味着一个事务的执行是阻塞的,资源是被长时间锁定的。异步

在高并发状况下资源被长时间的占用,就是致命的伤害,举一个有味道的例子,如厕高峰期,好了懂得都懂。编辑器

对了, ACID 是什么还不太清楚的同窗,赶忙去查一查,这里我就不展开说了。分布式

分布式事务

那说到分布式事务,常见的有 2PC、TCC 和事务消息,这篇文章重点就是事务消息,不过 2PC 和 TCC 我稍微提一下。高并发

2PC

2PC 就是二阶段提交,分别有协调者和参与者两个角色,二阶段分别是准备阶段和提交阶段。源码分析

准备阶段就是协调者向各参与者发送准备命令,这个阶段参与者除了事务的提交啥都作了,而提交阶段就是协调者看看各个参与者准备阶段都 o 不 ok,若是有 ok 那么就向各个参与者发送提交命令,若是有一个不 ok 那么就发送回滚命令。

这里的重点就是 2PC 只适用于数据库层面的事务,什么意思呢?就是你想在数据库里面写一条数据同时又要上传一张图片,这两个操做 2PC 没法保证两个操做知足事务的约束。

并且 2PC 是一种强一致性的分布式事务,它是同步阻塞的,即在接收到提交或回滚命令以前,全部参与者都是互相等待,特别是执行完准备阶段的时候,此时的资源都是锁定的状态,假若有一个参与者卡了好久,其余参与者都得等它,产生长时间资源锁定状态下的阻塞

整体而言效率低,而且存在单点故障问题,协调者是就是那个单点,而且在极端条件下存在数据不一致的风险,例如某个参与者未收到提交命令,此时宕机了,恢复以后数据是回滚的,而其余参与者其实都已经执行了提交事务的命令了。

TCC

TCC 能保证业务层面的事务,也就是说它不只仅是数据库层面,上面的上传图片这种操做它也能作。

TCC 分为三个阶段 try - confirm - cancel,简单的说就是每一个业务都须要有这三个方法,先都执行 try 方法,这一阶段不会作真正的业务操做,只是先占个坑,什么意思呢?好比打算加 10 个积分,那先在预添加字段加上这 10 积分,这个时候用户帐上的积分实际上是没有增长的。

而后若是都 try 成功了那么就执行 confirm 方法,你们都来作真正的业务操做,若是有一个 try 失败了那么你们都执行 cancel 操做,来撤回刚才的修改。

能够看到 TCC 其实对业务的耦合性很大,由于业务上须要作必定的改造才能完成这三个方法,这其实就是 TCC 的缺点,而且 confirm 和 cancel 操做要注意幂等,由于到执行这两步的时候没有退路,是务必要完成的,所以须要有重试机制,因此须要保证方法幂等。

事务消息

事务消息就是今天文章的主角了,它主要是适用于异步更新的场景,而且对数据实时性要求不高的地方

它的目的是为了解决消息生产者与消息消费者的数据一致性问题。

好比你点外卖,咱们先选了炸鸡加入购物车,又选了瓶可乐,而后下单,付完款这个流程就结束了。

而购物车里面的数据就很适合用消息通知异步删除,由于通常而言咱们下完单不会再去点开这个店家的菜单,并且就算点开了购物车里还有这些菜品也没有关系,影响不大。

咱们但愿的就是下单成功以后购物车的菜品最终会被删除,因此要点就是下单和发消息这两个步骤要么都成功要么都失败

RocketMQ 事务消息

咱们先来看一下 RocketMQ 是如何实现事务消息的。

RocketMQ 的事务消息也能够被认为是一个两阶段提交,简单的说就是在事务开始的时候会先发送一个半消息给 Broker。

半消息的意思就是这个消息此时对 Consumer 是不可见的,并且也不是存在真正要发送的队列中,而是一个特殊队列。

发送完半消息以后再执行本地事务,再根据本地事务的执行结果来决定是向 Broker 发送提交消息,仍是发送回滚消息。

此时有人说这一步发送提交或者回滚消息失败了怎么办?

影响不大,Broker 会定时的向 Producer 来反查这个事务是否成功,具体的就是 Producer 须要暴露一个接口,经过这个接口 Broker 能够得知事务到底有没有执行成功,没成功就返回未知,由于有可能事务还在执行,会进行屡次查询。

若是成功那么就将半消息恢复到正常要发送的队列中,这样消费者就能够消费这条消息了。

咱们再来简单的看下如何使用,我根据官网示例代码简化了下。

能够看到使用起来仍是很简便直观的,无非就是多加个反查事务结果的方法,而后把本地事务执行的过程写在 TransationListener 里面。

至此 RocketMQ 事务消息大体的流程已经清晰了,咱们画一张总体的流程图来过一遍,其实到第四步这个消息要么就是正常的消息,要么就是抛弃什么都不存在,此时这个事务消息已经结束它的生命周期了。

RocketMQ 事务消息源码分析

而后咱们再从源码的角度来看看究竟是怎么作的,首先咱们看下sendMessageInTransaction 方法,方法有点长,不过没有关系结构仍是很清晰的。

流程也就是咱们上面分析的,将消息塞入一些属性,标明此时这个消息仍是半消息,而后发送至 Broker,而后执行本地事务,而后将本地事务的执行状态发送给 Broker ,咱们如今再来看下 Broker 究竟是怎么处理这个消息的

在 Broker 的 SendMessageProcessor#sendMessage 中会处理这个半消息请求,由于今天主要分析的是事务消息,因此其余流程不作分析,我大体的说一下原理。

简单的说就是 sendMessage 中查到接受来的消息的属性里面MessageConst.PROPERTY_TRANSACTION_PREPARED 是 true ,那么能够得知这个消息是事务消息,而后再判断一下这条消息是否超过最大消费次数,是否要延迟,Broker 是否接受事务消息等操做后,将这条消息真正的 topic 和队列存入属性中,而后重置消息的 topic 为RMQ_SYS_TRANS_HALF_TOPIC,而且队列是 0 的队列中,使得消费者没法读取这个消息。

以上就是总体处理半消息的流程,咱们来看一下源码。

就是来了波狸猫换太子,其实延时消息也是这么实现的,最终将换了皮的消息入盘。

Broker 处理提交或者回滚消息的处理方法是 EndTransactionProcessor#processRequest,咱们来看一看它作了什么操做。

能够看到,若是是提交事务就是把皮再换回来写入真正的topic所属的队列中,供消费者消费,若是是回滚则是将半消息记录到一个 half_op 主题下,到时候后台服务扫描半消息的时候就依据其来判断这个消息已经处理过了。

那个后台服务就是 TransactionalMessageCheckService 服务,它会定时的扫描半消息队列,去请求反查接口看看事务成功了没,具体执行的就是TransactionalMessageServiceImpl#check 方法。

我大体说一下流程,这一步骤其实涉及到的代码不少,我就不贴代码了,有兴趣的同窗自行了解。不过我相信用语言也是能说清楚的。

首先取半消息 topic 即RMQ_SYS_TRANS_HALF_TOPIC下的全部队列,若是还记得上面内容的话,就知道半消息写入的队列是 id 是 0 的这个队列,而后取出这个队列对应的 half_op 主题下的队列,即 RMQ_SYS_TRANS_OP_HALF_TOPIC 主题下的队列。

这个 half_op 主要是为了记录这个事务消息已经被处理过,也就是说已经得知此事务消息是提交的仍是回滚的消息会被记录在 half_op 中。

而后调用 fillOpRemoveMap 方法,从 half_op 取一批已经处理过的消息来去重,将那些没有记录在 half_op 里面的半消息调用 putBackHalfMsgQueue 又写入了 commitlog 中,而后发送事务反查请求,这个反查请求也是 oneWay,即不会等待响应。固然此时的半消息队列的消费 offset 也会推动。

而后producer中的 ClientRemotingProcessor#processRequest 会处理这个请求,会把任务扔到 TransactionMQProducer 的线程池中进行,最终会调用上面咱们发消息时候定义的 checkLocalTransactionState 方法,而后将事务状态发送给 Broker,也是用 oneWay 的方式。

看到这里相信你们会有一些疑问,好比为何要有个 half_op ,为何半消息处理了还要再写入 commitlog 中别急听我一一道来。

首先 RocketMQ 的设计就是顺序追加写入,因此说不会更改已经入盘的消息,那事务消息又须要更新反查的次数,超过必定反查失败就断定事务回滚。

所以每一次要反查的时候就将之前的半消息再入盘一次,而且往前推动消费进度。而 half_op 又会记录每一次反查的结果,不管是提交仍是回滚都会记录,所以下一次还循环处处理此半消息的时候,能够从 half_op 得知此事务已经结束了,所以就被过滤掉不须要处理了。

若是获得的反查的结果是 UNKNOW,那 half_op 中也不会记录此结果,所以还能再次反查,而且更新反查次数。

到如今整个流程已经清晰了,我再画个图总结一下 Broker 的事务处理流程。

Kafka 事务消息

Kafka 的事务消息和 RocketMQ 的事务消息又不同了,RocketMQ 解决的是本地事务的执行和发消息这两个动做知足事务的约束。

而 Kafka 事务消息则是用在一次事务中须要发送多个消息的状况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败,就像下面代码所演示的。

Kafka 的事务基本上是配合其幂等机制来实现 Exactly Once 语义的,因此说 Kafka 的事务消息不是咱们想的那种事务消息,RocketMQ 的才是。

讲到这我就想扯一下了,说到这个 Exactly Once 其实不太清楚的同窗很容易会误解。

咱们知道消息可靠性有三种,分别是最多一次、刚好一次、最少一次,以前在消息队列连环问的文章我已经提到了基本上咱们都是用最少一次而后配合消费者端的幂等来实现刚好一次。

消息刚好被消费一次固然咱们全部人追求的,可是以前文章我已经从各方面已经分析过了,基本上难以达到。

而 Kafka 竟说它能实现 Exactly Once?这么牛啤吗?这实际上是 Kafka 的一个噱头,你要说他错,他还真没错,你要说他对可是他实现的 Exactly Once 不是你心中想的那个 Exactly Once。

它的刚好一次只能存在一种场景,就是从 Kafka 做为消息源,而后作了一番操做以后,再写入 Kafka 中

那他是如何实现刚好一次的?就是经过幂等,和咱们在业务上实现的同样经过一个惟一 Id, 而后记录下来,若是已经记录过了就不写入,这样来保证刚好一次。

因此说 Kafka 实现的是在特定场景下的刚好一次,不是咱们所想的利用 Kafka 来发送消息,那么这条消息只会恰巧被消费一次

这其实和 Redis 说他实现事务了同样,也不是咱们心想的事务。

因此开源软件说啥啥特性开发出来了,咱们一味的相信,所以其每每都是残血的或者在特殊的场景下才能知足,不要被误导了,不能相信表面上的描述,还得详细的看看文档或者源码。

不过从另外一个角度看也无可厚非,做为一个开源软件确定是想更多的人用,我也没说谎呀,我文档上写的很清楚的,这标题也没骗人吧?

确实,好比你点进震惊xxxx标题的文章,人家也没骗你啥,他本身确实震惊的呢。

再回来谈 Kafka 的事务消息,因此说这个事务消息不是咱们想要的那个事务消息,其实不是今天的主题了,不过我仍是简单的说一下。

Kafka 的事务有事务协调者角色,事务协调者其实就是 Broker 的一部分。

在开始事务的时候,生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志中,而后生产者再发送真正想要发送的消息,这里 Kafka 和 RocketMQ 处理不同,Kafka 会像对待正常消息同样处理这些事务消息,由消费端来过滤这个消息

而后发送完毕以后生产者会向事务协调者发送提交或者回滚请求,由事务协调者来进行两阶段提交,若是是提交那么会先执行预提交,即把事务的状态置为预提交而后写入事务日志,而后再向全部事务有关的分区写入一条相似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,能够把消息放出来了。

最后协调者会向事务日志中再记一条事务结束信息,至此 Kafka 事务就完成了,我拿 confluent.io 上的图来总结一下这个流程。

最后

至此咱们已经知道了 RocketMQ 和 Kakfa 的事务消息全流程,能够看到 RocketMQ 的事务消息才是咱们想要的,固然你要是用的流式计算那么 Kakfa 的事务消息也是你想要的。

须要贴代码的文章其实很难受,这贴的多很差,贴的少又怕不清晰,真的难,若是以为文章不错记得点个在看哟。


我是 yes,从一点点到亿点点,咱们下篇见

本文分享自微信公众号 - yes的练级攻略(yes_java)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。