在业务发展过程当中,会出现一些须要延时处理的场景,好比:java
a.订单下单以后超过30分钟用户未支付,须要取消订单
b.订单一些评论,若是48h用户未对商家评论,系统会自动产生一条默认评论
c.点我达订单下单后,超过必定时间订单未派出,须要超时取消订单等。。。
处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是彻底没问题,可是当数据量大的时候高频的轮训数据库就会比较的耗资源,致使数据库的慢查或者查询超时。因此在处理这类需求时候,采用了延时队列来完成。数据库
延时队列就是一种带有延迟功能的消息队列。下面会介绍几种目前已有的延时队列:
1.Java中java.util.concurrent.DelayQueue
优势:JDK自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化
2.Rocketmq延时队列
优势:消息持久化,分布式
缺点:不支持任意时间精度,只支持特定level的延时消息
3.Rabbitmq延时队列(TTL+DLX实现)
优势:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列jvm
根据自身业务和公司状况,若是实现一个本身的延时队列服务须要考虑一下几点:分布式
* 消息存储
* 过时延时消息实时获取
* 高可用性优化
* 消息可靠性,消息持久化,消息至少被消费一次
* 实时性:存在必定的时间偏差(定时任务间隔)
* 支持指定消息remove
* 高可用性线程
- Messages Pool全部的延时消息存放,结构为KV结构,key为消息ID,value为一个具体的message(这里选择Redis Hash结构主要是由于hash结构能存储较大的数据量,数据较多时候会进行渐进式rehash扩容,而且对于HSET和HGET命令来讲时间复杂度都是O(1))
- Delayed Queue是16个有序队列(队列支持水平扩展),结构为ZSET,value为messages pool中消息ID,score为过时时间(分为多个队列是为了提升扫描的速度)
- Timed Task定时任务,负责扫描处理每一个队列过时消息blog
每一个延时消息必须包括如下参数:排序
* tags:消息过时以后发送mq的tags
* keys:消息过时以后发送mq的keys
* body:消息过时以后发送mq的body,提供给消费这作具体的消息处理
* delayTime:延时发送时间(默认,delayTime、expectDate有一个便可)
* expectDate:指望发送时间队列
注:上图一、二、3或者二、3是一个事务操做
取出过时消息过程是经过一个外部定时任务每隔1min分钟去查询队列中过时的消息,而后发送mq && remove事务
1.0上有一个可改进的地方就是队列中过时的消息是经过定时任务触发查询。全部有了2.0
2.0版本在1.0上作了一个优化,废弃掉了1min定时任务触发过时消息发送,采用了java Lock await/singlal方式实现过时消息的实时发送低延时
- pull job:这里分别为每个队列建立了一个pull job thread,功能很简单,就是负责去队列中拉取过时的消息数据(这里保证一个队列有且只有一个pull job)
- worker:pull job拉取到的过时消息会交给一个worker thread去处理,这样的好处是处理过时的消息实时性更高(pull job没必要等去除过时消息所有处理完成在继续去拉取新的过时数据)
- zookeeper coordinate:经过zk的操做来完成对队列的从新分配工做,daemon thread监听zk节点的建立和删除
主要流程:
服务启动会注册zk,获取分配处理的queues,启动后台线程监听zk
为每一个分配queue建立一个pull job
pull job首先会去queue中查询是否有过时消息:
Y:将取出消息交给worker处理
N:查询queue中最后一个成员(zset结构默认按score递增排序),若是为空,则await;不为空则await(成员score-System.currentTimeMillis())
因为过时消息发送成功才会从队列中remove,因此pull job会记录上一次查询队列的一个offset,每次获取到过时消息会将offset向前偏移,过时消息交给worker处理,当worker因为某些异常缘由处理失败会重置pull job中offset,这样能够避免消息发送一次失败以后没办法在继续处理(除了新节点add || remove时候)当部署服务有新增,延时队列服务会从新计算获得当前处理队列,并将以前建立pull job cancel,为新处理队列从新建立pull job。删除同理。</ol>