处理线上RabbitMQ队列阻塞

前言

  那天我和同事一块儿吃完晚饭回公司加班,而后就群里就有人@我说xxx商户说收不到推送,一开始以为没啥。我第一反应是否是极光没注册上,就让客服通知商户,从新登陆下试试。这边打开极光推送的后台进行检查。后面反应收不到推送的愈来愈多,我就知道这事情不简单。java

事故通过

  因为大量商户反应收不到推送,我第一反应是否是推送系统挂了,致使没有进行推送。因而让运维老哥检查推送系统各节点的状况,发现都正常。因而打开RabbitMQ的管控台看了一下,人都蒙了。已经有几万条消息处于ready状态,还有几百条unacked的消息。git

  我觉得推送服务和MQ链接断开了,致使没法推送消息,因而让运维重启推送服务,将全部的推送服务重启完,发现unacked的消息所有变成ready,可是没过多久又有几百条unacked的消息了,这个就很明显了能消费,没有进行ack呀。spring

  当时我觉得是网络问题,致使mq没法接收到ack,让运维老哥检查了一下,发现网络没问题。如今看是真的是傻,网络有问题链接都连不上。因为肯定的是没法ack形成的,立马将ack模式由原来的manual 改为auto紧急发布。将全部的节点升级好之后,发现推送正常了。shell

  你觉得这就结束了其实并无,没过多久发现有一台MQ服务出现异常,因为生产采用了镜像队列,当即将这台有问题的MQ从集群中移除。直接进行重置,而后加入回集群。这事情算是告一段落了。此时已经接近24:00了。json

  时间来到次日上午10:00,运维那边又出现报警了,说推送系统有台机器,磁盘快被写满了,而且占用率很高。个人乖乖从昨晚到如今写了快40G的日志,一看报错信息瞬间就明白问题出在哪里了。麻溜的把bug修了紧急发布。网络

吐槽一波公司的ELK,压根就没有收集到这个报错信息,致使我没有及时发现。

事故重现-队列阻塞

MQ配置

spring:
  # 消息队列
  rabbitmq:
    host: 10.0.0.53
    username: guest
    password: guest
    virtual-host: local
    port: 5672
    # 消息发送确认
    publisher-confirm-type: correlated
    # 开启发送失败退回
    publisher-returns: true
    listener:
      simple:
        # 消费端最小并发数
        concurrency: 1
        # 消费端最大并发数
        max-concurrency: 5
        # 一次请求中预处理的消息数量
        prefetch: 2
        # 手动应答
        acknowledge-mode: manual

问题代码

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
                                      @Headers Map<String,Object> headers,
                                      Channel channel) throws Exception {
    // 解密和解析
    String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
    OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

    try {
        // 模拟推送
        pushMsg(orderDto);
    }catch (Exception e){
        log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));
    }finally {
        // 消息签收
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
    }

}

  看起来好像没啥问题。因为和交易系统约定好,订单数据须要先转换json串,而后再使用AES进行加密,因此这边须要,先进行解密而后在进行解析。才能获得订单数据。并发

  为了防止消息丢失,交易系统作了失败重发机制,防止消息丢失,不巧的是重发的时候没有对订单数据进行加密。这就致使推送系统,在解密的时候出异常,从而没法进行ack运维

默默的吐槽一句:人在家中坐,锅从天上来。

模拟推送

推送代码curl

发送3条正常的消息fetch

curl http://localhost:8080/sendMsg/3

发送1条错误的消息

curl http://localhost:8080/sendErrorMsg/1

再发送3条正常的消息

curl http://localhost:8080/sendMsg/3

  观察日志发下,虽然有报错,可是还能正常进行推送。可是RabbitMQ已经出现了一条unacked的消息。

继续发送1条错误的消息

curl http://localhost:8080/sendErrorMsg/1

再发送3条正常的消息

curl http://localhost:8080/sendMsg/3

  这个时候你会发现控制台报错,固然错误信息是解密失败,可是正常的消息却没有被消费,这个时候其实队列已经阻塞了。

  从RabbitMQ管控台也能够看到,刚刚发送的的3条消息处于ready状态。这个时候就若是一直有消息进入,都会堆积在队里里面没法被消费。

再发送3条正常的消息

curl http://localhost:8080/sendMsg/3

分析缘由

  上面说了是因为没有进行ack致使队里阻塞。那么问题来了,这是为何呢?其实这是RabbitMQ的一种保护机制。防止当消息激增的时候,海量的消息进入consumer而引起consumer宕机。

  RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认的消息的前提下,限制信道上的消费者所能保持的最大未确认的数量。能够经过设置PrefetchCount实现。

  举例说明:能够理解为在consumer前面加了一个缓冲容器,容器能容纳最大的消息数量就是PrefetchCount。若是容器没有满RabbitMQ就会将消息投递到容器内,若是满了就不投递了。当consumer对消息进行ack之后就会将此消息移除,从而放入新的消息。

listener:
  simple:
    # 消费端最小并发数
    concurrency: 1
    # 消费端最大并发数
    max-concurrency: 5
    # 一次处理的消息数量
    prefetch: 2
    # 手动应答
    acknowledge-mode: manual
prefetch参数就是PrefetchCount

  经过上面的配置发现prefetch我只配置了2,而且concurrency配置的只有1,因此当我发送了2条错误消息之后,因为解密失败这2条消息一直没有被ack。将缓冲区沾满了,这个时候RabbitMQ认为这个consumer已经没有消费能力了就不继续给它推送消息了,因此就形成了队列阻塞。

判断队列是否有阻塞的风险。

  当ack模式为manual,而且线上出现了unacked消息,这个时候不用慌。因为QOS是限制信道channel上的消费者所能保持的最大未确认的数量。因此容许出现unacked的数量能够经过channelCount * prefetchCount * 节点数量 得出。

channlCount就是由 concurrency, max-concurrency决定的。
  • min = concurrency * prefetch * 节点数量
  • max = max-concurrency * prefetch * 节点数量

由此能够的出结论

  • unacked_msg_count < min 队列不会阻塞。但须要及时处理unacked的消息。
  • unacked_msg_count >= min 可能会出现堵塞。
  • unacked_msg_count >= max 队列必定阻塞。

这里须要好好理解一下。

处理方法

  其实处理的方法很简单,将解密和解析的方法放入try catch中就解决了这样无论解密正常与否,消息都会被签收。若是出错将会输出错误日志,让开发人员进行处理了。

对于这个就须要有日志监控系统,来及时告警了。
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
                                      @Headers Map<String,Object> headers,
                                      Channel channel) throws Exception {
    try {

        // 解密和解析
        String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
        OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
        
        // 模拟推送
        pushMsg(orderDto);
    }catch (Exception e){
        log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
    }finally {
        // 消息签收
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
    }

}

注意的点

  unacked的消息在consumer切断链接后(重启),会自动回到队头。

事故重现-磁盘占用飙升

  一开始我不知道代码有问题,就是觉得单纯的没有进行ack因此将ack模式改为auto自动,紧急升级了,这样无论正常与否,消息都会被签收,因此在当时确实是解决了问题。

  其实如今回想起来是很是危险的操做的,将ack模式改为auto自动,这样会使QOS不生效。会出现大量消息涌入consumer从而形成consumer宕机,能够是由于当时在晚上,交易比较少,而且推送系统有多个节点,才没出现问题。

问题代码

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
                                      @Headers Map<String,Object> headers,
                                      Channel channel) throws Exception {
    // 解密和解析
    String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
    OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
    
    try {

        // 模拟推送
        pushMsg(orderDto);
    }catch (Exception e){
        log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
    }finally {
        // 消息签收
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
    }

}

配置文件

listener:
  simple:
    # 消费端最小并发数
    concurrency: 1
    # 消费端最大并发数
    max-concurrency: 5
    # 一次处理的消息数量
    prefetch: 2
    # 手动应答
    acknowledge-mode: auto

  因为当时不知道交易系统的重发机制,重发时没有对订单数据加密的bug,因此仍是会发出少许有误的消息。

发送1条错误的消息

curl http://localhost:8080/sendErrorMsg/1

缘由

  RabbitMQ消息监听程序异常时,consumer会向rabbitmq server发送Basic.Reject,表示消息拒绝接受,因为Spring默认requeue-rejected配置为true,消息会从新入队,而后rabbitmq server从新投递。就至关于死循环了,因此控制台在疯狂刷错误日志形成磁盘利用率飙升的缘由。

解决方法

  将default-requeue-rejected: false便可。

总结

  • 我的建议,生产环境不建议使用自动ack,这样会QOS没法生效。
  • 在使用手动ack的时候,须要很是注意消息签收。
  • 其实在将有问题的MQ重置时,是将错误的消息给清除才没有问题了,至关因而消息丢失了。
try {
    // 业务逻辑。
}catch (Exception e){
    // 输出错误日志。
}finally {
    // 消息签收。
}

参考资料

代码地址

https://gitee.com/huangxunhui...

结尾

  若是有人告诉你遇到线上事故不要慌,除非是超级大佬久经沙场。不然就是瞎扯淡,你让他来试试,看看他会不会大脑一片空白,直冒汗。

  若是以为对你有帮助,能够多多评论,多多点赞哦,也能够到个人主页看看,说不定有你喜欢的文章,也能够随手点个关注哦,谢谢。

相关文章
相关标签/搜索