导读java
在以前的文章中咱们介绍了如何基于RocketMQ搭建生产级消息集群,以及2PC、3PC和TCC等与分布式事务相关的基本概念(没有读过的读者详见👇推荐阅读)。在这篇文章中咱们将介绍RocketMQ的事务消息相关的内容,并经过一些实践和你们一块儿来探索下事务消息如何解决分布式系统中的分布式事务问题。git
事务消息原理github
事务消息特性能够看做是两阶段协议的消息实现方式,用以确保在以消息中间件解耦的分布式系统中本地事务的执行和消息的发送,能够以原子的方式进行。spring
举个例子,以某互联网公司的用户余额充值为例,由于有充返活动(充值100元赠送20元),优惠比较大,用户Joe禁不住诱惑用支付宝向本身的余额帐户充值了100元,支付成功后Joe的余额帐户有了120元钱。数据库
而该公司的关于用户余额充值的系统设计是这样的:编程
在这个设计流程中,该公司经过自建支付系统完成用户Joe的支付宝扣款操做,成功后须要更新支付流水的状态,由于用户的余额帐户系统与支付系统之间经过MQ解耦了,因此支付系统在完成支付流水状态更新后须要经过发送MQ消息到消息中间件服务,而后用户余额系统做为消费者经过消息消费的方式完成用户余额的增长操做。springboot
这里有个问题:“支付系统如何确保这笔余额充值消息必定会成功发送到MQ,而且用户余额系统必定能处理成功呢”?若是支付系统在完成支付订单状态更新后,MQ消息发送失败或者用户余额系统消息处理失败的话,都会致使Joe支付扣款成功,而本身的余额帐户却没到帐的状况发生。服务器
为了解决这个问题,按照目前的系统设计是须要“支付系统-MQ服务-用户余额系统”三者的处理知足数据的一致性要求。例如,若是支付系统感知到消息发送失败后还能够进行从新投递,从而确保支付系统与用户余额数据的最终一致性。架构
而上述问题就是事务消息要解决的问题,在具体了解RocketMQ提供的事务消息机制以前,咱们先来看下在RocketMQ的早期版本不支持事务消息,或者由于历史缘由选择的消息中间件自己就不支持事务消息的状况下,一些大公司是怎么解决这个问题的?并发
早期为了实现基于MQ异步调用的多个服务间,业务逻辑执行要么一块儿成功、要么一块儿失败,具有事务特色,一般会采用可靠消息最终一致性方案,来实现分布式事务。仍是以Joe充值这件事来举例,可靠消息方案实现过程以下:
在可靠消息最终一致性方案中,为了实现分布式事务,须要确保上游服务本地事务的处理与MQ消息的投递具备原子性,也就是说上游服务本地事务处理成功后要确保消息必定要成功投递到MQ服务,不然消息就不该该被投递到MQ服务;一样,被成功投递到MQ服务的消息,也必定要被下游服务成功处理,不然就须要从新投递MQ消息。
为了实现双向的原子性,可靠消息服务须要对消息进行状态标记,与此同时还须要对消息进行状态检查,从而实现从新投递及消息状态的最终一致性。核心流程说明以下:
一、上游服务(支付系统)如何确保完成自身支付成功状态更新后消息100%的可以投递到下游服务(用户余额系统)指定的Topic中?
在这个流程中上游服务在进行本地数据库事务操做前,会先发送一个状态为“待确认”的消息至可靠消息服务,而不是直接将消息投递到MQ服务的指定Topic。可靠消息服务此时会将该消息记录到自身服务的消息数据库中(消息状态为->待确认),完成后可靠消息服务会回调上游服务表示收到了消息,大家能够进行本地事务的操做了。
以后上游服务就会开启本地数据库事务执行业务逻辑操做,这里支付系统就会将该笔支付订单状态更新为“已成功”。(注意,这里只是举个示例场景,在真正的实践中通常是不会把支付订单自己的状态与业务端回调放在一个事务流程中的,关于这部分的详细说明咱们在下面的场景说明中再讨论)。
若是上游服务本地数据库事务执行成功,则继续向可靠消息服务发送消息确认消息,此时可靠消息服务就会正式将消息投递到MQ服务,而且同时更新消息数据库中的消息状态为“已发送”。(注意,这里可靠消息服务更新消息状态与投递消息至MQ也必须是在一个原子操做中,即消息投递成功则必定要将消息状态更新为“已发送”,因此在编程的细节中,可靠消息服务通常会先更新消息状态,而后再进行消息投递,这样即便消息投递失败,也能够对消息状态进行回滚->“待确认”,相反若是先进行消息投递再更新消息状态,可能就很差控制了)。
相反,若是上游本地数据库事务执行失败,则须要向可靠消息服务发送消息删除消息,可靠消息服务此时就会将消息删除,这样就意味着事务在上游消息投递过程当中就被回滚了,而流程也就此结束了,此时上游服务能够须要经过业务逻辑的设计进行重发,这个就再也不分布式事务的讨论范畴了。
说到这里,你们可能会有疑问了!由于在上述描述中,即便上游服务本地数据库事务执行成功了,可是在发送确认消息至可靠消息服务的过程当中,以及可靠消息服务在投递消息至MQ服务的过程当中,仍是会存在失败的风险,这样的话仍是会致使支付服务更新了状态,可是用户余额系统连消息都没有收到的状况发生?
实际上,实现数据一致性是一个复杂的活。在这个方案中可靠消息服务做为基础性的服务除了执行正常的逻辑外,还得处理复杂的异常场景。在实现过程当中可靠消息服务须要启动相应的后台线程,不断轮训消息的状态,这里会轮训消息状态为“待确认”的消息,并判断该消息的状态的持续时间是否超过了规定的时间,若是超过规定时间的消息还处于“待确认”的状态,就会触发上游服务状态询问机制。
可靠消息服务就会调用上游服务提供的相关借口,询问这笔消息的处理状况,若是这笔消息在上游服务处理成功,则后台线程就会继续触发上图中的步骤5,更新消息状态为“已发送”并投递消息至MQ服务;反之若是这笔消息上游服务处理失败,可靠消息服务则会进行消息删除。经过这样以上机制就确保了“上游服务本地事务成功处理+消息成功投递”处于一个原子操做了。
二、下游服务(用户余额系统)如何确保对MQ服务Topic消息的消费100%都能处理成功?
在1的过程当中,确保了上游服务逻辑处理与MQ消息的投递具有原子性,那么当消息被成功投递到了MQ服务的指定Topic后,下游服务如何才能确保消息的消费必定能被成功处理呢?
在正常的流程中,下游服务等待消费Topic的消息并进行自身本地数据库事务的处理,若是处理成功则会主动通知可靠消息服务,可靠消息服务此时就会将消息的状态更新为“已完成”;反之,处理失败下游服务就没法再主动向可靠消息服务发送通知消息了。
此时,与消息投递过程当中的异常逻辑同样,可靠消息服务也会启动相应的后台线程,轮询一直处于“已发送”状态的消息,判断状态持续时间是否超过了规定时间,若是超时,可靠消息服务就会再次向MQ服务投递此消息,从而确保消息能被再次消费处理。(注意,也可能出现下游服务处理成功,可是通知消息发送失败的状况,因此为了确保幂等,下游服务也须要在业务逻辑上作好相应的防重处理)。
RocketMQ事务消息机制
在👆面第2小节的内容中,咱们演示了一个自编写的中间服务+MQ来实现事务消息的示例。可是在现实的工做场景中,开发和维护一套可靠消息服务是一件很耗费资源和成本的事情,实际上,RocketMQ的最新版本(4.3.0+)中已经实现了可靠消息服务的全部功能,而且在保证高并发、高可用、高性能方面作了更为优秀的架构实现。
从设计逻辑上看RocketMQ所支持的分布式事务特性与上节中阐述的可靠消息服务基本上是一致的。只是RocketMQ在实现上相比较于可靠消息服务而言作了更为复杂的设计,而且由于自然与MQ服务自己紧密结合,因此在高可用、可靠性、性能等方面直接继承了MQ服务自己的架构优点。
下面咱们就结合流程并经过示例代码的分析来和你们一块儿理解下利用RocketMQ是如何实现分布式事务操做的?
在应用场景中分布式服务经过MQ通讯的过程当中,发送消息的一方咱们称之为Producer,接收消费消息的一方咱们称之为Consumer。若是Producer自身业务逻辑本地事务执行成功与否但愿和消息的发送保持一个原子性(也就是说若是Producer本地事务执行成功,那么这笔消息就必定要被成功的发送到RocketMQ服务的指定Topic,而且Consumer必定要被消费成功;反之,若是Producer本地事务执行失败,那么这笔消息就应该被RocketMQ服务器丢弃)的话,RocketMQ是怎么作的呢?
一、Producer选择使用RockerMQ提供的事务消息方法向RocketMQ服务发送事务消息(设置消息属性TRAN_MSG=TRUE);
二、RocketMQ服务端在收到消息后会判断消息的属性是否为事务消息,若是是普通消息就直接Push给Consumer;若是是事务消息就会对该消息进行特殊处理设置事务ID,并暂时设置该消息对Consumer不可见,以后向Producer返回Pre消息发送状态(SEND_OK)。
三、以后Producer就会开始执行本地事务逻辑,并设置本地事务处理状态后向RocketMQ服务器发送该事务消息的确认/回滚消息(COMMIT_MESSAGE/ROLLBACK_MESSAGE)。
四、RocketMQ服务器根据该笔事务消息的本地事务执行状态决定是否将消息Push给Consumer仍是删除该消息。
五、以后Consumer就会消费该消息,执行Consumer的本地事务逻辑,若是执行成功则向RocketMQ返回“CONSUME_SUCCESS”;反之出现异常则须要返回“RECONSUME_LATER”,以便RocketMQ再次Push该消息,这一点在实际编程中须要控制好。
正常状况下以上就是RocketMQ事务消息的基本运行流程了,可是从异常状况考虑,理论上也是存在Producer迟迟不发送确认或回滚消息的状况。与可靠消息服务同样,RocketMQ服务端也会设置后台线程去扫描消息状态,以后会调用Producer的本地checkLocalTransaction函数获取本地事务状态后继续进行第3步操做。
相信看到这里,你们对于RocketMQ的分布式事务消息的理解应该有了一个相对清晰的概念了,那么在代码中如何编写呢?
在开发中使用RocketMQ的分布式事务消息Consumer的代码不须要有什么特别的变化与普通消息Consumer代码一致就能够。
Consumer示例代码:
public static void main(String[] args) throws InterruptedException, MQClientException { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_PAY_ACCOUNT"); // Specify name server addresses. consumer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876"); // Subscribe one more more topics to consume. consumer.subscribe("PAY_ACCOUNT", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { System.out.println(new String(messageExt.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); }
主要的改变是在Producer代码,咱们须要额外编写一个实现执行本地事务逻辑,以及检查本地事务状态的类。示例代码以下:
public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
Producer示例代码:
public class TransactionProducerTest { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("CID_PAY_ACCOUNT"); producer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; try { Map<String, String> paramMap = new HashMap<>(); paramMap.put("type", "6"); paramMap.put("bizOrderId", "15414012438257823"); paramMap.put("payOrderId", "15414012438257823"); paramMap.put("amount", "10"); paramMap.put("userId", "200001"); paramMap.put("tradeType", "charge"); paramMap.put("financeStatus", "0");//财务状态,应收 paramMap.put("channel", "a");//余额 paramMap.put("tradeTime", "20190101202022"); paramMap.put("nonce_str", "xkdkskskdksk"); //拼凑消息体 Message msg = new Message("PAY_ACCOUNT", "pre",paramMap.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } Thread.sleep(10*1000); producer.shutdown(); } }
与非事务消息直接调用RocketMQ Client的send方法不一样,事务消息发送须要设置事务监听器类,并调用sendMessageInTransaction方法,而这个方法的具体逻辑也就是上述流程中描述的那样,具体你们能够看下。
以上代码只是示例代码,在实际的项目中咱们是须要进行一些封装设计的,以便与项目上下文环境集成。例如对于Springboot项目,咱们通常会编写一个stater工程进行集成。你们感兴趣能够关注下个人github项目,后面我会以真实的项目场景作一些集成示范。
https://github.com/qiaojiang2/springboot-starter
场景说明
目前RocketMQ消息中间件的使用场景比较普遍,对于须要经过MQ进行异步解耦的分布式应用系统来讲,RocketMQ无疑是一个不错的技术选择。接下来,咱们就以对数据一致性要求很是高的分布式支付系统为例,来看看基于RocketMQ的事务消息适用于哪些特定场景,从而实现支付系统数据的高度一致性。
事实上,支付系统的数据一致性是一个复杂的问题,缘由在于支付流程的各个环节都存在异步的不肯定性,例如支付系统须要跟第三方渠道进行交互,不一样的支付渠道交互流程存在差别,而且有异步支付结果回调的状况。
除此之外,支付系统内部自己又是由多个不一样子系统组成,除核心支付系统外,还有帐务系统、商户通知系统等等,而核心支付系统自己也会被拆分为多个不一样的服务模块,如风控、路由等用以实现不一样的功能逻辑。某些场景咱们没法经过分布式事务来实现数据一致性,只能经过额外的业务补偿手段,如二次轮训、支付对帐等来实现数据最终一致性。
综上所述,支付系统是一个复杂的系统,要彻底实现数据的一致性单靠某一种手段是没法实现的,大部分状况下咱们能够经过额外的业务补偿逻辑来实现数据最终一致性,只是这样补偿逻辑须要以更多的业务开发逻辑为代价,而且在时效性上会存在延迟的问题。
举个例子,支付核心系统支付成功后会更新本身的订单状态为支付成功,整个核心交易流程是一个比较实时同步的场景,若是出现数据不一致,会有额外的补偿逻辑如二次支付订单状态轮询、T+1日对帐等用以确保支付状态数据的最终一致性。可是除了核心支付外,支付成功的结果是须要通知到支付帐务系统、以及业务端系统,而为了确保性能,通常后续的通知就不会与主流程同样设计成实时同步,而是经过MQ异步解耦发送消息给独立的“通知响应模块”,而“通知响应模块”此时就能够经过分布式事务消息来与支付帐户系统、业务端等系统实现数据一致性,从而减小须要补偿手段处理的范围,提升系统的数据一致性等级和灵敏度。