以前学过ActiveMQ可是并发量不是很大点我直达,因此又学阿里开源的RocketMQ,听说队列能够堆积亿级别。下面是网上找的消息队列对比图,仅供参考html
点我直达java
地址:https://downloads.apache.org/rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip linux
百度云盘:git
连接: https://pan.baidu.com/s/1luq_MwxSn8k_bugrnQSJWg 密码: varj
export JAVA_HOME=/opt/soft/jdk1.8.0_202
export PATH=$JAVA_HOME/bin:$PATH
export CLASPATH=.:$JAVA_home/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
export MAVEN_HOME=/opt/soft/apache-maven-3.6.3
export PATH=$PATH:$MAVEN_HOME/bin
nohup sh bin/mqnamesrv &
找到runserver.sh 修改JAVA_OPT
vim /bin/runserver.sh配置
nohup sh bin/mqbroker -n localhost:9876 &
语法:nohup sh bin/mqbroker -n NameServer服务ip地址
找到runbroker.sh 修改JAVA_OPT
vim /bin/runbroker.sh配置
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
开2个控制台,链接通一台linuxgithub
NameServer默认端口号:9876;broker默认端口号:10911web
点我直达面试
百度云盘redis
连接: https://pan.baidu.com/s/1mdEGkq-JBTy1wtNmFPkmDg 密码: v6bq
进入:/opt/soft/rocketmq-externals-master/rocketmq-console
编译: mvn clean package -Dmaven.test.skip=true
修改appliccation.properties的rocketmq.config.namesrvAddr算法
编译打包spring
进入target目录,启动java -jar
守护进程启动: nohup java -jar rocketmq-console-ng-2.0.0.jar &
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ybchen</groupId> <artifactId>ybchen-mq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ybchen-mq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--注意: 这里的版本,要和部署在服务器上的版本号一致--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.7.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
package com.ybchen.ybchenmq.jms; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component public class PayProducer { /** * 生产者所属的组 */ private String producerGroup = "pay_group"; /** * MQ的地址,注意需开放端口号或者关闭防火墙 */ private String nameServerAddr = "192.168.199.100:9876"; private DefaultMQProducer producer; public PayProducer() { producer = new DefaultMQProducer(producerGroup); //指定NameServer地址,多个地址以;隔开 //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876") producer.setNamesrvAddr(nameServerAddr); start(); } /** * 获取生产者 * @return */ public DefaultMQProducer getProducer() { return this.producer; } /** * 开启,对象在使用以前必需要调用一次,只能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 关闭,通常在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown() { this.producer.shutdown(); } }
package com.ybchen.ybchenmq.controller; import com.ybchen.ybchenmq.jms.PayProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @ClassName:PayController * @Description:支付 * @Author:chenyb * @Date:2020/10/18 2:47 下午 * @Versiion:1.0 */ @RestController @RequestMapping("/api/v1") public class PayController { @Autowired private PayProducer payProducer; private static final String TOPIC = "ybchen_pay_topic"; /** * 支付回调 * * @param text * @return */ @RequestMapping("pay_cb") public Object callback(String text) { /** * String topic:话题 * String tags:二级分类 * byte[] body:body消息字节数组 */ Message message = new Message(TOPIC,"tag_a",("hello ybchen ==>"+text).getBytes()); try { SendResult send = payProducer.getProducer().send(message); System.out.println("send------>"+send); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return "ok"; } }
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
缘由:阿里云存在多网卡,rocketmq会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,极可能会有问题,好比,机器上有两个ip,一个公网ip,一个私网ip,所以须要配置broker.conf指定当前公网的ip,而后重启broker
修改配置:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/broker.conf
新增这个配置:brokerIP1=xxx.xxx.xxx.xxx
启动命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
MQClientException: No route info of this topic, TopicTest1
缘由:Broker 紧追自动建立Topic,且用户没有经过手工方式建立此Topic,或者broker和Nameserver网络不通
解决:
经过sh bin/mqbroker -m 查看配置
autoCreateTopicEnable=true 则自动建立Topic
Centos 7 关闭防火墙:systemctl stop firewalld
控制台查看不了数据,提示链接10909错误
缘由:Rocket默认开启了VIP通道,VPI通道端口号为10911-2=10909
解决:阿里云安全组添加一个端口:10909
没法自动建立topic:客户端版本要和服务端版本保持一致
服务器上装的是4.7.1 引入依赖项时 <!--注意: 这里的版本,要和部署在服务器上的版本号一致--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.7.1</version> </dependency>
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ybchen</groupId> <artifactId>ybchen-mq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ybchen-mq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--注意: 这里的版本,要和部署在服务器上的版本号一致--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.7.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
package com.ybchen.ybchenmqconsumer.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; /** * @ClassName:PayConsumer * @Description:消费者 * @Author:chenyb * @Date:2020/10/18 4:13 下午 * @Versiion:1.0 */ @Component public class PayConsumer { /** * 生产者所属的组 */ private String producerGroup = "pay_consumer_group"; /** * MQ的地址,注意需开放端口号或者关闭防火墙 */ private String nameServerAddr = "192.168.199.100:9876"; /** * 订阅主题 */ private String topic = "ybchen_pay_topic"; private DefaultMQPushConsumer consumer; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(producerGroup); //指定NameServer地址,多个地址以;隔开 //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876") consumer.setNamesrvAddr(nameServerAddr); //设置消费地点,从最后一个开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //订阅主题,监听主题下的那些标签 consumer.subscribe(topic, "*"); //注解一个监听器 //lambda方式 // consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> { // try { // Message message = msg.get(0); // System.out.printf("%s Receive New Messages: %s %n", // Thread.currentThread().getName(), new String(msg.get(0).getBody())); // //主题 // String topic = message.getTopic(); // //消息内容 // String body = null; // body = new String(message.getBody(), "utf-8"); // //二级分类 // String tags = message.getTags(); // //键 // String keys = message.getKeys(); // System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // } catch (UnsupportedEncodingException e) { // e.printStackTrace(); // return ConsumeConcurrentlyStatus.RECONSUME_LATER; // } // }); //通常方式 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { Message message = list.get(0); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(list.get(0).getBody(),"utf-8")); //主题 String topic = message.getTopic(); //消息内容 String body = null; body = new String(message.getBody(), "utf-8"); //二级分类 String tags = message.getTags(); //键 String keys = message.getKeys(); System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start .........."); } }
server.port=8081
本地开发测试,配置简单,同步刷盘消息一条都不会丢
不可靠,若是宕机,会致使服务不可用
同步双写消息不丢失,异步复制存在少许丢失你,主节点宕机,从节点能够对外提供消息的消费,可是不支持写入
主备有短暂消息延迟,毫秒级,目前不支持自动切换,须要脚本或者其余程序进行检测而后中止broker,重启让从节点成为主节点
配置简单,能够靠配置RAID磁盘阵列保证消息可靠,异步刷盘丢失少许消息
master宕机期间,未被消费的消息在机器恢复以前不可消息,实时性会受到影响
磁盘损坏,消息丢失的很是小,消息实时性不会受影响,Master宕机后,消费者仍然能够从Slave消费
主备有短暂消息延迟,毫秒级,若是Master宕机,磁盘损坏状况,会丢失你少许消息
同步双写方式,主备都写成功,才向应用返回成功,服务可用性与数据可用性很是高
性能比异步复制模式略低,主宕机后,备机不能自动切换为主机
准备2台机器,ip地址分别为:192.168.199.100;192.168.199.101;
环境:RocketMQ4.7.1+jdk8+Maven+Centos 7
启动两个机器的nameserver
路径:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1
启动:nohup sh bin/mqnamesrc &
主节点
进入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async
编辑并修改以下:vim broker-a.properties
namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
brokerClusterName=YbChenCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
启动:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a.properties &
从节点
进入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async
编辑并修改以下:vim broker-a-s.properties
namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
brokerClusterName=YbChenCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
启动:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a-s.properties &
使用192.168.199.100这台服务器,修改配置
192.168.199.100这台服务器
进入:/opt/soft/rocketmq-externals-master/rocketmq-console/src/main/resources
修改配置文件:vim application.properties
rocketmq.config.namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
编译
切换到:/opt/soft/rocketmq-externals-master/rocketmq-console
打包:
mvn clean
mvn install -Dmaven.test.skip=true
启动
进入:/opt/soft/rocketmq-externals-master/rocketmq-console/target
守护进程方式启动:nohup java -jar rocketmq-console-ng-2.0.0.jar &
模拟主挂了,可是从还能够被消费,此时不能写入,等主重启后,能够继续写入(数据不会被重复消费),如下内容是连续的
好了,到目前为止,主从已经搭建完成了。
Broker分为Master和Slave,一个Master能够对应多个Slave,但一个Slave只能对应一个Master,Master与Slave经过相同的Broker Name来匹配,不一样的Broker id来定义时Master仍是Slave
Broker向全部的NameServer节点创建长链接,定时注册Topic和发送元数据信息
NameServer定时扫描(默认2分钟)全部存活Broker的链接,若是超过期间没响应,则断开链接(心跳检测),可是Consumer客户端不能感知,Consumer定时(30秒)从NameServer获取topic的最新信息,因此broker不可用时,Consumer最多须要30秒才能发现
只有Master才能进行写入操做,Slave不容许写入只能同步,同步策略取决于Master配置
客户端消费能够从Master和Slave消费,默认消费者都从Master消费,若是在Master挂了以后,客户端从NameServer中感知Broker宕机,就会从Slave消费,感知非实时,存在必定的滞后性,Slave不能保证Master的100%都同步过来,会有少许的消息丢失。一旦Master恢复,未同步过去的消息会被最终消费掉。
若是Consumer实例的数量比Message Queue的总数量还多的话,多出来的Consumer实例将没法分到Queue,也就没法消费到消息,也就没法起到分摊负载的做用,因此须要控制让Queue的总数量大于Consumer的数量。
生产者设置重试次数,并设置惟一的key(通常惟一标识符)
设置广播方式
好比12306付完钱💰后,异步出票,对性能要求高,能够支持更高的并发,回调成功后触发相应的业务(onSuccess)
onSuccess:由于是异步方式,这里能够记录日志啥的
onException:补偿机制,根据实际状况使用,看是否进行重试
主要作日志收集,适用于对性能要求高,但可靠性并不高的场景。
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
生产消息使用MessageQueueSelector投递到Topic下指定的Queue
默认topic下的queue数量是4,能够配置
支持同步,异步发送指定的MessageQueue
选择的queue数量必须小于配置的,不然会出错
若是队列中某个产品,流量暴增,随机分配的话,会致使整个Topic都不能使用,指定到队列的话,若是这个队列坏了,其余队列不影响使用。
发送结果=SEND_OK,msg=SendResult [sendStatus=SEND_OK, msgId=AC1068013E3F18B4AAC276723EAC0000, offsetMsgId=C0A8C76400002A9F000000000009B536, messageQueue=MessageQueue [topic=ybchen_pay_topic, brokerName=broker-a, queueId=0], queueOffset=1]
发送结果=SEND_OK,msg=SendResult [sendStatus=SEND_OK, msgId=AC1068013E3F18B4AAC27672BCD50001, offsetMsgId=C0A8C76400002A9F000000000009B602, messageQueue=MessageQueue [topic=ybchen_pay_topic, brokerName=broker-a, queueId=0], queueOffset=2]
发送结果=SEND_OK,msg=SendResult [sendStatus=SEND_OK, msgId=AC1068013E3F18B4AAC27672CAA20002, offsetMsgId=C0A8C76400002A9F000000000009B6CF, messageQueue=MessageQueue [topic=ybchen_pay_topic, brokerName=broker-a, queueId=0], queueOffset=3]
能够看到打印出来的,queueId=0
生产者端代码修改
@Autowired private PayProducer payProducer; private static final String TOPIC = "ybchen_pay_topic"; /** * 支付回调 * * @param text * @return */ @RequestMapping("pay_cb") public Object callback(String text) { /** * String topic:话题 * String tags:二级分类 * byte[] body:body消息字节数组 */ Message message = new Message(TOPIC, "tag_a", text.getBytes()); //生产者使用MessageQueueSelector投递到Topic下指定的Queue,arg只能小于等于4 // try { // SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() { // @Override // public MessageQueue select(List<MessageQueue> list, Message message, Object o) { // int queueNum=Integer.parseInt(o.toString()); // return list.get(queueNum); // } // }, 0); // System.out.printf("发送结果=%s,msg=%s",sendResult.getSendStatus(),sendResult); // } catch (MQClientException e) { // e.printStackTrace(); // } catch (RemotingException e) { // e.printStackTrace(); // } catch (MQBrokerException e) { // e.printStackTrace(); // } catch (InterruptedException e) { // e.printStackTrace(); // } //异步发送到指定的queue try { payProducer.getProducer().send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { int queueNum = Integer.parseInt(o.toString()); return list.get(queueNum); } }, 3, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("发送结果=%s,msg=%s", sendResult.getSendStatus(), sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h //message.setDelayTimeLevel(2); // try { // SendResult send = payProducer.getProducer().send(message); // System.out.println("send------>"+send); // } catch (MQClientException e) { // e.printStackTrace(); // } catch (RemotingException e) { // e.printStackTrace(); // } catch (MQBrokerException e) { // e.printStackTrace(); // } catch (InterruptedException e) { // e.printStackTrace(); // } //异步发送 // try { // payProducer.getProducer().send(message, new SendCallback() { // @Override // public void onSuccess(SendResult sendResult) { // System.out.printf("发送结果=%s,msg=%s",sendResult.getSendStatus(),sendResult); // } // // @Override // public void onException(Throwable e) { // e.printStackTrace(); // //补偿机制,根据实际状况使用,看是否进行重试 // } // }); // } catch (MQClientException e) { // e.printStackTrace(); // } catch (RemotingException e) { // e.printStackTrace(); // } catch (InterruptedException e) { // e.printStackTrace(); // } return "ok"; }
顺序消息能够应用到电商和证券系统,订单系统。
消息的生产和消费顺序一致
topic下面所有消息都要有序(不多用)
只要保证一组消息被顺序消费便可(RocketMQ中使用)
对于指定的一个Topic,客户端按照必定的前后顺序发送消息
对于指定的一个Topic,按照必定的前后顺序接收消息,即先发送的消息必定先会被客户端接收到
建立ProductOrder.java
package com.ybchen.ybchenmq.entity; import java.io.Serializable; import java.util.ArrayList; import java.util.List; /** * @ClassName:ProductOrder * @Description:订单 * @Author:chenyb * @Date:2020/10/25 12:56 下午 * @Versiion:1.0 */ public class ProductOrder implements Serializable { /** * 订单id */ private long orderIdl; /** * 订单操做类型 */ private String type; public long getOrderIdl() { return orderIdl; } public void setOrderIdl(long orderIdl) { this.orderIdl = orderIdl; } public String getType() { return type; } public void setType(String type) { this.type = type; } public ProductOrder() { } public ProductOrder(long orderIdl, String type) { this.orderIdl = orderIdl; this.type = type; } @Override public String toString() { return "ProductOrder{" + "orderIdl=" + orderIdl + ", type='" + type + '\'' + '}'; } /** * 模拟批量建立实体类 * @return */ public static List<ProductOrder> getOrderList(){ List<ProductOrder> list=new ArrayList<>(); list.add(new ProductOrder(111L,"建立订单")); list.add(new ProductOrder(222L,"建立订单")); list.add(new ProductOrder(333L,"建立订单")); list.add(new ProductOrder(111L,"支付订单")); list.add(new ProductOrder(222L,"支付订单")); list.add(new ProductOrder(111L,"完成订单")); list.add(new ProductOrder(222L,"完成订单")); list.add(new ProductOrder(333L,"支付订单")); list.add(new ProductOrder(333L,"完成订单")); return list; } }
控制层:PayController.java
@Autowired private PayProducer payProducer; private static final String TOPIC = "ybchen_pay_topic"; private static final String TOPIC_ORDER = "ybchen_pay_order_topic"; @RequestMapping("pay_order") public Object payOrder() throws Exception{ //获取订单号 List<ProductOrder> list=ProductOrder.getOrderList(); for (int i = 0; i < list.size(); i++) { ProductOrder order=list.get(i); Message message=new Message(TOPIC_ORDER, "", order.getOrderIdl()+"", order.toString().getBytes()); //发送,同一个订单id进入同一个队列中 SendResult sendResult =payProducer.getProducer().send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) { Long id=(Long)arg; long index=id%mqs.size(); return mqs.get((int) index); } },order.getOrderIdl()); //打印输出结果 System.out.printf("发送结果=%s,sendResult=%s,orderId=%s,type=%s\n", sendResult.getSendStatus(), sendResult.toString(), order.getOrderIdl(), order.getType()); } return "ok"; }
package com.ybchen.ybchenmqconsumer.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; import java.util.List; /** * @ClassName:PayOrderConsumer * @Description:消费者-订单 * @Author:chenyb * @Date:2020/10/18 4:13 下午 * @Versiion:1.0 */ @Component public class PayOrderConsumer { /** * 生产者所属的组 */ private String producerGroup = "pay_order_consumer_group"; /** * MQ的地址,注意需开放端口号或者关闭防火墙 */ private String nameServerAddr = "192.168.199.100:9876;192.168.199.101:9876"; /** * 订阅主题,订单 */ private static final String TOPIC_ORDER = "ybchen_pay_order_topic"; private DefaultMQPushConsumer consumer; public PayOrderConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(producerGroup); //指定NameServer地址,多个地址以;隔开 //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876") consumer.setNamesrvAddr(nameServerAddr); //设置消费地点,从最后一个开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //订阅主题,监听主题下的那些标签 consumer.subscribe(TOPIC_ORDER, "*"); //默认是集群方式,广播方式不支持重试 consumer.setMessageModel(MessageModel.CLUSTERING); //注解一个监听器 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { MessageExt msg=list.get(0); System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(), new String(msg.getBody())); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("consumer order start .........."); } }
能够看到消费的时候,有点慢,由于我本地安装了2个虚拟机作一主一从,消费的顺序是正确的,都是按照:建立订单、支付订单、完成订单
2020-10-25 13:52:31.822 INFO 1473 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2020-10-25 13:52:31.822 INFO 1473 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2020-10-25 13:52:31.825 INFO 1473 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 3 ms 发送结果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D46F0000, offsetMsgId=C0A8C76400002A9F000000000009C8B2, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=3], queueOffset=6],orderId=111,type=建立订单 发送结果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4930001, offsetMsgId=C0A8C76400002A9F000000000009C9A5, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=2], queueOffset=6],orderId=222,type=建立订单 发送结果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4A90002, offsetMsgId=C0A8C76400002A9F000000000009CA98, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=1], queueOffset=6],orderId=333,type=建立订单 发送结果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4C00003, offsetMsgId=C0A8C76400002A9F000000000009CB8B, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=3], queueOffset=7],orderId=111,type=支付订单 发送结果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4CC0004, offsetMsgId=C0A8C76400002A9F000000000009CC7E, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=2], queueOffset=7],orderId=222,type=支付订单 发送结果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4D00005, offsetMsgId=C0A8C76400002A9F000000000009CD71, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=3], queueOffset=8],orderId=111,type=完成订单 发送结果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4D30006, offsetMsgId=C0A8C76400002A9F000000000009CE64, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=2], queueOffset=8],orderId=222,type=完成订单 发送结果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4DE0007, offsetMsgId=C0A8C76400002A9F000000000009CF57, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=1], queueOffset=7],orderId=333,type=支付订单 发送结果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4F80008, offsetMsgId=C0A8C76400002A9F000000000009D04A, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=1], queueOffset=8],orderId=333,type=完成订单
ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderIdl=333, type='建立订单'} ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderIdl=333, type='支付订单'} ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderIdl=333, type='完成订单'} ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderIdl=222, type='建立订单'} ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderIdl=222, type='支付订单'} ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderIdl=222, type='完成订单'} ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderIdl=111, type='建立订单'} ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderIdl=111, type='支付订单'} ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderIdl=111, type='完成订单'}
消费者会平均分配queue的数量,消费者数量小于等于4!!!
本地在线模拟,一个生产者、3个消费者场景,看看消费的顺序,内容较长,被分割3块
逻辑队列,默认存储位置:/root/store/consumequeue
真正存储消息文件的,默认存储位置:/root/store/commitlog
RocketMQ不保证消息不重复,若是业务保证严格的不能重复消费,须要本身去业务端去重
指定某个字段惟一值
利用Redis的特性分布式锁,下面是我以前的代码,待改造
package com.cyb.redis.utils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class jedisUtils { private static String ip = "192.168.31.200"; private static int port = 6379; private static JedisPool pool; static { pool = new JedisPool(ip, port); } public static Jedis getJedis() { return pool.getResource(); } public static boolean getLock(String lockKey, String requestId, int timeout) { //获取jedis对象,负责和远程redis服务器进行链接 Jedis je=getJedis(); //参数3:NX和XX //参数4:EX和PX String result = je.set(lockKey, requestId, "NX", "EX", timeout); if (result=="ok") { return true; } return false; } public static synchronized boolean getLock2(String lockKey, String requestId, int timeout) { //获取jedis对象,负责和远程redis服务器进行链接 Jedis je=getJedis(); //参数3:NX和XX //参数4:EX和PX Long result = je.setnx(lockKey, requestId); if (result==1) { je.expire(lockKey, timeout); //设置有效期 return true; } return false; } }
利用Redis的incr特性,若是大于0说明消费过了(须要设置过时时间)
连接: https://pan.baidu.com/s/1Q8iL0lH-bdFEycYGq61hQg 密码: rww2
连接: https://pan.baidu.com/s/1dkE7sAs9E4TjwDQ38Pv4_A 密码: mkjm