RocketMQ 一行代码形成大量消息丢失

作积极的人,越努力越幸运!
RocketMQ 一行代码形成大量消息丢失数据库

一、问题现象


首先接到项目反馈使用 RocketMQ 会出现以下错误:网络

在这里插入图片描述架构

错误信息关键点:MQBrokerException:CODE:2 DESC:[TIMEOUT_CLEAN_QUEUE]broker busy,start flow control for a while,period in queue:205ms,size of queue:880。
因为项目组并无对消息发送失败作任何补偿,致使丢失消息发送失败,故须要对这个问题进行深层次的探讨,并加以解决。ide

二、问题分析


首先咱们根据关键字:TIMEOUT_CLEAN_QUEUE 去 RocketMQ 中查询,去探究在何时会抛出如上错误。根据全文搜索以下图所示:
RocketMQ 一行代码形成大量消息丢失函数

该方法是在 BrokerFastFailure 中定义的,经过名称便可以当作其设计目的:Broker端快速失败机制。源码分析

Broker 端快速失败其原理图以下:

RocketMQ 一行代码形成大量消息丢失

  • 消息发送者向 Broker 发送消息写入请求,Broker 端在接收到请求后会首先放入一个队列中(SendThreadPoolQueue),默认容量为 10000。
  • Broker 会专门使用一个线程池(SendMessageExecutor)去从队列中获取任务并执行消息写入请求,为了保证消息的顺序处理,该线程池默认线程个数为1。

若是 Broker 端受到垃圾回收等等因素形成单条写入数据发生抖动,单个 Broker 端积压的请求太多从而得不到及时处理,会极大的形成客户端消息发送的时间延长。线程

设想一下,若是因为 Broker 压力增大,写入一条消息须要500ms甚至超过1s,而且队列中积压了5000条消息,消息发送端的默认超时时间为3s,若是按照这样的速度,这些请求在轮到 Broker 执行写入请求时,客户端已经将这个请求超时了,这样不只会形成大量的无效处理,还会致使客户端发送超时。架构设计

故 RocketMQ 为了解决该问题,引入 Broker 端快速失败机制,即开启一个定时调度线程,每隔10毫秒去检查队列中的第一个排队节点,若是该节点的排队时间已经超过了 200ms,就会取消该队列中全部已超过 200ms 的请求,当即向客户端返回失败,这样客户端能尽快进行重试,由于 Broker 都是集群部署,下次重试能够发送到其余 Broker 上,这样能最大程度保证消息发送在默认 3s 的时间内通过重试机制,能有效避免某一台 Broker 因为瞬时压力大而形成的消息发送不可用,从而实现消息发送的高可用。设计

从 Broker 端快速失败机制引入的初衷来看,快速失败后会发起重试,除非同一时刻集群内全部的 Broker 都繁忙,否则消息会发送成功,用户是不会感知这个错误的,那为何用户感知了呢?难道 TIMEOUT CLEAN QUEUE 错误,Broker 不重试?code

为了解开这个谜团,接下来会采用源码分析的手段去探究真相。接下来将以消息同步发送为例揭示其消息发送处理流程中的核心关键点。

MQ Client 消息发送端首先会利用网络通道将请求发送到 Broker,而后接收到请求结果后并调用 processSendResponse 方法对响应结果进行解析,以下图所示:
RocketMQ 一行代码形成大量消息丢失

在这里返回的 code 为 RemotingSysResponseCode . SYSTEM_BUSY。
咱们从 proccessSendResponse 方法中能够得知若是 code 为 SYSTEM_BUSY,该方法会抛出 MQBrokerException,响应 code 为 SYSTEM_BUSY,其错误描述为开头部分的错误信息。

那咱们沿着该方法的调用链路,能够找到其直接调用方:DefaultMQProducerImpl 的 sendKernelImpl,咱们重点考虑若是底层方法抛出 MQBrokerException 该方法会如何处理。

其关键代码以下图所示:
RocketMQ 一行代码形成大量消息丢失

能够看出在 sendKernelImpl 方法中首先会捕捉异常,先执行注册的钩子函数,即就算执行失败,对应的消息发送后置钩子函数也会执行,而后再原封不动的将该异常向上抛出。
sendKernelImpl 方法被 DefaultMQProducerImpl 的 sendDefaultImpl 方法调用,下面是其核心实现截图:
RocketMQ 一行代码形成大量消息丢失

从这里能够看出 RocketMQ 消息发送高可用设计一个很是关键的点,重试机制,其实现是在 for 循环中 使用 try catch 将 sendKernelImpl 方法包裹,就能够保证该方法抛出异常后能继续重试。从上文可知,若是 SYSTEM_BUSY 会抛出 MQBrokerException,但发现只有上述几个错误码才会重试,由于若是不是上述错误码,会继续向外抛出异常,此时 for 循环会被中断,即不会重试。
这里很是使人意外的是连 SYSTEM_ERROR 都会重试,却没有包含 SYSTEM_BUSY,显然违背了快速失败的设计初衷,故笔者判定,这是 RocketMQ 的一个BUG,将 SYSTEM_BUSY 遗漏了,后续会提一个 PR,增长一行代码,将 SYSTEM_BUSY 加上便可。

问题分析到这里,该问题应该就很是明了。

三、解决方案


若是你们在网上搜索 TIMEOUT_CLEAN_QUEUE 的解决方法,你们不约而同提出的解决方案是增长 waitTimeMillsInSendQueue 的值,该值默认为 200ms,例如将其设置为 1000s 等等,之前我是反对的,由于个人认知里 Broker 会重试,但如今发现 Broker 不会重试,因此我如今认为该 BUG未解决的状况下适当提升该值能有效的缓解。

但这是并非好的解决方案,我会在近期向官方提交一个PR,将这个问题修复,建议你们在公司尽可能对本身使用的版本进行修改,从新打一个包便可,由于这已经违背了 Broker 端快速失败的设计初衷。

但在消息发送的业务方,尽可能本身实现消息的重试机制,即不依赖 RocketMQ 自己提供的重试机制,由于受制于网络等因素,消息发送不可能百分之百成功,建议你们在消息发送时捕获一下异常,若是发送失败,能够将消息存入数据库,再结合定时任务对消息进行重试,尽最大程度保证消息不丢失。

原创不易,若是对你有所帮助请你为本文点个【在看】吧,这将是我写做更多优质文章的最强动力。

欢迎加入个人知识星球,一块儿交流源码,探讨架构,揭秘亿级订单的架构设计与实践经验,打造高质量的技术交流圈,为广大星友提供高质量问答服务,长按以下二维码加入。
RocketMQ 一行代码形成大量消息丢失

丁威素质三连是对我最大的鼓励

相关文章
相关标签/搜索