自从小王玩起了微服务,发现微服务果真很强大,好处真是太多,心中暗喜,然而,却也遇到了分布式中最棘手的问题:分布式事务。小王遍访各路神仙,也无个完美开源解决方案,固然,也有些实际可行的手法,虽不算完美,但也可拿来研究一番,那今天咱们也来讲说分布式事务。前端
分布式事务的起源,即因各服务是独立的,各自使用独立的DB,那本地事务能够保证事务式执行,但其余服务上关联的事务呢?以前Dubbo学习系列之六(微服务架构实战)项目中铺垫的最大bug在于:若是订单付款中异常,本地订单数据将会自动回滚,然而库存服务和物流服务,收到通讯消息后,就执行了,并无同步回滚!这就是典型的分布式事务。linux
分布式事务的解决思路有二:其一,强一致性方案,2PC(两阶段提交)和TCC(try-confirm-cancel)都是对全部相关的事务作同步协调处理,先尝试准备资源,锁住数据,若是均可执行,才都开始执行事务,执行后再确认,若是有一个事务失败,将所有回滚,这个方案缺点太多,执行效率低,回滚代价高,锁住资源多,对各业务侵入严重,必须人工进行各类事件编码,除了如金融类强调一致性的场景,并不推荐使用。其二,弱一致性(最终一致性)方案,基于可靠性MQ的异步,根据BASE理论以及CAP理论,即造成最终一致性。本地事务先执行,而后发送消息至其余服务,并确保消费者服务执行相关事务。举例:只要订单付款成功,那么就必定会减掉用户的帐户金额。下面就动手实现这个例子吧!git
准备:github
Idea201902/JDK11/ZK3.5.5/Gradle5.4.1/RabbitMQ3.7.13/Mysql8.0.11/Lombok0.26/Erlang21.2/postman7.5.0/Redis3.2/Rocket4.5.2sql
难度:新手--战士--老兵--大师数据库
目标:1.模拟商城系统,订单付款后,经过RocketMQ消息机制实现分布式事务方法 2.使用RabbitMQ延迟队列实现订单过时取消apache
步骤:编程
1.项目架构及代码基础设施,见往期文章,总体不变,只作增量,若有重大修改,会加以说明。网络
2.Rocket的安装见文章最后部分后记第2点。架构
3.RocketMQ事务实现逻辑:“两段提交,回查确认”,首先msg发给Rocket,但msg并没马上发送给下游消费者,而是等待发起者的事务执行结果,若是执行无误,则Rocket才将消息发送到下游,若是事务执行失败,则消息返回,下游无感知。
4.先实现第二个目标--使用RabbitMQ延迟队列实现订单过时取消,先看一个总体逻辑图:
exchangeA-->queueC是“订单到物流线”,exchangeA-->queueA-->exchangeB-->queueB是“过时订单取消线”。
5.订单到物流线参考往期文章--Dubbo学习系列之六(微服务架构实战),有微调,过时订单取消线,看几个核心点:
(队列属性设置见前面文章)com.biao.mall.business.impl.DubboOrderServiceImpl中saveOrder(orderBO)方法中的最后添加消息的逻辑,即将订单放入过时订单取消线,注意全部订单都会放入,而后在取消处理逻辑中判断是否取消,由于不能选择性从queue取出特定消息:
再定义消息消费者,处理订单消息:com.biao.mall.business.service.DLXMsgConsumer
再定义一个订单取消逻辑:
com.biao.mall.business.impl.DubboOrderServiceImpl中的cancelOrder(String orderId)方法:
在这里判断订单是否须要取消,已经付款的直接返回。须要取消的而后更新订单状态,更新库存:
6.测试流程:启动Redis-->ZK-->Stock-->Business-->Logistics
模拟一个订单:
数据库会生成一个订单:
队列中会生成一个消息:
一段时间过时后,自动进入DLX处理,DB中能够看到最后订单被取消,置为expired,这个过程当中同时也可观察到dubbo_stock表,锁定库存数变化又回退的过程。
7.再来看第一个逻辑实现过程,这个稍微复杂一点:现改变下分析顺序,咱们按实际逻辑处理顺序来:
controller前端传入后,进入com.biao.mall.business.impl.DubboOrderServiceImpl这个核心逻辑来处理:
这里使用了线程池,充分利用硬件,保证效率。注意这里必须使用事务型消息生产者:TransactionMQProducer,其绑定了一个事务监听器:TransactionListenerImpl,这个是我认为是事务MQ的核心,监听器能够根据处理事务结果决定下一步处理,能够看到msg/orderId/this做为实参传入,为啥这样设计?有点怪异的感受。看看传入后的地方或许有答案:com.biao.mall.business.impl.DubboOrderServiceImpl
这里我特地构造了一个用于传参的构造函数,对应上一图的实参,答案就是第一个方法executeLocalTransaction是用于处理本地事务的,即业务逻辑要放这里!并且特别注意,我使用的是orderService中构造一个独立的payOrderTrans方法,实际上是上一版本中payOrder方法的切割,而后在这里引入使用,且这里的arg不是对应形参Object arg!!执行正常即返回对应的状态常量。
第二个方法checkLocalTransaction是用于检查本地事务执行的结果,即对第一个方法的跟踪,并提供回调,最终结果返回到DubboOrderServiceImpl中。补充说明下三个状态,事务执行结果就是这三个之一:
UNKNOW 时,程序会自动不断check状态,默认15次(可指定)以后报异常,COMMIT表示执行完成,向下游发送消息,ROLLBACK即回滚,下游无感知;我注释了三种状态的check次数。
7.再回到DubboOrderServiceImpl,其中的transactionMQProducer经过com.biao.mall.common.conf.RocketTransMqProducerConf注入,
注意这里必须使用事务型Producer:
8.同理,消息consumer注入在com.biao.mall.common.conf.RocketMqConsumerConf,注意选择的是Push型消费者,而后加入了一个监听器(里面包含真正的消费业务处理逻辑),最后是设置订阅的主题和标签。
9.再进一步,看这个的消费者监听器实现com.biao.mall.common.component.RocketConsumeMsgListener,这里有个核心方法consumeMessage,处理消息,业务逻辑我暂时作个消息展现,后续再实现,要特别注意这个的幂等处理,因消息可能会重复消费。我亲测,若是开另一个客户端作消费,就会收到重复消息,分布式多实例下,非广播消息是竞争消费,广播型消息全消费,因此幂等必需要处理!
10.回到DubboOrderServiceImpl中,rocketMQService对象实现,两个方法,一个发消息,一个收消息:
11.为了简化并完备逻辑,我直接在DubboOrderServiceImpl中作消息消费处理。即//扣款 注释下。
12. 测试:启动ZK-->Redis-->Rocket-->stock-->business-->logistics
postman先生成一个未付款订单:
模拟付款:
控制台的输出:rocketMQ接收,并模拟扣款处理:
最后订单数据,paid为1,完成!其余表数据变化就不展现了。
13.代码结构集体照:
14.项目代码地址:其中的day11
https://github.com/xiexiaobiao/dubbo-project.git
1.MQ在微服务中至关之重要,因此出场比较多,这里我简要的作了个几款主流MQ的比较,之因此我这里同时使用了Rabbit和Rocket,折腾一把,是为了实际的体会与学习使用之目的,后续文章确定会加入kafka。
2.Rocket的安装要点:A.下载bin-release.zip版本,B.要配置ROCKETMQ_HOME环境变量 C.若是Windows+JDK11环境,要修改runserver.cmd/runbroker.cmd文件,因JDK兼容性问题,以下命令
会提示xxx命令没法识别,修改时直接去掉-XX xxx便可,详细可参考网络资源 D.Rocket的网页管理工具,需本身打包生成jar,项目是https://github.com/apache/rocketmq-externals.git中的console子项目,具体打包步骤参考官网介绍,很详细,但JDK11时要额外添加依赖包,
3.RocketMQ4.5.2+JDK11在window下目前兼容性不好,官方提供的启动脚本不能正常启动,因启动脚本是根据JDK1.8开发,修改启动脚本,能启动RocketMQ后,console管理工具启动也没法链接RocketMQ,从新安装JDK1.8,一切正常!整了一天未果,但我比较钻尖,就是要使用JDK11,最后只能曲线救国,在linux上安装RocketMQ和console,成功!生成console的jar时,如下警告,能够忽略,我尝试添加netty最新的依赖,能够消除该警告。
开启producer,遇到这个错误:
关闭linux防火墙,或者开相应的端口!linux下安装RocketMQ方法在此不表,稍微复杂,但值得操做。
4.Windows下启动Rocket遇到的错误示例(JDK11):
Windows下启动Rocket正常的状态(JDK1.8):
linux下正常启动(JDK11),也要修改启动脚本,参考个人另外一篇:
linux下JDK11和RocketMQ安装,启动命令及成功结果以下:
5.linux下安装RocketMQ,详细见个人另外一篇:linux下JDK11和RocketMQ安装
6.编程就是“思考十分钟,编写一分钟”,不少设计思惟很重要,好比此项目中的“过时订单取消线”,若是看官说,没付款且过时的消息才进入DLX,这个回答就是错误的!由于queue没法跳跃式pop消息。再好比,MQ事务中各业务逻辑的位置是有约定的,就要作转换设计,业务逻辑如何切割整合,各方的调用等。
往期文章导航:
Dubbo学习系列之四(MybaitsPlus+Druid使用)
欢迎关注个人 公众号,各类技术分析使用实战: