RabbitMQ如何解决各类状况下丢数据的问题

1.生产者丢数据spring

若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。api

 
生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。 transaction机制就是说,发送消息前,开启事物(channel.txSelect()),而后发送消息,若是发送过程当中出现什么异常,事物就会回滚(channel.txRollback()),若是发送成功则提交事 物(channel.txCommit())。 然而缺点就是吞吐量降低了。所以,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,全部在该信道上面发布的消息都将会被指派一个惟一的ID(从1开始),一旦 消息被投递到全部匹配的队列以后,rabbitMQ就会发送一个Ack给生产者(包含消息的惟一ID),这就使得生产者知道消息已经正确到达目的队列了.若是rabiitMQ没能处理该消息,则会发送一个N ack消息给你,你能够进行重试操做。

下面演示一下confirm模式:服务器

 
//测试确认后回调 @Service public class HelloSender1 implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String context = "你好如今是 " + new Date() +""; System.out.println("HelloSender发送内容 : " + context); this.rabbitTemplate.setConfirmCallback(this); //exchange,queue 都正确,confirm被回调, ack=true //this.rabbitTemplate.convertAndSend("exchange","topic.message", context); //exchange 错误,queue 正确,confirm被回调, ack=false //this.rabbitTemplate.convertAndSend("fasss","topic.message", context); //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE //this.rabbitTemplate.convertAndSend("exchange","", context); //exchange 错误,queue 错误,confirm被回调, ack=false this.rabbitTemplate.convertAndSend("fasss","fass", context); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause); } }

2.消息队列丢数据分布式

 
处理消息队列丢数据的状况,通常是开启持久化磁盘的配置。这个持久化配置能够和confirm机制配合使用,你能够在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,若是消息持久化磁盘 以前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。 那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步 ①、将queue的持久化标识durable设置为true,则表明是一个持久的队列 ②、发送消息的时候将deliveryMode=2 这样设置之后,rabbitMQ就算挂了,重启后也能恢复数据。在消息尚未持久化到硬盘时,可能服务已经死掉,这种状况能够经过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢 失(整个集群都挂掉)
 
/** * 第二个参数:queue的持久化是经过durable=true来实现的。 * 第三个参数:exclusive:排他队列,若是一个队列被声明为排他队列,该队列仅对首次申明它的链接可见,并在链接断开时自动删除。这里须要注意三点:    1. 排他队列是基于链接可见的,同一链接的不一样信道是能够同时访问同一链接建立的排他队列;    2.“首次”,若是一个链接已经声明了一个排他队列,其余链接是不容许创建同名的排他队列的,这个与普通队列不一样;    3.即便该队列是持久化的,一旦链接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。 * 第四个参数:自动删除,若是该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。 * @param * @return * @Author zxj */ @Bean public Queue queue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 25000);//25秒自动删除 Queue queue = new Queue("topic.messages", true, false, true, arguments); return queue; }
 
MessageProperties properties=new MessageProperties(); properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE); properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);//持久化设置 properties.setExpiration("2018-12-15 23:23:23");//设置到期时间 Message message=new Message("hello".getBytes(),properties); this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);

3.消费者丢数据ide

 
启用手动确认模式能够解决这个问题 ①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直处处理成功。不会丢失消息,即使服务挂掉,没有处理完成的消息会重回队列,可是异常会让 消息不断重试。 ②手动确认模式 ③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会当即在队列移除,不管客户端异常仍是断开,只要发送完就移除,不会重发。

若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。微服务

 
指定Acknowledge的模式: spring.rabbitmq.listener.direct.acknowledge-mode=manual,表示该监听器手动应答消息 针对手动确认模式,有如下特色: 1.使用手动应答消息,有一点须要特别注意,那就是不能忘记应答消息,由于对于RabbitMQ来讲处理消息没有超时,只要不该答消息,他就会认为仍在正常处理消息,致使消息队列出现阻塞,影响 业务执行。 2.若是消费者来不及处理就死掉时,没有响应ack时,会项目启动后会重复发送一条信息给其余消费者; 3.能够选择丢弃消息,这其实也是一种应答,以下,这样就不会再次收到这条消息。 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); 4.若是消费者设置了手动应答模式,而且设置了重试,出现异常时不管是否捕获了异常,都是不会重试的 5.若是消费者没有设置手动应答模式,而且设置了重试,那么在出现异常时没有捕获异常会进行重试,若是捕获了异常不会重试。

重试机制:源码分析

 
spring.rabbitmq.listener.simple.retry.max-attempts=5 最大重试次数 spring.rabbitmq.listener.simple.retry.enabled=true 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息) spring.rabbitmq.listener.simple.retry.initial-interval=5000 重试间隔时间(单位毫秒) spring.rabbitmq.listener.simple.default-requeue-rejected=false 重试次数超过上面的设置以后是否丢弃(false不丢弃时须要写相应代码将该消息加入死信队列)

若是设置了重试模式,那么在出现异常时没有捕获异常会进行重试,若是捕获了异常不会重试。性能

当出现异常时,咱们须要把这个消息回滚到消息队列,有两种方式:学习

//ack返回false,并从新回到队列,api里面解释得很清楚测试

 
//ack返回false,并从新回到队列,api里面解释得很清楚 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //拒绝消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

通过开发中的实际测试,当消息回滚到消息队列时,这条消息不会回到队列尾部,而是还是在队列头部,这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行。这种状况会致使消息队列处理出现阻塞,消息堆积,致使正常消息也没法运行。对于消息回滚到消息队列,咱们但愿比较理想的方式时出现异常的消息到 达消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行,所以咱们采起的解决方案是,将消息进行应答,这时消息队列会删除该消息,同时咱们再次发送该消息 到消息队列,这时就实现了错误消息进行消息队列尾部的方案。

 
//手动进行应答 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //从新发送消息到队尾 channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(new Object()));

若是一个消息体自己有误,会致使该消息体,一直没法进行处理,而服务器中刷出大量无用日志。解决这个问题能够采起两种方案:

1.一种是对于平常细致处理,分清哪些是能够恢复的异常,哪些是不能够恢复的异常。对于能够恢复的异常咱们采起第三条中的解决方案,对于不能够处理的异常,咱们采用记录日志,直接丢弃该消息方案。

2.另外一种是咱们对每条消息进行标记,记录每条消息的处理次数,当一条消息,屡次处理仍不能成功时,处理次数到达咱们设置的值时,咱们就丢弃该消息,但须要记录详细的日志。

消息监听内的异常处理有两种方式:

1.内部catch后直接处理,而后使用channel对消息进行确认

2.配置RepublishMessageRecoverer将处理异常的消息发送到指定队列专门处理或记录。监听的方法内抛出异常貌似没有太大用处。由于抛出异常就算是重试也很是有可能会继续出现异常,当重试次数完了以后消息就只有重启应用才能接收到了,颇有可能致使消息消费不及时。固然能够配置RepublishMessageRecoverer来解决,可是万一RepublishMessageRecoverer发送失败了呢。。那就可能形成消息消费不及时了。因此即便须要将处理出现异常的消息统一放到另外队列去处理,我的建议两种方式:

①catch异常后,手动发送到指定队列,而后使用channel给rabbitmq确认消息已消费

②给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败

若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。

相关文章
相关标签/搜索