消息队列对比参照表:java
RocketMQ vs. ActiveMQ vs. Kafka:node
参考至:git
环境要求:github
一、下载RocketMQ的二进制包,我这里使用的是4.5.1版本,下载地址以下:web
http://rocketmq.apache.org/release_notes/release-notes-4.5.1/spring
使用wget命令下载:数据库
[root@study-01 ~]# cd /usr/local/src [root@study-01 /usr/local/src]# wget http://mirror.bit.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
二、解压下载好的压缩包,并移动到合适的目录下:apache
[root@study-01 /usr/local/src]# unzip rocketmq-all-4.5.1-bin-release.zip [root@study-01 /usr/local/src]# mv rocketmq-all-4.5.1-bin-release /usr/local/rocketmq-4.5.1
注:若没有安装unzip命令则使用以下命令安装:
yum install -y unzip编程
三、进入rocketmq的根目录并查看是否包含以下目录及文件:json
[root@study-01 /usr/local/src]# cd /usr/local/rocketmq-4.5.1 [root@study-01 /usr/local/rocketmq-4.5.1]# ls benchmark bin conf lib LICENSE NOTICE README.md
四、没问题后,使用以下命令启动Name Server:
[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqnamesrv & [1] 2448 [root@study-01 /usr/local/rocketmq-4.5.1]#
五、查看默认的9876端口是否被监听,以验证Name Server是否启动成功:
[root@study-01 /usr/local/rocketmq-4.5.1]# netstat -lntp |grep java tcp6 0 0 :::9876 :::* LISTEN 2454/java [root@study-01 /usr/local/rocketmq-4.5.1]#
六、启动Broker:
[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqbroker -n localhost:9876 & [2] 2485 [root@study-01 /usr/local/rocketmq-4.5.1]#
七、验证Broker是否启动成功,若是启动成功,能看到相似以下的日志::
[root@study-01 /usr/local/rocketmq-4.5.1]# cat ~/logs/rocketmqlogs/broker.log |grep "boot success" 2019-08-04 01:27:38 INFO main - The broker[study-01, 192.168.190.129:10911] boot success. serializeType=JSON and name server is localhost:9876 [root@study-01 /usr/local/rocketmq-4.5.1]#
若想中止Name Server和Broker,则依次执行如下两条命令便可:
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown broker The mqbroker(2492) is running... Send shutdown request to mqbroker(2492) OK # 输出该信息说明中止成功 [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown namesrv The mqnamesrv(2454) is running... Send shutdown request to mqnamesrv(2454) OK # 输出该信息说明中止成功 [2]+ 退出 143 nohup sh bin/mqbroker -n localhost:9876 [root@study-01 /usr/local/rocketmq-4.5.1]#
一、验证生产消息正常,执行以下命令:
[root@study-01 /usr/local/rocketmq-4.5.1]# export NAMESRV_ADDR=localhost:9876 [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
正常的状况下,会看到一堆的相似于以下的输出,这是生产消息后成功的result:
SendResult [sendStatus=SEND_OK, msgId=C0A8BE810A690D7163610FCC253B03E7, offsetMsgId=C0A8BE8100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=study-01, queueId=3], queueOffset=249]
二、验证消费消息正常,执行以下命令:
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
正常的状况下,会看到一堆的相似于以下的输出,这是消费的消息内容:
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=242, sysFlag=0, bornTimestamp=1564853837073, bornHost=/192.168.190.129:34708, storeTimestamp=1564853837074, storeHost=/192.168.190.129:10911, msgId=C0A8BE8100002A9F000000000002AA4E, commitLogOffset=174670, bodyCRC=911284903, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1564854006859, UNIQ_KEY=C0A8BE810A690D7163610FCC251103CB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 49], transactionId='null'}]]
RocketMQ官方提供了一个基于Spring Boot开发的可视化控制台,能够方便咱们查看RocketMQ的运行状况以及提高运维效率。因此本小节将介绍一下如何搭建搭建RocketMQ的控制台,因为咱们使用的RocketMQ版本是4.5.1,因此须要对控制台的源码进行一些改动以适配RocketMQ的4.5.1版本。
一、首先须要下载源码,有两种方式,一是使用git克隆代码仓库,二是直接下载rocketmq-externals的zip包,我这里使用git方式,执行以下命令:
git clone https://github.com/apache/rocketmq-externals.git
二、修改控制台代码,使用IDE打开rocketmq-console
项目,以下图所示:
2.一、修改项目中的application.properties
配置文件,我这里主要是修改了监听端口和Name Server的链接地址,至于其余配置项有须要的话可按照说明自行修改:
# console的监听端口,默认是8080 server.port=8011 # Name Server的链接地址;非必须,能够在启动了console后,在控制台导航栏 - 运维 - NameSvrAddrList一栏设置 rocketmq.config.namesrvAddr=192.168.190.129:9876
2.二、修改依赖,因为console项目默认使用的rocketmq版本是4.4.0,与咱们这里使用的是4.5.1不彻底兼容,因此须要修改一下依赖版本,找到这一行:
<rocketmq.version>4.4.0</rocketmq.version>
修改成:
<rocketmq.version>4.5.1</rocketmq.version>
2.三、修改代码,因为修改了rocketmq的版本,会致使org.apache.rocketmq.console.service.impl.MessageServiceImpl#queryMessageByTopic
方法编译报错,因此须要改动一下此处代码 ,将:
@Override public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null); ...
修改成:
@Override public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) { RPCHook rpcHook = null; DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); ...
三、打包构建并启动,打开idea的terminal,执行以下命令:
# 在rocketmq-console目录下执行 mvn clean package -DskipTests # 进入jar包存放目录 cd target # 启动rocketmq console java -jar rocketmq-console-ng-1.0.1.jar
四、使用浏览器访问控制台,我这里因为修改了端口,因此访问地址是:http://localhost:8011
,正常的状况下能看到以下界面:
不习惯英文的话能够在右上角切换语言:
因为控制台是可视化界面而且支持中文,这里就不过多介绍了,能够参考官方的控制台使用说明文档:
我这里将基本的术语与概念简单总结成了思惟导图:
官方文档:
在以上小节搭建完RocketMQ以后,咱们来使用Spring的消息编程模型,编写一个简单的示例。首先须要在项目中添加相关依赖以下:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
在配置文件中添加rocketmq相关的配置以下:
rocketmq: name-server: 192.168.190.129:9876 producer: # 小坑:必须指定group group: test-group
编写生产者的代码,这里以Controller作示例,具体代码以下:
package com.zj.node.contentcenter.controller.content; import lombok.Data; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * 生产者 * * @author 01 * @date 2019-08-03 **/ @RestController @RequiredArgsConstructor public class TestProducerController { /** * 用于发送消息到 RocketMQ 的api */ private final RocketMQTemplate rocketMQTemplate; @GetMapping("/test-rocketmq/sendMsg") public String testSendMsg() { String topic = "test-topic"; // 发送消息 rocketMQTemplate.convertAndSend(topic, MyMessage.getInstance()); return "send message success"; } } @Data class MyMessage { private Integer id; private String name; private String status; private Date createTime; static MyMessage getInstance() { MyMessage message = new Message(); message.id = 1; message.name = "×××"; message.status = "default"; message.createTime = new Date(); return message; } }
编写完成后,启动项目,访问该接口:
消息发送成功后,能够到RocketMQ的控制台中进行查看:
消息体能够在消息详情中查看,以下:
从生产者的代码来看,能够说是十分的简单了,只须要使用一个RocketMQTemplate就能够实现将对象转换成消息体并发送消息。实际上除了RocketMQ外,其余的MQ也有对应的Template,以下:
在消费者项目中,也须要添加rocketmq的依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
一样须要配置Name Server的链接地址:
rocketmq: name-server: 192.168.190.129:9876
编写消费者的代码,具体代码以下:
package com.zj.node.usercenter.rocketmq; import com.alibaba.fastjson.JSON; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.util.Date; /** * 消费者监听器 * * @author 01 * @date 2019-08-03 **/ @Slf4j @Component // topic须要和生产者的topic一致,consumerGroup属性是必须指定的,内容能够随意 @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group") public class TestConsumerListener implements RocketMQListener<MyMessage> { /** * 监听到消息的时候就会调用该方法 * * @param message 消息体 */ @Override public void onMessage(MyMessage message) { log.info("从test-topic中监听到消息"); log.info(JSON.toJSONString(message)); } } /** * 消息体结构须要一致 */ @Data class MyMessage { private Integer id; private String name; private String status; private Date createTime; }
编写完成后启动项目,因为以前咱们已经往队列里发送了消息,因此此时消费者项目一启动,就能够监听到消息并消费,控制台就会输出以下日志:
众所周知RocketMQ是支持事务消息的,这也是不少人选择使用RocketMQ做为消息中间件的一大缘由,也是RocketMQ的一大特定。RocketMQ事务消息的流程以下图所示:
因为原图是英文的,因此进行了一个大体的翻译。以下:
简单剖析一下流程:
一、生产者向MQ Server发送半消息,半消息是一种特殊的消息,这种消息会被存储到MQ Server里,可是会标记为暂时不能投递的状态,因此此时消费者不会消费该消息
二、当半消息发送成功后,生产者就会去执行本地事务
三、生产者根据本地事务的执行结果,向MQ Server发送commit或rollback消息进行二次确认。若是MQ Server接收到的是commit则会将半消息标记为可投递状态,那么消费者就能够进行消费。反之,MQ Server接收到的是rollback则会将半消息丢弃掉,消费者就没法进行消费
四、若MQ Server未接收到二次确认的消息或生产者暂停了本地事务的执行,MQ Server则会定时(默认1分钟)向生产者发送回查消息,检查生产者的本地事务状态。而后生产者会根据回查的本地事务执行结果向MQ Server再次发送commit或rollback消息
概念术语:
消息三态:
要想实现RocketMQ事务消息的话,须要按照流程图编写一些代码。在开始编码以前,先在数据库中建立一张RocketMQ的事务日志表,用做于本地事务回查的依据,表结构以下:
而后再建一张表,做为事务方法操做的数据表,表结构以下:
接着开始写代码,首先定义一个service,里面有带有事务注解的方法以及发送事务消息的方法。具体代码以下:
package com.zj.node.contentcenter.service.test; import com.zj.node.contentcenter.dao.content.NoticeMapper; import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper; import com.zj.node.contentcenter.domain.entity.content.Notice; import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog; import lombok.Data; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Date; import java.util.UUID; /** * @author 01 * @date 2019-08-08 **/ @Service @RequiredArgsConstructor public class TestProducerService { private final RocketMQTemplate rocketMQTemplate; private final NoticeMapper noticeMapper; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; public String testSendMsg(Notice notice) { // topic String topic = "test-topic"; // 生产者所在的事务组 String txProducerGroup = "tx-test-producer-group"; // 生产事务id String transactionId = UUID.randomUUID().toString(); // 发送半消息 rocketMQTemplate.sendMessageInTransaction( txProducerGroup, topic, // 消息体 MessageBuilder.withPayload("事务消息") // header是消息的头部分,能够用做传参 .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .setHeader("notice_id", notice.getId()) .build(), // 传递到executeLocalTransaction的参数 notice); return "send message success"; } @Transactional(rollbackFor = Exception.class) public void updateNotice(Integer noticeId, Notice notice) { Notice newNotice = new Notice(); newNotice.setId(noticeId); newNotice.setContent(notice.getContent()); noticeMapper.updateByPrimaryKeySelective(newNotice); } @Transactional(rollbackFor = Exception.class) public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) { updateNotice(noticeId, notice); // 写入事务日志 rocketmqTransactionLogMapper.insertSelective( RocketmqTransactionLog.builder() .transactionId(transactionId) .log("updateNotice") .build() ); } }
实现一个本地事务监听器,用于执行事务方法及提供本地事务状态的回查方法。具体代码以下:
package com.zj.node.contentcenter.rocketmq; import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper; import com.zj.node.contentcenter.domain.entity.content.Notice; import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog; import com.zj.node.contentcenter.service.test.TestProducerService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; /** * 本地事务监听器 * * @author 01 * @date 2019-08-08 **/ @Slf4j @RequiredArgsConstructor // 这里的txProducerGroup须要与sendMessageInTransaction里设置的一致 @RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group") public class TestTransactionListener implements RocketMQLocalTransactionListener { private final TestProducerService service; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; /** * 用于执行本地事务的方法 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); log.info("执行本地事务方法. 事务id: {}", transactionId); // header里拿出来的都是String类型 Integer noticeId = Integer.parseInt((String) headers.get("notice_id")); try { // 执行带有事务注解的方法 service.updateNoticeWithRocketMQLog(noticeId, (Notice) arg, transactionId); // 正常执行,向MQ Server发送commit消息 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事务方法发生异常,消息将被回滚", e); // 发生异常向MQ Server发送rollback消息 return RocketMQLocalTransactionState.ROLLBACK; } } /** * 用于回查本地事务的执行结果 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); log.warn("回查本地事务状态. 事务id: {}", transactionId); // 按事务id查询日志数据 RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne( RocketmqTransactionLog.builder() .transactionId(transactionId) .build() ); // 若是能按事务id查询出来数据表示本地事务执行成功,没有数据则表示本地事务执行失败 if (transactionLog == null) { log.warn("本地事务执行失败,事务日志不存在,消息将被回滚. 事务id: {}", transactionId); return RocketMQLocalTransactionState.ROLLBACK; } return RocketMQLocalTransactionState.COMMIT; } }
简单说明一下这些方法的执行流程:
首先调用
TestProducerService.testSendMsg
向MQ Server发送半消息,从代码也能够看到该方法里不会执行本地事务方法。当MQ Server接收半消息成功后,会告诉生产者接收成功,接着就会执行本地事务监听器里的executeLocalTransaction
方法,该方法里会调用TestProducerService
里带有事务注解的方法updateNoticeWithRocketMQLog
,并在事务方法执行完毕后返回本地事务状态给MQ Server。若executeLocalTransaction
方法返回的事务状态是UNKNOWN
或者该方法出于某种缘由没有被执行完毕,那么MQ Server就接收不到二次确认消息,默认会在一分钟后向生产者发送回查消息,生产者接收到回查消息的话就会执行本地事务监听器里的checkLocalTransaction
方法,经过事务日志记录表的数据来确认该事务状态并返回。
因为rocketmq有本身内部的日志体系,因此默认不会使用Slf4j。体现到executeLocalTransaction
方法的话,就是若是该方法的执行过程当中抛出了异常的话,异常信息不会被打印到控制台,而是输出到rocketmq_client.log日志文件中。相关源码:org.apache.rocketmq.client.log.ClientLogger
若是但愿rocketmq的日志输出到控制台的话,须要在启动类的main方法中增长以下代码:
// 让rocketmq使用slf4j日志 System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");