1、RocketMQ简介
RocketMQ做为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。html
2、RocketMQ架构
如图所示为RocketMQ基本的部署结构,主要分为NameServer集群、Broker集群、Producer集群和Consumer集群四个部分。java
Broker在启动的时候会去向NameServer注册而且定时发送心跳,Producer在启动的时候会到NameServer上去拉取Topic所属的Broker具体地址,而后向具体的Broker发送消息git
一、NameServer
NameServer的做用是Broker的注册中心。github
每一个NameServer节点互相之间是独立的,没有任何信息交互,也就不存在任何的选主或者主从切换之类的问题,所以NameServer是很轻量级的。单个NameServer节点中存储了活跃的Broker列表(包括master和slave),这里活跃的定义是与NameServer保持有心跳。web
二、Topic、Tag、Queue、GroupName
Topic 与 Tag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 能够理解为是二级分类spring
1) Topic(话题)
Topic是生产者在发送消息和消费者在拉取消息的类别。Topic与生产者和消费者之间的关系很是松散。一个生产者能够发送不一样类型Topic的消息。消费者组能够订阅一个或多个主题,只要该组的实例保持其订阅一致便可。数据库
咱们能够理解为Topic是第一级消息类型,好比一个电商系统的消息能够分为:交易消息、物流消息等,一条消息必须有一个Topic。apache
2) Tag(标签)
意思就是子主题,为用户提供了额外的灵活性。有了标签,方便RocketMQ提供的查询功能。数组
能够理解为第二级消息类型,交易建立消息,交易完成消息..... 一条消息能够没有Tag服务器
3) Queue(队列)
一个topic下,能够设置多个queue(消息队列),默认4个队列。当咱们发送消息时,须要要指定该消息的topic。
RocketMQ会轮询该topic下的全部队列,将消息发送出去。
在 RocketMQ 中,全部消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每一个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100年内不会溢出,因此认为是长度无限。
也能够认为 Message Queue 是一个长度无限的数组,Offset 就是下标。
4) groupName(组名称)
RocketMQ中也有组的概念。表明具备相同角色的生产者组合或消费者组合,称为生产者组或消费者组。
做用是在集群HA的状况下,一个生产者down以后,本地事务回滚后,能够继续联系该组下的另一个生产者实例,不至于致使业务走不下去。在消费者组中,能够实现消息消费的负载均衡和消息容错目标。
有了GroupName,在集群下,动态扩展容量很方便。只须要在新加的机器中,配置相同的GroupName。启动后,就当即能加入到所在的群组中,参与消息生产或消费。
三、Broker-存放消息
Broker是具体提供业务的服务器,单个Broker节点与全部的NameServer节点保持长链接及心跳,定时(每隔30s)注册Topic信息到全部Name Server。Name Server定时(每隔10s)扫描全部存活broker的链接,若是Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的链接。底层的通讯和链接都是基于Netty实现的。
负载均衡:Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,会自动轮询当前全部可发送的broker ,尽可能平均分布到全部队列中,最终效果就是全部消息都平均落在每一个Broker上。
高可用:Broker中分master和slave两种角色,每一个master能够对应多个slave,但一个slave只能对应一个master,master和slave经过指定相同的Brokername组成,其中不一样的BrokerId==0 是master,非0是slave。
高可靠并发读写服务:master和slave之间的同步方式分为同步双写和异步复制,异步复制方式master和slave之间虽然会存在少许的延迟,但性能较同步双写方式要高出10%左右。
Topic、Broker、queue三者间的关系
四、Producer-生产消息
1) 与nameserver的关系
单个Producer和一台NameServer节点(随机选择)保持长链接,定时查询topic配置信息,若是该NameServer挂掉,生产者会自动链接下一个NameServer,直到有可用链接为止,并能自动重连。与NameServer之间没有心跳。
2) 与broker的关系
单个Producer和与其关联的全部broker保持长链接,并维持心跳。默认状况下消息发送采用轮询方式,会均匀发到对应Topic的全部queue中。
五、Consumer-消费消息
1) 与nameserver的关系
单个Consumer和一台NameServer保持长链接,定时查询topic配置信息,若是该NameServer挂掉,消费者会自动链接下一个NameServer,直到有可用链接为止,并能自动重连。与NameServer之间没有心跳。
2) 与broker的关系
单个Consumer和与其关联的全部broker保持长链接,并维持心跳,失去心跳后,则关闭链接,并向该消费者分组的全部消费者发出通知,分组内消费者从新分配队列继续消费。
5.1 消费者类型
- 1) pull consume Consumer 的一种,应用一般经过 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象马上回调 Listener 接口方法,相似于activemq的方式
- 2) push consume Consumer 的一种,应用一般主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制
5.2 消费模式
- 1) 集群模式
在默认状况下,就是集群消费,此时消息发出去后将只有一个消费者能获取消息。
- 2) 广播模式
广播消费,一条消息被多个Consumer消费。消息会发给Consume Group中的全部消费者进行消费。
3、RocketMQ的特性
一、消息顺序
消息的顺序指的是消息消费时,能按照发送的顺序来消费。
RocketMQ是经过将“相同ID的消息发送到同一个队列,而一个队列的消息只由一个消费者处理“来实现顺序消息
二、消息重复
1) 消息重复的缘由
消息领域有一个对消息投递的QoS(服务质量)定义,分为:最多一次(At most once)、至少一次(At least once)、仅一次( Exactly once)。
MQ产品都声称本身作到了At least once。既然是至少一次,就有可能发生消息重复。
有不少缘由致使,好比:网络缘由闪断,ACK返回失败等等故障,确认信息没有传送到消息队列,致使消息队列不知道本身已经消费过该消息了,再次将该消息分发给其余的消费者
不一样的消息队列发送的确认信息形式不一样:RocketMQ返回一个CONSUME_SUCCESS成功标志,RabbitMQ是发送一个ACK确认消息
2) 消息去重
- 1) 去重原则:使用业务端逻辑保持幂等性
幂等性:就是用户对于同一操做发起的一次请求或者屡次请求的结果是一致的,不会由于屡次点击而产生了反作用,数据库的结果都是惟一的,不可变的。
- 2) 只要保持幂等性,无论来多少条重复消息,最后处理的结果都同样,须要业务端来实现。
去重策略:保证每条消息都有惟一编号(好比惟一流水号),且保证消息处理成功与去重表的日志同时出现。
4、RocketMQ的应用场景
一、削峰填谷
好比如秒杀等大型活动时会带来较高的流量脉冲,若是没作相应的保护,将致使系统超负荷甚至崩溃。若是因限制太过致使请求大量失败而影响用户体验,能够利用MQ 超高性能的消息处理能力来解决。
二、异步解耦
经过上、下游业务系统的松耦合设计,好比:交易系统的下游子系统(如积分等)出现不可用甚至宕机,都不会影响到核心交易系统的正常运转。
三、顺序消息
FIFO原理相似,MQ提供的顺序消息即保证消息的先进先出,能够应用于交易系统中的订单建立、支付、退款等流程。
四、分布式事务消息
好比阿里的交易系统、支付红包等场景须要确保数据的最终一致性,须要引入 MQ 的分布式事务,既实现了系统之间的解耦,又能够保证最终的数据一致性。
5、RocketMQ集群部署方式
一、单Mater模式
优势:配置简单,方便部署
缺点:风险较大,一旦Broker重启或者宕机,会致使整个服务不可用
二、多Master模式
一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
优势:配置简单,单个Master宕机重启对应用没有影响。消息不会丢失
缺点:单台机器宕机期间,这台机器上没有被消费的消息在恢复以前不可订阅,消息实时性会受到影响。
三、多Master多Slave模式(异步)
每一个Master配置一个Slave,采用异步复制方式,主备有短暂消息延迟
优势:由于Master 宕机后,消费者仍然能够从 Slave消费,此过程对应用透明。不须要人工干预。性能同多 Master 模式几乎同样。
缺点:Master宕机后,会丢失少许信息
四、多Master多Slave模式(同步)
每一个Master配置一个Slave,采用同步双写方式,只有主和备都写成功,才返回成功
优势:数据与服务都无单点, Master宕机状况下,消息无延迟,服务可用性与数据可用性都很是高
缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能
6、RocketMQ的消息类型
消息发送步骤:
消息消费步骤:
建立一个maven工程,导入依赖
<dependencies> <!--rocket--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> <dependency> <!--顺序消息中,模拟了一个消息集合,加入了lombok--> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.22</version> </dependency> </dependencies>
一、普通消息
<details> <summary>点击查看生产者代码</summary> ```java /** * 普通消息生产者 */ public class Producer {
public static void main(String[] args) throws Exception {
// 建立一个消息发送入口对象,主要用于消息发送,指定生产者组 DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); // 设置NameServe地址,若是是集群环境,用分号隔开 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动并建立消息发送组件 producer.start(); // topic的名字 String topic = "rocketDemo1"; // 标签名 String taget = "tag"; // 要发送的数据 String body = "hello,RocketMq"; Message message = new Message(topic,taget,body.getBytes()); // 发送消息 SendResult result = producer.send(message); System.out.println(result); // 关闭消息发送对象 producer.shutdown(); } }
</details> <details> <summary>点击查看消费者代码</summary> ```java /** * 普通消息消费者 */ public class Consumer { public static void main(String[] args) throws Exception { // 建立一个消费管理对象,并建立消费者组名字 DefaultMQPushConsumer consumerGroup = new DefaultMQPushConsumer("ConsumerGroup"); // 设置NameServer地址,若是是集群环境,用逗号分隔 consumerGroup.setNamesrvAddr("127.0.0.1:9876"); // 设置要读取的消息主题和标签 consumerGroup.subscribe("rocketDemo1", "*"); // 设置回调函数,处理消息 //注意:MessageListenerConcurrently -- 并行消费监听 consumerGroup.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { //读取消息记录 for (MessageExt messageExt : msgs) { //获取消息主题 String topic = messageExt.getTopic(); //获取消息标签 String tags = messageExt.getTags(); //获取消息体内容 String body = new String(messageExt.getBody(), "UTF-8"); System.out.println("topic:" + topic + ",tags:" + tags + ",body:" + body); } } catch (Exception e) { e.printStackTrace(); } //返回消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 运行消息消费对象 consumerGroup.start(); } }
</details> #### 二、顺序消息   消息有序指的是能够按照消息的发送顺序来消费。RocketMQ是经过将“相同ID的消息发送到同一个队列,而一个队列的消息只由一个消费者处理“来实现顺序消息 。 **如何保证顺序** - 1) 消息被发送时保持顺序:发送时保持顺序意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。 - 2) 消息被存储时保持和发送的顺序一致:存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B,存储时在空间上A必定在B以前。 - 3) 消息被消费时保持和存储的顺序一致:消费保持和存储一致则要求消息A、B到达Consumer以后必须按照先A后B的顺序被处理。
<details> <summary>点击查看模拟消息代码</summary> ```java /** * 模拟消息 */ @Data @AllArgsConstructor @NoArgsConstructor public class Order {
private Long orderId; private String desc; public static List<Order> buildOrders(){ List<Order> list = new ArrayList<Order>(); Order order1001a = new Order(1001L,"建立"); Order order1004a = new Order(1004L,"建立"); Order order1006a = new Order(1006L,"建立"); Order order1009a = new Order(1009L,"建立"); list.add(order1001a); list.add(order1004a); list.add(order1006a); list.add(order1009a); Order order1001b = new Order(1001L,"付款"); Order order1004b = new Order(1004L,"付款"); Order order1006b = new Order(1006L,"付款"); Order order1009b = new Order(1009L,"付款"); list.add(order1001b); list.add(order1004b); list.add(order1006b); list.add(order1009b); Order order1001c = new Order(1001L,"完成"); Order order1006c = new Order(1006L,"完成"); list.add(order1001c); list.add(order1006c); return list; }
}
</details> <details> <summary>点击查看生产者代码</summary> ```java /** * Producer端确保消息顺序惟一要作的事情就是将消息路由到特定的队列, * 在RocketMQ中,经过MessageQueueSelector来实现分区的选择 */ public class ProducerOrder { //nameserver地址 private static String namesrvaddress="127.0.0.1:9876;"; public static void main(String[] args) throws Exception { //建立DefaultMQProducer DefaultMQProducer producer = new DefaultMQProducer("order_producer_name"); //设置namesrv地址 producer.setNamesrvAddr(namesrvaddress); //启动Producer producer.start(); List<Order> orderList = Order.buildOrders(); for (Order order : orderList) { String body = order.toString(); //建立消息 Message message = new Message("orderTopic","order",body.getBytes()); //发送消息 SendResult sendResult = producer.send( message, new MessageQueueSelector() { /** * * @param mqs topic中的队列集合 * @param msg 消息对象 * @param arg 业务参数 * @return */ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //参数是订单id号 Long orderId = (Long) arg; //肯定选择的队列的索引 long index = orderId % mqs.size(); return mqs.get((int) index); } }, order.getOrderId()); System.out.println("发送结果="+sendResult); } //关闭Producer producer.shutdown(); } }
</details> <details> <summary>点击查看消费者代码</summary> ```java /** * 消费者端实现MessageListenerOrderly介口监听消息来实现顺序消息 */ public class ConsumerOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); consumer.setNamesrvAddr("127.0.0.1:9876"); //从第一个开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("orderTopic","*"); //MessageListenerOrderly 顺序消费 consumer.registerMessageListener(new MessageListenerOrderly() { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { System.out.println("当前线程:"+Thread.currentThread().getName()+",接收消息:"+new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer started.%n"); }
}
</details> #### 三、延迟消息   RocketMQ 支持定时(延迟)消息,可是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。   延迟消息能够在生产者中直接设置,也能够在rocketmq的配置文件broker.conf中配置:messageDelayLevel=1s|5s|1m|2m|1h|2h...... <details> <summary>点击查看生产者代码</summary> ```java /** * 延迟消息 生产者 */ public class ProducerDelay { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("delay_producer"); //设置nameserver producer.setNamesrvAddr("127.0.0.1:9876"); //生产者开启 producer.start(); //建立消息对象 Message message = new Message("delayTopic","delay","hello world".getBytes()); //设置延迟时间级别 message.setDelayTimeLevel(2); //发送消息 SendResult sendResult = producer.send(message); System.out.println(sendResult); //生产者关闭 producer.shutdown(); } }
</details> <details> <summary>点击查看消费者代码</summary> ```java /** * 延迟消息 消费者 */ public class ConsumerDelay {
public static void main(String[] args) throws Exception { //建立消费者对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer"); //设置nameserver consumer.setNamesrvAddr("127.0.0.1:9876"); //设置主题和tag consumer.subscribe("delayTopic","*"); //注册消息监听 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消息ID:"+msg.getMsgId()+"发送时间:"+new Date(msg.getStoreTimestamp())+",延迟时间:"+(System.currentTimeMillis()-msg.getStoreTimestamp())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //开启消费者 consumer.start(); System.out.println("消费者启动"); }
}
</details> #### 四、批量发送消息 <details> <summary>点击查看生产者代码</summary> ```java /** * 批量 生产者 */ public class ProducerBatch { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("delay_producer"); //设置nameserver producer.setNamesrvAddr("127.0.0.1:9876"); //生产者开启 producer.start(); //建立消息对象 集合 String topic = "batchTopic"; String tag = "batch"; List<Message> messageList = new ArrayList<Message>(); Message message1 = new Message(topic,tag,"hello world1".getBytes()); Message message2 = new Message(topic,tag,"hello world2".getBytes()); Message message3 = new Message(topic,tag,"hello world3".getBytes()); messageList.add(message1); messageList.add(message2); messageList.add(message3); //发送消息 SendResult sendResult = producer.send(messageList); System.out.println(sendResult); //生产者关闭 producer.shutdown(); } }
</details> <details> <summary>点击查看消费者代码</summary> ```java /** * 批量消费者 */ public class ConsumerBatch { public static void main(String[] args) throws Exception { //建立消费者对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer"); //设置nameserver consumer.setNamesrvAddr("127.0.0.1:9876"); //设置主题和tag consumer.subscribe("batchTopic","*"); //注册消息监听 consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消息ID:"+msg.getMsgId()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //开启消费者 consumer.start(); System.out.println("消费者启动"); }
}
</details> #### 五、广播消息   rocketmq默认采用的是集群消费,咱们想要使用广播消费,只需在消费者中加入`consumer.setMessageModel(MessageModel.BROADCASTING)`这段配置,`MessageModel.CLUSTERING`为集群模式,是默认的; <details> <summary>点击查看生产者代码</summary> ```java /** * 生产者 */ public class ProducerBroadcast { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("delay_producer"); //设置nameserver producer.setNamesrvAddr("127.0.0.1:9876"); //生产者开启 producer.start(); //建立消息对象 集合 String topic = "broadcastTopic"; String tag = "broad"; List<Message> messageList = new ArrayList<Message>(); Message message1 = new Message(topic,tag,"hello world1".getBytes()); Message message2 = new Message(topic,tag,"hello world2".getBytes()); Message message3 = new Message(topic,tag,"hello world3".getBytes()); messageList.add(message1); messageList.add(message2); messageList.add(message3); //发送消息 SendResult sendResult = producer.send(messageList); System.out.println(sendResult); //生产者关闭 producer.shutdown(); } }
</details> <details> <summary>点击查看消费者1代码</summary> ```java /** * 消费者1 */ public class ConsumerBroadcast1 {
public static void main(String[] args) throws Exception { //建立消费者对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer"); //设置nameserver consumer.setNamesrvAddr("127.0.0.1:9876"); //设置主题和tag consumer.subscribe("broadcastTopic","*"); //设置消息模式 为 广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //注册消息监听 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消费者1:消息ID:"+msg.getMsgId()+",内容"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //开启消费者 consumer.start(); System.out.println("消费者1启动"); }
}
</details> <details> <summary>点击查看消费者2代码</summary> ```java /** * 消费者2 */ public class ConsumerBroadcast2 { public static void main(String[] args) throws Exception { //建立消费者对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer"); //设置nameserver consumer.setNamesrvAddr("127.0.0.1:9876"); //设置主题和tag consumer.subscribe("broadcastTopic","*"); //设置消息模式 为 广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //注册消息监听 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消费者2:消息ID:"+msg.getMsgId()+",内容"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //开启消费者 consumer.start(); System.out.println("消费者2启动"); } }
</details> ### 7、SpringBoot整合RocketMQ   建立一个maven工程,导入依赖 ```java <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.2.1.RELEASE</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>2.2.1.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.22</version> </dependency> </dependencies> ``` <details> <summary>点击查看模拟消息代码</summary> ```java /** * 模拟消息 */ @Data @AllArgsConstructor @NoArgsConstructor public class Order {
private Long orderId; private String desc; public static List<Order> buildOrders(){ List<Order> list = new ArrayList<Order>(); Order order1001a = new Order(1001L,"1001建立"); Order order1004a = new Order(1004L,"1004建立"); Order order1006a = new Order(1006L,"1006建立"); Order order1009a = new Order(1009L,"1009建立"); list.add(order1001a); list.add(order1004a); list.add(order1006a); list.add(order1009a); Order order1001b = new Order(1001L,"1001付款"); Order order1004b = new Order(1004L,"1004付款"); Order order1006b = new Order(1006L,"1006付款"); Order order1009b = new Order(1009L,"1009付款"); list.add(order1001b); list.add(order1004b); list.add(order1006b); list.add(order1009b); Order order1001c = new Order(1001L,"1001完成"); Order order1006c = new Order(1006L,"1006完成"); list.add(order1001c); list.add(order1006c); return list; }
}
</details> <details> <summary>点击查看消息生产者代码</summary> ```java @Slf4j @RunWith(SpringRunner.class) @SpringBootTest public class RocketMQTest { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 普通消息生产者 */ @Test public void testSend(){ rocketMQTemplate.convertAndSend("testTopic","这是测试消息!"); } /** * 延迟消息生产者 */ @Test public void testDelaySend(){ SendResult sendResult = rocketMQTemplate.syncSend("testTopic", new GenericMessage("这是延迟测试消息!"+new Date()), 10000, 4); log.info("sendResult=="+sendResult); } /** * 顺序消息 生产者 */ @Test public void testOrderlySend(){ List<Order> orderList = Order.buildOrders(); for (Order order : orderList) { //发送消息 rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //参数是订单id号 Long orderId = Long.valueOf((String)arg); //肯定选择的队列的索引 long index = orderId % mqs.size(); log.info("mqs is ::" + mqs.get((int) index)); return mqs.get((int) index); } }); SendResult sendOrderly = rocketMQTemplate.syncSendOrderly("testTopicOrderLy", new GenericMessage<String>(order.toString()), order.getOrderId().toString()); log.info("发送结果="+sendOrderly+",orderid :"+order.getOrderId()); } } }
</details> <details> <summary>点击查看普通|延迟消费者代码</summary> ```java /** * 普通、延迟消息 消费者代码 */ @Component @RocketMQMessageListener(consumerGroup = "myConsumer", topic = "testTopic") public class RocketConsumer implements RocketMQListener<String> {
public void onMessage(String message) { System.out.println("接收到消息:="+message); }
}
</details> <details> <summary>点击查看顺序消费者代码</summary> ```java /** * 顺序消息 ,消费者 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = "myConsumerOrderly", topic = "testTopicOrderLy",consumeMode = ConsumeMode.ORDERLY) public class RocketConsumerOrderly implements RocketMQListener<String> { public void onMessage(String message) { log.info("当前线程:"+Thread.currentThread().getName()+",接收到消息:="+message); } }
</details> ### 8、RocketMQ的安装配置 ##### 一、配置系统环境变量;计算机/属性/高级系统设置/环境变量/系统变量,新建系统变量ROCKETMQ_HOME=RocketMQ安装路径  ##### 二、进入RocketMQ安装目录的bin目录下,右键用记事本打开修改runserver.cmd文件  ##### 三、修改runbroker.cmd文件  ##### 四、cmd进入到MQ/bin目录下启动 ```java 1.启动mqnamesrv.cmd start mqnamesrv.cmd ``` 成功的弹窗,此框勿关闭。  ```java 2.启动mqbroker.cmd start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true ``` 成功的弹窗,此框勿关闭。  注意:假如弹出提示框提示‘错误: 找不到或没法加载主类 xxxxxx’。打开runbroker.cmd,而后将‘%CLASSPATH%’加上英文双引号。保存并从新执行start语句。  ##### 五、下载RocketMQ的可视化插件 - 1) 下载地址: https://github.com/apache/rocketmq-externals/releases
-
2) 修改rocketmq-console\src\main\resources\application.properties,修改以下:
-
3) cmd窗口执行:mvn clean package -Dmaven.test.skip=true
-
4) jar包运行:java -jar rocketmq-console-ng-1.0.0.jar
-
5) 测试输入地址: http://127.0.0.1:8080/#/ops
原文出处:https://www.cnblogs.com/kaischoolmate/p/12147205.html