RocketMQ事务消息实战


RocketMQ事务消息阅读目录指引:java

RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想
RocketMQ源码分析之RocketMQ事务消息实现原理上篇
RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ事务消息实战web


   咱们以一个订单流转流程来举例,例如订单子系统建立订单,须要将订单数据下发到其余子系统(与第三方系统对接)这个场景,咱们一般会将两个系统进行解耦,不直接使用服务调用的方式进行交互。其业务实现步骤一般为:
   一、A系统建立订单并入库。
   二、发送消息到MQ。
   三、MQ消费者消费消息,发送远程RPC服务调用,完成订单数据的同步。
   一、方案一
这里写图片描述
   方案弊端:
   一、若是消息发送成功,在提交事务的时候JVM忽然挂掉,事务没有成功提交,致使两个系统之间数据不一致。
   二、因为消息是在事务提交以前提交,发送的消息内容是订单实体的内容,会形成在消费端进行消费时若是须要去验证订单是否存在时可能出现订单不存在。
   三、消息发送能够考虑异步发送。
   方案二:
   因为存在上述问题,在MQ不支持事务消息的前提条件下,能够采用下面的方式进行优化。
这里写图片描述
   而后在控制器层,使用异步发送,将消息发送,并在消息发送成功后,更新待发送状态为已发送。
   而后经过定时任务,扫描待发送,结合建立时间的记录(小于当前时间5分钟的消息待发送记录),进行消息发送。
   方案弊端:
   一、消息有可能重复发送,但在消费端能够经过惟一业务编号来进行去重设计。
   二、实现过于复杂,为了不 极端状况下的消息丢失,须要使用定时任务。
   方案三:基于RocketMQ4.3版本事务消息
这里写图片描述
   额外须要实现事务会查监听器:TransactionListener,其实例代码:数据库

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;

@SuppressWarnings("unused")
public class OrderTransactionListenerImpl implements TransactionListener {
	
	private ConcurrentHashMap<String, Integer> countHashMap = new ConcurrentHashMap<>();
	
	private final static int MAX_COUNT = 5;


	@Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 
    	String bizUniNo = msg.getUserProperty("bizUniNo"); // 从消息中获取业务惟一ID。
    	// 将bizUniNo入库,表名:t_message_transaction,表结构  bizUniNo(主键),业务类型。
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = 0;
        // 从数据库查查询t_message_transaction表,若是该表中存在记录,则提交,
        String bizUniNo = msg.getUserProperty("bizUniNo"); // 从消息中获取业务惟一ID。
        // 而后t_message_transaction 表,是否存在bizUniNo,若是存在,则返回COMMIT_MESSAGE,
        // 不存在,则记录查询次数,未超过次数,返回UNKNOW,超过次数,返回ROLLBACK_MESSAGE
        
        if(query(bizUniNo) > 0 ) {
        	return LocalTransactionState.COMMIT_MESSAGE;
        }
        
        return rollBackOrUnown(bizUniNo);
    }
    
    public int query(String bizUniNo) {
    	return 1; //select count(1) from t_message_transaction a where a.biz_uni_no=#{bizUniNo}
    }
    
    public LocalTransactionState rollBackOrUnown(String bizUniNo) {
    	Integer num = countHashMap.get(bizUniNo);
    	
    	if(num != null &&  ++num > MAX_COUNT) {
    		countHashMap.remove(bizUniNo);
    		return LocalTransactionState.ROLLBACK_MESSAGE;
    	}
    	
    	if(num == null) {
    		num = new Integer(1);
    	}
    	
    	countHashMap.put(bizUniNo, num);
    	return LocalTransactionState.UNKNOW;
    	
    }
  
}

   TransactionListener 实现要点:
   executeLocalTransaction:
   该方法,主要是设置本地事务状态,该方法与业务方代码在一个事务中,例如OrderServer#createMap中,只要本地事务提交成功,该方法也会提交成功。
   故在这里,主要是t_message_transaction添加一条记录,在事务会查时,若是存在记录,就认为是该消息须要提交。
   checkLocalTransaction:
   该方法主要是告知RocketMQ消息是否须要提交仍是回滚,若是本地事务表(t_message_transaction)存在记录,则认为提交,若是不存在,能够设置会查次数,若是指定次数内仍是未查到消息,则回滚,不然返回未知,rocketmq会按必定的频率回查事务,固然回查次数也有限制,默认为5次,可配置。apache


更多文章请关注微信公众号:
在这里插入图片描述
广告:博文做者的新书《RocketMQ技术内幕》已上市
在这里插入图片描述
   《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎全部的配置参数。本书获得了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度承认并做序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
使人意外的是,新书一上市,7折优惠。服务器