前面已经说了JMS和RocketMQ一些概念和安装,下面使用SpringBoot来亲身操做一下.web
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
@Component public class PayProducer { /** * 生产组,生产者必须在生产组内 */ private String producerGroup = "pay_group"; /** * 端口 */ private String nameServer = "39.106.214.179:9876"; private DefaultMQProducer producer; public PayProducer() { producer = new DefaultMQProducer(producerGroup); // 指定nameServer地址,多个地址之间以 ; 隔开 producer.setNamesrvAddr(nameServer); start(); } public DefaultMQProducer getProducer() { return producer; } /** * 对象在使用以前必须调用一次,而且只能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 通常在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown() { producer.shutdown(); } }
@RestController public class PayController { @Autowired private PayProducer payProducer; /** * topic,消息依赖于topic */ private static final String topic = "pay_test_topic"; @RequestMapping("/api/v1/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { // 建立消息 主题 二级分类 消息内容好的字节数组 Message message = new Message(topic, "taga", ("hello rocketMQ " + text).getBytes()); SendResult send = payProducer.getProducer().send(message); System.out.println(send); return new HashMap<>(); } }
MQClientException: No route info of this topic, TopicTest1 这个的缘由就是Broker禁止自动建立Topic且用户没有经过手动方式建立Topic, 或者是broker与Nameserver网络不通 解决: 使用手动建立Topic,在RocketMQ控制台的主题中建立就好,最主要的是指定topic name,以下图 出现建立不了的状况往下看 若是还出现这个问题,请关闭防火墙
此次说下上面可能建立不了的问题,前面说了安装开放安全组,此次就是由于rocketMQ虚拟的端口问题,须要开放10909,也就是说ECS最终开放的端口号: 8080,10911,9876,10909spring
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout 这个问题是阿里云服务器存在多个网卡,rocketMQ会根据当前网卡选择一个IP使用,咱们须要制定一个IP: 路径是: /usr/local/software/rocketmq/distribution/target/apache-rocketmq vim ./conf/broker.conf 添加配置: brokerIP1=公网IP 从新启动: nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf & tail -f nohup.out
https://blog.csdn.net/qq_14853889/article/details/81053145 https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E https://www.jianshu.com/p/bfd6d849f156 https://blog.csdn.net/wangmx1993328/article/details/81588217
在前一个项目的基础上,将公共内容提取出来,建立一个JsmConfig的类,来声明公共内容:
```
public class JmsConfig {apache
/** * 端口 */ public static final String NAME_SERVER = "39.106.214.179:9876"; /** * topic,消息依赖于topic */ public static final String TOPIC = "pay_test_topic";}
生产者内容变为
```
@Component
public class PayProducer {vim
/** * 生产组,生产者必须在生产组内 */ private String producerGroup = "pay_producer_group"; private DefaultMQProducer producer; public PayProducer() { producer = new DefaultMQProducer(producerGroup); // 指定nameServer地址,多个地址之间以 ; 隔开 producer.setNamesrvAddr(JmsConfig.NAME_SERVER); start(); } public DefaultMQProducer getProducer() { return producer; } /** * 对象在使用以前必须调用一次,而且只能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 通常在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown() { producer.shutdown(); }}
建立消费者
```
@Component
public class PayConsumer {api
private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_consumer_group"; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); // 设置消费地点,从最后一个进行消费(其实就是消费策略) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 订阅主题的哪些标签 consumer.subscribe(JmsConfig.TOPIC, "*"); // 注册监听器 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { // 获取Message Message msg = msgs.get(0); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); String topic = msg.getTopic(); String body = new String(msg.getBody(), "utf-8"); // 标签 String tags = msg.getTags(); String keys = msg.getKeys(); System.out.println("topic=" + topic + ", tags=" + tags + ",keys=" + keys + ", msg=" + body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); System.out.println("Consumer Listener"); }
}
```数组
Controller的变化:
```
@RestController
public class PayController {安全
@Autowired private PayProducer payProducer; @RequestMapping("/api/v1/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { // 建立消息 主题 二级分类 消息内容好的字节数组 Message message = new Message(JmsConfig.TOPIC, "taga", ("hello rocketMQ " + text).getBytes()); SendResult send = payProducer.getProducer().send(message); System.out.println(send); return new HashMap<>(); }
}
```服务器
梳理一下整个流程,生产者存在于生产组,因此生产组很重要,建立生产者须要指定生产组.消费者同理,建立消费者也须要指定消费组. 而且两者都须要指定NameServer. 有了生产者就要发送消息,也就是Message,建立Message须要指定Topic,二级分类和消息体等信息. 那消费者如何获取呢? 无非就是绑定Topic和二级分类就能够,这就是整个流程. 中间少说了消息的存放,消息是在broker中,这个至关于仓库,因此就是生产者生产消息到Broker,Consumer从Broker中获取消息进行消费.网络