RocketMq灰皮书(三)------MQ使用

RocketMq灰皮书(三)------MQ使用

在使用MQ以前,咱们回顾一下前两篇博文的内容.java

  1. 咱们大体了解了RocketMQ的四个概念,分别是:Producer,Consumer,MessageBroker
  2. 咱们在本地的Windows10系统上,部署了RocketMQ和其后台系统

在本篇博文中,咱们会使用使用SpringBoot构建两个微服务,一个做为生产者,一个做为消费者,经过RocketMQ传递消息,了解在Java中使用RocketMQ的方法.git


一. SpringBoot整合RocketMQ收发消息

在灰皮书第一篇文章中,我画了下面这个图:github

image-20210209115705604

如今咱们本地的RocketMQ也部署起来了,接下来咱们建立两个微服务经过MQ来收发消息,实现基本的流程.spring


1. 微服务构建

首先咱们建立两个基于SpringBoot的微服务,分别是:apache

  • rocketmq-consumer消息消费者
  • rocketmq-producer消息生产者

image-20210218093723919

两个服务里面,rocketmq-consumer的端口号是2001,rocketmq-producer的端口号是2002springboot


2. 微服务启动测试

分别在两个微服务写两个测试方法,启动测试:app

rocketmq-consumeride

@RestController
public class ConsumerController {

    @GetMapping("/consumer")
    public String index() {
        return "rocketmq-consumer";
    }

}

rocketmq-producerspring-boot

@RestController
public class ProducerController {

    @GetMapping("/producer")
    public String index() {
        return "rocketmq-producer";
    }

}

启动测试,两个接口都成功访问.微服务

根据咱们最上面的图,服务A发送消息到服务B,在这里,咱们用rocketmq-producer来发送消息,消息发送到rocketmq之后,由服务Brocketmq-consumer消费消息.


3. 生产者发送消息

使用rocketmq发送消息有不少种方式,由于咱们使用的是SpringBoot,这里直接使用官方提供的rocketmq-spring-boot-starter包来开发

github上有个项目:RocketMQ-Spring

它就是RocketMq官方提供的整合了SpringBoot的rocketmq工具包,git地址以下:https://github.com/apache/rocketmq-spring

固然,你也可使用原生的rocketmq-client包,在官方的示例中,使用的就是这种方式,具体能够查看官方文档,下面咱们直接使用rocketmq-spring-boot-starter来发送消息.

咱们能够看到有不少的版本能够用:

image-20210218100739239

这里咱们使用2.0.3这个版本吧,具体的官方细节能够查看https://github.com/apache/rocketmq-spring/blob/release-2.0.3/README_zh_CN.md

3.1发送String消息

首先是pom坐标:

<!--add dependency in pom.xml-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>

而后再rocketmq-producer的配置文件中配置rocketmq的name-servergroup

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=producer

rocketmq-spring-boot-starter中提供了一个RocketMQTemplate来方便咱们发送消息,咱们能够直接注入这个类来使用.

RocketMQTemplatesend方法和convertAndSend方法,均可以用来发送消息,区别是,前者的方法入参是rocketmq规定的Message类型,然后者能够发送对象,而且帮咱们转换,源码以下:

/**
	 * Send a message to the given destination.
	 * @param destination the target destination
	 * @param message the message to send
	 */
	void send(D destination, Message<?> message) throws MessagingException;

	/**
	 * Convert the given Object to serialized form, possibly using a
	 * {@link org.springframework.messaging.converter.MessageConverter},
	 * wrap it as a message and send it to a default destination.
	 * @param payload the Object to use as payload
	 */
	void convertAndSend(Object payload) throws MessagingException;

下面咱们直接发送消息到mq

@Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/producer")
    public String index() {
        rocketMQTemplate.convertAndSend("test-topic", "消息发送成功!");
        return "rocketmq-producer";
    }

convertAndSend方法有两个参数,第一个参数是消息要发送到的topic,也就是目的地,第二个参数就是消息自己,至于topic究竟是什么,这个咱们后面详细来讲,咱们只须要知道,咱们的消息发送到了rocketmq的一个叫作test-topic的地方便可.

而且,因为咱们在灰皮书第二章的时候,启动mq的时候,指定了autoCreateTopicEnable=true,也就是说,咱们使用RocketMQTemplate发送的消息,就算topic以前不存在,rocket也会帮咱们建立好.

编码完成,重启项目,咱们只要访问http://localhost:2002/producer就会发送消息到mq,咱们能够经过rocketmq-console查看咱们发送的消息

能够看到mq自动为咱们建立了topic:

image-20210218113616038

在message页签,能够查看到咱们刚才发送的消息:

image-20210218113722908

详细的消息内容:

image-20210218113741161

3.2发送对象

在上面的例子中,咱们直接发送字符串到MQ,通常来讲,咱们发送的消息体是一个java对象,在这里也是能够的,咱们改造一下代码:

@GetMapping("/producer")
    public String index() {
        rocketMQTemplate.convertAndSend("test-topic", new User("张三", 20));
        return "rocketmq-producer";
    }

    @Data
    class User implements Serializable {

        private static final long serialVersionUID = -3486413003967431764L;
        private String name;
        private Integer age;



        User() {}

        User(String name, Integer age) {
            this.name = name;
            this.age = age;
        }
    }

这样咱们发送了一个User对象到RocketMQ中,咱们再去rocketmq-console查看:

image-20210218145923688

能够看到,消息成功发送到了mq中,须要注意的是,这里咱们发送的对象要实现Serializable接口,否则会抛异常.

那么咱们发送的消息的内容是怎么序列化的呢?

RocketMQ的消息体都是以byte[]方式存储的,若是内容体是java.lang.String类型时,统一按照UTF-8编码转成byte[];若是消息内容不是String类型的,则采用jackson-databind序列化成JSON格式的字符串后,再统一按照UTF-8编码转换成byte[]

以上释义源于RocketMQ官方文档,因此说,有问题多看看官方文档能很大程度上解决咱们的疑惑!


4. 消费消息

好了,咱们的消息发送成功了,接下来咱们在rocketmq-consumer应用中消费以前发送出来的消息.

在开发以前咱们先想一下: 消息的生产者随着用户的请求,不断的往MQ中发送消息,那么消费者在消费消息的时候,是怎么知道它要取哪一条消息呢?

咱们以前的文章中提到过一个topic,生产者在发送消息的时候,会指定一个topic,消息会发送到某个topic下,那么天然而然的,消费者在获取消息的时候,也是须要知道它要从哪一个topic里面去获取消息的.

而获取消息,则是经过监听器来完成的.

建立一个监听器:

@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer")
@Slf4j
public class Consumerlistener implements RocketMQListener<User> {



    @Override
    public void onMessage(User message) {
        log.info("收到消息 : {}", message);
    }


}

@RocketMQMessageListener注解中咱们指定了2个参数:

  • topic 指定监听器要监听的topic,监听器运行之后,会一直监听该topic下的消息
  • consumerGroup 指定当前消费者是数据哪一个消费组,这个概念咱们后面会详细说

其次,咱们自定义的监听器还要实现RocketMQListener<T>接口,该接口的泛型类型就是咱们生产者发送消息的消息类型,以前咱们发送的是User对象,所以这里也是User对象

实现RocketMQListener接口的onMessage方法,方法的入参就是咱们发送出来的消息,在这个方法中咱们能够进行本身的业务处理.

启动服务rocketmq-consumer,能够看到正常消费到了消息:

image-20210218155436627


以上,咱们成功的在咱们的微服务中使用RocketMQ进行了消息的发送和消费.

不只仅是简单的消息,RocketMQ还支持更高级的功能,好比事务消息消息轨迹等,这些高级特效咱们会下后面的进阶文章中详细讲解.

结语:

在本篇博文中,咱们使用RocketMQ官方提供的pom包进行了消息的发送和接收,也成功的在rocketmq-console中查看到了消息.

在这个工程中,咱们接触了不少新的概念:

  • topic
  • consumerGroup

以上这些概念,以及前面篇文章中遗留下来的概念,咱们将在下一篇文章中详细介绍.

我的公众号<橙耘自留地>日前已经开通,后续博主发布的文章都会一并更新到公众号,若有须要,自行查阅.

关于橙耘自留地,是我我的聚焦互联网技术栈学习分享的一个平台,创立之初是由于目前业内各类技术课程资料层次不齐,褒贬不一,有时候一门课花费高价买入,其实内含草包,有时偶尔低价得之,却又大有干货.所以我会根据你们的意见和评价,选择不一样的技术栈去学习,一为提高我本身的技术,二为你们梳理出质量比较好的课程,以做参考.同时,相关的学习心得也会一并更新到博客和公众号.

qrcode_for_gh_c1462e34b232_344

相关文章
相关标签/搜索