最近看了 @JavaGuide 发布的一篇『面试官问我如何保证Kafka不丢失消息?我哭了!』,这篇文章承接这个主题,来聊聊如何保证 RocketMQ 不丢失消息。java
一条消息从生产到被消费,将会经历三个阶段:git
以上任一阶段均可能会丢失消息,咱们只要找到这三个阶段丢失消息缘由,采用合理的办法避免丢失,就能够完全解决消息丢失的问题。github
生产者(Producer) 经过网络发送消息给 Broker,当 Broker 收到以后,将会返回确认响应信息给 Producer。因此生产者只要接收到返回的确认响应,就表明消息在生产阶段未丢失。面试
RocketMQ 发送消息示例代码以下:apache
DefaultMQProducer mqProducer=new DefaultMQProducer("test"); // 设置 nameSpace 地址 mqProducer.setNamesrvAddr("namesrvAddr"); mqProducer.start(); Message msg = new Message("test_topic" /* Topic */, "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送消息到一个Broker try { SendResult sendResult = mqProducer.send(msg); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
send
方法是一个同步操做,只要这个方法不抛出任何异常,就表明消息已经发送成功。网络
消息发送成功仅表明消息已经到了 Broker 端,Broker 在不一样配置下,可能会返回不一样响应状态:异步
SendStatus.SEND_OK
SendStatus.FLUSH_DISK_TIMEOUT
SendStatus.FLUSH_SLAVE_TIMEOUT
SendStatus.SLAVE_NOT_AVAILABLE
引用官方状态说明:ide
上图中不一样 broker 端配置将会在下文详细解释
另外 RocketMQ 还提供异步的发送的方式,适合于链路耗时较长,对响应时间较为敏感的业务场景。工具
DefaultMQProducer mqProducer = new DefaultMQProducer("test"); // 设置 nameSpace 地址 mqProducer.setNamesrvAddr("127.0.0.1:9876"); mqProducer.setRetryTimesWhenSendFailed(5); mqProducer.start(); Message msg = new Message("test_topic" /* Topic */, "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); try { // 异步发送消息到,主线程不会被阻塞,马上会返回 mqProducer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 消息发送成功, } @Override public void onException(Throwable e) { // 消息发送失败,能够持久化这条数据,后续进行补偿处理 } }); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
异步发送消息必定要注意重写回调方法,在回调方法中检查发送结果。性能
不论是同步仍是异步的方式,都会碰到网络问题致使发送失败的状况。针对这种状况,咱们能够设置合理的重试次数,当出现网络问题,能够自动重试。设置方式以下:
// 同步发送消息重试次数,默认为 2 mqProducer.setRetryTimesWhenSendFailed(3); // 异步发送消息重试次数,默认为 2 mqProducer.setRetryTimesWhenSendAsyncFailed(3);
默认状况下,消息只要到了 Broker 端,将会优先保存到内存中,而后马上返回确认响应给生产者。随后 Broker 按期批量的将一组消息从内存异步刷入磁盘。
这种方式减小 I/O 次数,能够取得更好的性能,可是若是发生机器掉电,异常宕机等状况,消息还未及时刷入磁盘,就会出现丢失消息的状况。
若想保证 Broker 端不丢消息,保证消息的可靠性,咱们须要将消息保存机制修改成同步刷盘方式,即消息存储磁盘成功,才会返回响应。
修改 Broker 端配置以下:
## 默认状况为 ASYNC_FLUSH flushDiskType = SYNC_FLUSH
若 Broker 未在同步刷盘时间内(默认为 5s)完成刷盘,将会返回 SendStatus.FLUSH_DISK_TIMEOUT
状态给生产者。
集群部署
为了保证可用性,Broker 一般采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还须要复制到 slave 节点。
默认方式下,消息写入 master 成功,就能够返回确认响应给生产者,接着消息将会异步复制到 slave 节点。
注:master 配置:flushDiskType = SYNC_FLUSH
此时若 master 忽然宕机且不可恢复,那么还未复制到 slave 的消息将会丢失。
为了进一步提升消息的可靠性,咱们能够采用同步的复制方式,master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。
异步复制与同步复制区别以下图:
注: 你们不要被上图误导,broker master 只能配置一种复制方式,上图只为解释同步复制的与异步复制的概念。
Broker master 节点 同步复制配置以下:
## 默认为 ASYNC_MASTER brokerRole=SYNC_MASTER
若是 slave 节点未在指定时间内同步返回响应,生产者将会收到 SendStatus.FLUSH_SLAVE_TIMEOUT
返回状态。
小结
结合生产阶段与存储阶段,若须要严格保证消息不丢失,broker 须要采用以下配置:
## master 节点配置 flushDiskType = SYNC_FLUSH brokerRole=SYNC_MASTER ## slave 节点配置 brokerRole=slave flushDiskType = SYNC_FLUSH
同时这个过程咱们还须要生产者配合,判断返回状态是不是 SendStatus.SEND_OK
。如果其余状态,就须要考虑补偿重试。
虽然上述配置提升消息的高可靠性,可是会下降性能,生产实践中须要综合选择。
消费者从 broker 拉取消息,而后执行相应的业务逻辑。一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
状态给 Broker。
若是 Broker 未收到消费确认响应或收到其余状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的状况。
消息消费的代码以下:
// 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer"); // 设置NameServer的地址 consumer.setNamesrvAddr("namesrvAddr"); // 订阅一个或者多个Topic,以及Tag来过滤须要消费的消息 consumer.subscribe("test_topic", "*"); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 执行业务逻辑 // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start();
以上消费消息过程的,咱们须要注意返回消息状态。只有当业务逻辑真正执行成功,咱们才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
。不然咱们须要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
,稍后再重试。
看完 RocketMQ 不丢消息处理办法,回头再看这篇 kafka,有没有发现,二者解决思路是同样的,区别就是参数配置不同而已。
因此下一次,面试官再问你 XX 消息队列如何保证不丢消息?若是你没用过这个消息队列,也不要哭,微笑面对他,从容给他分析那几步会丢失,而后大体解决思路。
最后咱们还能够说出咱们的思考,虽然提升消息可靠性,可是可能致使消息重发,重复消费。因此对于消费客户端,须要注意保证幂等性。
可是要注意了,这时面试官可能就会跟你的话题,让你来聊聊如何保证幂等性,必定先想好再说哦。
什么?你还不知道如何实现幂等?那就赶忙关注@程序通事,后面文章咱们就来聊聊幂等这个话题。
才疏学浅,不免会有纰漏,若是你发现了错误的地方,还请你留言给我指出来,我对其加以修改。
再次感谢您的阅读,我是楼下小黑哥,一位还未秃头的工具猿,下篇文章咱们再见~
欢迎关注个人公众号:程序通事,得到平常干货推送。若是您对个人专题内容感兴趣,也能够关注个人博客: studyidea.cn