在使用MQ以前,咱们回顾一下前两篇博文的内容.java
RocketMQ
的四个概念,分别是:Producer
,Consumer
,Message
和Broker
RocketMQ
和其后台系统在本篇博文中,咱们会使用使用SpringBoot构建两个微服务,一个做为生产者,一个做为消费者,经过RocketMQ
传递消息,了解在Java
中使用RocketMQ的方法.git
在灰皮书第一篇文章中,我画了下面这个图:github
如今咱们本地的RocketMQ
也部署起来了,接下来咱们建立两个微服务经过MQ来收发消息,实现基本的流程.spring
首先咱们建立两个基于SpringBoot
的微服务,分别是:apache
rocketmq-consumer
消息消费者rocketmq-producer
消息生产者两个服务里面,rocketmq-consumer
的端口号是2001,rocketmq-producer
的端口号是2002springboot
分别在两个微服务写两个测试方法,启动测试:app
rocketmq-consumeride
@RestController public class ConsumerController { @GetMapping("/consumer") public String index() { return "rocketmq-consumer"; } }
rocketmq-producerspring-boot
@RestController public class ProducerController { @GetMapping("/producer") public String index() { return "rocketmq-producer"; } }
启动测试,两个接口都成功访问.微服务
根据咱们最上面的图,服务A发送消息到服务B,在这里,咱们用rocketmq-producer
来发送消息,消息发送到rocketmq
之后,由服务Brocketmq-consumer
消费消息.
使用rocketmq发送消息有不少种方式,由于咱们使用的是SpringBoot
,这里直接使用官方提供的rocketmq-spring-boot-starter
包来开发
在github
上有个项目:RocketMQ-Spring
它就是RocketMq官方提供的整合了SpringBoot
的rocketmq工具包,git地址以下:https://github.com/apache/rocketmq-spring
固然,你也可使用原生的rocketmq-client
包,在官方的示例中,使用的就是这种方式,具体能够查看官方文档,下面咱们直接使用rocketmq-spring-boot-starter
来发送消息.
咱们能够看到有不少的版本能够用:
这里咱们使用2.0.3
这个版本吧,具体的官方细节能够查看https://github.com/apache/rocketmq-spring/blob/release-2.0.3/README_zh_CN.md
首先是pom坐标:
<!--add dependency in pom.xml--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
而后再rocketmq-producer
的配置文件中配置rocketmq的name-server
和group
## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=producer
rocketmq-spring-boot-starter
中提供了一个RocketMQTemplate
来方便咱们发送消息,咱们能够直接注入这个类来使用.
RocketMQTemplate
有send
方法和convertAndSend
方法,均可以用来发送消息,区别是,前者的方法入参是rocketmq
规定的Message
类型,然后者能够发送对象,而且帮咱们转换,源码以下:
/** * Send a message to the given destination. * @param destination the target destination * @param message the message to send */ void send(D destination, Message<?> message) throws MessagingException; /** * Convert the given Object to serialized form, possibly using a * {@link org.springframework.messaging.converter.MessageConverter}, * wrap it as a message and send it to a default destination. * @param payload the Object to use as payload */ void convertAndSend(Object payload) throws MessagingException;
下面咱们直接发送消息到mq
@Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/producer") public String index() { rocketMQTemplate.convertAndSend("test-topic", "消息发送成功!"); return "rocketmq-producer"; }
convertAndSend
方法有两个参数,第一个参数是消息要发送到的topic
,也就是目的地,第二个参数就是消息自己,至于topic究竟是什么,这个咱们后面详细来讲,咱们只须要知道,咱们的消息发送到了rocketmq
的一个叫作test-topic的地方便可.
而且,因为咱们在灰皮书第二章的时候,启动mq的时候,指定了autoCreateTopicEnable=true
,也就是说,咱们使用RocketMQTemplate
发送的消息,就算topic以前不存在,rocket也会帮咱们建立好.
编码完成,重启项目,咱们只要访问http://localhost:2002/producer
就会发送消息到mq,咱们能够经过rocketmq-console
查看咱们发送的消息
能够看到mq自动为咱们建立了topic:
在message页签,能够查看到咱们刚才发送的消息:
详细的消息内容:
在上面的例子中,咱们直接发送字符串到MQ,通常来讲,咱们发送的消息体是一个java对象,在这里也是能够的,咱们改造一下代码:
@GetMapping("/producer") public String index() { rocketMQTemplate.convertAndSend("test-topic", new User("张三", 20)); return "rocketmq-producer"; } @Data class User implements Serializable { private static final long serialVersionUID = -3486413003967431764L; private String name; private Integer age; User() {} User(String name, Integer age) { this.name = name; this.age = age; } }
这样咱们发送了一个User对象到RocketMQ
中,咱们再去rocketmq-console
查看:
能够看到,消息成功发送到了mq中,须要注意的是,这里咱们发送的对象要实现Serializable
接口,否则会抛异常.
那么咱们发送的消息的内容是怎么序列化的呢?
RocketMQ的消息体都是以
byte[]
方式存储的,若是内容体是java.lang.String
类型时,统一按照UTF-8
编码转成byte[]
;若是消息内容不是String类型的,则采用jackson-databind
序列化成JSON格式的字符串后,再统一按照UTF-8
编码转换成byte[]
以上释义源于RocketMQ
官方文档,因此说,有问题多看看官方文档能很大程度上解决咱们的疑惑!
好了,咱们的消息发送成功了,接下来咱们在rocketmq-consumer
应用中消费以前发送出来的消息.
在开发以前咱们先想一下: 消息的生产者随着用户的请求,不断的往MQ中发送消息,那么消费者在消费消息的时候,是怎么知道它要取哪一条消息呢?
咱们以前的文章中提到过一个topic
,生产者在发送消息的时候,会指定一个topic,消息会发送到某个topic下,那么天然而然的,消费者在获取消息的时候,也是须要知道它要从哪一个topic
里面去获取消息的.
而获取消息,则是经过监听器
来完成的.
建立一个监听器:
@Component @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer") @Slf4j public class Consumerlistener implements RocketMQListener<User> { @Override public void onMessage(User message) { log.info("收到消息 : {}", message); } }
@RocketMQMessageListener
注解中咱们指定了2个参数:
其次,咱们自定义的监听器还要实现RocketMQListener<T>
接口,该接口的泛型类型就是咱们生产者发送消息的消息类型,以前咱们发送的是User
对象,所以这里也是User
对象
实现RocketMQListener
接口的onMessage
方法,方法的入参就是咱们发送出来的消息,在这个方法中咱们能够进行本身的业务处理.
启动服务rocketmq-consumer
,能够看到正常消费到了消息:
以上,咱们成功的在咱们的微服务中使用RocketMQ
进行了消息的发送和消费.
不只仅是简单的消息,RocketMQ
还支持更高级的功能,好比事务消息
、消息轨迹
等,这些高级特效咱们会下后面的进阶文章中详细讲解.
在本篇博文中,咱们使用RocketMQ
官方提供的pom包进行了消息的发送和接收,也成功的在rocketmq-console
中查看到了消息.
在这个工程中,咱们接触了不少新的概念:
以上这些概念,以及前面篇文章中遗留下来的概念,咱们将在下一篇文章中详细介绍.
我的公众号<橙耘自留地>日前已经开通,后续博主发布的文章都会一并更新到公众号,若有须要,自行查阅.
关于橙耘自留地,是我我的聚焦互联网技术栈学习分享的一个平台,创立之初是由于目前业内各类技术课程资料层次不齐,褒贬不一,有时候一门课花费高价买入,其实内含草包,有时偶尔低价得之,却又大有干货.所以我会根据你们的意见和评价,选择不一样的技术栈去学习,一为提高我本身的技术,二为你们梳理出质量比较好的课程,以做参考.同时,相关的学习心得也会一并更新到博客和公众号.