前言html
消息队列在现今数据量大,并发量高的系统中是十分经常使用的。本文将会对现时最经常使用到的几款消息队列框架 ActiveMQ、RabbitMQ、Kafka 进行分析对比。
详细介绍 RabbitMQ 在 Spring 框架下的结构及实现原理,从Producer 端的事务、回调函数(ConfirmCallback / ReturnCallback)到 Consumer 端的 MessageListenerContainer 信息接收容器进行详细的分析。经过对 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等经常使用类型介绍,深刻剖析在消息处理各个传输环节中的原理及注意事项。
并举以实例对死信队列、持久化操做进行一一介绍。java
目录spring
6、死信队列框架
7、持久化操做dom
1、RabbitMQ 与 AMQP 的关系
1.1 AMQP简介
AMQP(Advanced Message Queue Protocol 高级消息队列协议)是一个消息队列协议,它支持符合条件的客户端和消息代理中间件(message middleware broker)进行通信。RabbitMQ 则是 AMQP 协议的实现者,主要用于在分布式系统中信息的存储发送与接收,RabbitMQ 的服务器端用 Erlang 语言编写,客户端支持多种开发语言:Python、.NET、Java、Ruby、C、PHP、ActionScript、XMPP、STOMP 等。
1.2 ActiveMQ、RabbitMQ、Kafka 对比
如今在市场上有 ActiveMQ、RabbitMQ、Kafka 等多个经常使用的消息队列框架,与其余框架对比起来,RabbitMQ 在易用性、扩展性、高可用性、多协议、支持多语言客户端等方面都有不俗表现。
1.2.1 AcitveMQ 特色
ActiveMQ 是 Apache 以 Java 语言开发的消息模型,它完美地支持 JMS(Java Message Service)消息服务,客户端支持 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多种开主发语言,支持OpenWire、Stomp、REST、XMPP、AMQP 等多种协议。ActiveMQ 采用异步消息传递方式,在设计上保证了多主机集群,客户端-服务器,点对点等模式的有效通讯。从开始它就是按照 JMS 1.1 和 J2EE 1.4 规范进行开发,实现了消息持久化,XA,事务支撑等功能。经历多年的升级完善,现今已成为 Java 应用开发中主流的消息解决方案。但相比起 RabbitMQ、Kafka 它的主要缺点表现为资源消耗比较大,吞吐量较低,在高并发的状况下系统支撑能力较弱。若是系统全程使用 Java 开发,其并发量在可控范围内,或系统须要支持多种不一样的协议,使用 ActiveMQ 可更轻便地搭建起消息队列服务。
1.2.2 Kafka 特色
Kafka 天生是面向分布式系统开发的消息队列,它具备高性能、容灾性、可动态扩容等特色。Kafka 与生俱来的特色在于它会把每一个Partition 的数据都备份到不一样的服务器当中,并与 ZooKeeper 配合,当某个Broker 故障失效时,ZooKeeper 服务就会将通知生产者和消费者,从备份服务器进行数据恢复。在性能上 Kafka 也大大超越了传统的 ActiveMQ、RabbitMQ ,因为 Kafka 集群可支持动态扩容,在负载量到达峰值时可动态增长新的服务器进集群而无需重启服务。但因为 Kafka 属于分布式系统,因此它只能在同一分区内实现消息有序,没法实现全局消息有序。并且它内部的监控机制不够完善,须要安装插件,依赖ZooKeeper 进行元数据管理。若是系统属于分布式管理机制,数据量较大且并发量难以预估的状况下,建议使用 Kafka 队列。
1.2.3 RabbitMQ 对比
因为 ActiveMQ 过于依赖 JMS 的规范而限制了它的发展,因此 RabbitMQ 在性能和吞吐量上明显会优于 ActiveMQ。
因为上市时间较长,在可用性、稳定性、可靠性上 RabbitMq 会比 Kafka 技术成熟,并且 RabbitMq 使用 Erlang 开发,因此天生具有高并发高可用的特色。而 Kafka 属于分布式系统,它的性能、吞吐量、TPS 都会比 RabbitMq 要强。
2、RabbitMQ 的实现原理
2.1 生产者(Producer)、消费者(Consumer)、服务中心(Broker)之间的关系
首先简单介绍 RabbitMQ 的运行原理,在 RabbitMQ 使用时,系统会先安装并启动 Broker Server,也就是 RabbitMQ 的服务中心。不管是生产者 (Producer),消费者(Consumer)都会经过链接池(Connection)使用 TCP/IP 协议(默认)来与 BrokerServer 进行链接。而后 Producer 会把 Exchange / Queue 的绑定信息发送到 Broker Server,Broker Server 根据 Exchange 的类型逻辑选择对应 Queue ,最后把信息发送到与 Queue 关联的对应 Consumer 。
2.2 交换器(Exchange)、队列(Queue)、信道(Channel)、绑定(Binding)的概念
2.2.1 交换器 Exchange
Producer 创建链接后,并不是直接将消息投递到队列 Queue 中,而是把消息发送到交换器 Exchange,由 Exchange 根据不一样逻辑把消息发送到一个或多个对应的队列当中。目前 Exchange 提供了四种不一样的经常使用类型:Fanout、Direct、Topic、Header。
此类型是最为常见的交换器,它会将消息转发给全部与之绑定的队列上。好比,有N个队列与 Fanout 交换器绑定,当产生一条消息时,Exchange 会将该消息的N个副本分别发给每一个队列,相似于广播机制。
此类型的 Exchange 会把消息发送到 Routing_Key 彻底相等的队列当中。多个 Cousumer 可使用相同的关键字进行绑定,相似于数据库的一对多关系。好比,Producer 以 Direct 类型的 Exchange 推送 Routing_Key 为 direct.key1 的队列,系统再指定多个 Cousumer 绑定 direct.key1。如此,消息就会被分发至多个不一样的 Cousumer 当中。
此类型是最灵活的一种方式配置方式,它可使用模糊匹配,根据 Routing_Key 绑定到包含该关键字的不一样队列中。好比,Producer 使用 Topic类型的 Exchange 分别推送 Routing_Key 设置为 topic.guangdong.guangzhou 、topic.guangdong.shenzhen 的不一样队列,Cousumer 只须要把 Routing_Key 设置为 topic.guangdong.# ,就能够把全部消息接收处理。
该类型的交换器与前面介绍的稍有不一样,它再也不是基于关键字 Routing_Key 进行路由,而是基于多个属性进行路由的,这些属性比路由关键字更容易表示为消息的头。也就是说,用于路由的属性是取自于消息 Header 属性,当消息 Header 的值与队列绑定时指定的值相同时,消息就会路由至相应的队列中。
2.2.2 Queue 队列
Queue 队列是消息的载体,每一个消息都会被投入到 Queue 当中,它包含 name,durable,arguments 等多个属性,name 用于定义它的名称,当 durable(持久化)为 true 时,队列将会持久化保存到硬盘上。反之为 false 时,一旦 Broker Server 被重启,对应的队列就会消失,后面还会有例子做详细介绍。
2.2.3 Channel 通道
当 Broker Server 使用 Connection 链接 Producer / Cousumer 时会使用到信道(Channel),一个 Connection上能够创建多个 Channel,每一个 Channel 都有一个会话任务,能够理解为逻辑上的链接。主要用做管理相关的参数定义,发送消息,获取消息,事务处理等。
2.2.4 Binding 绑定
Binding 主要用于绑定交换器 Exchange 与 队列 Queue 之间的对应关系,并记录路由的 Routing-Key。Binding 信息会保存到系统当中,用于 Broker Server 信息的分发依据。
3、RabbitMQ 应用实例
3.1 Rabbit 经常使用类说明
3.1.1 RabbitTemplate 类
Spring 框架已经封装了 RabbitTemplate 对 RabbitMQ 的绑定、队列发送、接收进行简化管理
方法 | 说明 |
void setExchange(String exchange) | 设置绑定的 exchange 名称 |
String getExchange() | 获取已绑定的 exchange 名称 |
void setRoutingKey(String routingKey) | 设置绑定的 routingKey |
String getRoutingKey() | 获取已绑定的 routingKey |
void send(String exchange, String routingKey, Message message,CorrelationData data) | 以Message方式发送信息到 Broken Server,CorrelationData 为标示符可为空 |
void convertAndSend(String exchange, String routingKey, Object object, CorrelationData data) | 以自定义对象方式发送信息到 Broken Server,系统将自动把 object转换成 Message,CorrelationData 为标示符可为空 |
Message receive(String queueName, long timeoutMillis) | 根据queueuName接收队列发送Message信息 |
Object receiveAndConvert(String queueName, long timeoutMillis) | 根据queueuName接收队列对象信息 |
void setReceiveTimeout(long receiveTimeout) | 设置接收过时时间 |
void setReplyTimeout(long replyTimeout) | 设置重发时间 |
void setMandatory(boolean mandatory) | 开启强制委托模式(下文会详细说明) |
void setConfirmCallback(confirmCallback) | 绑定消息确认回调方法(下文会详细说明) |
void setReturnCallback(returnCallback) | 绑定消息退出回调方法(下文会详细说明) |
3.2 初探 RabbitMQ
在官网下载并成功安装完 RabbitMQ 后,打开默认路径 http://localhost:15672/#/ 便可看到 RabbitMQ 服务中心的管理界面
3.2.1 Producer 端开发
先在 pom 中添加 RabbitMQ 的依赖,并在 application.yml 中加入 RabbitMQ 账号密码等信息。此例子,咱们尝试使用 Direct 交换器把队列发送到不一样的 Consumer。
1 **********************pom ************************* 2 <project> 3 ............. 4 <dependency> 5 <groupId>org.springframework.boot</groupId> 6 <artifactId>spring-boot-starter-amqp</artifactId> 7 <version>2.0.5.RELEASE</version> 8 </dependency> 9 </project> 10 11 **************** application.yml **************** 12 spring: 13 application: 14 name: rabbitMqProducer 15 rabbitmq: 16 host: localhost 17 port: 5672 18 username: admin 19 password: 12345678 20 virtual-host: /LeslieHost
首先使用 CachingConnectionFactory 创建连接,经过 BindingBuilder 绑定 Exchange、Queue、RoutingKey之间的关系。
而后经过 void convertAndSend (String exchange, String routingKey, Object object, CorrelationData data) 方法把信息发送到 Broken Server
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 public class BindingConfig { 32 public final static String first="direct.first"; 33 public final static String second="direct.second"; 34 public final static String Exchange_NAME="directExchange"; 35 public final static String RoutingKey1="directKey1"; 36 public final static String RoutingKey2="directKey2"; 37 38 @Bean 39 public Queue queueFirst(){ 40 return new Queue(first); 41 } 42 43 @Bean 44 public Queue queueSecond(){ 45 return new Queue(second); 46 } 47 48 @Bean 49 public DirectExchange directExchange(){ 50 return new DirectExchange(Exchange_NAME,true,true); 51 } 52 53 //利用BindingBuilder绑定Direct与queueFirst 54 @Bean 55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 57 } 58 59 //利用BindingBuilder绑定Direct与queueSecond 60 @Bean 61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 63 } 64 } 65 66 @Controller 67 @RequestMapping("/producer") 68 public class ProducerController { 69 @Autowired 70 private RabbitTemplate template; 71 72 @RequestMapping("/send") 73 public void send() { 74 for(int n=0;n<100;n++){ 75 76 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"I'm the first queue! "+String.valueOf(n),getCorrelationData()); 77 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey2,"I'm the second queue! "+String.valueOf(n),getCorrelationData()); 78 } 79 } 80 81 private CorrelationData getCorrelationData(){ 82 return new CorrelationData(UUID.randomUUID().toString()); 83 } 84 }
此时,打开 RabbitMQ 管理界面,可看到 Producer 已经向 Broken Server 的 direct.first / direct.second 两个 Queue 分别发送100 个 Message
3.2.2 Consumer 端开发
分别创建两个不一样的 Consumer ,一个绑定 direct.first 别一个绑定 direct.second , 而后经过注解 @RabbitListener 监听不一样的 queue,当接到到 Producer 推送队列时,显示队列信息。
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 public class BindingConfig { 32 public final static String first="direct.first"; 33 public final static String Exchange_NAME="directExchange"; 34 public final static String RoutingKey1="directKey1"; 35 36 @Bean 37 public Queue queueFirst(){ 38 return new Queue(first); 39 } 40 41 @Bean 42 public DirectExchange directExchange(){ 43 return new DirectExchange(Exchange_NAME); 44 } 45 46 //利用BindingBuilder绑定Direct与queueFirst 47 @Bean 48 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 49 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 50 } 51 } 52 53 @Configuration 54 @RabbitListener(queues="direct.first") 55 public class RabbitMqListener { 56 57 @RabbitHandler 58 public void handler(String message){ 59 System.out.println(message); 60 } 61 } 62 63 @SpringBootApplication 64 public class App { 65 66 public static void main(String[] args){ 67 SpringApplication.run(App.class, args); 68 } 69 }
运行后能够观察到不一样的 Consumer 会收到不一样队列的消息
若是以为使用 Binding 代码绑定过于繁琐,还能够直接在监听类RabbitMqListener中使用 @QueueBinding 注解绑定
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 @RabbitListener(bindings=@QueueBinding( 32 exchange=@Exchange(value="directExchange"), 33 value=@Queue(value="direct.second"), 34 key="directKey2")) 35 public class RabbitMqListener { 36 37 @RabbitHandler 38 public void handler(String message){ 39 System.out.println(message); 40 } 41 } 42 43 @SpringBootApplication 44 public class App { 45 46 public static void main(String[] args){ 47 SpringApplication.run(App.class, args); 48 } 49 }
运行结果
4、Producer 端的消息发送与监控
前面一节已经介绍了RabbitMQ的基本使用方法,这一节将从更深刻的层面讲述 Producer 的应用。
试想一下这种的情形,若是因 RabbitTemplate 发送时 Exchange 名称绑定错误,或 Broken Server 因网络问题或服务负荷过大引起异常,Producer 发送的队列丢失,系统没法正常工做。此时,开发人员应该进行一系列应对措施进行监测,确保每一个数据都能正常推送到 Broken Server 。有见及此,RabbitMQ 专门为你们提供了两种解决方案,一是使用传统的事务模式,二是使用回调函数,下面为你们做详介绍。
4.1 Producer 端的事务管理
在须要使用事务时,能够经过两种方法
第一能够调用 channel 类的方法以传统模式进行管理,事务开始时调用 channel.txSelect(),信息发送后进行确认 channel.txCommit(),一旦捕捉到异常进行回滚 channel.txRollback(),最后关闭事务。
1 @Controller 2 @RequestMapping("/producer") 3 public class ProducerController { 4 @Autowired 5 private RabbitTemplate template; 6 7 @RequestMapping("/send") 8 public void send1(HttpServletResponse response) 9 throws InterruptedException, IOException, TimeoutException{ 10 Channel channel=template.getConnectionFactory().createConnection().createChannel(true); 11 ....... 12 try{ 13 channel.txSelect(); 14 channel.basicPublish("ErrorExchange", BindingConfig.Routing_Key_First, new AMQP.BasicProperties(),"Nothing".getBytes()); 15 channel.txCommit(); 16 }catch(Exception e){ 17 channel.txRollback(); 18 }finally{ 19 channel.close(); 20 } 21 ...... 22 ...... 23 ...... 24 } 25 }
第二还能够直接经过 RabbitTemplate 的配置方法 void setChannelTransacted(bool isTransacted) 直接开启事务
1 public class ProducerController { 2 @Autowired 3 private ConnectionConfig connection; 4 5 @Autowired 6 @Bean 7 private RabbitTemplate template(){ 8 RabbitTemplate template=new RabbitTemplate(connection.getConnectionFactory()); 9 template.setChannelTransacted(true); 10 return template; 11 } 12 13 @RequestMapping("/send") 14 @Transactional(rollbackFor=Exception.class) 15 public void send(HttpServletResponse response) throws InterruptedException, IOException,TimeoutException{ 16 .......... 17 .......... 18 .......... 19 } 20 }
4.2 利用 ConfirmCallback 回调确认消息是否成功发送到 Exchange
使用事务模式消耗的系统资源比较大,系统每每会处理长期等待的状态,在并发量较高的时候也有可能形成死锁的隐患。有见及此,系统提供了轻量级的回调函数方式进行异步处理。
当须要确认消息是否成功发送到 Exchange 的时候,可使用 ConfirmCallback 回调函数。使用该函数,系统推送消息后,该线程便会获得释放,等 Exchange 接收到消息后系统便会异步调用 ConfirmCallback 绑定的方法进行处理。ConfirmCallback 只包含一个方法 void confirm(CorrelationData correlationData, boolean ack, String cause),此方法会把每条数据发送到 Exchange 时候的 ack 状态(成功/失败),cause 成败缘由,及对应的 correlationData(CorrelationData 只包含一个属性 id,是绑定发送对象的惟一标识符) 返还到 Producer,让Producer 进行相应处理。
注意:在绑定 ConfirmCallback 回调函数前,请先把 publisher-confirms 属性设置为 true
1 spring: 2 application: 3 name: rabbitmqproducer 4 rabbitmq: 5 host: 127.0.0.1 6 port: 5672 7 username: admin 8 password: 12345678 9 virtual-host: /LeslieHost
例如:下面的例子,特地将 RabbitTemplate 发送时所绑定的 Exchange 名称填写为错误名称 “ ErrorExchange ”,形成发送失败,而后在回调函数中检查失败的缘由。
Producer 端代码:
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 System.out.println(host); 22 factory.setHost(host); 23 factory.setPort(port); 24 factory.setUsername(username); 25 factory.setPassword(password); 26 factory.setVirtualHost(virtualHost); 27 factory.setPublisherConfirms(true); 28 factory.setPublisherReturns(true); 29 return factory; 30 } 31 } 32 33 @Configuration 34 public class BindingConfig { 35 public final static String first="direct.first"; 36 public final static String Exchange_NAME="directExchange"; 37 public final static String RoutingKey1="directKey1"; 38 39 @Bean 40 public Queue queueFirst(){ 41 return new Queue(first); 42 } 43 44 @Bean 45 public DirectExchange directExchange(){ 46 return new DirectExchange(Exchange_NAME); 47 } 48 49 @Bean 50 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 51 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 52 } 53 } 54 55 @Component 56 public class MyConfirmCallback implements ConfirmCallback { 57 58 @Override 59 public void confirm(CorrelationData correlationData, boolean ack, String cause) { 60 // TODO 自动生成的方法存根 61 // TODO 自动生成的方法存根 62 if(ack){ 63 System.out.println(correlationData.getId()+" ack is: true! \ncause:"+cause); 64 }else 65 System.out.println(correlationData.getId()+" ack is: false! \ncause:"+cause); 66 } 67 } 68 69 @Controller 70 @RequestMapping("/producer") 71 public class ProducerController { 72 @Autowired 73 private RabbitTemplate template; 74 @Autowired 75 private MyConfirmCallback confirmCallback; 76 77 @RequestMapping("/send") 78 public void send() { 79 template.setConfirmCallback(confirmCallback); 80 for(int n=0;n<2;n++){ 81 template.convertAndSend("ErrorExchange", 82 BindingConfig.RoutingKey1,"I'm the first queue! " 83 +String.valueOf(n),getCorrelationData()); 84 } 85 } 86 87 private CorrelationData getCorrelationData(){ 88 return new CorrelationData(UUID.randomUUID().toString()); 89 } 90 }
Consumer端代码
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 @RabbitListener(bindings=@QueueBinding( 32 exchange=@Exchange(value="directExchange"), 33 value=@Queue(value="direct.first"), 34 key="directKey1")) 35 public class RabbitMqListener { 36 37 @RabbitHandler 38 public void handler(String message){ 39 System.out.println(message); 40 } 41 } 42 43 @SpringBootApplication 44 public class App { 45 46 public static void main(String[] args){ 47 SpringApplication.run(App.class, args); 48 } 49 }
运行结果:
4.3 绑定 CorrelationData 与发送对象的关系
上面的例子当中,CorrelationData 只是用一个随机的 UUID 做为 CorrelationID,而在现实的应用场景中,因为 ConfirmCallback 只反回标识值 CorrelationData,而没有把队列里的对象值也一同返回。因此,在推送队列时能够先用 Key-Value 保存 CorrelationID 与所发送信息的关系,这样当 ConfirmCallback 回调时,就可根据 CorrelationID 找回对象,做进一步处理。
下面例子,咱们把要发送的对象放在虚拟数据 DataSource 类中,用 DataRelation 记录 CorrelationID 与发送对象 OrderID 的关系,而后在回调函数 ConfirmCallback 中根据 CorrelationID 查找对应的 OrderEntity,若是发送成功,则删除绑定。若是发送失败,能够从新发送或根据状况再做处理。
Producer端代码:
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 System.out.println(host); 22 factory.setHost(host); 23 factory.setPort(port); 24 factory.setUsername(username); 25 factory.setPassword(password); 26 factory.setVirtualHost(virtualHost); 27 factory.setPublisherConfirms(true); 28 factory.setPublisherReturns(true); 29 return factory; 30 } 31 } 32 33 @Configuration 34 public class BindingConfig { 35 public final static String first="direct.first"; 36 //Exchange 使用 direct 模式 37 public final static String Exchange_NAME="directExchange"; 38 public final static String RoutingKey1="directKey1"; 39 40 @Bean 41 public Queue queueFirst(){ 42 return new Queue(first); 43 } 44 45 @Bean 46 public DirectExchange directExchange(){ 47 return new DirectExchange(Exchange_NAME); 48 } 49 50 @Bean 51 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 52 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 53 } 54 } 55 56 @Data 57 public class OrderEntity implements Serializable{ 58 private String id; 59 private String goods; 60 private Double price; 61 private Integer count; 62 63 public OrderEntity(String id,String goods,Double price,Integer count){ 64 this.id=id; 65 this.goods=goods; 66 this.price=price; 67 this.count=count; 68 } 69 70 public OrderEntity(){} 71 72 public String getId() { 73 return id; 74 } 75 public void setId(String id) { 76 this.id = id; 77 } 78 79 public String getGoods() { 80 return goods; 81 } 82 83 public void setGoodsId(String goods) { 84 this.goods = goods; 85 } 86 87 public Integer getCount() { 88 return count; 89 } 90 91 public void setCount(Integer count) { 92 this.count = count; 93 } 94 95 public Double getPrice() { 96 return price; 97 } 98 99 public void setPrice(Double price) { 100 this.price = price; 101 } 102 } 103 104 @Component 105 public class DataSource { 106 //加入虚拟数据 107 private static List<OrderEntity> list=new ArrayList<OrderEntity>( 108 Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1), 109 new OrderEntity("002","Huwei P30 Plus",5400.00,1), 110 ..........)); 111 112 public DataSource(){ 113 } 114 115 public List<OrderEntity> getOrderList(){ 116 return list; 117 } 118 119 //根据Id获取对应order 120 public OrderEntity getOrder(String id){ 121 for(OrderEntity order:list){ 122 if(order.getId()==id) 123 return order; 124 } 125 return null; 126 } 127 } 128 129 public class DataRelation { 130 public static Map map=new HashMap(); 131 132 //绑定关系 133 public static void add(String key,String value){ 134 if(!map.containsKey(key)) 135 map.put(key,value); 136 } 137 138 //返回orderId 139 public static Object get(String key){ 140 if(map.containsKey(key)) 141 return map.get(key); 142 else 143 return null; 144 } 145 146 //根据 orderId 删除绑定关系 147 public static void del(String key){ 148 if(map.containsKey(key)) 149 map.remove(key); 150 } 151 } 152 153 @Component 154 public class MyConfirmCallback implements ConfirmCallback { 155 @Autowired 156 private DataSource datasource; 157 158 @Override 159 public void confirm(CorrelationData correlationData, boolean ack, String cause) { 160 String correlationId=correlationData.getId(); 161 //根据 correclationId取回对应的orderId 162 String orderId=DataRelation.get(correlationId).toString(); 163 //在datasource中找回对应的order 164 OrderEntity order=datasource.getOrder(orderId); 165 166 if(ack){ 167 System.out.println("--------------------ConfirmCallback-------------------\n" 168 +" order's ack is true!\nId:"+order.getId()+" Goods:"+order.getGoods() 169 +" Count:"+order.getCount().toString()+" Price:"+order.getPrice()); 170 DataRelation.del(correlationId); //操做完成删除对应绑定 171 }else { 172 System.out.println(order.getId()+" order's ack is: false! \ncause:"+cause); 173 //可在记录日志后把Order推送到队列进行从新发送 174 ....... 175 } 176 } 177 } 178 179 @Controller 180 @RequestMapping("/producer") 181 public class ProducerController { 182 @Autowired 183 private RabbitTemplate template; 184 @Autowired 185 private MyConfirmCallback confirmCallback; 186 @Autowired 187 private DataSource dataSource; 188 189 @RequestMapping("/send") 190 public void send() throws InterruptedException, IOException{ 191 //绑定 ConfirmCallback 回调函数 192 template.setConfirmCallback(confirmCallback); 193 194 for(OrderEntity order:dataSource.getOrderList()){ 195 CorrelationData correlationData=getCorrelationData(); 196 //保存 CorrelationId 与 orderId关系 197 DataRelation.add(correlationData.getId(), order.getId()); 198 //把 order 插入队列 199 template.convertAndSend("directExchange",BindingConfig.RoutingKey1,order,correlationData); 200 } 201 } 202 203 private CorrelationData getCorrelationData(){ 204 return new CorrelationData(UUID.randomUUID().toString()); 205 } 206 }
Consumer 端代码
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 @RabbitListener(bindings=@QueueBinding( 32 exchange=@Exchange(value="directExchange"), 33 value=@Queue(value="direct.first"), 34 key="directKey1")) 35 public class RabbitMqListener { 36 37 @RabbitHandler 38 public void handler(String message){ 39 System.out.println(message); 40 } 41 } 42 43 @SpringBootApplication 44 public class App { 45 46 public static void main(String[] args){ 47 SpringApplication.run(App.class, args); 48 } 49 }
运行结果
4.4 利用 ReturnCallback 处理队列 Queue 错误
使用 ConfirmCallback 函数只能判断消息是否成功发送到 Exchange,但并不能保证消息已经成功进行队列 Queue。因此,系统预备了另外一个回调函数 ReturnCallback 来监听 Queue 队列处理的成败。若是队列错误绑定不存在的 queue,或者 Broken Server 瞬间出现问题末能找到对应的 queue,系统就会激发 Producer 端 ReturnCallback 的回调函数来进行错误处理。 ReturnCallback 回调接口只包含一个方法 void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey),它会把出错的 replyCode,replyText,exchange,routingKey等值都一块儿返还。与 ConfirmCallback 不一样的是,returnedMessage 会把队列中的对象保存到 Message 的 Body 属性中并返还到回调函数。
注意:在绑定 ReturnCallback 回调函数前,请先把 publisher-returns 及 mandatory 属性设置为 true 。 mandatory 参数默认为 false,用于判断 broken server是否把错误的对象返还到 Producer。如末进行设置,系统将把错误的消息丢弃。
下面例子咱们在调用 convertAndSend 方法时特地把 routingKey 设置为 ErrorKey,触发 ReturnCallback 回调,而后在 ReturenCallback 的回调方法显示 replyCode,replyText,exchange,routingKey 等值,并把队列中对象属性一并显示。
Producer 端代码
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 System.out.println(host); 22 factory.setHost(host); 23 factory.setPort(port); 24 factory.setUsername(username); 25 factory.setPassword(password); 26 factory.setVirtualHost(virtualHost); 27 factory.setPublisherConfirms(true); 28 factory.setPublisherReturns(true); 29 return factory; 30 } 31 } 32 33 @Configuration 34 public class BindingConfig { 35 public final static String first="direct.first"; 36 public final static String Exchange_NAME="directExchange"; 37 public final static String RoutingKey1="directKey1"; 38 39 @Bean 40 public Queue queueFirst(){ 41 return new Queue(first); 42 } 43 44 @Bean 45 public DirectExchange directExchange(){ 46 return new DirectExchange(Exchange_NAME); 47 } 48 49 @Bean 50 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 51 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 52 } 53 } 54 55 @Data 56 public class OrderEntity implements Serializable{ 57 private String id; 58 private String goods; 59 private Double price; 60 private Integer count; 61 62 public OrderEntity(String id,String goods,Double price,Integer count){ 63 this.id=id; 64 this.goods=goods; 65 this.price=price; 66 this.count=count; 67 } 68 69 public OrderEntity(){} 70 71 public String getId() { 72 return id; 73 } 74 public void setId(String id) { 75 this.id = id; 76 } 77 78 public String getGoods() { 79 return goods; 80 } 81 82 public void setGoodsId(String goods) { 83 this.goods = goods; 84 } 85 86 public Integer getCount() { 87 return count; 88 } 89 90 public void setCount(Integer count) { 91 this.count = count; 92 } 93 94 public Double getPrice() { 95 return price; 96 } 97 98 public void setPrice(Double price) { 99 this.price = price; 100 } 101 } 102 103 @Component 104 public class DataSource { 105 //虚拟数据 106 private static List<OrderEntity> list=new ArrayList<OrderEntity>( 107 Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1), 108 new OrderEntity("002","Huwei P30 Plus",5400.00,1), 109 ......)); 110 public DataSource(){ 111 } 112 113 public List<OrderEntity> getOrderList(){ 114 return list; 115 } 116 117 //根据Id获取对应order 118 public OrderEntity getOrder(String id){ 119 for(OrderEntity order:list){ 120 if(order.getId()==id) 121 return order; 122 } 123 return null; 124 } 125 } 126 127 @Component 128 public class MyReturnCallback implements ReturnCallback { 129 130 @Override 131 public void returnedMessage(Message message, int replyCode, 132 String replyText, String exchange, String routingKey){ 133 //把messageBody反序列化为 OrderEntity对象 134 OrderEntity order=convertToOrder(message.getBody()); 135 //显示错误缘由 136 System.out.println("-------------ReturnCallback!------------\n" 137 +" exchange:"+exchange+" replyCode:"+String.valueOf(replyCode) 138 +" replyText:"+replyText+" key:"+routingKey+"\n OrderId:"+order.getId() 139 +" Goods:"+order.getGoods()+" Count:"+order.getCount().toString() 140 +" Price:"+order.getPrice()+" "); 141 } 142 143 //把byte[]反序列化为 OrderEntity对象 144 private OrderEntity convertToOrder(byte[] bytes){ 145 OrderEntity order=null; 146 ByteArrayInputStream bis = new ByteArrayInputStream (bytes); 147 ObjectInputStream ois; 148 try { 149 ois = new ObjectInputStream (bis); 150 Object obj = ois.readObject(); 151 order=(OrderEntity)obj; 152 ois.close(); 153 bis.close(); 154 } catch (IOException | ClassNotFoundException e) { 155 // TODO 自动生成的 catch 块 156 e.printStackTrace(); 157 } 158 return order; 159 } 160 } 161 162 @Controller 163 @RequestMapping("/producer") 164 public class ProducerController { 165 @Autowired 166 private RabbitTemplate template; 167 @Autowired 168 private MyReturnCallback returnCallback; 169 @Autowired 170 private DataSource dataSource; 171 172 173 @RequestMapping("/send") 174 public void send() throws InterruptedException, IOException{ 175 //把 mandatory 属性设定为true 176 template.setMandatory(true); 177 //绑定 ReturnCallback 回调函数 178 template.setReturnCallback(returnCallback); 179 180 for(OrderEntity order:dataSource.getOrderList()){ 181 CorrelationData correlationData=getCorrelationData(); 182 template.convertAndSend("directExchange","ErrorKey",order,correlationData); 183 } 184 } 185 186 private CorrelationData getCorrelationData(){ 187 return new CorrelationData(UUID.randomUUID().toString()); 188 } 189 }
Consumer 代码
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 @RabbitListener(bindings=@QueueBinding( 32 exchange=@Exchange(value="directExchange"), 33 value=@Queue(value="direct.first"), 34 key="directKey1")) 35 public class RabbitMqListener { 36 37 @RabbitHandler 38 public void handler(String message){ 39 System.out.println(message); 40 } 41 } 42 43 @SpringBootApplication 44 public class App { 45 46 public static void main(String[] args){ 47 SpringApplication.run(App.class, args); 48 } 49 }
运行结果:
5、Consumer 消息接收管控
在第四节主要介绍了 Producer 端的队列发送与监控,它只能管理 Producer 与 Broker Server 之间的通讯,但并不能确认 Consumer 是否能成功接收到队列,在这节内容将介绍 Consumer 端的队列接收与监听。前面几节里,Consumer 端都是简单地直接使用 RabbitListener 对队列进行监听,其实 RabbitMQ 已经为用户准备了功能更强大的 MessageListenerContainer 容器用于管理 Message ,下面将为你们介绍。
5.1 AbstractMessageListenerContainer 介绍
AbstractMeessageListenerContainer 虚拟类是 RabbitMQ 封装好的一个容器,自己并无对消息进行处理,而是把消息的处理方式交给了 MessageListener 。而它的主要功能是实现 MessageListener 的绑定,ApplicationContext 上下文的绑定,ErrorHandler 的错误处理方法的绑定、对消息消费的开始、结束等等默认参数进行配置,让开发人员能够在容器中对 Consumer 实现统一管理。SimpleMessageListenerContainer、DirectMessageLinstenerCoontainer 都是它的子类,分别应用于不一样的场景,在下面会再做详细介绍。
方法 | 说明 |
void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) | 设定消息接收确认的模式(下文会有详细介绍) |
AcknowledgeMode getAcknowledgeMode() | 获取消息接收确认的模式(下文会有详细介绍) |
void setPrefetchCount(int prefetchCount) | 设置每一个 consumer 每次可接收到消息的最大数量 |
void setQueues(Queue... queues) | 设定监听Queue队列 |
void addQueues(Queue... queues) | 加入监听Queue队列 |
void setMessageListener(Object messageListener) | 绑定MessageListener,对信息进行处理 |
void setChannelAwareMessageListener(ChannelAwareMessageListener messageListener) | 绑定ChannelAwareMessageListener,对信息进行处理,同时可获取当前使用的channel信息 |
Object getMessageListener() | 获取MessageListener对象 |
void setMessageConverter(MessageConverter messageConverter) | 绑定MessageConverter消息转换对象 |
void setApplicationContext(ApplicationContext applicationContext) | 绑定ApplicationContext上下文 |
ConnectionFactory getConnectionFactory() | 获取ConnectionFactory链接工厂 |
void setListenerId(String listenerId) | 设定ListenerId |
MessageListener 是监听消息最经常使用 Listener,它只包含了一个方法 void onMessage(Message message),这是消息接收最经常使用的一个方法,开发者只须要实现此方法便可对接收到的 Message 进行处理。
ChannelAwareMessageListener 至关因而 MessageListener的一个扩展,包含了方法 void onMessage(Message message, Channel channel),除了对 Message 进行处理外,还能够对接收此 Message 的 Channel 进行检测。
5.2 SimpleMessageListenerContainer 经常使用方法
SimpleMessageListenerContainer 是最经常使用的 MessageListener 容器,它能够经过下面的方法设置默认消费者数量与最大的消费者数量。下面例子中尝试把 consurrentConsumers 设置为3,把maxConcurrentConsumers 设置为4,并同时监控 direct 模式交换器的 direct.first,direct.second 队列。
方法 | 说明 |
void setConcurrentConsumers(final int concurrentConsumers) | 设定当前队列中消费者数量 |
void setMaxConcurrentConsumers(int maxConcurrentConsumers) | 设定当前队列中最大消费者数量 |
经过截图能够看到,系统默认会为每一个 queue 都建立 3 个 consumers,不一样的 queue 中的 consumers 是共享相同的 3 个 channel 。
当 Producer 端发送消息时,consumers 的实际数量可根据 maxConcurrentConsumers 的配置限制进行扩展。
Producer 端代码
1 @Configuration 2 public class BindingConfig { 3 public final static String first="direct.first"; 4 public final static String second="direct.second"; 5 public final static String Exchange_NAME="directExchange"; 6 public final static String RoutingKey1="directKey1"; 7 public final static String RoutingKey2="directKey2"; 8 9 @Bean 10 public Queue queueFirst(){ 11 return new Queue(first); 12 } 13 14 @Bean 15 public Queue queueSecond(){ 16 return new Queue(second); 17 } 18 19 @Bean 20 public DirectExchange directExchange(){ 21 return new DirectExchange(Exchange_NAME); 22 } 23 24 //利用BindingBuilder绑定Direct与queueFirst 25 @Bean 26 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 27 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 28 } 29 30 //利用BindingBuilder绑定Direct与queueSecond 31 @Bean 32 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 33 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 34 } 35 } 36 37 @Configuration 38 public class ConnectionConfig { 39 @Value("${spring.rabbitmq.host}") 40 public String host; 41 42 @Value("${spring.rabbitmq.port}") 43 public int port; 44 45 @Value("${spring.rabbitmq.username}") 46 public String username; 47 48 @Value("${spring.rabbitmq.password}") 49 public String password; 50 51 @Value("${spring.rabbitmq.virtual-host}") 52 public String virtualHost; 53 54 @Bean 55 public ConnectionFactory getConnectionFactory(){ 56 CachingConnectionFactory factory=new CachingConnectionFactory(); 57 factory.setHost(host); 58 factory.setPort(port); 59 factory.setUsername(username); 60 factory.setPassword(password); 61 factory.setVirtualHost(virtualHost); 62 return factory; 63 } 64 } 65 66 @Controller 67 @RequestMapping("/producer") 68 public class ProducerController { 69 @Autowired 70 private RabbitTemplate template; 71 72 @RequestMapping("/send") 73 public void send(HttpServletResponse response) throws InterruptedException, IOException{ 74 for(Integer n=0;n<100;n++){ 75 CorrelationData correlationData=getCorrelationData(); 76 template.convertAndSend("directExchange","directKey1", 77 "queue1"+" "+n.toString(),correlationData); 78 template.convertAndSend("directExchange","directKey2"," queue2"+" "+n.toString(),correlationData); 79 Thread.currentThread().sleep(30); 80 } 81 } 82 83 private CorrelationData getCorrelationData(){ 84 return new CorrelationData(UUID.randomUUID().toString()); 85 } 86 }
Consumer 端代码:
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 public class BindingConfig { 32 public final static String first="direct.first"; 33 public final static String second="direct.second"; 34 public final static String Exchange_NAME="directExchange"; 35 public final static String RoutingKey1="directKey1"; 36 public final static String RoutingKey2="directKey2"; 37 38 @Bean 39 public Queue queueFirst(){ 40 return new Queue(first); 41 } 42 43 @Bean 44 public Queue queueSecond(){ 45 return new Queue(second); 46 } 47 48 @Bean 49 public DirectExchange directExchange(){ 50 return new DirectExchange(Exchange_NAME); 51 } 52 53 //利用BindingBuilder绑定Direct与queueFirst 54 @Bean 55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 57 } 58 59 //利用BindingBuilder绑定Direct与queueSecond 60 @Bean 61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 63 } 64 } 65 @Configuration 66 public class SimpleMessListener { 67 @Autowired 68 private RabbitTemplate template; 69 private int index=0; 70 71 @Bean 72 public SimpleMessageListenerContainer messageContainer(){ 73 SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(); 74 container.setConnectionFactory(connectionConfig.getConnectionFactory()); 75 // 绑定Queue1/Queue2 76 container.setQueueNames("direct.first"); 77 container.addQueueNames("direct.second"); 78 //设置默认 consumer 数为3 79 container.setConcurrentConsumers(3); 80 //设置最大 consumer 数为4 81 container.setMaxConcurrentConsumers(4); 82 //标记 consumerTag 83 container.setConsumerTagStrategy(queue -> "consumer"+(++index)); 84 //绑定MessageListener显示接收信息 85 container.setMessageListener(new MessageListener(){ 86 @Override 87 public void onMessage(Message message) { 88 // TODO 自动生成的方法存根 89 Thread thread=Thread.currentThread(); 90 MessageProperties messProp=message.getMessageProperties(); 91 try { 92 System.out.println(" ConsumerTag:"+messProp.getConsumerTag() 93 +" ThreadId is:"+thread.getId()+" Queue:"+messProp.getConsumerQueue() 94 +" "+new String(message.getBody(),"UTF-8")); 95 } catch (UnsupportedEncodingException e) { 96 // TODO 自动生成的 catch 块 97 e.printStackTrace(); 98 } 99 } 100 101 }); 102 return container; 103 } 104 }
运行结果
5.3 SimpleMessageListenerContainer 的运做原理
在 SimpleMessageListenerContainer 模式中,不管系统监听多少个 queue 队列,channel 都是共享的,相似上面的例子,4个 channel 会把接收到不一样的队列请求并分发到对应的 consumer 进行处理。这样作的好处是系统能够经过 concurrentConsumers、maxConcurrentConsumers 灵活设定当前队列中消费者的数量,系统能够跟据实际需求灵活处理。但因为每一个 channel 都是在固定线程中运行的,一个 channel 要游走于多个 consumer 当中,这无疑增长了系统在上下文切换中的开销。下面用系统提供的 ChannelAwareMessageListener 接口,以更直观的例子说明一下 SimpleMessageListenerContainer 当中 channel、queue、consumer 之间的关系。
Producer 端代码
1 @Configuration 2 public class BindingConfig { 3 public final static String first="direct.first"; 4 public final static String second="direct.second"; 5 public final static String Exchange_NAME="directExchange"; 6 public final static String RoutingKey1="directKey1"; 7 public final static String RoutingKey2="directKey2"; 8 9 @Bean 10 public Queue queueFirst(){ 11 return new Queue(first); 12 } 13 14 @Bean 15 public Queue queueSecond(){ 16 return new Queue(second); 17 } 18 19 @Bean 20 public DirectExchange directExchange(){ 21 return new DirectExchange(Exchange_NAME); 22 } 23 24 //利用BindingBuilder绑定Direct与queueFirst 25 @Bean 26 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 27 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 28 } 29 30 //利用BindingBuilder绑定Direct与queueSecond 31 @Bean 32 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 33 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 34 } 35 } 36 37 @Configuration 38 public class ConnectionConfig { 39 @Value("${spring.rabbitmq.host}") 40 public String host; 41 42 @Value("${spring.rabbitmq.port}") 43 public int port; 44 45 @Value("${spring.rabbitmq.username}") 46 public String username; 47 48 @Value("${spring.rabbitmq.password}") 49 public String password; 50 51 @Value("${spring.rabbitmq.virtual-host}") 52 public String virtualHost; 53 54 @Bean 55 public ConnectionFactory getConnectionFactory(){ 56 CachingConnectionFactory factory=new CachingConnectionFactory(); 57 factory.setHost(host); 58 factory.setPort(port); 59 factory.setUsername(username); 60 factory.setPassword(password); 61 factory.setVirtualHost(virtualHost); 62 return factory; 63 } 64 } 65 66 @Controller 67 @RequestMapping("/producer") 68 public class ProducerController { 69 @Autowired 70 private RabbitTemplate template; 71 72 @RequestMapping("/send") 73 public void send(HttpServletResponse response) throws InterruptedException, IOException{ 74 for(Integer n=0;n<100;n++){ 75 CorrelationData correlationData=getCorrelationData(); 76 template.convertAndSend("directExchange","directKey1", 77 " queue1"+" "+n.toString(),correlationData); 78 template.convertAndSend("directExchange","directKey2", 79 "queue2"+" "+n.toString(),correlationData); 80 Thread.currentThread().sleep(30); 81 } 82 } 83 84 private CorrelationData getCorrelationData(){ 85 return new CorrelationData(UUID.randomUUID().toString()); 86 } 87 }
Consumer 端代码
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 public class BindingConfig { 32 public final static String first="direct.first"; 33 public final static String second="direct.second"; 34 public final static String Exchange_NAME="directExchange"; 35 public final static String RoutingKey1="directKey1"; 36 public final static String RoutingKey2="directKey2"; 37 38 @Bean 39 public Queue queueFirst(){ 40 return new Queue(first); 41 } 42 43 @Bean 44 public Queue queueSecond(){ 45 return new Queue(second); 46 } 47 48 @Bean 49 public DirectExchange directExchange(){ 50 return new DirectExchange(Exchange_NAME); 51 } 52 53 //利用BindingBuilder绑定Direct与queueFirst 54 @Bean 55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 57 } 58 59 //利用BindingBuilder绑定Direct与queueSecond 60 @Bean 61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 63 } 64 } 65 @Configuration 66 public class SimpleMessListener { 67 @Autowired 68 private RabbitTemplate template; 69 @Autowired 70 private ConnectionConfig connectionConfig; 71 private int index=0; 72 73 @Bean 74 public SimpleMessageListenerContainer messageContainer(){ 75 SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(); 76 container.setConnectionFactory(connectionConfig.getConnectionFactory()); 77 // 绑定Queue1/Queue2 78 container.setQueueNames("direct.first"); 79 container.addQueueNames("direct.second"); 80 //设置默认 consumer 数为3 81 container.setConcurrentConsumers(3); 82 //设置最大 consumer 数为4 83 container.setMaxConcurrentConsumers(4); 84 //标记 consumerTag 85 container.setConsumerTagStrategy(queue -> "consumer"+(++index)); 86 //绑定ChannelAwareMessageListener显示接收信息 87 container.setChannelAwareMessageListener(new ChannelAwareMessageListener(){ 88 @Override 89 public void onMessage(Message message, com.rabbitmq.client.Channel channel) 90 throws Exception { 91 // TODO 自动生成的方法存根 92 // TODO 自动生成的方法存根 93 Thread thread=Thread.currentThread(); 94 System.out.println("Channel:"+channel.getChannelNumber() 95 +" ThreadId is:"+thread.getId() 96 +" ConsumerTag:"+message.getMessageProperties().getConsumerTag() 97 +" Queue:"+message.getMessageProperties().getConsumerQueue()); 98 99 } 100 101 }); 102 return container; 103 } 104 }
运行结果:
观察运行结果能够看到:每一个 channel 都在固定的线程中运行,一个 channel 会向不一样的 consumer 发送队列信息。了解 channel、thread、queue、consumer 之间的关系,会对 SimpleMessageListenerContainer 有更深刻认识。
5.4 DirectMessageListenerContainer
SimpleMessageListenerContainer 是经典的容器,使用 channel 共享,一旦某个 channel 关闭或重启,意味着每一个队列 queue 中使用当前 channel 的 consumer 都会受到影响。 有见及此,在 RabbitMQ 2.0 后,系统引入了 DirectMessageListenerContainer ,它容许每一个 consumer 都有各自的对应的 channel 的,channel 只管理负责管理当前 consumer 的通道。这样令 consumer 运用更灵活,同时线程并无跟 channel 绑定,而是由独立的线程池进行管理,这是更好地解决了 SimpleMessageListenerContainer 中上下文切换所带来的资源消耗问题。
下面的例子,咱们尝试使用把 consumersPerQueue 设置为 4,并同时监控 direct 模式 exchange 的 direct.first,direct.second 队列。
从管理界面能够看到,系统会为每一个 consumer 都生成一个独立的 channel 进行管理。
Producer 端代码
1 @Configuration 2 public class BindingConfig { 3 public final static String first="direct.first"; 4 public final static String second="direct.second"; 5 public final static String Exchange_NAME="directExchange"; 6 public final static String RoutingKey1="directKey1"; 7 public final static String RoutingKey2="directKey2"; 8 9 @Bean 10 public Queue queueFirst(){ 11 return new Queue(first); 12 } 13 14 @Bean 15 public Queue queueSecond(){ 16 return new Queue(second); 17 } 18 19 @Bean 20 public DirectExchange directExchange(){ 21 return new DirectExchange(Exchange_NAME); 22 } 23 24 //利用BindingBuilder绑定Direct与queueFirst 25 @Bean 26 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 27 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 28 } 29 30 //利用BindingBuilder绑定Direct与queueSecond 31 @Bean 32 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 33 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 34 } 35 } 36 37 @Configuration 38 public class ConnectionConfig { 39 @Value("${spring.rabbitmq.host}") 40 public String host; 41 42 @Value("${spring.rabbitmq.port}") 43 public int port; 44 45 @Value("${spring.rabbitmq.username}") 46 public String username; 47 48 @Value("${spring.rabbitmq.password}") 49 public String password; 50 51 @Value("${spring.rabbitmq.virtual-host}") 52 public String virtualHost; 53 54 @Bean 55 public ConnectionFactory getConnectionFactory(){ 56 CachingConnectionFactory factory=new CachingConnectionFactory(); 57 factory.setHost(host); 58 factory.setPort(port); 59 factory.setUsername(username); 60 factory.setPassword(password); 61 factory.setVirtualHost(virtualHost); 62 return factory; 63 } 64 } 65 66 @Controller 67 @RequestMapping("/producer") 68 public class ProducerController { 69 @Autowired 70 private RabbitTemplate template; 71 72 @RequestMapping("/send") 73 public void send(HttpServletResponse response) throws InterruptedException, IOException{ 74 for(Integer n=0;n<100;n++){ 75 CorrelationData correlationData=getCorrelationData(); 76 template.convertAndSend("directExchange","directKey1", 77 " queue1"+" "+n.toString(),correlationData); 78 template.convertAndSend("directExchange","directKey2", 79 "queue2"+" "+n.toString(),correlationData); 80 Thread.currentThread().sleep(30); 81 } 82 } 83 84 private CorrelationData getCorrelationData(){ 85 return new CorrelationData(UUID.randomUUID().toString()); 86 } 87 }
Consumer 端代码
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 public class BindingConfig { 32 public final static String first="direct.first"; 33 public final static String second="direct.second"; 34 public final static String Exchange_NAME="directExchange"; 35 public final static String RoutingKey1="directKey1"; 36 public final static String RoutingKey2="directKey2"; 37 38 @Bean 39 public Queue queueFirst(){ 40 return new Queue(first); 41 } 42 43 @Bean 44 public Queue queueSecond(){ 45 return new Queue(second); 46 } 47 48 @Bean 49 public DirectExchange directExchange(){ 50 return new DirectExchange(Exchange_NAME); 51 } 52 53 //利用BindingBuilder绑定Direct与queueFirst 54 @Bean 55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 57 } 58 59 //利用BindingBuilder绑定Direct与queueSecond 60 @Bean 61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 63 } 64 } 65 66 @Configuration 67 public class DirectMessListener { 68 @Autowired 69 private ConnectionConfig connectionConfig; 70 @Autowired 71 private RabbitTemplate template; 72 private int index=0; 73 74 @Bean 75 public DirectMessageListenerContainer messageContainer(){ 76 DirectMessageListenerContainer container=new DirectMessageListenerContainer(); 77 container.setConnectionFactory(connectionConfig.getConnectionFactory()); 78 // 设置每一个队列的 consumer 数量 79 container.setConsumersPerQueue(4); 80 container.addQueueNames("direct.first"); 81 container.addQueueNames("direct.second"); 82 container.setConsumerTagStrategy(queue -> "consumer"+(++index)); 83 container.setMessageListener(new ChannelAwareMessageListener(){ 84 @Override 85 public void onMessage(Message message, com.rabbitmq.client.Channel channel) 86 throws Exception { 87 // TODO 自动生成的方法存根 88 // TODO 自动生成的方法存根 89 Thread thread=Thread.currentThread(); 90 91 System.out.println("Channel:"+channel.getChannelNumber() 92 +" ThreadId is:"+thread.getId() 93 +" ConsumerTag:"+message.getMessageProperties().getConsumerTag() 94 +" Queue:"+message.getMessageProperties().getConsumerQueue()); 95 } 96 }); 97 return container; 98 } 99 }
经过运行结果进一步能够证明,consumer 信息接收是由独立的线程池进行管理的,并无与 channel 绑定,每一个 consumer 都有本身单独的 channel,即便 channel 发生问题时,也不会对其余的 consumer 发生影响,这正是 DirectMessageListenerContainer 的优胜之处。
5.5 Consumer 的信息接收确认方式
在第四节曾经介绍过在 Producer 端利用 ConfirmCallback / ReturnCallback 监控信息的发送,在这节将为你们在 Consumer 端监控信息的接收。
Consumer 的信息接收确认模式能够经过 AcknowledgeMode 设定,一共有三种模式:NONE、MANUAL、AUTO,默认是 AUTO 模式。其中 NONE 为系统确认,MANUAL 是手动确认。
而 AUTO 为自动模式,系统能够根据执行状况自动发送 ack / nack。若是方法未抛出异常,则发送 ack。若是抛出异常 AmqpRejectAndDontRequeueException 顾名思义消息被拒绝且不会从新加入队列。若是方法抛出非 AmqpRejectAndDontRequeueException 异常,则系统发送 nack 消息重归队列。
Channel 消息接收的经常使用方法
方法 | 说明 |
void basicAck(long deliveryTag, boolean multiple) | deliveryTag 为该消息的标识,multiple 为 true 表明批量确认同一批次的信息接收成功,为 false 时表明单独断定某个消息接收成功。 |
void basicReject(long deliveryTag, boolean requeue) | deliveryTag 为该消息的标识,requeue 为 true时,被拒绝的消息会从新进入队列进行推送,为false时消息将再也不进入队列 |
void basicNack(long deliveryTag, boolean multiple, boolean requeue) | deliveryTag 为该消息的标识,multiple 为 true 表明批量确认同一批次的信息接收失败,为 false 时表明单独断定某个消息接收失败。requeue 为 true时,消息会从新进入队列进行推送,为false时消息将再也不进入队列 |
AcknowledgeMode 配置为 MANUAL 后,用户可经过 Channel 类的 void basicAck(long deliveryTag, boolean multiple) 方法手动确认消息接收是否成功。
若检测到有异常,可经过void basicReject(long deliveryTag, boolean requeue) 或 void basicNack(long deliveryTag, boolean multiple, boolean requeue) 确认是否从新把消息推送。
经过配置 prefetchCount 可设置 consumer 每次接收到的信息数量,系统默认值为 250,这表示当 consumer 队列接收到 250 请求其状态皆为 unacked 时,broker server 将暂停向 consumer 发送消息,待消息处理后再继续。
下面例子中咱们尝试把 prefetchCount 设置为 10,即每一个 consumer 单次最多接收到的消息为 10 条,并把 consumersPerQueue 设置为 4,而后把 AcknowledgeMode 设置为 MANUAL,经过手动确认消息接收,一旦发生错误,消息从新加入队列。
Producer 端代码
1 @Configuration 2 public class BindingConfig { 3 public final static String first="direct.first"; 4 public final static String second="direct.second"; 5 public final static String Exchange_NAME="directExchange"; 6 public final static String RoutingKey1="directKey1"; 7 public final static String RoutingKey2="directKey2"; 8 9 @Bean 10 public Queue queueFirst(){ 11 return new Queue(first); 12 } 13 14 @Bean 15 public Queue queueSecond(){ 16 return new Queue(second); 17 } 18 19 @Bean 20 public DirectExchange directExchange(){ 21 return new DirectExchange(Exchange_NAME); 22 } 23 24 //利用BindingBuilder绑定Direct与queueFirst 25 @Bean 26 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 27 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 28 } 29 30 //利用BindingBuilder绑定Direct与queueSecond 31 @Bean 32 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 33 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 34 } 35 } 36 37 @Configuration 38 public class ConnectionConfig { 39 @Value("${spring.rabbitmq.host}") 40 public String host; 41 42 @Value("${spring.rabbitmq.port}") 43 public int port; 44 45 @Value("${spring.rabbitmq.username}") 46 public String username; 47 48 @Value("${spring.rabbitmq.password}") 49 public String password; 50 51 @Value("${spring.rabbitmq.virtual-host}") 52 public String virtualHost; 53 54 @Bean 55 public ConnectionFactory getConnectionFactory(){ 56 CachingConnectionFactory factory=new CachingConnectionFactory(); 57 factory.setHost(host); 58 factory.setPort(port); 59 factory.setUsername(username); 60 factory.setPassword(password); 61 factory.setVirtualHost(virtualHost); 62 return factory; 63 } 64 } 65 66 @Controller 67 @RequestMapping("/producer") 68 public class ProducerController { 69 @Autowired 70 private RabbitTemplate template; 71 72 @RequestMapping("/send") 73 public void send(HttpServletResponse response) throws InterruptedException, IOException{ 74 for(Integer n=0;n<100;n++){ 75 CorrelationData correlationData=getCorrelationData(); 76 template.convertAndSend("directExchange","directKey1", 77 " queue1"+" "+n.toString(),correlationData); 78 template.convertAndSend("directExchange","directKey2", 79 "queue2"+" "+n.toString(),correlationData); 80 } 81 } 82 83 private CorrelationData getCorrelationData(){ 84 return new CorrelationData(UUID.randomUUID().toString()); 85 } 86 }
运行后可看到 Broker Server 每条 queue 会有 100 条数据处于待处理状态
Consumer 端代码
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 public class BindingConfig { 32 public final static String first="direct.first"; 33 public final static String second="direct.second"; 34 public final static String Exchange_NAME="directExchange"; 35 public final static String RoutingKey1="directKey1"; 36 public final static String RoutingKey2="directKey2"; 37 38 @Bean 39 public Queue queueFirst(){ 40 return new Queue(first); 41 } 42 43 @Bean 44 public Queue queueSecond(){ 45 return new Queue(second); 46 } 47 48 @Bean 49 public DirectExchange directExchange(){ 50 return new DirectExchange(Exchange_NAME); 51 } 52 53 //利用BindingBuilder绑定Direct与queueFirst 54 @Bean 55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 57 } 58 59 //利用BindingBuilder绑定Direct与queueSecond 60 @Bean 61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 63 } 64 } 65 66 @Configuration 67 public class DirectMessListener { 68 @Autowired 69 private ConnectionConfig connectionConfig; 70 @Autowired 71 private RabbitTemplate template; 72 private int index=0; 73 74 @Bean 75 public DirectMessageListenerContainer messageContainer(){ 76 DirectMessageListenerContainer container=new DirectMessageListenerContainer(); 77 container.setConnectionFactory(connectionConfig.getConnectionFactory()); 78 // 设置每一个队列的 consumer 数量 79 container.setConsumersPerQueue(4); 80 // 设置每一个 consumer 每次的接收的消息数量为10个 81 container.setPrefetchCount(10); 82 // 使用MANUAL进行手动确认 83 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 84 container.addQueueNames("direct.first"); 85 container.addQueueNames("direct.second"); 86 container.setConsumerTagStrategy(queue -> "consumer"+(++index)); 87 container.setMessageListener(new ChannelAwareMessageListener(){ 88 @Override 89 public void onMessage(Message message, com.rabbitmq.client.Channel channel) 90 throws Exception { 91 Thread thread=Thread.currentThread(); 92 MessageProperties prop=message.getMessageProperties(); 93 try{ 94 System.out.println("Channel:"+channel.getChannelNumber() 95 +" ThreadId is:"+thread.getId() 96 +" ConsumerTag:"+prop.getConsumerTag() 97 +" Queue:"+prop.getConsumerQueue()); 98 //经过Tag单个确认 99 channel.basicAck(prop.getDeliveryTag(), false); 100 }catch(Exception ex){ 101 //断定单个接收失败,从新加入consumer队列 102 channel.basicReject(prop.getDeliveryTag(), true); 103 } 104 thread.sleep(1000); 105 } 106 }); 107 return container; 108 } 109 }
观察信息接收状况,每一个 consumer 一次可处理10条信息,对队列进行分批处理。
6、死信队列
死信队列(Dead-Letter-Exchange) 可被看做是死信交换器。当消息在一个队列中变成死信后,它能被从新被发送到特定的交换器中,这个交换器就是DLX ,绑定DLX 的队列就称之为死信队列。消息变成死信通常是因为如下几种状况:
其实死信队列 DLX 也是一个正常的交换器,和通常的交换器没有什么区别,咱们能够用通常创建队列的方法,创建一个死信队列。而后创建一个正常的队列,在正常队列中加入参数 x-dead-letter-exchange、x-dead-letter-routing-key 与死信队列进行绑定,完成绑定后在管理界面 Features 选项中 direct.queue.first 会显示 DLX DLK。这时当被绑定的队列出现超时,超长,或被拒绝时(注意requeue被设置为false时,对会激发死信),信息就会流入死信队列被处理。
具体的例子Producer端:
1 @Configuration 2 public class BindingConfig { 3 public final static String Queue_First="direct.queue.first"; 4 public final static String Exchange_Name="directExchange"; 5 public final static String Routing_Key_First="directKey1"; 6 7 @Bean 8 public Queue queueFirst(){ 9 return new Queue(this.Queue_First); 10 } 11 12 @Bean 13 public DirectExchange directExchange(){ 14 return new DirectExchange(this.Exchange_Name); 15 } 16 17 @Bean 18 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 19 return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First); 20 } 21 } 22 23 @Configuration 24 public class ConnectionConfig { 25 @Value("${spring.rabbitmq.host}") 26 public String host; 27 28 @Value("${spring.rabbitmq.port}") 29 public int port; 30 31 @Value("${spring.rabbitmq.username}") 32 public String username; 33 34 @Value("${spring.rabbitmq.password}") 35 public String password; 36 37 @Value("${spring.rabbitmq.virtual-host}") 38 public String virtualHost; 39 40 @Bean 41 public ConnectionFactory getConnectionFactory(){ 42 CachingConnectionFactory factory=new CachingConnectionFactory(); 43 System.out.println(host); 44 factory.setHost(host); 45 factory.setPort(port); 46 factory.setUsername(username); 47 factory.setPassword(password); 48 factory.setVirtualHost(virtualHost); 49 return factory; 50 } 51 } 52 53 @Controller 54 @RequestMapping("/producer") 55 public class ProducerController { 56 @Autowired 57 private RabbitTemplate template; 58 59 @RequestMapping("/send") 60 public void send() { 61 for(int n=0;n<10;n++){ 62 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"Hello World! " 63 +String.valueOf(n),getCorrelationData()); 64 } 65 } 66 67 private CorrelationData getCorrelationData(){ 68 return new CorrelationData(UUID.randomUUID().toString()); 69 } 70 }
Customer 端
1 @Configuration 2 public class BindingConfig { 3 //普通队列参数 4 public final static String Queue_First="direct.queue.first"; 5 public final static String Exchange_Name="directExchange"; 6 public final static String Routing_Key_First="directKey1"; 7 //死信队列参数 8 public final static String Queue_Dead="direct.queue.dead"; 9 public final static String Exchange_Dead="directDead"; 10 public final static String Routing_Key_Dead="directDeadKey"; 11 12 @Bean 13 public Queue queueFirst(){ 14 Map<String, Object> args=new HashMap<String,Object>(); 15 //声明当前死信的 Exchange 16 args.put("x-dead-letter-exchange", this.Exchange_Dead); 17 //声明当前队列的死信路由key 18 args.put("x-dead-letter-routing-key", this.Routing_Key_Dead); 19 //把死信队列的参数绑定到当前队列中 20 return QueueBuilder.durable(Queue_First).withArguments(args).build(); 21 } 22 23 @Bean 24 public DirectExchange directExchange(){ 25 return new DirectExchange(this.Exchange_Name); 26 } 27 28 @Bean 29 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 30 return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First); 31 } 32 33 @Bean 34 public Queue queueDead(){ 35 return new Queue(this.Queue_Dead); 36 } 37 38 @Bean 39 public DirectExchange directExchangeDead(){ 40 return new DirectExchange(this.Exchange_Dead); 41 } 42 43 @Bean 44 public Binding bindingExchangeDead(Queue queueDead,DirectExchange directExchangeDead){ 45 return BindingBuilder.bind(queueDead).to(directExchangeDead).with(this.Routing_Key_Dead); 46 } 47 } 48 49 @Configuration 50 public class ConnectionConfig { 51 @Value("${spring.rabbitmq.host}") 52 public String host; 53 54 @Value("${spring.rabbitmq.port}") 55 public int port; 56 57 @Value("${spring.rabbitmq.username}") 58 public String username; 59 60 @Value("${spring.rabbitmq.password}") 61 public String password; 62 63 @Value("${spring.rabbitmq.virtual-host}") 64 public String virtualHost; 65 66 @Bean 67 public ConnectionFactory getConnectionFactory(){ 68 CachingConnectionFactory factory=new CachingConnectionFactory(); 69 factory.setHost(host); 70 factory.setPort(port); 71 factory.setUsername(username); 72 factory.setPassword(password); 73 factory.setVirtualHost(virtualHost); 74 return factory; 75 } 76 } 77 78 @Configuration 79 public class DirectMessListener { 80 @Autowired 81 private ConnectionConfig connectionConfig; 82 @Autowired 83 private RabbitTemplate template; 84 private int index=0,normalIndex=0,deadIndex=0; 85 86 @Bean 87 public DirectMessageListenerContainer messageContainer(){ 88 DirectMessageListenerContainer container=new DirectMessageListenerContainer(); 89 container.setConnectionFactory(connectionConfig.getConnectionFactory()); 90 // 设置每一个队列的 consumer 数量 91 container.setConsumersPerQueue(4); 92 // 设置每一个 consumer 每次的接收的消息数量 93 container.setPrefetchCount(10); 94 // 使用MANUAL手动确认 95 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 96 // 监听队列 97 container.addQueueNames(BindingConfig.Queue_First); 98 container.addQueueNames(BindingConfig.Queue_Dead); 99 container.setConsumerTagStrategy(queue -> "consumer"+(++index)); 100 101 container.setMessageListener(new ChannelAwareMessageListener(){ 102 @Override 103 public void onMessage(Message message, com.rabbitmq.client.Channel channel) 104 throws Exception { 105 MessageProperties prop=message.getMessageProperties(); 106 if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_First)){ 107 System.out.println("This is a normal queue! "+(++normalIndex)); 108 //把当前的队列转送到死信队列中 109 channel.basicReject(prop.getDeliveryTag(), false); 110 } 111 if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_Dead)){ 112 System.out.println("This is a dead queue! "+(++deadIndex)); 113 //模拟对死信队列处理 114 Thread.currentThread().sleep(5000); 115 ....... 116 //处理完毕 117 channel.basicAck(prop.getDeliveryTag(), false); 118 } 119 120 } 121 }); 122 return container; 123 } 124 }
经过管理界面能够看,信息会先发送到 direct.queue.first,而后被放进死信队列做处理。
运行结果
死信队列最经常使用的场景能够在订单支付,流程审批等环节。例如在 京*、淘* 等平台,当下单成功后,客户要在必定的时间内完成支付操做,不然订单被视做无效,这些业务流程就可使用死信队列来处理。
7、持久化操做
RabbitMq 的持久化操做包含有 Queue 持久化、Message 持久化和 Exchange 持久化三类。
7.1 Queue 的持久化
队列持久化只须要在 Queue 的构造函数 public Queue(String name, boolean durable) 把 durable 参数置为 true 就可实现。若是队列不设置持久化( (durable 默认为 false), 那么在RabbitMQ 服务重启以后,相关队列的元数据会丢失,此时数据也会丢失。
7.2 Message 持久化
设置了Queue 持久化之后,当 RabbitMQ 服务重启以后,队列依然存在,但消息已经消失,可见单单设置队列的持久化而不设置消息持久化显得毫无心义,因此一般列队持久化会与消息持久化共同使用。
在 RabbitMQ 原生态的框架下,须要把信息属性设置为 MessageProperties.PERSISTENT TEXT PLAIN 才会实现消息的持久化。
而在 Spring 框架下,因为在使用回调函数时须要把 Message 从新返回队列再进行处理,因此 Message 默认已是持久化的。
7.3 Exchage 的持久化
交换器持久化可经过构造函数 public DirectExchange(String name, boolean durable, boolean autoDelete) 把 durable 参数置为 true 就可实现,而 autoDelete 则是指在所在消费者都解除订阅的状况下自动删除。若是交换器不设置持久化,那么在 RabbitMQ 服务重启以后,相关的交换器元数据会丢失,不过消息不会丢失,只是消息再也不发送到该 Exchange 。对一个长期使用的交换器来讲,持久化仍是有其必要性的。
本章总结
RabbitMQ 发展至今,被愈来愈多的人承认,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是密不可分的。
相比于传统的 ActiveMQ 和分布式 Kafka,它具备本身独有的特色。
但愿文章有帮于你们对 RabbitMQ 消息队列方面有更深刻的了解,在不一样的开发环境中灵活运用。
因为时间仓促,文章当中有不明确的地方或有错漏敬请点明。
对 JAVA 开发有兴趣的朋友欢迎加入QQ群:174850571 共同探讨!
对 .NET 开发有兴趣的朋友欢迎加入QQ群:230564952 共同探讨 !
相关文章
深刻剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议
做者:风尘浪子
http://www.javashuo.com/article/p-pfjqdpop-cs.html
原创做品,转载时请注明做者及出处