SpringBoot整合RabbitMQ消息队列-学习笔记

SpringBoot整合RabbitMQ消息队列-学习笔记

2018年08月30日 14:50:50 Calon Mo 阅读数 3672html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处连接和本声明。java

本文连接:http://www.javashuo.com/article/p-pqjrwton-r.htmlgit

前言

本篇文章主要用于记录我的学习RabbitMQ的过程,记录下来方便往后查看,若有错误的地方,还望指正。github

本篇文章比较详细地记录本人在学习过程当中的每个步骤,比较适合对RabbitMQ不熟的同窗学习,跟着本文操做一遍,就能够大概知道RabbitMQ的基础知识了。web

准备阶段

首先把RabbitMQ环境安装好,下面再详细介绍RabbitMQ各个知识点和如何使用。spring

因为是基于Centos7的操做系统安装RabbitMQ-3.7.7。json

为了方便操做,先把防火墙干掉,生产环境固然不能这么干,我的学习随意,如下是相关命令:centos

centos7关闭并禁止防火墙启动命令:浏览器

 
  1. systemctl stop firewalldspringboot

  2. systemctl disable firewalld

RabbitMQ安装

这里介绍一种比较简单的安装方法-依赖安装,不用单独安装erlang等依赖。

首先到RabbitMQ官网下载:http://www.rabbitmq.com/download.html,

选择合适你的操做系统版本,本人的操做系统是Centos7.5,因此选择RHEL/CentOS 7.x这个。

把下载好的rabbitmq-server-3.7.7-1.el7.noarch.rpm放到/home目录,因为RabbitMQ-3.7.7须要安装比较新的erlang-v19.3以上,而yum上并无这么高的版本,因此须要在/etc/yum.repos.d/目录下建立文件rabbitmq-erlang.repo,命令以下:

 
  1. cd /etc/yum.repos.d/

  2. touch rabbitmq-erlang.repo

编辑rabbitmq-erlang.repo命令以下:

vi rabbitmq-erlang.repo

添加如下内容到rabbitmq-erlang.repo:

 
  1. [rabbitmq-erlang]

  2. name=rabbitmq-erlang

  3. baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/21/el/7/

  4. gpgcheck=1

  5. gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc

  6. repo_gpgcheck=0

  7. enabled=1

上面baseurl是指向erlang-v21版本的连接。

 

cd到/home目录,执行安装RabbitMQ的命令:

yum install -y rabbitmq-server-3.7.7-1.el7.noarch.rpm

这个过程会下载安装依赖的erlang等依赖,等待安装完成,会出现下面的界面,则说明RabbitMQ就已经安装完成了。

 
  1. Running transaction check

  2. Running transaction test

  3. Transaction test succeeded

  4. Running transaction

  5. 正在安装 : erlang-21.0.5-1.el7.centos.x86_64 1/3

  6. 正在安装 : socat-1.7.3.2-2.el7.x86_64 2/3

  7. 正在安装 : rabbitmq-server-3.7.7-1.el7.noarch 3/3

  8. 验证中 : socat-1.7.3.2-2.el7.x86_64 1/3

  9. 验证中 : rabbitmq-server-3.7.7-1.el7.noarch 2/3

  10. 验证中 : erlang-21.0.5-1.el7.centos.x86_64 3/3

  11.  
  12. 已安装:

  13. rabbitmq-server.noarch 0:3.7.7-1.el7

  14.  
  15. 做为依赖被安装:

  16. erlang.x86_64 0:21.0.5-1.el7.centos socat.x86_64 0:1.7.3.2-2.el7

  17.  
  18. 完毕!

RabbitMQ设置

启动RabbitMQ服务:

service rabbitmq-server start

刚安装好的RabbitMQ是尚未用户的,也不能访问RabbitMQ的web管理后台,接下来先添加一个叫root的用户:

 
  1. rabbitmqctl add_user root root 

  2. rabbitmqctl set_user_tags root administrator

  3. rabbitmqctl set_permissions -p / root "." "." ".*"

  4.  
  5.  
  6. #更多命令查看:rabbitmqctl --help

启用web访问权限:

rabbitmq-plugins enable rabbitmq_management

重启RabbitMQ服务:

service rabbitmq-server restart

而后在浏览器输入:http://ip:15672/ ,这时能够看到RabbitMQ管理页面了,输入刚刚添加的帐号root,密码root便可进入。

登陆进去后界面以下:

RabbitMQ是基于Virtual Host来进行权限控制的,如今为咱们刚刚添加的root用户添加一个Virtual Host,在RabbitMQ的web管理后台,根据下图进行添加一个virtual host,添加成功后默认分配给root用户了。

 

RabbitMQ简介

    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种语言平台的客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

一般咱们谈到消息队列, 会有三个概念: 消息生产者(Provider)、队列(Queue)、消息消费者(Consumer),RabbitMQ 在这个基本概念上, 多作了一层抽象, 在消息生产者和队列之间, 加入了交换器 (Exchange)。这样消息生产者和队列就没有直接联系, 变成消息生产者把消息发送给交换器, 交换器根据调度策略再把消息发送给队列。

  1. 左侧P表明消息生产者,也就是往RabbitMQ发消息的程序。
  2. 中间便是RabbitMQ,其中包括交换机(Exchange)和队列(Queue)。
  3. 右侧C表明消费者,也就是往RabbitMQ拿消息的程序。

其中比较重要的概念有:虚拟主机(Virtual Host)、交换机(Exchange)、队列(Queue)、绑定(Binding)。

虚拟主机(Virtual Hosts)

        在上面已经说明如何为一个用户建立一个Virtual Host,一个虚拟主机持有一组交换机、队列和绑定。在RabbitMQ当中,用户只能在虚拟主机这个粒度上进行权限的控制。 若是须要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别建立一个虚拟主机。每个RabbitMQ服务器都有一个默认的虚拟主机“/”。

交换机(Exchange)

        交换机的功能主要是接收消息而且根据转发策略转发到对应的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误,这个ack模式后面再详细讨论。交换机有四种类型:Direct, topic, Headers and Fanout

队列(Queue)

        队列用于存放消息的载体,通常是和交换机进行绑定,交换机根据转发策略把消息转发到队列里。

绑定(Binding)

        也就是交换机须要和队列相绑定,这其中如上图所示,是多对多的关系。

 

交换机类型介绍

    Direct Exchange:

    

        direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个binding_key, 消息的routing_key与binding_key匹配时, 才会被交换器投送到绑定的队列中去.

    Topic:

        转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

  1. 路由键必须是一串字符,用句号(.)隔开,好比说 topic.message,或者 topic.message.detail 等。
  2. 路由模式必须包含一个星号(*),主要用于匹配路由键指定位置的一个单词,好比说,一个路由模式是这样:topic.*,那么就只能匹配路由键是:topic.message、topic.other等,第一个单词是 topic,第二个单词能够是任意一个单词。 井号(#)就表示一个或者多个单词,例如一个匹配模式是topic.#,那么能够匹配到例如:topic.message、topic.message.detail等,以topic.开头的路由键均可以匹配到。

    Fanout:

        Fanout类型相似于消息广播,无论路由键或者是路由模式,会把消息发给绑定给它的所有队列,若是配置了routing_key会被忽略。

    Headers:

        设置header attribute参数类型的交换机

 

项目简介

    本文是基于Springboot-1.5.15整合RabbitMQ来进行讲解,在真实工做中,生产者和消费者通常是在不一样的项目里,各自负责不一样的职责,这里为了模拟真实环境,建立两个不一样的项目进行演示。建立两个maven项目,消息生产者mq-rabbit-provider和消息消费者mq-rabbit-consumer,两个项目的pom.xml文件添加相同依赖:

 
  1. <dependency>

  2. <groupId>org.springframework.boot</groupId>

  3. <artifactId>spring-boot-starter-amqp</artifactId>

  4. </dependency>

  5. <dependency>

  6. <groupId>org.springframework.boot</groupId>

  7. <artifactId>spring-boot-starter-web</artifactId>

  8. </dependency>

mq-rabbit-provider项目的application.properties内容以下:

 
  1. server.port=8080

  2. spring.application.name=springboot-rabbitmq-provider

  3.  
  4. spring.rabbitmq.host=10.211.55.3

  5. spring.rabbitmq.port=5672

  6. spring.rabbitmq.username=root

  7. spring.rabbitmq.password=root

  8. #RabbitMQ的虚拟host

  9. spring.rabbitmq.virtual-host=CalonHost

mq-rabbit-consumer项目的application.properties内容以下:

 
  1. server.port=9090

  2. spring.application.name=springboot-rabbitmq-consumer

  3.  
  4. spring.rabbitmq.host=10.211.55.3

  5. spring.rabbitmq.port=5672

  6. spring.rabbitmq.username=root

  7. spring.rabbitmq.password=root

  8. #RabbitMQ的虚拟host

  9. spring.rabbitmq.virtual-host=CalonHost

这里只是端口和应用名不一样,其余都同样。

 

接下来分别介绍Direct、Topic、Fanout等3种不一样交换机的使用例子。

Direct Exchange

    在mq-rabbit-provider项目建一个配置类DirectRabbitConfig.java,配置交换机、队列、BindingKey=CalonDirectRouting的绑定关系,代码以下:

 
  1. @Configuration

  2. public class DirectRabbitConfig {

  3.  
  4. //队列

  5. @Bean

  6. public Queue CalonDirectQueue() {

  7. return new Queue("CalonDirectQueue",true);

  8. }

  9.  
  10. //Direct交换机

  11. @Bean

  12. DirectExchange CalonDirectExchange() {

  13. return new DirectExchange("CalonDirectExchange");

  14. }

  15.  
  16. //绑定

  17. @Bean

  18. Binding bindingDirect() {

  19. return BindingBuilder.bind(CalonDirectQueue()).to(CalonDirectExchange()).with("CalonDirectRouting");

  20. }

  21. }

    建立一个实体类User.java,这里说明一下,该实体类是消息的主体,因此必须实现Serializable接口,不然在消息消费者项目读取消息时会报错,代码以下:

 
  1. package mq.rabbit.entity;

  2.  
  3. import java.io.Serializable;

  4.  
  5. public class User implements Serializable{

  6.  
  7. private static final long serialVersionUID = 1L;

  8.  
  9. private String id;

  10. private String username;

  11. private String password;

  12. private String type;

  13.  
  14. public String getId() {

  15. return id;

  16. }

  17. public void setId(String id) {

  18. this.id = id;

  19. }

  20. public String getUsername() {

  21. return username;

  22. }

  23. public void setUsername(String username) {

  24. this.username = username;

  25. }

  26. public String getPassword() {

  27. return password;

  28. }

  29. public void setPassword(String password) {

  30. this.password = password;

  31. }

  32.  
  33. public String getType() {

  34. return type;

  35. }

  36. public void setType(String type) {

  37. this.type = type;

  38. }

  39. public User() {

  40. super();

  41. }

  42. public User(String id, String username, String password, String type) {

  43. super();

  44. this.id = id;

  45. this.username = username;

  46. this.password = password;

  47. this.type = type;

  48. }

  49. }

下面建立一个Controller,利用http请求进行调试,CalonDirectExchange是上面配置的交换机标识,CalonDirectRouting就是上面绑定好的queue名字,因为上面已经配置好交换机和队列的绑定关系,这两个组合就能够知道消息最终是发送到队列CalonDirectQueue里面去了,Controller类的代码以下:

 
  1. @Controller

  2. public class SendController {

  3.  
  4. @Autowired

  5. private RabbitTemplate template;

  6.  
  7. @GetMapping("/sendDirect")

  8. private @ResponseBody String sendDirect(String message) throws Exception {

  9. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendDirect");

  10. template.convertAndSend("CalonDirectExchange", "CalonDirectRouting", user);

  11. return "OK,sendDirect:" + message;

  12. }

  13. }

启动mq-rabbit-provider项目,在浏览器输入:

http://localhost:8080/sendDirect?message=123

再去RabbitMQ的web管理后台查看,你会发如今Queue里找到刚刚添加的那个队列,后面的数字就是消息数量有变化,说明消息已经存储进去了:

    把mq-rabbit-provider项目里的User类和DirectRabbitConfig类复制到mq-rabbit-consumer项目,User类用于读取消息时接收消息对象,DirectRabbitConfig能够不复制,可是若是RabbitMQ里尚未被监听的队列时会报错,复制过来是为了让RabbitMQ里尚未被监听的队列时自动建立该队列,防止报错。

建立队列监听类DirectReceiver.java,代码以下:

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6.  
  7. import mq.rabbit.entity.User;

  8.  
  9. @Component

  10. @RabbitListener(queues = "CalonDirectQueue")//CalonDirectQueue为队列名称

  11. public class DirectReceiver {

  12.  
  13. @RabbitHandler

  14. public void process(User user) {

  15. System.out.println("DirectReceiver消费者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  16. }

  17.  
  18. }

启动mq-rabbit-consumer项目,就会收到以前发送到CalonDirectQueue队列的消息了,继续调用上面的请求/sendDirect,消息消费者会继续收到消息。

 

Topic Exchange

在mq-rabbit-provider项目建一个配置类TopicRabbitConfig.java,配置交换机、队列、BindingKey的绑定关系,代码以下:

 
  1. package mq.rabbit.config;

  2.  
  3. import org.springframework.amqp.core.Binding;

  4. import org.springframework.amqp.core.BindingBuilder;

  5. import org.springframework.amqp.core.Queue;

  6. import org.springframework.amqp.core.TopicExchange;

  7. import org.springframework.context.annotation.Bean;

  8. import org.springframework.context.annotation.Configuration;

  9.  
  10. /**

  11. * Topic Exchange类型交换机

  12. * @author calon

  13. *

  14. */

  15. @Configuration

  16. public class TopicRabbitConfig {

  17.  
  18. public final static String first = "topic.first";

  19. public final static String second = "topic.second";

  20.  
  21. @Bean

  22. public Queue firstQueue() {

  23. return new Queue(TopicRabbitConfig.first);

  24. }

  25.  
  26. @Bean

  27. public Queue secondQueue() {

  28. return new Queue(TopicRabbitConfig.second);

  29. }

  30.  
  31. @Bean

  32. TopicExchange exchange() {

  33. return new TopicExchange("topicExchange");

  34. }

  35.  
  36. //绑定topic.first队列到routingKey为topic.first,只有topic.first的routingKey消息才发送到此队列

  37. @Bean

  38. Binding bindingExchangeMessage() {

  39. return BindingBuilder.bind(firstQueue()).to(exchange()).with(first);

  40. }

  41.  
  42. //绑定topic.second队列到topic.#,凡是topic.开头的routingKey消息都发送到此队列

  43. @Bean

  44. Binding bindingExchangeMessage2() {

  45. return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");

  46. }

  47.  
  48. }

Topic Exchange类型的交换机是基于模糊匹配规则,因此这里建立两个Queue,分别绑定到两个BindingKey:topic.first和topic.#,用来测试消息进到哪一个队列里。

在SendController类里添加两个request,代码以下:

 
  1. @Controller

  2. public class SendController {

  3.  
  4. @Autowired

  5. private RabbitTemplate template;

  6.  
  7. @GetMapping("/sendTopicFirst")

  8. private @ResponseBody String sendTopicFirst(String message) {

  9. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendTopicFirst");

  10. template.convertAndSend("topicExchange", "topic.first", user);

  11. return "OK,sendTopicFirst:" + message;

  12. }

  13.  
  14. @GetMapping("/sendTopicSecond")

  15. private @ResponseBody String sendTopicSecond(String message) {

  16. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendTopicSecond");

  17. template.convertAndSend("topicExchange", "topic.second", user);

  18. return "OK,sendTopicSecond:" + message;

  19. }

  20. }

当咱们调用/sendTopicFirst请求时,交换机为topicExchange,routingKey为topic.first,按照上面bindingKey的配置,能够匹配到topic.first和topic.#规则,对应的队列是topic.first和topic.second,因此一条消息进到两个队列里。

当调用/sendTopicSecond请求时,交换机为topicExchange,routingKey为topic.second,匹配到topic.#规则,对应的队列是topic.second,因此消息进到topic.second队列里,除了#匹配规则,你们能够自行试试星号(*)这个匹配规则,*符号是匹配一个单词的。

把mq-rabbit-provider项目里的TopicRabbitConfig类复制到mq-rabbit-consumer项目,分别建立TopicFirstReceiver和TopicSecondReceiver消息监听类,代码以下:

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6. import mq.rabbit.entity.User;

  7.  
  8. @Component

  9. @RabbitListener(queues = "topic.first")

  10. public class TopicFirstReceiver {

  11.  
  12. @RabbitHandler

  13. public void process(User user) {

  14. System.out.println("TopicFirstReceiver消费者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  15. }

  16. }

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6. import mq.rabbit.entity.User;

  7.  
  8. @Component

  9. @RabbitListener(queues = "topic.second")

  10. public class TopicSecondReceiver {

  11.  
  12. @RabbitHandler

  13. public void process(User user) {

  14. System.out.println("TopicSecondReceiver消费者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  15. }

  16.  
  17. }

启动mq-rabbit-consumer项目,会发现分别接收到各自监听的队列的消息。

 

Fanout Exchang

    在mq-rabbit-provider项目建一个配置类FanoutRabbitConfig.java,配置交换机、队列的绑定关系,代码以下:    

 
  1. package mq.rabbit.config;

  2.  
  3. import org.springframework.amqp.core.Binding;

  4. import org.springframework.amqp.core.BindingBuilder;

  5. import org.springframework.amqp.core.FanoutExchange;

  6. import org.springframework.amqp.core.Queue;

  7. import org.springframework.context.annotation.Bean;

  8. import org.springframework.context.annotation.Configuration;

  9.  
  10. @Configuration

  11. public class FanoutRabbitConfig {

  12.  
  13. @Bean

  14. public Queue AMessage() {

  15. return new Queue("fanout.A");

  16. }

  17.  
  18. @Bean

  19. public Queue BMessage() {

  20. return new Queue("fanout.B");

  21. }

  22.  
  23. @Bean

  24. public Queue CMessage() {

  25. return new Queue("fanout.C");

  26. }

  27.  
  28. @Bean

  29. FanoutExchange fanoutExchange() {

  30. return new FanoutExchange("fanoutExchange");

  31. }

  32.  
  33. @Bean

  34. Binding bindingExchangeA() {

  35. return BindingBuilder.bind(AMessage()).to(fanoutExchange());

  36. }

  37.  
  38. @Bean

  39. Binding bindingExchangeB() {

  40. return BindingBuilder.bind(BMessage()).to(fanoutExchange());

  41. }

  42.  
  43. @Bean

  44. Binding bindingExchangeC() {

  45. return BindingBuilder.bind(CMessage()).to(fanoutExchange());

  46. }

  47. }

这里建立三个队列fanout.A、fanout.B、fanout.C,都绑定到FanoutExchange交换机fanoutExchange上。

在SendController类添加一个请求/sendFanout,代码以下:

 
  1. package mq.rabbit.controller;

  2.  
  3. import java.util.UUID;

  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;

  5. import org.springframework.beans.factory.annotation.Autowired;

  6. import org.springframework.stereotype.Controller;

  7. import org.springframework.web.bind.annotation.GetMapping;

  8. import org.springframework.web.bind.annotation.PathVariable;

  9. import org.springframework.web.bind.annotation.ResponseBody;

  10. import com.fasterxml.jackson.databind.ObjectMapper;

  11. import mq.rabbit.entity.User;

  12.  
  13. @Controller

  14. public class SendController {

  15.  
  16. @Autowired

  17. private RabbitTemplate template;

  18.  
  19. @GetMapping("/sendFanout")

  20. private @ResponseBody String sendFanout(String message) {

  21. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendFanout");

  22. template.convertAndSend("fanoutExchange", null, user);

  23. return "OK,sendFanout:" + message;

  24. }

  25.  
  26. }

当调用/sendFanout请求时,在RabbitMQ的web管理界面看到三个队列fanout.A、fanout.B、fanout.C都有一条消息,在Fanout交换机里,若是有设置BindingKey,Fanout交换机会忽略已设置的BindingKey,把消息发送到绑定该交换机的全部队列里。

 

把mq-rabbit-provider项目里的FanoutRabbitConfig类复制到mq-rabbit-consumer项目,分别建立FanoutReceiverA、FanoutReceiverB和FanoutReceiverC类,代码以下:

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6. import mq.rabbit.entity.User;

  7.  
  8. @Component

  9. @RabbitListener(queues = "fanout.A")

  10. public class FanoutReceiverA {

  11.  
  12. @RabbitHandler

  13. public void process(User user) {

  14. System.out.println("FanoutReceiverA消费者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  15. }

  16.  
  17. }

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6. import mq.rabbit.entity.User;

  7.  
  8. @Component

  9. @RabbitListener(queues = "fanout.B")

  10. public class FanoutReceiverB {

  11.  
  12. @RabbitHandler

  13. public void process(User user) {

  14. System.out.println("FanoutReceiverB消费者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  15. }

  16.  
  17. }

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6. import mq.rabbit.entity.User;

  7.  
  8. @Component

  9. @RabbitListener(queues = "fanout.C")

  10. public class FanoutReceiverC {

  11.  
  12. @RabbitHandler

  13. public void process(User user) {

  14. System.out.println("FanoutReceiverC消费者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  15. }

  16.  
  17. }

上面也能够在一个类里写3个方法来进行对队列的监听,不一样的地方在于把@RabbitListener移到方法上便可。

启动mq-rabbit-consumer,便可收到队列的消息。

 

RabbitMQ消息的确认机制

    在使用RabbitMQ的时候,咱们能够经过消息持久化操做来解决由于服务器的异常奔溃致使的消息丢失,除此以外咱们还会遇到一个问题,当消息的生产者在将消息发送出去以后,消息到底有没有正确到达服务器?若是不进行特殊配置的话,默认状况下发布消息是不会返回任何信息给生产者的,也就是生产者是不知道消息有没有正确到达消息服务器,同理,消息消费者在接收消息后,若是在执行业务逻辑过程出现异常崩溃等状况,会致使消息丢失,因此咱们须要对消息的发送和消费进行确认,确保消息可以被正确的存储和消费。RabbitMQ为咱们提供了两种方式:一、事务机制;二、确认机制。下面介绍消息确认机制。

 

生产者消息确认机制:

先把例子跑起来,下面再作详细介绍。在mq-rabbit-provider项目的application.properties文件添加如下属性:

 
  1. #确认消息已发送到交换机(Exchange)

  2. spring.rabbitmq.publisher-confirms=true

  3. #确认消息已发送到队列(Queue)

  4. spring.rabbitmq.publisher-returns=true

在mq-rabbit-provider项目建立配置类RabbitConfig.java,代码以下:

 
  1. package mq.rabbit.config;

  2.  
  3. import org.springframework.amqp.core.Message;

  4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;

  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;

  6. import org.springframework.amqp.rabbit.support.CorrelationData;

  7. import org.springframework.context.annotation.Bean;

  8. import org.springframework.context.annotation.Configuration;

  9.  
  10. @Configuration

  11. public class RabbitConfig {

  12.  
  13. @Bean

  14. public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){

  15. RabbitTemplate rabbitTemplate = new RabbitTemplate();

  16. rabbitTemplate.setConnectionFactory(connectionFactory);

  17. rabbitTemplate.setMandatory(true);//必须设置为true,才能让下面的ReturnCallback函数生效

  18.  
  19. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

  20. @Override

  21. public void confirm(CorrelationData correlationData, boolean ack, String cause) {

  22. System.out.println("=======ConfirmCallback=========");

  23. System.out.println("correlationData = " + correlationData);

  24. System.out.println("ack = " + ack);

  25. System.out.println("cause = " + cause);

  26. System.out.println("=======ConfirmCallback=========");

  27. }

  28. });

  29.  
  30. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

  31. @Override

  32. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

  33. System.out.println("--------------ReturnCallback----------------");

  34. System.out.println("message = " + message);

  35. System.out.println("replyCode = " + replyCode);

  36. System.out.println("replyText = " + replyText);

  37. System.out.println("exchange = " + exchange);

  38. System.out.println("routingKey = " + routingKey);

  39. System.out.println("--------------ReturnCallback----------------");

  40. }

  41. });

  42.  
  43. return rabbitTemplate;

  44. }

  45.  
  46. }

RabbitMQ生产者是依赖两个回调函数来实现确认的,分别是ConfirmCallback和ConfirmCallback,如上面的代码。按如下4种状况进行回调:

一、消息找不到交换机(Exchange)时回调ConfirmCallback,返回ack=false,代码以下:

 
  1. =======ConfirmCallback=========

  2. correlationData = null

  3. ack = false

  4. cause = channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)

  5. =======ConfirmCallback=========

  6. 2018-08-30 09:59:37.892 ERROR 55704 --- [0.211.55.3:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)

二、消息找到交换机(Exchange)但找不到队列(Queue)时回调ConfirmCallback和ReturnCallback,返回ack=true,replyCode = 312,replyText = NO_ROUTE,代码以下:

 
  1. --------------ReturnCallback----------------

  2. message = (Body:'[B@bf8af5b(byte[179])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/x-java-serialized-object, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=null, receivedExchange=null, receivedRoutingKey=null, receivedDelay=null, deliveryTag=0, messageCount=null, consumerTag=null, consumerQueue=null])

  3. replyCode = 312

  4. replyText = NO_ROUTE

  5. exchange = CalonDirectExchange

  6. routingKey = CalonDirectRouting1

  7. --------------ReturnCallback----------------

  8. =======ConfirmCallback=========

  9. correlationData = null

  10. ack = true

  11. cause = null

  12. =======ConfirmCallback=========

三、消息既找不到交换机(Exchange)又找不到队列(Queue)时回调ConfirmCallback,返回ack=false,代码以下:

 
  1. =======ConfirmCallback=========

  2. correlationData = null

  3. ack = false

  4. cause = channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)

  5. =======ConfirmCallback=========

  6. 2018-08-30 10:03:22.204 ERROR 55704 --- [0.211.55.3:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)

四、消息成功发送回调ConfirmCallback,返回ack=true,代码以下:

 
  1. =======ConfirmCallback=========

  2. correlationData = null

  3. ack = true

  4. cause = null

  5. =======ConfirmCallback=========

根据上面4种状态,咱们能够在这两个回调函数里根据返回的状态进行业务方面的处理,好比业务回滚或者从新发送消息等,能够基于上面SendController类对其中一个请求进行测试,更改exchange和routingKey来测试一下这4种状态,这个就是生产消息的确认机制。

 

消费者消息确认机制:

    在mq-rabbit-consumer项目的DirectRabbitConfig配置类进行消息消费确认机制的配置,代码以下:

 
  1. package mq.rabbit.config;

  2.  
  3. import org.springframework.amqp.core.AcknowledgeMode;

  4. import org.springframework.amqp.core.Binding;

  5. import org.springframework.amqp.core.BindingBuilder;

  6. import org.springframework.amqp.core.DirectExchange;

  7. import org.springframework.amqp.core.Queue;

  8. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

  9. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

  10. import org.springframework.beans.factory.annotation.Autowired;

  11. import org.springframework.context.annotation.Bean;

  12. import org.springframework.context.annotation.Configuration;

  13. import mq.rabbit.receiver.DirectAckReceiver;

  14.  
  15. @Configuration

  16. public class DirectRabbitConfig {

  17.  
  18. @Bean

  19. public Queue CalonDirectQueue() {

  20. return new Queue("CalonDirectQueue",true);

  21. }

  22.  
  23. @Bean

  24. DirectExchange CalonDirectExchange() {

  25. return new DirectExchange("CalonDirectExchange");

  26. }

  27.  
  28. @Bean

  29. Binding bindingDirect() {

  30. return BindingBuilder.bind(CalonDirectQueue()).to(CalonDirectExchange()).with("CalonDirectRouting");

  31. }

  32.  
  33. @Autowired

  34. private CachingConnectionFactory connectionFactory;

  35. @Autowired

  36. private DirectAckReceiver directAckReceiver;//消息接收处理类

  37.  
  38. @Bean

  39. public SimpleMessageListenerContainer simpleMessageListenerContainer() {

  40. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

  41. container.setConcurrentConsumers(1);

  42. container.setMaxConcurrentConsumers(1);

  43. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改成手动确认消息

  44.  
  45. container.setQueues(CalonDirectQueue());

  46. container.setMessageListener(directAckReceiver);

  47. return container;

  48. }

  49.  
  50. }

在mq-rabbit-consumer项目新建消息监听类DirectAckReceiver.java,用于处理消息的确认操做,代码以下:

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.core.Message;

  4. import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

  5. import org.springframework.beans.factory.annotation.Autowired;

  6. import org.springframework.stereotype.Component;

  7. import com.fasterxml.jackson.databind.ObjectMapper;

  8. import com.rabbitmq.client.Channel;

  9. import mq.rabbit.entity.User;

  10.  
  11. @Component

  12. public class DirectAckReceiver implements ChannelAwareMessageListener {

  13.  
  14. @Autowired

  15. private ObjectMapper objectMapper;

  16.  
  17. @Override

  18. public void onMessage(Message message, Channel channel) throws Exception {

  19. long deliveryTag = message.getMessageProperties().getDeliveryTag();

  20. try {

  21. byte[] body = message.getBody();

  22. User user = objectMapper.readValue(body, User.class);

  23. System.out.println("DirectAckReceiver消费者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  24. channel.basicAck(deliveryTag, true);

  25. // channel.basicReject(deliveryTag, true);//为true会从新放回队列

  26. } catch (Exception e) {

  27. channel.basicReject(deliveryTag, false);

  28. e.printStackTrace();

  29. }

  30. }

  31.  
  32. }

消息接收确认模式有如下3种:

  • AcknowledgeMode.NONE:不确认
  • AcknowledgeMode.AUTO:自动确认
  • AcknowledgeMode.MANUAL:手动确认

默认状况下是自动确认,若是消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就至关于丢失了消息,在实际应用中,咱们但愿每条消息都可以被正确消费而不是出现丢失的状况,上面代码是开启手动确认模式,下面看看手动确认都有哪几种方式:

  • 成功确认:void basicAck(long deliveryTag, boolean multiple) throws IOException;

            deliveryTag:该消息的index

            multiple:是否批量. true:将一次性ack全部小于deliveryTag的消息。

        消费者成功处理后,调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行确认。

  • 失败确认:void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

            deliveryTag:该消息的index。

            multiple:是否批量. true:将一次性拒绝全部小于deliveryTag的消息。

            requeue:是否从新入队列。

  • 拒绝确认:void basicReject(long deliveryTag, boolean requeue) throws IOException;

            deliveryTag:该消息的index。

            requeue:被拒绝的是否从新入队列。

            channel.basicNack 与 channel.basicReject 的区别在于basicNack能够批量拒绝多条消息,而basicReject一次只能拒绝一条消息。

这里要注意一点的是,不管如何,必须对消息进行确认操做,若是不调用相关函数进行确认,则RabbitMQ会认为该程序处理能力弱,不会再发送消息到该监听程序。

还有一个问题,在启用消息手动确认模式后,发送消息的实体须要转成json字符串发送,接收消息时再把json转回对象,不然出错,也许是我还没找到直接发送实体的方法,还望指正。        

    RabbitMQ的基础知识就已经介绍完了,若有错误,还望留意指正,谢谢。

 

本文例子代码地址:

https://github.com/calonmo/mq-rabbit-provider.git

https://github.com/calonmo/mq-rabbit-consumer.git

相关文章
相关标签/搜索