RocketMQ消息发送常见错误与解决方案

文将结合本身使用RocketMQ的经验,对消息发送常见的问题进行分享,基本会遵循出现,分析问题、解决问题。web

一、No route info of this topic


没法找到路由信息,其完整的错误堆栈信息以下:
缓存

且不少读者朋友会说Broker端开启了自动建立主题也会出现上述问题。

RocketMQ的路由寻找流程以下图所示:
服务器

上面的核心关键点以下:
  • 若是Broker开启了自动建立Topic,在启动的时候会默认建立主题:TBW102,并会随着Broker发送到Nameserver的心跳包汇报给Nameserver,继而从Nameserver查询路由信息时能返回路由信息。微信

  • 消息发送者在消息发送时首先会查本地缓存,若是本地缓存中存在,直接返回路由信息。网络

  • 若是缓存不存在,则向Nameserver查询路由信息,若是Nameserver存在该路由信息,就直接返回。并发

  • 若是Nameserver不存在该topic的路由信息,若是没有开启自动建立主题,则抛出 No route info of this topic。异步

  • 若是开启了自动建立主题,则使用默认主题向Nameserver查询路由信息,并使用默认Topic的路由信息为本身的路由信息,将不会抛出 No route info of this topic。高并发

一般状况下 No route info of this topic 这个错误通常是在刚搭建RocketMQ,刚入门 RocketMQ遇到的比较多,一般的排查思路以下:性能

  • 能够经过rocketmq-console查询路由信息是否存在,或使用以下命令查询路由信息:学习

    cd ${ROCKETMQ_HOME}/bin
    sh ./mqadmin topicRoute -n 127.0.0.1:9876 -t dw_test_0003

    其输出结果以下所示:

  • 若是经过命令没法查询到路由信息,则查看Broker是否开启了自动建立topic,参数为:autoCreateTopicEnable,该参数默认为true。但在生产环境不建议开启。

  • 若是开启了自动建立路由信息,但仍是抛出这个错误,这个时候请检查客户端(Producer)链接的Nameserver地址是否与Broker中配置的nameserver地址是否一致。

通过上面的步骤,基本就能解决该错误。

二、消息发送超时


消息发送超时,一般客户端的日志以下:

客户端报消息发送超时,一般第一怀疑的对象是RocketMQ服务器,是否是Broker性能出现了抖动,没法抗住当前的量。

那我们如何来排查RocketMQ当前是否有性能瓶颈呢?

首先咱们执行以下命令查看RocketMQ 消息写入的耗时分布状况:

cd /${USER.HOME}/logs/rocketmqlogs/
grep -n 'PAGECACHERT' store.log | more

输出结果以下所示:

RocketMQ会每一分钟打印前一分钟内消息发送的耗时状况分布,咱们从这里就能窥探RocketMQ消息写入是否存在明细的性能瓶颈,其区间以下:
  • [<=0ms] 小于0ms,即微妙级别的。

  • [0~10ms] 小于10ms的个数。

  • [10~50ms] 大于10ms小

  • 于50ms的个数

其余区间显示,绝大多数会落在微妙级别完成,按照笔者的经验若是100-200ms及以上的区间超过20个后,说明Broker确实存在必定的瓶颈,若是只是少数几个,说明这个是内存或pagecache的抖动,问题不大。

一般状况下超时一般与Broker端的处理能力关系不大,还有另一个佐证,在RocketMQ broker中还存在快速失败机制,即当Broker收到客户端的请求后会将消息先放入队列,而后顺序执行,若是一条消息队列中等待超过200ms就会启动快速失败,向客户端返回[TIMEOUT_CLEAN_QUEUE]broker busy,这个在本文的第3部分会详细介绍。

在RocketMQ客户端遇到网络超时,一般能够考虑一些应用自己的垃圾回收,是否因为GC的停顿时间致使的消息发送超时,这个我在测试环境进行压力测试时遇到过,但生产环境暂时没有遇到过,你们稍微留意一下。

在RocketMQ中一般遇到网络超时,一般与网络的抖动有关系,但因为我对网络不是特别擅长,故暂时没法找到直接证据,但能找到一些间接证据,例如在一个应用中同时链接了kafka、RocketMQ集群,发如今出现超时的同一时间发现链接到RocketMQ集群内全部Broker,链接到kafka集群都出现了超时。

但出现网络超时,咱们总得解决,那有什么解决方案吗?

咱们对消息中间件的最低指望就是高并发低延迟,从上面的消息发送耗时分布状况也能够看出RocketMQ确实符合咱们的指望,绝大部分请求都是在微妙级别内,故我给出的方案时,减小消息发送的超时时间,增长重试次数,并增长快速失败的最大等待时长。具体措施以下:

  • 增长Broker端快速失败的时长,建议为1000,在broker的配置文件中增长以下配置:

    maxWaitTimeMillsInQueue=1000

    主要缘由是在当前的RocketMQ版本中,快速失败致使的错误为SYSTEM_BUSY,并不会触发重试,适当增大该值,尽量避免触发该机制,详情能够参考本文第3部份内容,会重点介绍system_busy、broker_busy。

  • 若是RocketMQ的客户端版本为4.3.0如下版本(不含4.3.0)
    将超时时间设置消息发送的超时时间为500ms,并将重试次数设置为6次(这个能够适当进行调整,尽可能大于3),其背后的哲学是尽快超时,并进行重试,由于发现局域网内的网络抖动是瞬时的,下次重试的是就能恢复,而且RocketMQ有故障规避机制,重试的时候会尽可能选择不一样的Broker,相关的代码以下:

    DefaultMQProducer producer = new DefaultMQProducer("dw_test_producer_group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.setRetryTimesWhenSendFailed(5);// 同步发送模式:重试次数
    producer.setRetryTimesWhenSendAsyncFailed(5);// 异步发送模式:重试次数
    producer.start();
    producer.send(msg,500);//消息发送超时时间
  • 若是RocketMQ的客户端版本为4.3.0及以上版本

    若是客户端版本为4.3.0及其以上版本,因为其设置的消息发送超时时间为全部重试的总的超时时间,故不能直接经过设置RocketMQ的发送API的超时时间,而是须要对其API进行包装,重试须要在外层收到进行,例如示例代码以下:

    public static SendResult send(DefaultMQProducer producer, Message msg, int 
                                retryCount)
     
    {
      Throwable e = null;
      for(int i =0; i < retryCount; i ++ ) {
          try {
              return producer.send(msg,500); //设置超时时间,为500ms,内部有重试机制
          } catch (Throwable e2) {
              e = e2;
          }
      }
      throw new RuntimeException("消息发送异常",e);
    }

三、System busy、Broker busy


在使用RocketMQ中,若是RocketMQ集群达到1W/tps的压力负载水平,System busy、Broker busy就会是你们常常会遇到的问题。例如以下图所示的异常栈。

纵观RocketMQ与system busy、broker busy相关的错误关键字,总共包含以下5个:

  • [REJECTREQUEST]system busy

  • too many requests and system thread pool busy

  • [PC_SYNCHRONIZED]broker busy

  • [PCBUSY_CLEAN_QUEUE]broker busy

  • [TIMEOUT_CLEAN_QUEUE]broker busy

3.1 原理分析

咱们先用一张图来阐述一下在消息发送的全生命周期中分别在何时会抛出上述错误。

根据上述5类错误日志,其触发的原有能够概括为以下3种。
  • pagecache压力较大

    其中以下三类错误属于此种状况

  • [REJECTREQUEST]system busy

  • [PC_SYNCHRONIZED]broker busy

  • [PCBUSY_CLEAN_QUEUE]broker busy

    判断pagecache是否忙的依据就是在写入消息时,在向内存追加消息时加锁的时间,默认的判断标准是加锁时间超过1s,就认为是pagecache压力大,向客户端抛出相关的错误日志。

  • 发送线程池挤压的拒绝策略
    在RocketMQ中处理消息发送的是一个只有一个线程的线程池,内部会维护一个有界队列,默认长度为1W,若是当前队列中挤压的数量超过1w,执行线程池的拒绝策略,从而抛出[too many requests and system thread pool busy]错误。

  • Broker端快速失败

    默认状况下Broker端开启了快速失败机制,就是在Broker端还未发生pagecache繁忙(加锁超过1s)的状况,但存在一些请求在消息发送队列中等待200ms的状况,RocketMQ会再也不继续排队,直接向客户端返回system busy,但因为rocketmq客户端目前对该错误没有进行重试处理,因此在解决这类问题的时候须要额外处理。

3.2 PageCache繁忙解决方案

一旦消息服务器出现大量pagecache繁忙(在向内存追加数据加锁超过1s)的状况,这个是比较严重的问题,须要人为进行干预解决,解决的问题思路以下:

  • transientStorePoolEnable

    开启transientStorePoolEnable机制,即在broker中配置文件中增长以下配置:

    transientStorePoolEnable=true

    transientStorePoolEnable的原理以下图所示:

 引入transientStorePoolEnable能缓解pagecache的压力背后关键以下:
  • 消息先写入到堆外内存中,该内存因为启用了内存锁定机制,故消息的写入是接近直接操做内存,性能能获得保证。

  • 消息进入到堆外内存后,后台会启动一个线程,一批一批将消息提交到pagecache,即写消息时对pagecache的写操做由单条写入变成了批量写入,下降了对pagecache的压力。

    引入transientStorePoolEnable会增长数据丢失的可能性,若是Broker JVM进程异常退出,提交到PageCache中的消息是不会丢失的,但存在堆外内存(DirectByteBuffer)中但还未提交到PageCache中的这部分消息,将会丢失。但一般状况下,RocketMQ进程退出的可能性不大,一般状况下,若是启用了transientStorePoolEnable,消息发送端须要有从新推送机制(补偿思想)。

  • 扩容

    若是在开启了transientStorePoolEnable后,还会出现pagecache级别的繁忙,那须要集群进行扩容,或者对集群中的topic进行拆分,即将一部分topic迁移到其余集群中,下降集群的负载。


舒适提示:在RocketMQ出现pagecache繁忙形成的broker busy,RocketMQ Client会有重试机制。

3.3 TIMEOUT_CLEAN_QUEUE 解决方案

因为若是出现TIMEOUT_CLEAN_QUEUE的错误,客户端暂时不会对其进行重试,故现阶段的建议是适当增长快速失败的判断标准,即在broker的配置文件中增长以下配置:

#该值默认为200,表示200ms
waitTimeMillsInSendQueue=1000

本文来自笔者的另外一力做《RocketMQ实战与进阶》,专栏从使用场景入手介绍如何使用 RocketMQ,使用过程当中遇到什么问题,如何解决这些问题,以及为何能够这样解决,即原理讲解(图)穿插在实战中。专栏的设计思路重在强调实战二字,旨在让一位 RocketMQ 初学者经过对本专栏的学习,快速“打怪升级”,理论与实战结合,成为该领域的佼佼者。

本文分享自微信公众号 - 瓜农老梁(gh_01130ae30a83)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索