微服务倡导将复杂的单体应用拆分为若干个功能简单、松耦合的服务,而于此同时就会引入多个服务之间的分布式事务的问题。git
众所周知,数据库能实现本地事务,也就是说在同一个数据库中,能够保证事务的原子性,就是所有成功或者失败,上篇文章也写过简单的多数据源事务的解决方案(相似2PC)github
但如今的系统每每采用微服务架构,业务系统拥有独立的数据库,所以就出现了跨多个数据库的事务需求,这种事务即为“分布式事务”。redis
针对这样的问题通常经常使用的方案有:数据库
基于可靠消息服务的分布式事务,我本身实现了一个基于rabbitmq的分布式事务中间件,shine-mq架构
一开始原本是想用来封装mq的操做方便使用,后续迭代增长了分布式事务的功能。下面就来介绍下这个中间件:并发
setPublisherConfirms(true)
以及实现setConfirmCallback
的回调来实现消息中间件持久化应答服务A。这以后对于服务A来讲就能够删除以前的ready记录和去处理其余任务了。上述是整个流程,服务A完成任务A后,到任务B执行完成之间,会存在必定的时间差。在这个时间差内,整个系统处于数据不一致的状态,但这短暂的不一致性是能够接受的,由于通过短暂的时间后,系统又能够保持数据一致性,知足BASE理论。运维
BASE理论:异步
上面的是一个比较理想的流程,可是真正的环境会有不少突发状况,好比任务A处理失败,那么须要进入回滚流程分布式
由于任务A的异常对于服务A是能够直接捕获的,回滚异常后删除prepare记录,服务A删除以后即可以认为回滚已经完成,即可以去作其余的事情。微服务
而该消息没有投递到消息中间件,则服务B没有影响。此时系统又处于一致性状态,由于任务A和任务B都没有执行。
Coordinator提供了接口能够本身来实现,我默认实现的方式是用redis。若要使用其余方式能够自行实现接口。
上图表现的是发送ready记录的时候,失败了。这时候对于服务A是会收到异常或者收不到应答,这时候能够直接将以前的任务A进行回滚,任务A在回滚的时候会触发删除ready的操做。一样若是异常是发生在发送prepare的状况下,这时候服务A还没执行任务也不会有影响。
分析完服务A,Coordinator和消息中间件之间的一些状况后,如今分析下消息中间件和服务B之间的一些特殊状况。
当消息成功发布到消息中间件以后,服务A就能够作本身的事情去了,消息中间件会保证消息能成功投递到服务B。这个就是消息中间件在消息投递状况下的可靠性保证,具体流程是消息中间件向下游系统投递完消息后便进入阻塞等待状态,下游系统便当即进行任务的处理,任务处理完成后便向消息中间件返回应答。消息中间件收到确认应答后便认为该事务处理完毕!若是消息在投递过程当中丢失,或消息的确认应答在返回途中丢失,那么消息中间件在等待确认应答超时以后就会从新投递,直到下游消费者返回消费成功响应为止。
这之间能够设置消息重试的次数和时间间隔,若是一直失败这时候就会用到死信队列。具体看下图:
当消息一直没法被正常消费,超过设置的重试阈值就会投递到死信队列,死信队列的exchange和routeKey默认是@DistributedTrans
中设置的值。
经过消费死信队列的消息来处理这种异常状况(能够设置短信或邮箱提醒,人工介入),这里暂时不实现服务A的回滚,由于让服务A事先提供回滚接口,这无疑增长了额外的开发成本,业务系统的复杂度也将提升。对于一个业务系统的设计目标是,在保证性能的前提下,最大限度地下降系统复杂度,从而可以下降系统的运维成本。
最后整理一下整个中间件的设计思路
上面已经分析了一些异常状况,对于下游服务和消息中间件的原子性,咱们能够经过消息中间件投递的可靠性来保证(就是ACK模式,失败或未收到应答进行重试)。 那么咱们要实现分布式事务,剩下的就是要保证上游服务执行的任务和向消息中间件投递消息这2个操做的原子性。
这时候通常就会有两种方案,同步和异步通讯。经过以前的时序图,很显然上游系统和消息中间件之间采用的是异步通讯,也就是说当上游服务提交完消息后即可以去作别的事情,接下来提交、回滚就彻底交给消息中间件来完成,而且彻底信任消息中间件,认为它必定能正确地完成事务的提交或回滚。这主要是为了提升系统并发度,另外业务系统直接和用户打交道,用户体验尤其重要,所以这种异步通讯方式可以极大程度地下降用户等待时间。
Rabbitmq其实有提供事务机制,使用txSelect(), txCommit()以及txRollback()来实现,经过测试抓包发现,Tx.Commit后直接发送Tx.Commit-Ok,二者之间的时间间隔会比较长,简单的测试能到300ms,这是很耗时的。因此我没有用这种方式,而是引入一个Coordinator(协调者)来实现。
另外还有一个比较关键的daemon(守护线程),是处理在Coordinator一些错误超时的记录(相似Rocketmq的超时询问机制)。因此服务A除了实现正常的业务流程外,还需提供一个事务询问的接口,供Coordinator调用,来保障服务A在执行任务出现宕机的状况。当有prepare超时就会触发访问这个回查接口,该接口会返回三种结果:
而超时的ready的消息,就直接捞起发送到消息中间件,由于只要是ready消息持久化到协调者,那就说明服务A的任务已经完成。
这样就能保证上游服务和消息中间件的原子性了(具体能够看分布式事务:消息可靠发送),再经过消息中间件可靠的投递结合下游服务,就完成了分布式事务。
若是对你有帮助,那就帮忙点个星星把 ^.^
github地址:github.com/7le/shine-m…