【九】kafka延迟队列、重试队列、死信队列

1、延迟队列

实现方案:spa

在发送延时消息的时候并非先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,线程

而后经过一个自定义的服务拉取这些内部主题中的消息,并将知足条件的消息再投递到要发送的真实的主题中,消费者所订阅的仍是真实的主题。设计

若是采用这种方案,那么通常是按照不一样的延时等级来划分的,好比设定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour这些按延时时间递增的延时等级,延时的消息按照延时时间投递到不一样等级的主题中,投递到同一主题中的消息的延时时间会被强转为与此主题延时等级一致的延时时间,这样延时偏差控制在两个延时等级的时间差范围以内(好比延时时间为17s的消息投递到30s的延时主题中,以后按照延时时间为30s进行计算,延时偏差为13s)。虽然有必定的延时偏差,可是偏差可控,而且这样只需增长少量的主题就能实现延时队列的功能。blog

发送到内部主题(delaytopic*)中的消息会被一个独立的 DelayService 进程消费,这个 DelayService 进程和 Kafka broker 进程以一对一的配比进行同机部署(参考下图),以保证服务的可用性。排序

针对不一样延时级别的主题,在 DelayService 的内部都会有单独的线程来进行消息的拉取,以及单独的 DelayQueue(这里用的是 JUC 中 DelayQueue)进行消息的暂存。与此同时,在 DelayService 内部还会有专门的消息发送线程来获取 DelayQueue 的消息并转发到真实的主题中。从消费、暂存再到转发,线程之间都是一一对应的关系。以下图所示,DelayService 的设计应当尽可能保持简单,避免锁机制产生的隐患。队列

为了保障内部 DelayQueue 不会由于未处理的消息过多而致使内存的占用过大,DelayService 会对主题中的每一个分区进行计数,当达到必定的阈值以后,就会暂停拉取该分区中的消息。进程

由于一个主题中通常不止一个分区,分区之间的消息并不会按照投递时间进行排序,DelayQueue的做用是将消息按照再次投递时间进行有序排序,这样下游的消息发送线程就可以按照前后顺序获取最早知足投递条件的消息。内存

 2、重试队列和死信队列

死信能够看做消费者不能处理收到的消息,也能够看做消费者不想处理收到的消息,还能够看做不符合处理要求的消息。好比消息内包含的消息内容没法被消费者解析,为了确保消息的可靠性而不被随意丢弃,故将其投递到死信队列中,这里的死信就能够看做消费者不能处理的消息。再好比超过既定的重试次数以后将消息投入死信队列,这里就能够将死信看做不符合处理要求的消息。部署

重试队列其实能够看做一种回退队列,具体指消费端消费消息失败时,为了防止消息无端丢失而从新将消息回滚到 broker 中。与回退队列不一样的是,重试队列通常分红多个重试等级,每一个重试等级通常也会设置从新投递延时,重试次数越多投递延时就越大。get

理解了他们的概念以后咱们就能够为每一个主题设置重试队列,消息第一次消费失败入重试队列 Q1,Q1 的从新投递延时为5s,5s事后从新投递该消息;若是消息再次消费失败则入重试队列 Q2,Q2 的从新投递延时为10s,10s事后再次投递该消息。

而后再设置一个主题做为死信队列,重试越屡次从新投递的时间就越久,而且须要设置一个上限,超过投递次数就进入死信队列。重试队列与延时队列有相同的地方,都须要设置延时级别。

 

参考https://www.luozhiyun.com/archives/58