SpringBoot整合RocketMQ

上篇博客讲解了服务器集群部署RocketMQ 博客地址:RocketMQ(2)---Docker部署RocketMQ集群html

这篇在上篇搭建好的基础上,将SpringBoot整合RocketMQ实现生产消费。git

GitHub地址https://github.com/yudiandemingzi/spring-boot-studygithub

1、搭建步骤

先说下技术大体架构spring

SpringBoot2.1.6 + Maven3.5.4 + rocketmq4.3.0 + JDK1.8 +Lombok(插件)

一、添加rocketmq包

<!--注意: 这里的版本,要和部署在服务器上的版本号一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>

二、JmsConfig(配置类)

链接RocketMQ服务器配置类,这里为了方便直接写成常量。apache

/** * @Description: 安装实际开发这里的信息 都是应该写在配置里,来读取,这里为了方便因此写成常量 */
public class JmsConfig { /** * Name Server 地址,由于是集群部署 因此有多个用 分号 隔开 */
    public static final String NAME_SERVER = "127.12.15.6:9876;127.12.15.6:9877"; /** * 主题名称 主题通常是服务器设置好 而不能在代码里去新建topic( 若是没有建立好,生产者往该主题发送消息 会报找不到topic错误) */
    public static final String TOPIC = "topic_family"; }

三、Producer (生产者)

@Slf4j @Component public class Producer { private String producerGroup = "test_producer"; private DefaultMQProducer producer; public Producer(){ //示例生产者
        producer = new DefaultMQProducer(producerGroup); //不开启vip通道 开通口端口会减2
        producer.setVipChannelEnabled(false); //绑定name server
 producer.setNamesrvAddr(JmsConfig.NAME_SERVER); start(); } /** * 对象在使用以前必需要调用一次,只能初始化一次 */
    public void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public DefaultMQProducer getProducer(){ return this.producer; } /** * 通常在应用上下文,使用上下文监听器,进行关闭 */
    public void shutdown(){ this.producer.shutdown(); } }

四、Consumer (消费者)

@Slf4j @Component public class Consumer { /** * 消费者实体对象 */
    private DefaultMQPushConsumer consumer; /** * 消费者组 */
    public static final String CONSUMER_GROUP = "test_consumer"; /** * 经过构造函数 实例化对象 */
    public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //订阅主题和 标签( * 表明全部标签)下信息
        consumer.subscribe(JmsConfig.TOPIC, "*"); // //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { // msgs中只收集同一个topic,同一个tag,而且key相同的message // 会把不一样的消息分别放置到不一样的队列中
            try { for (Message msg : msgs) { //消费者获取消息 这里只输出 不作后面逻辑处理
                    String body = new String(msg.getBody(), "utf-8"); log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("消费者 启动成功======="); } }

大体就是这边简单,下面就是测试。服务器

 

2、测试

先写个测试接口进行测试。架构

一、Controller

@Slf4j @RestController public class Controller { @Autowired private Producer producer; private List<String> mesList; /** * 初始化消息 */
    public Controller() { mesList = new ArrayList<>(); mesList.add("小小"); mesList.add("爸爸"); mesList.add("妈妈"); mesList.add("爷爷"); mesList.add("奶奶"); } @RequestMapping("/text/rocketmq") public Object callback() throws Exception { //总共发送五次消息
        for (String s : mesList) { //建立生产信息
            Message message = new Message(JmsConfig.TOPIC, "testtag", ("小小一家人的称谓:" + s).getBytes()); //发送
            SendResult sendResult = producer.getProducer().send(message); log.info("输出生产者信息={}",sendResult); } return "成功"; } }

二、测试结果

 

 

很明显生产发送消息已经成功,二消费者也成功接收了消息!app

另外咱们再来看下RocketMQ控制台是否也有消费记录分布式

 

 

很明显在控制台这边也会有消费记录!函数

总结这边只是简单的整合,后面会经过RocketMQ实现分布式事务,能够用于线上实际环境中,到时候会深刻讲解下源码。

 

转载于:http://www.javashuo.com/article/p-nfawozci-eh.html

相关文章
相关标签/搜索