一条消费成功被消费经历了生产者->MQ->消费者,所以在这三个步骤中都有可能形成消息丢失。java
AMQP
协议提供了事务机制,在投递消息时开启事务支持,若是消息投递失败,则回滚事务。spring
自定义事务管理器缓存
@Configuration public class RabbitTranscation { @Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){ return new RabbitTransactionManager(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ return new RabbitTemplate(connectionFactory); } }
修改ymlide
spring: rabbitmq: # 消息在未被队列收到的状况下返回 publisher-returns: true
开启事务支持性能
rabbitTemplate.setChannelTransacted(true);
消息未接收时调用ReturnCallbackfetch
rabbitTemplate.setMandatory(true);
生产者投递消息this
@Service public class ProviderTranscation implements RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ // 设置channel开启事务 rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("这条消息发送失败了"+message+",请处理"); } @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager") public void publishMessage(String message) throws Exception { rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatrip",message); } }
可是,不多有人这么干,由于这是同步操做,一条消息发送以后会使发送端阻塞,以等待RabbitMQ-Server的回应,以后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大下降。code
发送消息时将信道设置为confirm
模式,消息进入该信道后,都会被指派给一个惟一ID,一旦消息被投递到所匹配的队列后,RabbitMQ
就会发送给生产者一个确认。rabbitmq
开启消息确认机制队列
spring: rabbitmq: # 消息在未被队列收到的状况下返回 publisher-returns: true # 开启消息确认机制 publisher-confirm-type: correlated
消息未接收时调用ReturnCallback
rabbitTemplate.setMandatory(true);
生产者投递消息
@Service public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("确认了这条消息:"+correlationData); }else{ System.out.println("确认失败了:"+correlationData+";出现异常:"+cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("这条消息发送失败了"+message+",请处理"); } public void publisMessage(String message){ rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatrip",message); } }
若是消息确认失败后,咱们能够进行消息补偿,也就是消息的重试机制。当未收到确认信息时进行消息的从新投递。设置以下配置便可完成。
spring: rabbitmq: # 支持消息发送失败后重返队列 publisher-returns: true # 开启消息确认机制 publisher-confirm-type: correlated listener: simple: retry: # 开启重试 enabled: true # 最大重试次数 max-attempts: 5 # 重试时间间隔 initial-interval: 3000
消息在MQ中有可能发生丢失,这时候咱们就须要将队列和消息都进行持久化。
@Queue注解为咱们提供了队列相关的一些属性,具体以下:
持久化队列
建立队列的时候将持久化属性durable设置为true,同时要将autoDelete设置为false
@Queue(value = "javatrip",durable = "false",autoDelete = "false")
持久化消息
发送消息的时候将消息的deliveryMode设置为2,在Spring Boot中消息默认就是持久化的。
消费者刚消费了消息,尚未处理业务,结果发生异常。这时候就须要关闭自动确认,改成手动确认消息。
修改yml为手动签收模式
spring: rabbitmq: listener: simple: # 手动签收模式 acknowledge-mode: manual # 每次签收一条消息 prefetch: 1
消费者手动签收
@Component @RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true")) public class Consumer { @RabbitHandler public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{ System.out.println(message); // 惟一的消息ID Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 确认该条消息 if(...){ channel.basicAck(deliverTag,false); }else{ // 消费失败,消息重返队列 channel.basicNack(deliverTag,false,true); } } }
生产者、MQ、消费者都有可能形成消息丢失