SpringBoot+Rocketmq

@PostConstruct:用于在依赖关系注入完成以后须要执行的方法上,以执行任何初始化。此方法必须在将类放入服务以前调用。
@PreDestroy:在开发中咱们若是要在关闭spring容器后释放一些资源,就在这个类中写一个被@PreDestroy的方法。今天就由于这个浪费了好长时间,mq的生产者启动以后没有被销毁,致使我用idea结束程序以后端口号依然被占用,每次再启动都要杀进程。。。

先把最简单的代码贴出来,只有最基本的发送接收功能

pom.xml
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-common -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.3.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

application.propertiesjava

apache.rocketmq.consumer.PushConsumer=PushConsumer apache.rocketmq.producer.producerGroup=Producer apache.rocketmq.namesrvAddr=localhost:9876

TestController.javaweb

package com.rmqspringtest.demo; import com.rmqspringtest.demo.producer.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Autowired private ProducerService producer; @RequestMapping("/push") public String pushMsg(String msg) { return producer.send("test1", "push", msg); } }

ConsumerService.javaspring

package com.rmqspringtest.demo.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class ConsumerService { @Value("${apache.rocketmq.consumer.PushConsumer}") private String consumerGroup; @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQPushConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(namesrvAddr); try { consumer.subscribe("test1", "push"); // 若是是第一次启动,从队列头部开始消费 // 若是不是第一次启动,从上次消费的位置继续消费
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> { try { for (MessageExt messageExt : list) { String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("[Consumer] msgID(" + messageExt.getMsgId() + ") msgBody : " + messageBody); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("[Consumer 已启动]"); } catch (Exception e) { e.printStackTrace(); } } }

ProducerService.javaapache

package com.rmqspringtest.demo.producer; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @Service public class ProducerService { @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; private DefaultMQProducer producer; @PostConstruct public void initProducer() { producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(namesrvAddr); producer.setRetryTimesWhenSendFailed(3); try { producer.start(); System.out.println("[Producer 已启动]"); } catch (Exception e) { e.printStackTrace(); } } public String send(String topic, String tags, String msg) { SendResult result = null; try { Message message = new Message(topic, tags, msg.getBytes(RemotingHelper.DEFAULT_CHARSET)); result = producer.send(message); System.out.println("[Producer] msgID(" + result.getMsgId() + ") " + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } return "{\"MsgId\":\"" + result.getMsgId() + "\"}"; } @PreDestroy public void shutDownProducer() { if (producer != null) { producer.shutdown(); } } }

cmd中执行命令开启服务:json

start mqnamesrv后端

start mqbroker -n 127.0.0.1:9876app

发送请求:127.0.0.1:8080/push?msg=helloide

okspring-boot

 

 

 

 

RocketMQTemplate 这玩意 看一下
相关文章
相关标签/搜索
本站公众号
   欢迎关注本站公众号,获取更多信息