你们好,我是yes。html
最近看 Kafka 看到了时间轮算法,记得之前看 Netty 也看到过这玩意,没太过关注。今天就来看看时间轮究竟是什么东西。面试
为何要用时间轮算法来实现延迟操做?算法
延时操做 Java 不是提供了 Timer 么?数组
还有 DelayQueue 配合线程池或者 ScheduledThreadPool 不香吗?缓存
咱们先来简单看看 Timer、DelayQueue 和 ScheduledThreadPool 的相关实现,看看它们是如何实现延时任务的,源码之下无秘密。再来剖析下为什么 Netty 和 Kafka 特地实现了时间轮来处理延迟任务。markdown
若是在手机上阅读其实纯看字也行,不用看代码,我都会先用文字描述清楚。不过电脑上看效果更佳。多线程
Timer 能够实现延时任务,也能够实现周期性任务。咱们先来看看 Timer 核心属性和构造器。并发
核心就是一个优先队列和封装的执行任务的线程,从这咱们也能够看到一个 Timer 只有一个线程执行任务。异步
再来看看如何实现延时和周期性任务的。我先简单的归纳一下,首先维持一个小顶堆,即最快须要执行的任务排在优先队列的第一个,根据堆的特性咱们知道插入和删除的时间复杂度都是 O(logn)。分布式
而后 TimerThread 不断地拿排着的第一个任务的执行时间和当前时间作对比。若是时间到了先看看这个任务是否是周期性执行的任务,若是是则修改当前任务时间为下次执行的时间,若是不是周期性任务则将任务从优先队列中移除。最后执行任务。若是时间还未到则调用 wait()
等待。
再看下图,整理下流程。
流程知道了再对着看下代码,这块就差很少了。看代码不爽的能够跳过代码部分,影响不大。
先来看下 TaskQueue,就简单看下插入任务的过程,就是个普通的堆插入操做。
再来看看 TimerThread 的 run
操做。
能够看出 Timer 实际就是根据任务的执行时间维护了一个优先队列,而且起了一个线程不断地拉取任务执行。
有什么弊端呢?
首先优先队列的插入和删除的时间复杂度是O(logn),当数据量大的时候,频繁的入堆出堆性能有待考虑。
而且是单线程执行,那么若是一个任务执行的时间太久则会影响下一个任务的执行时间(固然你任务的run要是异步执行也行)。
而且从代码能够看到对异常没有作什么处理,那么一个任务出错的时候会致使以后的任务都没法执行。
在说 ScheduledThreadPoolExecutor 以前咱们再看下 Timer 的注释,注释可都是干货千万不要错过。我作了点修改,突出了下重点。
Java 5.0 introduced ScheduledThreadPoolExecutor, It is effectively a more versatile replacement for the Timer, it allows multiple service threads. Configuring with one thread makes it equivalent to Timer。
简单翻译下:1.5 引入了 ScheduledThreadPoolExecutor,它是一个具备更多功能的 Timer 的替代品,容许多个服务线程。若是设置一个服务线程和 Timer 没啥差异。
从注释看出相对于 Timer ,可能就是单线程跑任务和多线程跑任务的区别。咱们来看下。
继承了 ThreadPoolExecutor,实现了 ScheduledExecutorService。能够定性操做就是正常线程池差很少了。区别就在于两点,一个是 ScheduledFutureTask ,一个是 DelayedWorkQueue。
其实 DelayedWorkQueue 就是优先队列,也是利用数组实现的小顶堆。而 ScheduledFutureTask 继承自 FutureTask 重写了 run 方法,实现了周期性任务的需求。
ScheduledThreadPoolExecutor 大体的流程和 Timer 差很少,也是维护一个优先队列,而后经过重写 task 的 run 方法来实现周期性任务,主要差异在于能多线程运行任务,不会单线程阻塞。
而且 Java 线程池的设定是 task 出错会把错误吃了,无声无息的。所以一个任务出错也不会影响以后的任务。
Java 中还有个延迟队列 DelayQueue,加入延迟队列的元素都必须实现 Delayed 接口。延迟队列内部是利用 PriorityQueue 实现的,因此仍是利用优先队列!Delayed 接口继承了Comparable 所以优先队列是经过 delay 来排序的。
而后咱们再来看下延迟队列是如何获取元素的。
也是利用优先队列实现的,元素经过实现 Delayed 接口来返回延迟的时间。不过延迟队列就是个容器,须要其余线程来获取和执行任务。
这下是搞明白了 Timer 、ScheduledThreadPool 和 DelayQueue,总结的说下它们都是经过优先队列来获取最先须要执行的任务,所以插入和删除任务的时间复杂度都为O(logn),而且 Timer 、ScheduledThreadPool 的周期性任务是经过重置任务的下一次执行时间来完成的。
问题就出在时间复杂度上,插入删除时间复杂度是O(logn),那么假设频繁插入删除次数为 m,总的时间复杂度就是O(mlogn),这种时间复杂度知足不了 Kafka 这类中间件对性能的要求,而时间轮算法的插入删除时间复杂度是O(1)。咱们来看看时间轮算法是如何实现的。
俗话说艺术源于生活,技术也能从平常生活中找到灵感。我们先来看块表,嗯金色的表。
都看清楚了吧,时间轮就是和手表时钟很类似的存在。时间轮用环形数组实现,数组的每一个元素能够称为槽,和 HashMap同样称呼。
槽的内部用双向链表存着待执行的任务,添加和删除的链表操做时间复杂度都是 O(1),槽位自己也指代时间精度,好比一秒扫一个槽,那么这个时间轮的最高精度就是 1 秒。
也就是说延迟 1.2 秒的任务和 1.5 秒的任务会被加入到同一个槽中,而后在 1 秒的时候遍历这个槽中的链表执行任务。
从图中能够看到此时指针指向的是第一个槽,一共有八个槽0~7,假设槽的时间单位为 1 秒,如今要加入一个延时 5 秒的任务,计算方式就是 5 % 8 + 1 = 6,即放在槽位为 6,下标为 5 的那个槽中。更具体的就是拼到槽的双向链表的尾部。
而后每秒指针顺时针移动一格,这样就扫到了下一格,遍历这格中的双向链表执行任务。而后再循环继续。
能够看到插入任务从计算槽位到插入链表,时间复杂度都是O(1)。那假设如今要加入一个50秒后执行的任务怎么办?这槽好像不够啊?难道要加槽嘛?和HashMap同样扩容?
不是的,常见有两种方式,一种是经过增长轮次的概念。50 % 8 + 1 = 3,即应该放在槽位是 3,下标是 2 的位置。而后 (50 - 1) / 8 = 6,即轮数记为 6。也就是说当循环 6 轮以后扫到下标的 2 的这个槽位会触发这个任务。Netty 中的 HashedWheelTimer 使用的就是这种方式。
还有一种是经过多层次的时间轮,这个和咱们的手表就更像了,像咱们秒针走一圈,分针走一格,分针走一圈,时针走一格。
多层次时间轮就是这样实现的。假设上图就是第一层,那么第一层走了一圈,第二层就走一格,能够得知第二层的一格就是8秒,假设第二层也是 8 个槽,那么第二层走一圈,第三层走一格,能够得知第三层一格就是 64 秒。那么一格三层,每层8个槽,一共 24 个槽时间轮就能够处理最多延迟 512 秒的任务。
而多层次时间轮还会有降级的操做,假设一个任务延迟 500 秒执行,那么刚开始加进来确定是放在第三层的,当时间过了 436 秒后,此时还须要 64 秒就会触发任务的执行,而此时相对而言它就是个延迟 64 秒后的任务,所以它会被下降放在第二层中,第一层还放不下它。
再过个 56 秒,相对而言它就是个延迟 8 秒后执行的任务,所以它会再被降级放在第一层中,等待执行。
降级是为了保证时间精度一致性。Kafka内部用的就是多层次的时间轮算法。
在 Netty 中时间轮的实现类是 HashedWheelTimer,代码中的 wheel 就是上图画的循环数组,mask 的设计和HashMap同样,经过限制数组的大小为2的次方,利用位运算来替代取模运算,提升性能。tickDuration 就是每格的时间即精度。能够看到配备了一个工做线程来处理任务的执行。
接下来咱们再来看看任务是如何添加的。
能够看到任务并无直接添加到时间轮中,而是先入了一个 mpsc 队列,我简单说下 mpsc 是 JCTools 中的并发队列,用在多个生产者可同时访问队列,但只有一个消费者会访问队列的状况。篇幅有限,有兴趣的朋友自行了解实现。
而后咱们再来看看工做线程是如何运做的。
很直观没什么花头,咱们先来看看 waitForNextTick,是如何获得下一次执行时间的。
简单的说就是经过 tickDuration 和此时已经滴答的次数算出下一次须要检查的时间,时候未到就sleep等着。
再来看下任务如何入槽的。
注释的很清楚了,实现也和上述分析的一致。
最后再来看下如何执行的。
就是经过轮数和时间双重判断,执行完了移除任务。
整体上看 Netty 的实现就是上文说的时间轮经过轮数的实现,彻底一致。能够看出时间精度由 TickDuration 把控,而且工做线程的除了处理执行到时的任务还作了其余操做,所以任务不必定会被精准的执行。
并且任务的执行若是不是新起一个线程,或者将任务扔到线程池执行,那么耗时的任务会阻塞下个任务的执行。
而且会有不少无用的 tick 推动,例如 TickDuration 为1秒,此时就一个延迟350秒的任务,那就是有349次无用的操做。
可是从另外一面来看,若是任务都执行很快(固然你也能够异步执行),而且任务数不少,经过分批执行,而且增删任务的时间复杂度都是O(1)来讲。时间轮仍是比经过优先队列实现的延时任务来的合适些。
上面咱们说到 Kafka 中的时间轮是多层次时间轮实现,总的而言实现和上述说的思路一致。不过细节有些不一样,而且作了点优化。
先看看添加任务的方法。在添加的时候就设置任务执行的绝对时间。
那么时间轮是如何推进的呢?Netty 中是经过固定的时间间隔扫描,时候未到就等待来进行时间轮的推进。上面咱们分析到这样会有空推动的状况。
而 Kafka 就利用了空间换时间的思想,经过 DelayQueue,来保存每一个槽,经过每一个槽的过时时间排序。这样拥有最先须要执行任务的槽会有优先获取。若是时候未到,那么 delayQueue.poll 就会阻塞着,这样就不会有空推动的状况发送。
咱们来看下推动的方法。
从上面的 add 方法咱们知道每次对比都是根据expiration < currentTime + interval
来进行对比的,而advanceClock
就是用来推动更新 currentTime 的。
Kafka 用了多层次时间轮来实现,而且是按需建立时间轮,采用任务的绝对时间来判断延期,而且对于每一个槽(槽内存放的也是任务的双向链表)都会维护一个过时时间,利用 DelayQueue 来对每一个槽的过时时间排序,来进行时间的推动,防止空推动的存在。
每次推动都会更新 currentTime 为当前时间戳,固然作了点微调使得 currentTime 是 tickMs 的整数倍。而且每次推动都会把能降级的任务从新插入降级。
能够看到这里的 DelayQueue 的元素是每一个槽,而不是任务,所以数量就少不少了,这应该是权衡了对于槽操做的延时队列的时间复杂度与空推动的影响。
首先介绍了 Timer、DelayQueue 和 ScheduledThreadPool,它们都是基于优先队列实现的,O(logn) 的时间复杂度在任务数多的状况下频繁的入队出队对性能来讲有损耗。所以适合于任务数很少的状况。
Timer 是单线程的会有阻塞的风险,而且对异常没有作处理,一个任务出错 Timer 就挂了。而 ScheduledThreadPool 相比于 Timer 首先能够多线程来执行任务,而且线程池对异常作了处理,使得任务之间不会有影响。
而且 Timer 和 ScheduledThreadPool 能够周期性执行任务。 而 DelayQueue 就是个具备优先级的阻塞队列。
对比而言时间轮更适合任务数很大的延时场景,它的任务插入和删除时间复杂度都为O(1)。对于延迟超过期间轮所能表示的范围有两种处理方式,一是经过增长一个字段-轮数,Netty 就是这样实现的。二是多层次时间轮,Kakfa 是这样实现的。
相比而言 Netty 的实现会有空推动的问题,而 Kafka 采用 DelayQueue 以槽为单位,利用空间换时间的思想解决了空推动的问题。
能够看出延迟任务的实现都不是很精确的,而且或多或少都会有阻塞的状况,即便你异步执行,线程不够的状况下仍是会阻塞。
《深刻理解Kafka:核心设计与实践原理》
我是 yes,从一点点到亿点点,咱们下篇见。