- 本文主要整理一下与 RabbitMQ 相关的一些基本概念,了解这些概念,是使用好 RabbitMQ 的基础;
- 并使用直连型交换机作了一个demo,经过实例去更好的理解这些概念。文末提供有 demo 下载地址。
备注:想对 AMQP 有深刻了解能够点这里查看。git
经过下面这张图片,咱们能够对 RabbitMQ 的模型有一个大概的了解:github
须要了解的基本概念主要以下:web
test1
,则只会转发转发 test1,不会转发 test2。Topic:模式匹配,Exchange 会把消息发送到一个或者多个知足通配符规则的 routing-key 的 Queue。spring
*
号表示匹配一个 word(好比:知足 a.*.c
的 routing-key 有 a.test1.c
);#
号匹配多个 word 和路径,路径之间经过 . 隔开(好比:知足 a.#.c
的 routing-key 有 a.test1.test2.c
);一会儿看到这么多概念,可能会有点发懵。下面咱们经过一个很简单的 demo 去深刻理解一下这些概念。安全
建立一个 SpringBoot 项目,在 pom.xml
文件里添加依赖:服务器
<!--mq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
# rabbitmq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
添加一个 RabbitMQ 的配置类 RabbitConfig:微信
/** * RabbitMQ配置 * @公众号 全栈在路上 * @GitHub https://github.com/liuyongfei1 * @author lyf * @date 2020-05-17 17:20 */ Configuration public class RabbitConfig { /** * 声明队列 * 参数说明: * durable 是否持久化,默认是false(持久化队列则数据会被存储在磁盘上,当消息代理重启时数据不会丢失;暂存队列只对当前链接有效) * exclusive 默认是false,只能被当前建立的链接使用,并且当链接关闭后队列即被删除。此参考优先级高于durable * autoDelete 默认是false,是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 * 通常设置一下队列的持久化就好,其他两个就是默认false * @return Queue */ @Bean Queue myQueue() { return new Queue(QueueConstants.QUEUE_NAME, true); } /** * 设置交换机,类型为 direct * @return DirectExchange */ @Bean DirectExchange myExchange() { return new DirectExchange(QueueConstants.QUEUE_EXCHANGE_NAME, true, false); } /** * 绑定:将交换机和队列绑定,并设置路由匹配键 * @return Binding */ @Bean Binding queueBinding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with(QueueConstants.QUEUE_ROUTING_KEY_NAME); } }
/** * 消息生产端 * @公众号 全栈在路上 * @GitHub https://github.com/liuyongfei1 * @author lyf * @date 2020-05-17 18:30 */ @RestController public class ProducerController { /** * RabbitTemplate提供了发送/接收消息的方法 */ @Autowired RabbitTemplate rabbitTemplate; /** * 发送消息(交换机类型为 Direct) * @return */ @GetMapping("/sendDirectMessage") public String sendDirectMessage() { // 生成消息的惟一id String msgId = UUID.randomUUID().toString(); String messageData = "hello,this is rabbitmq demo message"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 定义要发送的消息对象 Map<String,Object> messageObj = new HashMap<>(); messageObj.put("msgId",msgId); messageObj.put("messageData",messageData); messageObj.put("createTime",createTime); rabbitTemplate.convertAndSend(QueueConstants.QUEUE_EXCHANGE_NAME,QueueConstants.QUEUE_ROUTING_KEY_NAME, messageObj); return "message send ok"; } }
代码保存后,启动服务,使用 Postman 请求生产消息接口:app
Postman 请求成功,咱们打开 RabbitMQ 的管理界面:dom
能够看到有一个名为 demo1_queue
的队列,说明咱们测试的消息已经推送到RabbitMq 服务器上面了:spring-boot
Total 为 1
Ready 为 1
/** * 消息消费端 * @公众号 全栈在路上 * @GitHub https://github.com/liuyongfei1 * @author lyf * @date 2020-05-17 18:00 */ @Component @RabbitListener(queues = {QueueConstants.QUEUE_NAME}) public class ConsumerController { @RabbitHandler public void handler(Map message) throws IOException { System.out.println("收到消息:" + message.toString()); } }
保存代码,重启服务,在 Idea 的终端窗口,能够看到以前生产的那条消息已经被被消费了:
因为当前队列就 1 条消息 且已经被成功消费掉了,再次访问 RabbitMQ 管理界面,会发现 Ready 和 Total 已经更新为 0。