消息中间件 -- RabbitMQ

1、介绍

  RabbitMQ是由Erlang语言开发,基于AMQP(高级消息队列协议)协议实现的消息队列。spring

  补充:JMS,Java提供的一套消息服务API标准。服务器

  应用场景:并发

  • 任务异步处理。

  将不须要同步处理的而且耗时长的操做由消息队列通知消息接收方进行异步处理,提升了应用程序的响应时间。app

  • 应用程序解耦。

  MQ至关于一个中介,生产方经过MQ与消费方交互,它将应用程序进行解耦合。异步

  • 优势

  一、使用简单,功能强大maven

  二、基于AMQP协议分布式

  三、社区活跃,文档完善ide

  四、高并发性能好,得益于Erlang语言高并发

  五、SpringBoot默认集成RabbitMQ工具

2、快速入门

  • RabbitMQ的工做原理

  基本结构以下:

  

  组成部分说明:

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
  • Exchange:消息队列交换机,按必定的规则将消息路由转发到某个队列,对消息进行过滤
  • Queue:消息队列,存储消息队列,消息到达队列转发到指定的消费方
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息

  消息发布接收流程:

  — — — — — —发送消息— — — — — —

  1. 生产者和Broker创建TCP链接
  2. 生产者和Broker创建通道
  3. 生产者经过通道将消息发送给Broker,由Exchange将消息进行转发
  4. Exchange将消息转发到指定的Queue(队列)

  — — — — — —接收消息— — — — — —

  1. 消费者和Broker创建TCP链接
  2. 消费者和Broker创建通道
  3. 消费者监听指定的Queue(队列)
  4. 当有消息到达Queue时Broker默认将消息推送给消费者
  5. 消费者接收到消息
  • 关于安装

  RabbitMQ有Erlang语言开发,Erlang语言用于并发及分布式系统的开发,OTP(Open Telecon Platform)做为Erlang语言的一部分,包含了不少基于Erlang开发的中间件及工具库,安装RabbitMQ须要安装Erlang/OTP,并保持版本匹配。

  咱们后面的案例采用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。

  具体的安装过程这里就不赘述,网上搜索教程便可。

3、入门案例

  咱们首先用RabbitMQ官方提供的Java client测试,了解一下RabbitMQ的交互过程。

  • 建立maven工程

  

  • 在父工程rabbitmq-test的pom.xml文件中添加依赖
1 <dependency>
2     <groupId>com.rabbitmq</groupId>
3     <artifactId>amqp-client</artifactId>
4     <version>5.1.2</version>
5 </dependency>
  • 在生产者工程中添加生产者类Producer01
 1 public class Producer01 {  2     /**
 3  * 队列名称  4      */
 5     private static final String QUEUE = "hello rabbitmq";  6 
 7     public static void main(String[] args) {  8         Connection connection = null;  9         Channel channel = null; 10 
11         ConnectionFactory factory = new ConnectionFactory(); 12         factory.setHost("localhost"); 13         factory.setPort(5672); 14         factory.setUsername("guest"); 15         factory.setPassword("guest"); 16         // RabbitMQ默认虚拟机名称为"/",虚拟机至关于一个独立的MQ服务器
17         factory.setVirtualHost("/"); 18 
19         try { 20             // 建立于RabbitMQ服务的链接
21             connection = factory.newConnection(); 22             // 建立与Exchange的通道,每一个链接能够建立多个通道,每一个通道表明一个会话任务
23             channel = connection.createChannel(); 24             /**
25  * 声明队列,若是RabbitMQ中没有此队列将自动建立 26  * param1:队列名称 27  * param2:是否持久化 28  * param3:队列是否独占此链接 29  * param4:队列再也不使用时自动删除此列 30  * param5:队列参数 31              */
32             channel.queueDeclare(QUEUE,true,false,false,null); 33             String message = "hello rabbit:" + System.currentTimeMillis(); 34 
35             /**
36  * 消息发布方法 37  * param1:Exchange的名称,若是没有指定,则使用Default Exchange 38  * 这里没有指定交换机,消息将发送给默认交换机,每一个队列也会绑定默认交换机,可是不能显示绑定或解除绑定 39  * 使用默认的交换机,routingkey等于队列名称 40  * param2:routingkey,消息的路由,适用于Exchange将消息转发到指定的消息队列 41  * param3:消息包含的属性 42  * param4:消息体 43              */
44             channel.basicPublish("",QUEUE,null,message.getBytes()); 45         } catch (IOException e) { 46  e.printStackTrace(); 47         } catch (TimeoutException e) { 48  e.printStackTrace(); 49         } finally { 50             if (channel != null){ 51                 try { 52  channel.close(); 53                 } catch (IOException e) { 54  e.printStackTrace(); 55                 } catch (TimeoutException e) { 56  e.printStackTrace(); 57  } 58  } 59             if (connection != null){ 60                 try { 61  connection.close(); 62                 } catch (IOException e) { 63  e.printStackTrace(); 64  } 65  } 66  } 67  } 68 }
Producer01
  • 在消费者工程中添加消费者类Consumer01
 1 public class Consumer01 {  2     /**
 3  * 队列名称  4      */
 5     private static final String QUEUE = "hello rabbitmq";  6 
 7     public static void main(String[] args) {  8         Connection connection = null;  9         Channel channel = null; 10 
11         ConnectionFactory factory = new ConnectionFactory(); 12         factory.setHost("localhost"); 13         factory.setPort(5672); 14 
15         try { 16             // 建立于RabbitMQ服务的链接
17             connection = factory.newConnection(); 18             // 建立与Exchange的通道,每一个链接能够建立多个通道,每一个通道表明一个会话任务
19             channel = connection.createChannel(); 20             /**
21  * 声明队列,若是RabbitMQ中没有此队列将自动建立 22  * param1:队列名称 23  * param2:是否持久化 24  * param3:队列是否独占此链接 25  * param4:队列再也不使用时自动删除此列 26  * param5:队列参数 27              */
28             channel.queueDeclare(QUEUE,true,false,false,null); 29             // 定义消费方法
30             DefaultConsumer consumer = new DefaultConsumer(channel){ 31 
32                 /**
33  * 消费者接受消息调用此方法 34  * @param consumerTag 消费者标签,在channel,basicConsumer()去指定 35  * @param envelope 消息包含的的内容,能够从中获取消息id,消息routingkey,交换机, 36  * 消息和重传标志(收到消息失败后是否须要从新发送) 37  * @param properties 38  * @param body 39  * @throws IOException 40                  */
41  @Override 42                 public void handleDelivery(String consumerTag, 43  Envelope envelope, 44  AMQP.BasicProperties properties, 45                                            byte[] body) throws IOException { 46                     // 交换机
47                     String exchange = envelope.getExchange(); 48                     // routingkey
49                     String routingKey = envelope.getRoutingKey(); 50                     // 消息id
51                     long deliveryTag = envelope.getDeliveryTag(); 52                     // 消息内容
53                     String msg = new String(body, "utf-8"); 54                     System.out.println("receive message:" + msg); 55  } 56  }; 57             /**
58  * 监听队列String queue, boolean autoAck, Consumer callback 59  * param1:队列名称 60  * param2:是否自动回复,设置为true为表示消息接收到自动向mq回复收到了,mq接收到回复会删除消息 61  * 设置为false则须要手动回复 62  * param3:消费消息的方法,消费者接收到消息后调用此方法 63              */
64             channel.basicConsume(QUEUE, true, consumer); 65         } catch (IOException e) { 66  e.printStackTrace(); 67         } catch (TimeoutException e) { 68  e.printStackTrace(); 69  } 70  } 71 }
Consumer01

  启动消费者工程中的main方法进行监听,再启动生产者中的main方法。注:二者的启动顺序能够颠倒。控制台打印到接收到的消息以下:

  • 总结

  发送端操做流程:

  建立链接——>建立通道——>声明队列——>发送消息

  接收端:

  建立链接——>建立通道——>声明队列——>监听队列——>接收消息——>ack回复

4、工做模式

  RabbitMQ有如下几种工做模式:

  • Work queues模式
    •   
    • Work queues与入门程序比,多以消费端,两个消费端同时消费同一个队列中的消息

    • 消费场景:对于人物太重或者任务较多状况使用工做队列能够提升任务处理的速度
    • 测试:
      • 使用入门程序,启动多个消费者
      • 生产者发送多个消息
    • 结果:
      • 一条消息只会被一个消费者接收
      • rabbit采用轮询的方式将消息是平均发送给消费者
      • 消费者在处理完某条消息后,才会接收到下一条消息
  • Publish/Subscribe模式
    • 发布订阅模式

      • 每一个消费者监听本身的队列
      • 生产者将消息发送给broker,由交换机将消息转发到绑定次交换机的每一个队列,每一个绑定交换机的队列都将接收到消息
    • 案例:用户通知,信用卡月帐单,通知方式有短信,邮件等多种方式
    • 生产者
      • 声明Exchange_fanout_inform交换机
      • 声明两个队列而且绑定到此交换机,绑定时不须要指定routingkey
      • 在生产者工程中添加生产者类Producer02Publish
 1 public class Producer02Publish {  2     /**
 3  * 队列名称  4      */
 5     private static final String QUEUE_EMAIL_INFORM = "queue email inform";  6     private static final String QUEUE_SMS_INFORM = "queue sms inform";  7     /**
 8  * 声明EXCHANGE_FANOUT_INFORM交换机  9      */
10     private static final String EXCHANGE_FANOUT_INFORM = "exchange fanout inform"; 11 
12     public static void main(String[] args) { 13         Connection connection = null; 14         Channel channel = null; 15 
16         // 建立一个与MQ的链接
17         ConnectionFactory factory = new ConnectionFactory(); 18         factory.setHost("localhost"); 19         factory.setPort(5672); 20         factory.setUsername("guest"); 21         factory.setPassword("guest"); 22         // RabbitMQ默认虚拟机名称为"/",虚拟机至关于一个独立的MQ服务器
23         factory.setVirtualHost("/"); 24 
25         try { 26             // 建立于RabbitMQ服务的链接
27             connection = factory.newConnection(); 28             // 建立与Exchange的通道,每一个链接能够建立多个通道,每一个通道表明一个会话任务
29             channel = connection.createChannel(); 30             /**
31  * 声明交换机 32  * param1:交换机名称 33  * param2:交换机类型,fanout、topic、direct、headers 34              */
35  channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); 36             /**
37  * 声明队列,若是RabbitMQ中没有此队列将自动建立 38  * param1:队列名称 39  * param2:是否持久化 40  * param3:队列是否独占此链接 41  * param4:队列再也不使用时自动删除此列 42  * param5:队列参数 43              */
44             channel.queueDeclare(QUEUE_EMAIL_INFORM,true,false,false,null); 45             channel.queueDeclare(QUEUE_SMS_INFORM,true,false,false,null); 46             /**
47  * 交换机和队列绑定 48  * param1:队列名称 49  * param2:交换机名称 50  * param3:路由key 51              */
52             channel.queueBind(QUEUE_EMAIL_INFORM,EXCHANGE_FANOUT_INFORM,""); 53             channel.queueBind(QUEUE_SMS_INFORM,EXCHANGE_FANOUT_INFORM,""); 54 
55             // 发送消息
56             for (int i = 0; i<10; i++){ 57                 String message = "inform to user:" + i; 58                 /**
59  * 消息发布方法 60  * param1:Exchange的名称,若是没有指定,则使用Default Exchange 61  * param2:routingkey,消息的路由,适用于Exchange将消息转发到指定的消息队列 62  * param3:消息包含的属性 63  * param4:消息体 64                  */
65                 channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes()); 66 
67  } 68         } catch (IOException e) { 69  e.printStackTrace(); 70         } catch (TimeoutException e) { 71  e.printStackTrace(); 72         } finally { 73             if (channel != null){ 74                 try { 75  channel.close(); 76                 } catch (IOException e) { 77  e.printStackTrace(); 78                 } catch (TimeoutException e) { 79  e.printStackTrace(); 80  } 81  } 82             if (connection != null){ 83                 try { 84  connection.close(); 85                 } catch (IOException e) { 86  e.printStackTrace(); 87  } 88  } 89  } 90  } 91 }
Producer02Publish
    • 接收邮件消费者
 1 public class Consumer02SubscribeEmail {  2     /**
 3  * 队列名称  4      */
 5     private static final String QUEUE_EMAIL_INFORM = "queue email inform";  6     /**
 7  * 声明EXCHANGE_FANOUT_INFORM交换机  8      */
 9     private static final String EXCHANGE_FANOUT_INFORM = "exchange fanout inform"; 10 
11     public static void main(String[] args) { 12         Connection connection = null; 13         Channel channel = null; 14 
15         // 建立一个与MQ的链接
16         ConnectionFactory factory = new ConnectionFactory(); 17         factory.setHost("localhost"); 18         factory.setPort(5672); 19         factory.setUsername("guest"); 20         factory.setPassword("guest"); 21         // rabbitmq默认虚拟机名称为“/”,虚拟机至关于一个独立的mq服务器
22         factory.setVirtualHost("/"); 23         try { 24             // 建立于RabbitMQ服务的链接
25             connection = factory.newConnection(); 26             // 建立与Exchange的通道,每一个链接能够建立多个通道,每一个通道表明一个会话任务
27             channel = connection.createChannel(); 28             /**
29  * param1:交换机名称 30  * param2:交换机类型,fanout、topic、direct、headers 31              */
32  channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); 33             /**
34  * 声明队列,若是RabbitMQ中没有此队列将自动建立 35  * param1:队列名称 36  * param2:是否持久化 37  * param3:队列是否独占此链接 38  * param4:队列再也不使用时自动删除此列 39  * param5:队列参数 40              */
41             channel.queueDeclare(QUEUE_EMAIL_INFORM,true,false,false,null); 42 
43             /**
44  * 交换机和队列绑定 45  * param1:队列名称 46  * param2:交换机名称 47  * param3:路由key 48              */
49             channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_FANOUT_INFORM, ""); 50             // 定义消费方法
51             DefaultConsumer consumer = new DefaultConsumer(channel){ 52 
53                 /**
54  * 消费者接受消息调用此方法 55  * @param consumerTag 消费者标签,在channel,basicConsumer()去指定 56  * @param envelope 消息包含的的内容,能够从中获取消息id,消息routingkey,交换机, 57  * 消息和重传标志(收到消息失败后是否须要从新发送) 58  * @param properties 59  * @param body 60  * @throws IOException 61                  */
62  @Override 63                 public void handleDelivery(String consumerTag, 64  Envelope envelope, 65  AMQP.BasicProperties properties, 66                                            byte[] body) throws IOException { 67                     // 消息id
68                     long deliveryTag = envelope.getDeliveryTag(); 69                     // 交换机
70                     String exchange = envelope.getExchange(); 71                     // 消息内容
72                     String msg = new String(body, "utf-8"); 73                     System.out.println("receive message:" + msg); 74  } 75  }; 76             /**
77  * 监听队列String queue, boolean autoAck, Consumer callback 78  * param1:队列名称 79  * param2:是否自动回复,设置为true为表示消息接收到自动向mq回复收到了,mq接收到回复会删除消息 80  * 设置为false则须要手动回复 81  * param3:消费消息的方法,消费者接收到消息后调用此方法 82              */
83             channel.basicConsume(QUEUE_EMAIL_INFORM, true, consumer); 84         } catch (IOException e) { 85  e.printStackTrace(); 86         } catch (TimeoutException e) { 87  e.printStackTrace(); 88  } 89  } 90 }
Consumer02SubscribeEmail
    • 接收信息消费者
 1 public class Consumer02SubscribeSms {  2     /**
 3  * 队列名称  4      */
 5     private static final String QUEUE_SMS_INFORM = "queue sms inform";  6     /**
 7  * 声明EXCHANGE_FANOUT_INFORM交换机  8      */
 9     private static final String EXCHANGE_FANOUT_INFORM = "exchange fanout inform"; 10 
11     public static void main(String[] args) { 12         Connection connection = null; 13         Channel channel = null; 14 
15         // 建立一个与MQ的链接
16         ConnectionFactory factory = new ConnectionFactory(); 17         factory.setHost("localhost"); 18         factory.setPort(5672); 19         factory.setUsername("guest"); 20         factory.setPassword("guest"); 21         // rabbitmq默认虚拟机名称为“/”,虚拟机至关于一个独立的mq服务器
22         factory.setVirtualHost("/"); 23         try { 24             // 建立于RabbitMQ服务的链接
25             connection = factory.newConnection(); 26             // 建立与Exchange的通道,每一个链接能够建立多个通道,每一个通道表明一个会话任务
27             channel = connection.createChannel(); 28             /**
29  * param1:交换机名称 30  * param2:交换机类型,fanout、topic、direct、headers 31              */
32  channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); 33             /**
34  * 声明队列,若是RabbitMQ中没有此队列将自动建立 35  * param1:队列名称 36  * param2:是否持久化 37  * param3:队列是否独占此链接 38  * param4:队列再也不使用时自动删除此列 39  * param5:队列参数 40              */
41             channel.queueDeclare(QUEUE_SMS_INFORM,true,false,false,null); 42 
43             /**
44  * 交换机和队列绑定 45  * param1:队列名称 46  * param2:交换机名称 47  * param3:路由key 48              */
49             channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_FANOUT_INFORM, ""); 50             // 定义消费方法
51             DefaultConsumer consumer = new DefaultConsumer(channel){ 52 
53                 /**
54  * 消费者接受消息调用此方法 55  * @param consumerTag 消费者标签,在channel,basicConsumer()去指定 56  * @param envelope 消息包含的的内容,能够从中获取消息id,消息routingkey,交换机, 57  * 消息和重传标志(收到消息失败后是否须要从新发送) 58  * @param properties 59  * @param body 60  * @throws IOException 61                  */
62  @Override 63                 public void handleDelivery(String consumerTag, 64  Envelope envelope, 65  AMQP.BasicProperties properties, 66                                            byte[] body) throws IOException { 67                     // 消息id
68                     long deliveryTag = envelope.getDeliveryTag(); 69                     // 交换机
70                     String exchange = envelope.getExchange(); 71                     // 消息内容
72                     String msg = new String(body, "utf-8"); 73                     System.out.println("receive message:" + msg); 74  } 75  }; 76             /**
77  * 监听队列String queue, boolean autoAck, Consumer callback 78  * param1:队列名称 79  * param2:是否自动回复,设置为true为表示消息接收到自动向mq回复收到了,mq接收到回复会删除消息 80  * 设置为false则须要手动回复 81  * param3:消费消息的方法,消费者接收到消息后调用此方法 82              */
83             channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 84         } catch (IOException e) { 85  e.printStackTrace(); 86         } catch (TimeoutException e) { 87  e.printStackTrace(); 88  } 89  } 90 }
Consumer02SubscribeSms
    • 测试 -- 执行生产者 的main方法,执行两个消费者的main方法,前后顺序随便
    • 发如今两个消费者端的控制台都能打印到消息
    • publish、subscribe与work queues的区别与相同点
      • work queues不用定义交换机,publish/subscribe须要定义交换机
      • publish/subscribe须要设置队列和交换机的绑定,work queues不须要设置,实际上workqueues会将队列绑定到默认的交换机
      • 二者实现的发布订阅效果是同样的,多个消费端监听同一个队列不会重复消费消息
    • 实际工做用publish/subscrib仍是work queues
      • 建议使用publish/subscribe,发布定语模式比工做队列模式更强大,并且发布订阅模式能够指定本身专用的交换机
  • Routing模式
    • 路由模式:

      • 每一个消费者监听本身的队列,并设置routingkey
      • 生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列
    • 生产者
      • 声明exchange_routing_inform交换机
      • 声明两个队列而且绑定到此交换机,绑定石须要指定routingkey
      • 发送消息时须要指定routingkey
      • 在生产者工程中添加生产者类Producer03Routing,在Produce03Routing中只绑定了接收信息消费者的路由信息
 1 public class Producer03Routing {  2     /**
 3  * 队列名称  4  * 路由名称设定与队列名称同样  5      */
 6     private static final String QUEUE_EMAIL_INFORM = "queue email inform";  7     private static final String QUEUE_SMS_INFORM = "queue sms inform";  8     /**
 9  * 声明EXCHANGE_ROUTING_INFORM交换机 10      */
11     private static final String EXCHANGE_ROUTING_INFORM = "exchange routing inform"; 12 
13     public static void main(String[] args) { 14         Connection connection = null; 15         Channel channel = null; 16 
17         // 建立一个与MQ的链接
18         ConnectionFactory factory = new ConnectionFactory(); 19         factory.setHost("localhost"); 20         factory.setPort(5672); 21         factory.setUsername("guest"); 22         factory.setPassword("guest"); 23         // RabbitMQ默认虚拟机名称为"/",虚拟机至关于一个独立的MQ服务器
24         factory.setVirtualHost("/"); 25 
26         try { 27             // 建立于RabbitMQ服务的链接
28             connection = factory.newConnection(); 29             // 建立与Exchange的通道,每一个链接能够建立多个通道,每一个通道表明一个会话任务
30             channel = connection.createChannel(); 31             /**
32  * 声明交换机 33  * param1:交换机名称 34  * param2:交换机类型,fanout、topic、direct、headers 35              */
36  channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); 37             /**
38  * 声明队列,若是RabbitMQ中没有此队列将自动建立 39  * param1:队列名称 40  * param2:是否持久化 41  * param3:队列是否独占此链接 42  * param4:队列再也不使用时自动删除此列 43  * param5:队列参数 44              */
45             channel.queueDeclare(QUEUE_EMAIL_INFORM, true, false, false, null); 46             channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 47             /**
48  * 交换机和队列绑定 49  * param1:队列名称 50  * param2:交换机名称 51  * param3:路由key 52              */
53  channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_EMAIL_INFORM); 54  channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_SMS_INFORM); 55 
56             // 发送消息
57             for (int i = 0; i < 10; i++) { 58                 String message = "inform to user:" + i; 59                 /**
60  * 消息发布方法 61  * param1:Exchange的名称,若是没有指定,则使用Default Exchange 62  * param2:routingkey,消息的路由,适用于Exchange将消息转发到指定的消息队列 63  * param3:消息包含的属性 64  * param4:消息体 65                  */
66                 channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_SMS_INFORM, null, message.getBytes()); 67  } 68         } catch (IOException e) { 69  e.printStackTrace(); 70         } catch (TimeoutException e) { 71  e.printStackTrace(); 72         } finally { 73             if (channel != null) { 74                 try { 75  channel.close(); 76                 } catch (IOException e) { 77  e.printStackTrace(); 78                 } catch (TimeoutException e) { 79  e.printStackTrace(); 80  } 81  } 82             if (connection != null) { 83                 try { 84  connection.close(); 85                 } catch (IOException e) { 86  e.printStackTrace(); 87  } 88  } 89  } 90  } 91 }
Producer03Routing
    • 接收邮件消费者
 1 public class Consumer03RoutingEmail {  2     /**
 3  * 队列名称  4  * 路由名称设定与队列名称同样  5      */
 6     private static final String QUEUE_EMAIL_INFORM = "queue email inform";  7     /**
 8  * 声明EXCHANGE_ROUTING_INFORM交换机  9      */
10     private static final String EXCHANGE_ROUTING_INFORM = "exchange routing inform"; 11 
12     public static void main(String[] args) { 13         Connection connection = null; 14         Channel channel = null; 15 
16         // 建立一个与MQ的链接
17         ConnectionFactory factory = new ConnectionFactory(); 18         factory.setHost("localhost"); 19         factory.setPort(5672); 20         factory.setUsername("guest"); 21         factory.setPassword("guest"); 22         // rabbitmq默认虚拟机名称为“/”,虚拟机至关于一个独立的mq服务器
23         factory.setVirtualHost("/"); 24         try { 25             // 建立于RabbitMQ服务的链接
26             connection = factory.newConnection(); 27             // 建立与Exchange的通道,每一个链接能够建立多个通道,每一个通道表明一个会话任务
28             channel = connection.createChannel(); 29             /**
30  * param1:交换机名称 31  * param2:交换机类型,fanout、topic、direct、headers 32              */
33  channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); 34             /**
35  * 声明队列,若是RabbitMQ中没有此队列将自动建立 36  * param1:队列名称 37  * param2:是否持久化 38  * param3:队列是否独占此链接 39  * param4:队列再也不使用时自动删除此列 40  * param5:队列参数 41              */
42             channel.queueDeclare(QUEUE_EMAIL_INFORM, true, false, false, null); 43 
44             /**
45  * 交换机和队列绑定 46  * param1:队列名称 47  * param2:交换机名称 48  * param3:路由key 49              */
50  channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_EMAIL_INFORM); 51             // 定义消费方法
52             DefaultConsumer consumer = new DefaultConsumer(channel) { 53 
54                 /**
55  * 消费者接受消息调用此方法 56  * @param consumerTag 消费者标签,在channel,basicConsumer()去指定 57  * @param envelope 消息包含的的内容,能够从中获取消息id,消息routingkey,交换机, 58  * 消息和重传标志(收到消息失败后是否须要从新发送) 59  * @param properties 60  * @param body 61  * @throws IOException 62                  */
63  @Override 64                 public void handleDelivery(String consumerTag, 65  Envelope envelope, 66  AMQP.BasicProperties properties, 67                                            byte[] body) throws IOException { 68                     // 消息id
69                     long deliveryTag = envelope.getDeliveryTag(); 70                     // 交换机
71                     String exchange = envelope.getExchange(); 72                     // 消息内容
73                     String msg = new String(body, "utf-8"); 74                     System.out.println("email receive message:" + msg); 75  } 76  }; 77             /**
78  * 监听队列String queue, boolean autoAck, Consumer callback 79  * param1:队列名称 80  * param2:是否自动回复,设置为true为表示消息接收到自动向mq回复收到了,mq接收到回复会删除消息 81  * 设置为false则须要手动回复 82  * param3:消费消息的方法,消费者接收到消息后调用此方法 83              */
84             channel.basicConsume(QUEUE_EMAIL_INFORM, true, consumer); 85         } catch (IOException e) { 86  e.printStackTrace(); 87         } catch (TimeoutException e) { 88  e.printStackTrace(); 89  } 90  } 91 }
Consumer03RoutingEmail
    • 接收信息消费者
 1 public class Consumer03RoutingSms {  2     /**
 3  * 队列名称  4  * 路由名称设定与队列名称同样  5      */
 6     private static final String QUEUE_SMS_INFORM = "queue sms inform";  7     /**
 8  * 声明EXCHANGE_ROUTING_INFORM交换机  9      */
10     private static final String EXCHANGE_ROUTING_INFORM = "exchange routing inform"; 11 
12     public static void main(String[] args) { 13         Connection connection = null; 14         Channel channel = null; 15 
16         // 建立一个与MQ的链接
17         ConnectionFactory factory = new ConnectionFactory(); 18         factory.setHost("localhost"); 19         factory.setPort(5672); 20         factory.setUsername("guest"); 21         factory.setPassword("guest"); 22         // rabbitmq默认虚拟机名称为“/”,虚拟机至关于一个独立的mq服务器
23         factory.setVirtualHost("/"); 24         try { 25             // 建立于RabbitMQ服务的链接
26             connection = factory.newConnection(); 27             // 建立与Exchange的通道,每一个链接能够建立多个通道,每一个通道表明一个会话任务
28             channel = connection.createChannel(); 29             /**
30  * param1:交换机名称 31  * param2:交换机类型,fanout、topic、direct、headers 32              */
33  channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); 34             /**
35  * 声明队列,若是RabbitMQ中没有此队列将自动建立 36  * param1:队列名称 37  * param2:是否持久化 38  * param3:队列是否独占此链接 39  * param4:队列再也不使用时自动删除此列 40  * param5:队列参数 41              */
42             channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 43 
44             /**
45  * 交换机和队列绑定 46  * param1:队列名称 47  * param2:交换机名称 48  * param3:路由key 49              */
50  channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_SMS_INFORM); 51             // 定义消费方法
52             DefaultConsumer consumer = new DefaultConsumer(channel) { 53 
54                 /**
55  * 消费者接受消息调用此方法 56  * @param consumerTag 消费者标签,在channel,basicConsumer()去指定 57  * @param envelope 消息包含的的内容,能够从中获取消息id,消息routingkey,交换机, 58  * 消息和重传标志(收到消息失败后是否须要从新发送) 59  * @param properties 60  * @param body 61  * @throws IOException 62                  */
63  @Override 64                 public void handleDelivery(String consumerTag, 65  Envelope envelope, 66  AMQP.BasicProperties properties, 67                                            byte[] body) throws IOException { 68                     // 消息id
69                     long deliveryTag = envelope.getDeliveryTag(); 70                     // 交换机
71                     String exchange = envelope.getExchange(); 72                     // 消息内容
73                     String msg = new String(body, "utf-8"); 74                     System.out.println("sms receive message:" + msg); 75  } 76  }; 77             /**
78  * 监听队列String queue, boolean autoAck, Consumer callback 79  * param1:队列名称 80  * param2:是否自动回复,设置为true为表示消息接收到自动向mq回复收到了,mq接收到回复会删除消息 81  * 设置为false则须要手动回复 82  * param3:消费消息的方法,消费者接收到消息后调用此方法 83              */
84             channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 85         } catch (IOException e) { 86  e.printStackTrace(); 87         } catch (TimeoutException e) { 88  e.printStackTrace(); 89  } 90  } 91 }
Consumer03RoutingSms

 

    • 测试:执行两个消费者的main方法,执行生产者的main方法,发现只有接收信息的消费者可以接收到打印的消息
    • Routing与Publish/subscibe的区别
      • Routing模式要求队列在绑定交换机是要指定routingkey,消息会转发到符合routingkey的队列
  • Topics模式
    • 匹配模式

      • 每个消费者监听本身的队列,而且设置带通配符的routingkey
      • 生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列
    • 案例
      • 根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效
    • 生产者
      • 声明交换机,指定topics类型
 1 public class Producer04Topics {  2     /**
 3  * 队列名称  4      */
 5     private static final String QUEUE_EMAIL_INFORM = "queue email inform";  6     private static final String QUEUE_SMS_INFORM = "queue sms inform";  7     /**
 8  * 路由key,#表明能够匹配多个词,符号*能够匹配一个词语  9      */
 10     private static final String ROUTINGKEY_EMAIL = "inform.#.email.#";  11     private static final String ROUTINGKEY_SMS = "inform.#.sms.#";  12     /**
 13  * 声明EXCHANGE_TOPICS_INFORM交换机  14      */
 15     private static final String EXCHANGE_TOPICS_INFORM = "exchange topics inform";  16 
 17     public static void main(String[] args) {  18         Connection connection = null;  19         Channel channel = null;  20 
 21         // 建立一个与MQ的链接
 22         ConnectionFactory factory = new ConnectionFactory();  23         factory.setHost("localhost");  24         factory.setPort(5672);  25         factory.setUsername("guest");  26         factory.setPassword("guest");  27         // RabbitMQ默认虚拟机名称为"/",虚拟机至关于一个独立的MQ服务器
 28         factory.setVirtualHost("/");  29 
 30         try {  31             // 建立于RabbitMQ服务的链接
 32             connection = factory.newConnection();  33             // 建立与Exchange的通道,每一个链接能够建立多个通道,每一个通道表明一个会话任务
 34             channel = connection.createChannel();  35             /**
 36  * 声明交换机  37  * param1:交换机名称  38  * param2:交换机类型,fanout、topic、direct、headers  39              */
 40  channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);  41             /**
 42  * 声明队列,若是RabbitMQ中没有此队列将自动建立  43  * param1:队列名称  44  * param2:是否持久化  45  * param3:队列是否独占此链接  46  * param4:队列再也不使用时自动删除此列  47  * param5:队列参数  48              */
 49             channel.queueDeclare(QUEUE_EMAIL_INFORM, true, false, false, null);  50             channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null);  51             /**
 52  * 交换机和队列绑定  53  * param1:队列名称  54  * param2:交换机名称  55  * param3:路由key  56              */
 57  channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_EMAIL);  58  channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS);  59 
 60 
 61             /**
 62  * 消息发布方法  63  * param1:Exchange的名称,若是没有指定,则使用Default Exchange  64  * param2:routingkey,消息的路由,适用于Exchange将消息转发到指定的消息队列  65  * param3:消息包含的属性  66  * param4:消息体  67              */
 68             for (int i = 0; i < 5; i++) {  69                 //发送消息的时候指定routingKey
 70                 String message = "send email inform message to user";  71                 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes());  72                 System.out.println("send to mq " + message);  73  }  74             for (int i = 0; i < 5; i++) {  75                 //发送消息的时候指定routingKey
 76                 String message = "send sms inform message to user";  77                 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes());  78                 System.out.println("send to mq " + message);  79  }  80             for (int i = 0; i < 5; i++) {  81                 //发送消息的时候指定routingKey
 82                 String message = "send sms and email inform message to user";  83                 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes());  84                 System.out.println("send to mq " + message);  85  }  86         } catch (IOException e) {  87  e.printStackTrace();  88         } catch (TimeoutException e) {  89  e.printStackTrace();  90         } finally {  91             if (channel != null) {  92                 try {  93  channel.close();  94                 } catch (IOException e) {  95  e.printStackTrace();  96                 } catch (TimeoutException e) {  97  e.printStackTrace();  98  }  99  } 100             if (connection != null) { 101                 try { 102  connection.close(); 103                 } catch (IOException e) { 104  e.printStackTrace(); 105  } 106  } 107  } 108  } 109 }
Producer04Topics
    • 消费者
      • 队列绑定交换机指定通配符
      • 通配符规则
      • 中间以“.”分隔
      • 符号能够匹配多个词语,符号*能够匹配一个词语
    • 接收邮件消费者
 1 public class Consumer04TopicsEmail {  2     /**
 3  * 队列名称  4  * 路由名称设定与队列名称同样  5      */
 6     private static final String QUEUE_SMS_INFORM = "queue sms inform";  7 
 8     /**
 9  * 路由key,#表明能够匹配多个词,符号*能够匹配一个词语 10      */
11     private static final String ROUTINGKEY_EMAIL = "inform.#.email.#"; 12     /**
13  * 声明EXCHANGE_TOPICS_INFORM交换机 14      */
15     private static final String EXCHANGE_TOPICS_INFORM = "exchange topics inform"; 16 
17     public static void main(String[] args) { 18         Connection connection = null; 19         Channel channel = null; 20 
21         // 建立一个与MQ的链接
22         ConnectionFactory factory = new ConnectionFactory(); 23         factory.setHost("localhost"); 24         factory.setPort(5672); 25         factory.setUsername("guest"); 26         factory.setPassword("guest"); 27         // rabbitmq默认虚拟机名称为“/”,虚拟机至关于一个独立的mq服务器
28         factory.setVirtualHost("/"); 29         try { 30             // 建立于RabbitMQ服务的链接
31             connection = factory.newConnection(); 32             // 建立与Exchange的通道,每一个链接能够建立多个通道,每一个通道表明一个会话任务
33             channel = connection.createChannel(); 34             /**
35  * param1:交换机名称 36  * param2:交换机类型,fanout、topic、direct、headers 37              */
38  channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); 39             /**
40  * 声明队列,若是RabbitMQ中没有此队列将自动建立 41  * param1:队列名称 42  * param2:是否持久化 43  * param3:队列是否独占此链接 44  * param4:队列再也不使用时自动删除此列 45  * param5:队列参数 46              */
47             channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 48 
49             /**
50  * 交换机和队列绑定 51  * param1:队列名称 52  * param2:交换机名称 53  * param3:路由key 54              */
55  channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_EMAIL); 56             // 定义消费方法
57             DefaultConsumer consumer = new DefaultConsumer(channel) { 58 
59                 /**
60  * 消费者接受消息调用此方法 61  * @param consumerTag 消费者标签,在channel,basicConsumer()去指定 62  * @param envelope 消息包含的的内容,能够从中获取消息id,消息routingkey,交换机, 63  * 消息和重传标志(收到消息失败后是否须要从新发送) 64  * @param properties 65  * @param body 66  * @throws IOException 67                  */
68  @Override 69                 public void handleDelivery(String consumerTag, 70  Envelope envelope, 71  AMQP.BasicProperties properties, 72                                            byte[] body) throws IOException { 73                     // 消息id
74                     long deliveryTag = envelope.getDeliveryTag(); 75                     // 交换机
76                     String exchange = envelope.getExchange(); 77                     // 消息内容
78                     String msg = new String(body, "utf-8"); 79                     System.out.println("email receive message:" + msg); 80  } 81  }; 82             /**
83  * 监听队列String queue, boolean autoAck, Consumer callback 84  * param1:队列名称 85  * param2:是否自动回复,设置为true为表示消息接收到自动向mq回复收到了,mq接收到回复会删除消息 86  * 设置为false则须要手动回复 87  * param3:消费消息的方法,消费者接收到消息后调用此方法 88              */
89             channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 90         } catch (IOException e) { 91  e.printStackTrace(); 92         } catch (TimeoutException e) { 93  e.printStackTrace(); 94  } 95  } 96 }
Consumer04TopicsEmail
    • 接收信息消费者
 1 public class Consumer04TopicsSms {  2     /**
 3  * 队列名称  4  * 路由名称设定与队列名称同样  5      */
 6     private static final String QUEUE_SMS_INFORM = "queue sms inform";  7     /**
 8  * 路由key,#表明能够匹配多个词,符号*能够匹配一个词语  9      */
10     private static final String ROUTINGKEY_SMS = "inform.#.sms.#"; 11     /**
12  * 声明EXCHANGE_TOPICS_INFORM交换机 13      */
14     private static final String EXCHANGE_TOPICS_INFORM = "exchange topics inform"; 15 
16     public static void main(String[] args) { 17         Connection connection = null; 18         Channel channel = null; 19 
20         // 建立一个与MQ的链接
21         ConnectionFactory factory = new ConnectionFactory(); 22         factory.setHost("localhost"); 23         factory.setPort(5672); 24         factory.setUsername("guest"); 25         factory.setPassword("guest"); 26         // rabbitmq默认虚拟机名称为“/”,虚拟机至关于一个独立的mq服务器
27         factory.setVirtualHost("/"); 28         try { 29             // 建立于RabbitMQ服务的链接
30             connection = factory.newConnection(); 31             // 建立与Exchange的通道,每一个链接能够建立多个通道,每一个通道表明一个会话任务
32             channel = connection.createChannel(); 33             /**
34  * param1:交换机名称 35  * param2:交换机类型,fanout、topic、direct、headers 36              */
37  channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); 38             /**
39  * 声明队列,若是RabbitMQ中没有此队列将自动建立 40  * param1:队列名称 41  * param2:是否持久化 42  * param3:队列是否独占此链接 43  * param4:队列再也不使用时自动删除此列 44  * param5:队列参数 45              */
46             channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 47 
48             /**
49  * 交换机和队列绑定 50  * param1:队列名称 51  * param2:交换机名称 52  * param3:路由key 53              */
54  channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS); 55             // 定义消费方法
56             DefaultConsumer consumer = new DefaultConsumer(channel) { 57 
58                 /**
59  * 消费者接受消息调用此方法 60  * @param consumerTag 消费者标签,在channel,basicConsumer()去指定 61  * @param envelope 消息包含的的内容,能够从中获取消息id,消息routingkey,交换机, 62  * 消息和重传标志(收到消息失败后是否须要从新发送) 63  * @param properties 64  * @param body 65  * @throws IOException 66                  */
67  @Override 68                 public void handleDelivery(String consumerTag, 69  Envelope envelope, 70  AMQP.BasicProperties properties, 71                                            byte[] body) throws IOException { 72                     // 消息id
73                     long deliveryTag = envelope.getDeliveryTag(); 74                     // 交换机
75                     String exchange = envelope.getExchange(); 76                     // 消息内容
77                     String msg = new String(body, "utf-8"); 78                     System.out.println("sms receive message:" + msg); 79  } 80  }; 81             /**
82  * 监听队列String queue, boolean autoAck, Consumer callback 83  * param1:队列名称 84  * param2:是否自动回复,设置为true为表示消息接收到自动向mq回复收到了,mq接收到回复会删除消息 85  * 设置为false则须要手动回复 86  * param3:消费消息的方法,消费者接收到消息后调用此方法 87              */
88             channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 89         } catch (IOException e) { 90  e.printStackTrace(); 91         } catch (TimeoutException e) { 92  e.printStackTrace(); 93  } 94  } 95 }
Consumer04TopicsSms
    • 测试:使用生产者发送若干条消息,交换机根据routingkey通配符匹配并转发消息到指定的队列
    • 思考:本案例的需求使用Routing工做模式是否能实现
      • 使用Routing模式也能够实现本案例,共设置三个routingkey,分别是email、sms、all、email队列绑定email和all,sms队列绑定sms和all,这样就能够实现上面案例的功能,实现过程比topics复杂
      • topics模式更增强大,能够实现Routing。publish/subscribe模式的功能
  • Header模式
    • header模式与routing不一样的地方在于,header模式取消routingkey,使用header中的key/value(键值对)匹配队列
    • 案例:根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。
    • 生产者
      • 队列与交换机绑定的代码与以前不一样,以下:
1 Map<String, Object> headers_email = new Hashtable<String, Object>(); 2 headers_email.put("inform_type", "email"); 3 Map<String, Object> headers_sms = new Hashtable<String, Object>(); 4 headers_sms.put("inform_type", "sms"); 5 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); 6 channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
    • 通知:
1 String message = "email inform to user"+i; 2 Map<String,Object> headers =  new Hashtable<String, Object>(); 3 headers.put("inform_type", "email");//匹配email通知消费者绑定的header 4 //headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
5 AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); 6 properties.headers(headers); 7 //Email通知
8 channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
    • 发送邮件消费者
1 channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS); 2 Map<String, Object> headers_email = new Hashtable<String, Object>(); 3 headers_email.put("inform_email", "email"); 4 //交换机和队列绑定
5 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); 6 //指定消费队列
7 channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
  • RPC模式
    • RPC即客户端远程调用服务端的方法,使用MQ能够实现RPC的异步调用,基于Direct交换机实现,流程以下:

      • 客户端既是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列
      • 服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,获得方法返回的结果
      • 服务端将RPC方法的结果发送到RPC响应队列
      • 客户端(RPC调用方)监听RPC响应队列,接收到RPC调用的结果

5、SpringBoot整合RabbitMQ

  5.1 利用SpringBoot快速建立生产者与消费者两个工程

  咱们选择基于Spring-Rabbit去操做RabbitMQ

  1.两个工程中添加一样的maven依赖

 1 <dependency>
 2     <groupId>org.springframework.boot</groupId>
 3     <artifactId>spring‐boot‐starter‐amqp</artifactId>
 4 </dependency>
 5 <dependency>
 6     <groupId>org.springframework.boot</groupId>
 7     <artifactId>spring‐boot‐starter‐test</artifactId>
 8 </dependency>
 9 <dependency>
10     <groupId>org.springframework.boot</groupId>
11     <artifactId>spring‐boot‐starter‐logging</artifactId>
12 </dependency>

  2.配置文件application.yml

 1 server:  2   port: 44000  3 spring:  4   application:  5     name: test‐rabbitmq‐producer/consumer #两个工程的名称  6   rabbitmq:  7     host: 127.0.0.1  8     port: 5672  9     username: guest 10     password: guest 11     virtualHost: /

  3.定义RabbitConfig类,配置Exchange、Queue、以及绑定交换机 -- 案例中使用Topics交换机

 1 @Configuration  2 public class RabbitmqConfig {  3     public static final String QUEUE_EMAIL_INFORM = "queue_email_inform";  4     public static final String QUEUE_SMS_INFORM = "queue_sms_inform";  5     public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";  6 
 7     /**
 8  * 声明交换机  9  * @return
10      */
11  @Bean(EXCHANGE_TOPICS_INFORM) 12     public Exchange EXCHANGE_TOPICS_INFORM(){ 13         // durable(true):持久化,消息队列重启后交换机仍然存在
14         return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); 15  } 16 
17     /**
18  * 声明队列 19  * @return
20      */
21  @Bean(QUEUE_EMAIL_INFORM) 22     public Queue QUEUE_EMAIL_INFORM(){ 23         return new Queue(QUEUE_EMAIL_INFORM); 24  } 25 
26  @Bean(QUEUE_SMS_INFORM) 27     public Queue QUEUE_SMS_INFORM(){ 28         return new Queue(QUEUE_SMS_INFORM); 29  } 30 
31     /**
32  * 绑定队列到交换机 33  * @param queue 34  * @param exchange 35  * @return
36      */
37     public Binding BINDING_QUEUE_EMAIL(@Qualifier(QUEUE_EMAIL_INFORM) Queue queue, 38  @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ 39         return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs(); 40  } 41 
42     public Binding BINDING_QUEUE_SMS(@Qualifier(QUEUE_SMS_INFORM) Queue queue, 43  @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ 44         return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs(); 45  } 46 }

  4.以上为两个工程中相同的地方,接下来配置不一样的地方

  • 消费端工程配置一个消息监听类ReceiveHandler

1 @Component 2 public class ReceiveHandler { 3 4 // 监听email队列 5 @RabbitListener(queues = {RabbitmqConfig.QUEUE_EMAIL_INFORM}) 6 public void receiveEmail(String msg) { 7 System.out.println("email队列接收到消息:" + msg); 8 } 9 10 // 监听sms队列 11 @RabbitListener(queues = {RabbitmqConfig.QUEUE_SMS_INFORM}) 12 public void receiveSMS(String msg) { 13 System.out.println("sms队列接收到消息:" + msg); 14 } 15 }

  • 生产端工程直接在测试方法中测试消息发送功能
 1 @RunWith(SpringRunner.class)  2 @SpringBootTest  3 public class SpringBootTopicsProducerTest {  4     
 5  @Autowired  6     public RabbitTemplate rabbitTemplate;  7 
 8  @Test  9     public void testSendMessageByTopics(){ 10         for (int i = 0; i < 5; i++) { 11             String message = "sms email inform to user"+i; 12             rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message); 13  } 14  } 15 }
  • 5.测试

  消费端执行工程中的SpringBoot的引导类的main方法执行消息的监听,生产端执行测试方法testSendMessageByTopics()

  控制台打印接收到的消息:

  由于队列消息的监听是异步的,因此会出现消息打印交替出现的现象,这里简单的SpringBoot整合RabbitMQ的案例就结束了。:)

相关文章
相关标签/搜索