前情提要:rabbitmq 管理界面查看姿式java
1、快速搭建/基本信息发送和消费git
一、引入依赖web
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二、application.ymlspring
spring: rabbitmq: host: ipXXX port: 5672 username: 帐户XXX password: 密码XXX virtual-host: /wen # 交换器名称
以 direct模式为例数据库
一、配置文件 import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class RabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue emailQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前链接有效 // exclusive:默认也是false,只能被当前建立的链接使用,并且当链接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //通常设置一下队列的持久化就好,其他两个就是默认false return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前链接有效 // exclusive:默认也是false,只能被当前建立的链接使用,并且当链接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //通常设置一下队列的持久化就好,其他两个就是默认false return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前链接有效 // exclusive:默认也是false,只能被当前建立的链接使用,并且当链接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //通常设置一下队列的持久化就好,其他两个就是默认false return new Queue("weixin.fanout.queue", true); } @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(16); map.put("x-message-ttl", 30000); // 队列中的消息未被消费则30秒后过时 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public DirectExchange TTLExchange() { return new DirectExchange("TTL_EXCHANGE", true, false); } //Direct交换机 起名:TestDirectExchange @Bean public DirectExchange fanoutOrderExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("fanout_exchange", true, false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean public Binding bindingDirect() { return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL"); } @Bean public Binding bindingDirect1() { return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with(""); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with(""); } @Bean public Binding bindingDirect3() { return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with(""); } } 二、生产者 package com.pit.barberShop.common.MQ.Rabbit.fanout; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author :wenye * @date :Created in 2021/6/15 21:41 * @description:广播模式 * @version: $ */ @RestController @RequestMapping("/rabbitmq") public class ProducerFanout { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定义交换机 private String exchangeName = "fanout_exchange"; // 2: 路由key private String routeKey = ""; @RequestMapping("/fanout") public void markerFanout() { String message ="shua"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, routeKey, message); } @RequestMapping("/ttl") public String testTTL() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("20000"); // 设置过时时间,单位:毫秒 byte[] msgBytes = "测试消息自动过时".getBytes(); Message message = new Message(msgBytes, messageProperties); rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message); return "ok"; } } 三、消费者 package com.pit.barberShop.common.MQ.Rabbit.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * @author :wenye * @date :Created in 2021/6/15 22:07 * @description:fanout消费者 * @version: $ */ @Component public class ConsumerFanout { @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你能够自定随便定义。 value = @Queue(value = "sms.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_exchange", // 这里是肯定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.DIRECT) )) public void messagerevice(String message){ // 此处省略发邮件的逻辑 System.out.println("sms-two111------------->" + message); } @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你能够自定随便定义。 value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_exchange", // 这里是肯定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.DIRECT) )) public void messageWXrevice(String message){ // 此处省略发邮件的逻辑 System.out.println("weixin----two---------->" + message); } }
2、过时时间缓存
一、生产者发送消息时设置过时时间 public String testTTL() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("20000"); // 设置过时时间,单位:毫秒 byte[] msgBytes = "测试消息自动过时".getBytes(); Message message = new Message(msgBytes, messageProperties); rabbitTemplate.convertAndSend("TTL_EXCHANGE", "", message); return "ok"; } 二、队列中的全部消息设置过时时间 配置中添加 @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 30000); // 队列中的消息未被消费则30秒后过时 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 30000); // 队列中的消息未被消费则30秒后过时 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public DirectExchange TTLExchange() { return new DirectExchange("TTL_EXCHANGE", true, false); } @Bean public Binding bindingDirect() { return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL"); }
3、消息确认机制配置
参考:https://blog.csdn.net/qq33098...
默认是自动应答app
spring: rabbitmq: # 开启发送确认 publisher-confirms: true # 开启发送失败退回 publisher-returns: true
目前回调存在ConfirmCallback和ReturnCallback二者。他们的区别在于
若是消息没有到exchange,则ConfirmCallback回调,ack=false,
若是消息到达exchange,则ConfirmCallback回调,ack=true
exchange到queue成功,则不回调ReturnCallback
rabbitMQ 消息生产者发送消息的流程less
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { /** * correlationData:对象内部只有一个 id 属性,用来表示当前消息的惟一性。 * ack:消息投递到broker 的状态,true表示成功。 * cause:表示投递失败的缘由。 **/ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause){ if (!ack) { log.error("消息发送异常!"); } else { log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause); } } } import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class ReturnCallbackService implements RabbitTemplate.ReturnCallback { //重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey路由(队列) @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); } } 配置文件 一、防止重复签发ack须要在配置类中重写 @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); //此处也设置为手动ack factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } 二、从新建立设置交换器和队列属性 @Bean public Queue chongfuQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前链接有效 // exclusive:默认也是false,只能被当前建立的链接使用,并且当链接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //通常设置一下队列的持久化就好,其他两个就是默认false return new Queue("chongfu.fanout.queue", true); } //Direct交换机 起名:TestDirectExchange @Bean public DirectExchange chongfuExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("chongfu_exchange", true, false); } @Bean public Binding bindingDirect4() { return BindingBuilder.bind(chongfuQueue()).to(chongfuExchange()).with(""); } 生产者 public void markerchongfu() { /** * 确保消息发送失败后能够从新返回到队列中 * 注意:yml须要配置 publisher-returns: true */ rabbitTemplate.setMandatory(true); /** * 消费者确认收到消息后,手动ack回执回调处理 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 消息投递到队列失败回调处理 */ rabbitTemplate.setReturnCallback(returnCallbackService); /** * 发送消息 */ String s = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("chongfu_exchange", routeKey, "帅哥", message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(s)); } 消费者 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你能够自定随便定义。 value = @Queue(value = "chongfu.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "chongfu_exchange", // 这里是肯定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.DIRECT) )) public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("小富收到消息:{}", msg); // log.info("序号:{}", message.getMessageProperties().getDeliveryTag()); // System.out.println(msg); //TODO 具体业务 // 收到消息 basicAck() channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("消息已重复处理失败,拒绝再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息 } else { log.error("消息即将再次返回队列处理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
消费消息有三种回执方法
一、basicAckdom
basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
void basicAck(long deliveryTag, boolean multiple)
分布式
举个栗子: 假设我先发送三条消息deliveryTag分别是五、六、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将五、六、七、8的消息所有进行确认。
二、basicNack
basicNack :表示失败确认,通常在消费消息业务异常时用到此方法,能够将消息从新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
三、basicReject
basicReject:拒绝消息,与basicNack区别在于不能进行批量操做,其余用法很类似。
void basicReject(long deliveryTag, boolean requeue)
4、死信队列
死信队列其实和普通的队列没啥大的区别,都须要建立本身的Queue、Exchange,而后经过RoutingKey绑定到Exchange上去,只不过死信队列的RoutingKey和Exchange要做为参数,绑定到正常的队列上去,一种应用场景是正常队列里面的消息被basicNack或者reject时,消息就会被路由到正常队列绑定的死信队列中,还有一种还有经常使用的场景就是开启了自动签收,而后消费者消费消息时出现异常,超过了重试次数,那么这条消息也会进入死信队列,若是配置了话,
例子
//模拟异经常使用的交换器 ,topic交换器会通配符匹配,固然字符串如出一辙也会匹配 @Bean TopicExchange emailExchange() { return new TopicExchange("demoTopicExchange"); } //死信队列 @Bean public Queue deadLetterQueue() { return new Queue("demo.dead.letter"); } //死信交换器 @Bean TopicExchange deadLetterExchange() { return new TopicExchange("demoDeadLetterTopicExchange"); } //绑定死信队列 @Bean Binding bindingDeadLetterQueue() { return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("demo.dead.letter"); } 生产者 @RequestMapping("/sixin") public void sendEmailMessage() { CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("demoTopicExchange","demo.email","11",correlationData); log.info("---发送 email 消息---{}---messageId---{}","111",correlationData.getId()); } 消费者 /** * 邮件消费者 * @param message * @param channel * @throws IOException */ @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你能够自定随便定义。 value = @Queue(value = "demo.email",autoDelete = "false", arguments = { @Argument(name = "x-dead-letter-exchange", value = "demoDeadLetterTopicExchange"), @Argument(name = "x-dead-letter-routing-key",value = "demo.dead.letter"), @Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") }), key = "demo.email", // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "demoTopicExchange", // 这里是肯定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.TOPIC) )) public void handleEmailMessage(Message message, Channel channel,String msg) throws IOException { try { log.info("---接受到消息---{}",msg); //主动异常 int m=1/0; //手动签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { //异常,ture 从新入队,或者false,进入死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } } /** * 死信消费者,自动签收开启状态下,超太重试次数,或者手动签收,reject或者Nack * @param message */ @RabbitListener(queues = "demo.dead.letter") public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException { //能够考虑数据库记录,每次进来查数量,达到必定的数量,进行预警,人工介入处理 log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation")); //回复ack channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
一样也可以使用java类配置
@Bean public Queue emailQueue() { Map<String, Object> arguments = new HashMap<>(2); // 绑定死信交换机 arguments.put("x-dead-letter-exchange", "demoDeadLetterTopicExchange"); // 绑定死信的路由key arguments.put("x-dead-letter-routing-key", "demo.dead.letter"); arguments.put("x-message-ttl", 3000); return new Queue(emailQueue,true,false,false,arguments); } @Bean TopicExchange emailExchange() { return new TopicExchange(topicExchange); } @Bean Binding bindingEmailQueue() { return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#"); }
5、持久化机制和内存磁盘监控
一、持久化
RabbitMQ的持久化队列分为:
1:队列持久化
2:消息持久化
3:交换机持久化
不管是持久化的消息仍是非持久化的消息均可以写入到磁盘中,只不过非持久的是等内存不足的状况下才会被写入到磁盘中。
二、内存磁盘监控
6、分布式事务
7、配置详解
rabbitmq: addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client链接到的server的地址,多个以逗号分隔(优先取addresses,而后再取host) # port: ##集群配置 addresses之间用逗号隔开 # addresses: ip:port,ip:port password: admin username: 123456 virtual-host: / # 链接到rabbitMQ的vhost requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s publisher-confirms: #是否启用 发布确认 publisher-reurns: # 是否启用发布返回 connection-timeout: #链接超时,单位毫秒,0表示无穷大,不超时 cache: channel.size: # 缓存中保持的channel数量 channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;若是为0,则老是建立一个新channel connection.size: # 缓存的链接数,只有是CONNECTION模式时生效 connection.mode: # 链接工厂缓存模式:CHANNEL 和 CONNECTION listener: simple.auto-startup: # 是否启动时自动启动容器 simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto simple.concurrency: # 最小的消费者数量 simple.max-concurrency: # 最大的消费者数量 simple.prefetch: # 指定一个请求能处理多少个消息,若是有事务的话,必须大于等于transaction数量. simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量. simple.default-requeue-rejected: # 决定被拒绝的消息是否从新入队;默认是true(与参数acknowledge-mode有关系) simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒 simple.retry.enabled: # 监听重试是否可用 simple.retry.max-attempts: # 最大重试次数 simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔 simple.retry.multiplier: # 应用于上一重试间隔的乘数 simple.retry.max-interval: # 最大重试时间间隔 simple.retry.stateless: # 重试是有状态or无状态 template: mandatory: # 启用强制信息;默认false receive-timeout: # receive() 操做的超时时间 reply-timeout: # sendAndReceive() 操做的超时时间 retry.enabled: # 发送重试是否可用 retry.max-attempts: # 最大重试次数 retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔 retry.multiplier: # 应用于上一重试间隔的乘数 retry.max-interval: #最大重试时间间隔