消息服务中如何确保消息至少被消费一次

对消息服务须要了解的朋友,能够移步:网络

  1. 聊聊mq的使用场景
  2. 聊聊业务系统中投递消息到mq的几种方式
  3. 谈谈mq消息消费的几种方式

本章讨论主题

  1. 如何确保消息至少消费一次,确保消费者最大程度消费成功

消费者消费消息有2中方式:异步

1. push方式ide

消息服务接收到消息以后,主动将消息推送给消费者消费设计

2. pull方式开发

消费者定时从消息服务中拉取消息进行消费get

下面咱们将讨论2中方式中如何确保消息至少被消费一次。it

push模式

消费的过程:class

  1. 消息服务查询待消费的消息列表
  2. 轮询待消息列表
  3. 调用消费者
  4. 消费者收到消费请求,执行业务处理,将处理结果返回给消息服务
  5. 消息服务接收到消费成功的信息,将消息状态置为消费成功状态
  6. 继续消费下一条消息

探讨一下上面须要考虑的问题:监控

若消息一直消费失败如何处理?select

先说一下影响:

  1. 消息被阻塞

    消息若是一直消费失败,消息服务会不断调用消费者进行消费,会阻塞其余消息的消费,直接影响到业务的正常进行.

消费失败的缘由:

  1. 代码问题

    这种状况无论尝试多少次,消息都会消费失败,须要人工介入修复bug,这个能够依靠监控系统发现bug,同时开发进行修复。

  2. 系统运行异常

    如调用超时、网络问题等一些不可控的因素。产生这种错误,继续重试,最终会处理成功。

此处我们只用讨论消息服务中重试机制如何设计?

系统异常状况下,可能过一段时间,系统恢复了,此时去重试,消费也就成功了。

因此咱们对于消费失败的消息采用延迟处理的方式,能够这么实现:

消息中增长几个字段用于重试:next_dispose_time【下次处理时间】、max_failure【最大容许失败次数】、failure【当前失败次数】,消息入库时:next_dispose_time=需消费的时间,max_failure = 运行最大失败次数, failure=0;

当消费失败时,处理过程:

  1. 计算下次处理时间(next_dispose_time),能够在当前时间上面作指数递增,好比根据失败次数依次在当前时间上递增2的failure次方秒,如:

    第1次失败:当前时间 + 2秒

    第2次失败:当前时间 + 4秒

    第3次失败:当前时间 + 8秒

    第4次失败:当前时间 + 16秒

    .......

    第n次失败:当前时间 + 2的n次方秒

  2. failure++

消息服务查询待消费的消息也须要作调整:

select * from 消息表 where next_dispose_time<=当前时间 and failure<max_failure and status = 待处理;

此时可以最大程度保证消息最少消费成功一次。

pull方式

这种会复杂一些,为什么会复杂一些,我们先看一下常规的流程:

  1. 消费者从消息服务中拉取消息
  2. 本地进行处理
  3. 从消息服务中删除此消息
  4. 继续拉取下一条进行处理

若是本地一直处理失败,那么后面拉取到的都是同一条消息,这条消息直接阻塞后续消息的消费,这种状况如何解?

我们先分析一下出现这种问题的后果及缘由:

  1. 后果:消息被阻塞,业务没法正常运行
  2. 缘由:代码问题或其余异常
  3. 确保代码没问题,能够解决上面问题,及时性不够高,线上要考虑系统的容错能力。

遇到这种问题仍是挺严重了,业务方都是没法接受的,一条消息消费失败,会影响到其余全部消息的消费,这个咱们仍是得想办法解决,能够这样:

  1. 消费者拉取消息
  2. 落地到本地
  3. 从消息服务中删除此消息
  4. 异步去消费本地落地的消息

消息先落地,而后异步处理,本地须要有个补偿的job,去处理本地消费失败的消息,这个能够参考push方式消费的过程。

相关文章
相关标签/搜索