上篇文章介绍了RocketMQ总体架构和原理有兴趣的能够阅读一下,在这篇文章中的延时消息部分,我写道开源版的RocketMQ只提供了18个层级的消息队列延时,这个功能在开源版中显得特别鸡肋,可是在阿里云中的RocketMQ却提供了支持40天以内任意秒级延时队列,果真有些功能你只能充钱才能拥有。固然你或许想换一个开源的消息队列,在开源社区中消息队列延时消息不少都没有被支持好比:RabbitMQ,Kafka等,都只能经过一些特殊方法才能完成延时的功能。为何这么多都没有实现这个功能呢?是由于技术难度比较复杂吗?接下来咱们分析一下如何才能实现一个延时消息。html
在实现分布式消息队列的延时消息以前,咱们想一想咱们平时是如何在本身的应用程序上实现一些延时功能的?在Java中能够经过下面的方式来完成咱们延时功能:mysql
ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,咱们提交任务的时候,会将任务首先提交到DelayedWorkQueue一个优先级队列中,按照过时时间进行排序,这个优先级队列也就是咱们堆结构,每次提交任务排序的复杂度是O(logN)。而后取任务的时候就会从堆顶取出咱们的任务,也就是咱们延迟时间最小的任务。ScheduledThreadPoolExecutor有个好处是执行延时任务能够支持多线程并行执行,由于他继承的是ThreadPoolExecutor。redis
Timer:Timer也是利用优先级队列结构作的,可是其没有继承线程池,相对来讲比较独立,不支持多线程,只能使用单独的一个线程。算法
咱们实现本地延时比较简单,直接使用Java中现成的便可,那咱们分布式消息队列的实现有哪些难点呢?sql
有不少同窗首先会想到咱们实现分布式消息队列的延时任务,可不能够直接使用本地的那一套,用ScheduledThreadPoolExecutor,Timer,固然这是能够的,前提是你的消息量很小,可是咱们分布式消息队列每每都是企业级别的中间件,数据量都是很是的大,那么咱们纯内存的方案确定是行不通的。因此咱们就有了下面这几个方案来解决咱们这个问题。数据库
数据库通常来讲是咱们很容易想到的一个办法,咱们一般能够创建下面这样一个表:数组
CREATE TABLE `delay_message` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `excute_time` bigint(16) DEFAULT NULL COMMENT '执行时间,ms级别', `body` varchar(4096) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '消息体', PRIMARY KEY (`id`), KEY `time_index` (`excute_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
这个表中咱们使用excute_time表明咱们真实的执行时间,而且对其创建索引,而后在咱们的消息服务中,启动一个定时任务,定时从数据库中扫描已经能够执行的消息,而后开始执行,具体流程以下面所示:缓存
使用数据库的方法是一个比较原始的方法,在没有延时消息这个概念以前,要作一个订单多少分钟过时的这种功能,一般使用这个方法去完成。而这个方法一般也比较局限于咱们单个业务,若是想扩展为咱们企业级的一个中间件的话是不行的,由于mysql因为BTree的特性,会随着维护二级索引的开销愈来愈大,致使写入会愈来愈慢,因此这个方案一般不会被考虑。数据结构
咱们以前介绍RocketMQ在开源版本中只实现了18个Level的延时消息,可是有不少公司基于RocketMQ作了本身的一套支持任意时间的延时消息,在美团内部封装了RocketMQ使用LevelDB作了对延时消息的封装,在滴滴开源的DDMQ中,使用了RocksDB对RocketMQ的延时消息部分进行了封装。多线程
其原理基本和Mysql相似,以下图所示:
为何一样是数据库RocksDB会比Mysql更加合适呢?由于RocksDB的特性是LSM树,其使用场景适用于大量写入,和消息队列的场景更加契合,因此这个也是滴滴和美团选择其做为延时消息封装的存储介质。
再说时间轮以前,让咱们再次回到咱们的实现本地延时的时候使用的ScheduledThreadPoolExecutor还有Timer,他们都是使用的优先级队列完成的,优先级队列本质上也就是堆结构,堆结构的插入的时间复杂度是O(LogN),若是将来咱们的内存能够作到无限,咱们使用使用优先级队列去作延时消息的存储,可是随着消息的增多,咱们的插入消息的效率也会愈来愈低,那么怎么才能让咱们的插入消息的效率不随着消息的增多而变低呢?答案就是时间轮。
什么是时间轮呢?其实咱们能够简单的将其看作是一个多维数组。在不少框架中都使用了时间轮来作一些定时的任务,用来替代咱们的Timer,好比我以前讲过的有关本地缓存Caffeine一篇文章,在Caffeine中是一个二层时间轮,也就是二维数组,其一维的数据表示较大的时间维度好比,秒,分,时,天等,其二维的数据表示该时间维度较小的时间维度,好比秒内的某个区间段。当定位到一个TimeWhile[i][j]以后,其数据结构实际上是一个链表,记录着咱们的Node。在Caffeine利用时间轮记录咱们在某个时间过时的数据,而后去处理。
因为时间轮是一个数组的结构,那么其插入复杂度是O(1)。咱们解决了效率以后,可是咱们的内存依旧不是无限的,咱们时间轮如何使用呢?答案固然就是磁盘,在去哪儿开源的QMQ中已经实现了时间轮+磁盘存储,这里为了方便描述我将其转化为RocketMQ中的结构来进行讲解,实现图以下:
时间轮+磁盘存储我我的以为比上面的RocksDB要更加正统一点,不依赖其余的中间件就能够完成,可用性天然也就更高,固然阿里云的RocketMQ具体怎么实现的这个两种方案都有可能。
在社区中也有不少公司使用的Redis作的延时消息,在Redis中有一个数据结构是Zest,也就是有序集合,他能够实现相似咱们的优先级队列的功能,一样的他也是堆结构,因此插入算法复杂度依然是O(logN),可是因为Redis足够快,因此这一块能够忽略。(这块没有作对比的基准测试,只是猜想)。有同窗会问,redis不是纯内存的k,v吗,一样的应该也会受到内存限制啊,为何还会选择他呢?
其实在这个场景中,Redis是很容易水平扩展的当一个Redis内存不够,这里可使用两个甚至更多,来知足咱们的须要,redis延时消息的原理图(原图出自:http://www.javashuo.com/article/p-pxsavpsj-ec.html)以下:
咱们怎么才能知道Delayed Queue中的消息到期了呢?这里有两种方法:
本文介绍了三种方式实现分布式延时消息,但愿能在你实现本身的延迟消息的时候提供一点思路。总的来讲可能前两种方法来讲适用面更加广一点,毕竟在RocketMQ这些大型的消息队列中间件,还有一些其余的集成功能,好比顺序消息,事务消息等,延时消息可能更加倾向因而分布式消息队列中的一个功能,而不是做为一个独立的组件存在。固然其中还有一些细节并无一一介绍,具体细节能够去参考QMQ和DDMQ的源码。
若是你们以为这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O: