RocketMQ与MYSQL事务消息整合

一、基础理论知识篇“两阶段提交”若是你了解能够跳过这段,固然若是你想深刻了解你能够购买相关书籍或去搜索相关资料阅读

  两阶段提交分为 正常提交和异常提交或异常回滚java

       上面是正常提交的示意图,协调者发起预提交请求,参与者回复成功以后协调者再次发起commit请求,统一提交事物。事物结束。mysql

       若是这两阶段提交过程中有任何一个请求出现异常就会回滚,以下流程:git

       异常请求包括预提交 返回预提交的应答,commit请求 等任何一个失败都会致使整个事物回滚。github

  二阶段提交的问题 
    二阶段提交”还有一个很严重的问题就是若是commit过程中失败了 就致使了所有事物失败,代价很大,简单粗暴的处理方式

         还有一个问题是若是 commit过程当中网络出现问题 commit没有被整个事物的参与者之一或者多个收到,这个时候就会出现数据不一致现象。spring

 
  可能你们会提到 协调者是谁,参与者又是谁那?

               这里简单说下本身的理解sql

         若是在你的应用程序中你是经过 begin等相关操做语句开始的,好比 你使用了spring的@Transactional注解等,mongodb

         那协调者就是你的“应用程序”,参与者就是 mysql或其余支持事物的数据库系统数据库

         若是你就直接向mysql发送了一条sql语句mysql是自行提交的,那协调者和参与者都是mysql数据库本身apache

二、这里说下mysql对所谓的“重复数据”提供的相关sql或关键字。

       unique 惟一主键约束api

              在sql事物中和应用程序中均可以捕获这个错误码或异常,能够做为幂等判断的一个依据。

       upset 操做,发现惟一主键冲忽然后更新相关数据,mongodb有直接使用的sql方法语句

              示例:insert into tablename(column1) values(123) on duplicate key update column1 =column1 +123

        ignore 忽略操做对于多余的操做直接忽略

              insert ignore into tablename(column1)  values(123)

 

  基础篇说完不少内容若是想深刻了解能够本身找资料处理。下面是华丽分割线


三、在咱们原有的认知里有一个方案就差那么一点点就能够大面积使用的。

       咱们以前可能想过怎样既能发送mq又能写数据库,下面这个方案会分接近咱们的愿望。

       咱们听从以下步骤进行代码处理:

       一、开启数据库事物执行相关sql

       二、发送MQ消息

       三、提交数据库事物

       (注意:以上每一步都是在上一步执行成功以后在执行下一步的)

       根据步骤我画出了下面的流程图

 其实这个流程是有一个漏洞的,若是我把上面的流程图改造为下面的二阶段提交的示意图就会很明显的看出来

        不知道你们有么有发现问题,是否是 各类提交和回滚操做都是针对的数据库,而不是MQ。commit数据库事物出现异常就会形成数据不一致现象。

        其实也不用在想有没有其余的流程方案能解决分布式双写问题,只要存在多写问题就存在数据不一致问题的现象,

        因此就出现了3pc Paxos 等协议来解决分布式事物/一致性的问题。

 

        下面咱们开始介绍怎么使用mysql和RocketMQ来实现事物问题

         华丽分割线


四、RocketMQ事物消息的过程

       一、发送MQ的事物消息

       二、事物消息发送成功后会同步触发对应执行本地接口来执行针对mysql数据库的操做

       三、若是有未commit的消息,RocketMQ 的 broker会定时间隔时间来回查数据库事物是否已经提交完成

五、结合RocketMQ的事物消息与Mysql数据库事物的实现思想

  若是上面的二阶段提交你已经理解了,你会发现我这里设计的流程(上面图的流程)有点不太同样的地方

        什么地方那?

        MQ事物消息回滚的时候是由于mysql数据库事物没有提交成功而致使的,也就是说若是mysql数据库事务成功了MQ的事务消息是必定要成功的

        不然就会出现事物不一致的现象。

        假如发送MQ的prepare消息成功了,执行mysql事物的操做也成功了,可是恰恰返回给MQ的commit消息丢失了,那这个时候数据库消息并不会回滚。

  因此就有了回查本地事物消息是否成功的操做,来对MQ的消息作个补偿动做实现数据一致性

 

        理解了二阶段提交以及RocketMQ的事物实现以后你就能够本身设计事物相关操做的执行顺序了

        (这里的流程设计以及包括个人代码实现是以个人理解作出的最佳实践)

 六、RocketMQ与Mysql事物结合注意事项

       一、若是应用程序承担协调者的工做就尽可能晚开启事物和尽可能早的提交数据库事物,事物中的sql对数据竞争多的sql尽可能靠后

            由于执行数据库事物会有各类锁操做,减小锁的生命周期,数据库是稀缺资源,你们能省则省

       二、数据库事物最好设置超时时间,超时以后自动解除,最好不超过1分钟

       三、MQ默认1分钟以后回查一次已发送message但未commit的消息,最多回查15次,以后会执行回滚操做

       四、应用程序必定要作好幂等处理(能够参考上面mysql相关语句实现幂等接口)

       五、网络不要太差,不然会形成大量的重试,重试就会影响消息的及时性

       六、适用场景

                    单次请求数量小

                    每次请求会有数据产生,而不是查询产生的数据(好比 insert操做叫生产数据,select操做不生产数据)

                    下游能够接受必定的延迟(这里有两个因素,有应用程序自己和Broker,这里指broker)

                    下游服务或系统以接收到的消息为依据作相应的操做

                     MQ消息做为主要信息传递的工具

 

         下面说下具体代码实现

         华丽分割线


 

七、实战代码解析

       首先附上源码地址 https://github.com/zygfengyuwuzu/springboot-rocketmq-example

       下面将针对关键代码进行讲解

       首先介绍一下代码目录

 

 

         了解了上面的代码目录下面说下代码的执行流程


    首先看事物消息生产者的实例对象建立
package rocketmq_example.mqandmysqltraction.producer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * 生产者和消费者测试的时候记得注掉一中的一个以避免观察不出效果
 * 
 */
@Component
public class TransactionProducer {
 static Logger logger = LoggerFactory.getLogger(TransactionProducer.class);

 public DefaultMQProducer producer = null;
 
 @Autowired
 TransactionListener transactionListenerImp;

 @PostConstruct
 private void init() throws MQClientException {
  logger.info("MQ事物生产者初始化开始--------------------------------------------------");
  TransactionMQProducer transactionProducer = new TransactionMQProducer("mytestgroup");
  // Producer 组名, 多个 Producer 若是属于一 个应用,发送一样的消息,则应该将它们 归为同一组
  //transactionProducer.setProducerGroup("mytestgroup");
  // Name Server 地址列表
  transactionProducer.setNamesrvAddr("10.10.6.71:9876;10.10.6.72:9876");
  // 超时时间  这里必定要大于数据库事物执行的超时时间
  transactionProducer.setSendMsgTimeout(90000);
  //这个线程池做用就是  mqbroker端回调信息的本地处理线程池
  ExecutorService executorService = new ThreadPoolExecutor(1, 5, 100, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
     @Override
     public Thread newThread(Runnable r) {
      Thread thread = new Thread(r);
      thread.setName("client-transaction-msg-check-thread");
      return thread;
     }
    });
  transactionProducer.setExecutorService(executorService);
  transactionProducer.setTransactionListener(transactionListenerImp);
  producer = transactionProducer;
  producer.start();
  logger.info("MQ事物生产者初始化结束--------------------------------------------------");
 }
 public SendResult send(Message me) throws Exception {
  return producer.send(me);
 }
 /**
  * 发送普通消息
  * @param Topic
  * @param Tags
  * @param body
  * @return
  * @throws Exception
  */
 public SendResult send(String Topic, String Tags, String body) throws Exception {
  Message me = new Message();
  // 标示
  me.setTopic(Topic);
  // 标签
  me.setTags(Tags);
  // 内容
  me.setBody(body.getBytes(RemotingHelper.DEFAULT_CHARSET));
  return producer.send(me);
 }
 /**
  * 发送普通消息
  * @param Topic
  * @param Tags
  * @param key
  * @param body
  * @return
  * @throws Exception
  */
 public SendResult send(String Topic, String Tags, String key, String body) throws Exception {
  try {
   Message me = new Message(Topic, Tags, key, 0, body.getBytes(RemotingHelper.DEFAULT_CHARSET), true);
   return producer.send(me);
  } catch (Exception e) {
   logger.error("发送MQ信息异常Topic{},Tags{},key{},body{}", Topic, Tags, key, body);
   throw e;
  }
 }
 @PreDestroy
 public void Destroy() {
  producer.shutdown();
 }
}

  上面的代码咱们接收到请求传输过来的数据以后,首先作了MQ消息对象的建立,建立成功以后直接发送MQ事物消息

  事物消息发送成功以后会调用上面设置的接口实现类的TransactionListenerImpl.executeLocalTransaction()这个方法。

  接口实现的方法代码以下:

package rocketmq_example.mqandmysqltraction.producer;

import java.util.List;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import rocketmq_example.mqandmysqltraction.MyTableModel;
import rocketmq_example.mqandmysqltraction.MytableService;

/**
 * 把数据库事物嵌套在mq事物当中不能显示抛出异常
 * 
 * 
 * 
 * 
 * @author zyg
 *
 */
@Component
public class TransactionListenerImpl implements TransactionListener {

 static Logger logger = LoggerFactory.getLogger(TransactionListenerImpl.class);

 @Autowired
 MytableService mytableService;

 /**
  * 必定要设置执行sql时间,尽可能不要超时
  * 
  */
 @Override
 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  logger.info("开始执行本地数据库事物  transactionid:{}", msg.getTransactionId());
  LocalTransactionState lts = LocalTransactionState.UNKNOW;
  @SuppressWarnings("unchecked")
  List<MyTableModel> mytablelist = (List<MyTableModel>) arg;
  try {
   long start=System.currentTimeMillis();
   //数据库事物执行时间不要超过mq回查时间 默认15分钟
   mytableService.execMytableinsert2(mytablelist, msg.getTransactionId());
   logger.info("执行数据库事物耗时:{}",System.currentTimeMillis()-start);
   lts = LocalTransactionState.COMMIT_MESSAGE;
  } catch (Exception e) {
   logger.error("数据库事务异常", e);
   lts = LocalTransactionState.ROLLBACK_MESSAGE;
  }
  logger.info("结束执行本地数据库事物  transactionid:{} 返回:{}", msg.getTransactionId(),lts);
  return lts;
 }

 /**
  * 去数据库查询看看是否存在已经成功发送预提交数据而没有commit成功的mq信息
  * 每分钟1次默认15次
  * 
  * 这里能够作个计数 让MQ重试5次/5分钟就回滚减轻MQ回查的压力
  * 
  */
 @Override
 public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  if (mytableService.existMyTableModelByMsgid(msg.getTransactionId())) {
   logger.info("查询到已提交事物 transactionid:{}",msg.getTransactionId());
   return LocalTransactionState.COMMIT_MESSAGE;
  } else {
   logger.info("未查到已提交事物 transactionid:{}",msg.getTransactionId());
   return LocalTransactionState.UNKNOW;
  }

 }

}

     上面代码有两个方法,这里说下两个方法的做用和执行时间

             executeLocalTransaction这个方法是发送完 事物消息 以后同步被调用到的方法,用来执行本地事物操做

             executeLocalTransaction方法有两个参数,第一个是发送成功以后的message消息,在这个方法中包含事物ID其实就是msgid

             第二个参数是object类型的是从dataapi传过来,

             个人代码中没作任何处理直接传递过来了而后直接转化传递给了service层进行事物处理

             这个executeLocalTransaction方法里面为何要直接返回commit或rollback,

             目的是尽可能快的告诉MQ个人数据库事务执行成功了,

             尽快将half消息转为正常消息,已备消费者消费到作业务处理。

             这里彻底能够直接返回unknow,等待broker回查来实现commit操做的。可是这样作对回查消息broker形成必定的压力。

      上面代码的第二个方法是提供给broker回调执行的,进行检查本地事务是否成功执行的操做,发起方是broker

             这里面咱们接收到broker的回查请求以后直接去数据库查询是否存在broker提供的事务ID的数据

             若是存在返回commit标识,若是不存在返回unknow标识以等待下一次再来回查

      到此咱们的一个事务操做就算完成了


    另外你们能够直接查看service层的实现代码,就不一一解释了
package rocketmq_example.mqandmysqltraction;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class MytableService {
 static Logger logger = LoggerFactory.getLogger(MytableService.class);

 @Autowired
 IMytableMapper mytable;

 @Autowired
 ObjectMapper objMapper;

 /**
  * 这里能够显示提交事物 返回boolean 一条一条插入只是为了展示事物的特性 获取全部异常 处理你的业务逻辑等等
  * 
  * @param mytablemodels
  * @return
  */
 @Transactional(rollbackFor = Exception.class, timeout = 60000)
 public List<Integer> execMytableinsert2(List<MyTableModel> mytablemodels, String msgid) {

  // logger.info("开始执行数据库事物");
  List<Integer> result = new ArrayList<Integer>();
  for (MyTableModel myTableModel : mytablemodels) {
   // 插入数据库
   myTableModel.setMsgid(msgid);
   mytable.insertmytable(myTableModel);
   result.add(myTableModel.getId());
  }
  // logger.info("结束执行数据库事物");
  return result;
 }

 public boolean existMyTableModelById(Integer id) {
  MyTableModel myTableModel = mytable.selectMyTableModelById(id);
  if (myTableModel != null && null != myTableModel.getId()) {
   return true;
  }
  return false;
 }

 /**
  * 查询是否存在已经发送过的msgid消息
  * 
  * @param msgid
  * @return
  */
 public boolean existMyTableModelByMsgid(String msgid) {
  int count = mytable.selectMyTableModelByMsgid(msgid);
  if (count > 0) {
   return true;
  }
  return false;
 }

 public void insetmsg(MyTableModel mytablemodel) {
  try {
   mytable.insertmsgrecord(mytablemodel);

  } catch (org.springframework.dao.DuplicateKeyException e) {
   logger.error("主键冲突异常被捕获",e);
  }
 }
}

很是感谢你能看到这里!!!看到这里相信你已经对本篇博客的内容有所了解了!若是有什么问题或者想不通的地方欢迎评论区进行讨论。

若是有不正确的地方恳请指正

 

相关文章
相关标签/搜索