[TOC]java
在许多业务场景中,咱们都会碰到延迟任务,定时任务这种需求。特别的,在网络链接的场景中,经常会出现一些超时控制。因为服务端的链接数量很大,这些超时任务的数量每每也是很庞大的。实现对大量任务的超时管理并非一个容易的事情。算法
本章咱们将介绍几种用于实现超时任务的数据结构,而且最后分析 Netty 在超时任务上采起的结构和代码。数组
欢迎加入技术交流群186233599讨论交流,也欢迎关注笔者公众号:风火说。网络
JDK 在 1.3 的时候引入了Timer
数据结构用于实现定时任务。Timer
的实现思路比较简单,其内部有两个主要属性:数据结构
TaskQueue
:定时任务抽象类TimeTask
的列表。TimerThread
:用于执行定时任务的线程。Timer
结构还定义了一个抽象类TimerTask
而且继承了Runnable
接口。业务系统实现了这个抽象类的run
方法用于提供具体的延时任务逻辑。多线程
TaskQueue
内部采用大顶堆的方式,依据任务的触发时间进行排序。而TimerThread
则以死循环的方式从TaskQueue
获取队列头,等待队列头的任务的超时时间到达后触发该任务,而且将任务从队列中移除。并发
Timer
的数据结构和算法都很容易理解。全部的超时任务都首先进入延时队列。后台超时线程不断的从延迟队列中获取任务而且等待超时时间到达后执行任务。延迟队列采用大顶堆排序,在延迟任务的场景中有三种操做,分别是:添加任务,提取队列头任务,查看队列头任务。数据结构和算法
查看队列头任务的事件复杂度是 O(1) 。而添加任务和提取队列头任务的时间复杂度都是 O(Log2n) 。当任务数量较大时,添加和删除的开销也是比较大的。此外,因为Timer
内部只有一个处理线程,若是有一个延迟任务的处理消耗了较多的时间,会对应的延迟后续任务的处理。性能
因为Timer
只有一个线程用来处理延迟任务,在任务数量不少的时候显然是不足够的。在 JDK1.5 引入线程池接口ExecutorService
后,也对应的提供了一个用于处理延时任务的ScheduledExecutorService
子类接口。该接口内部也同样使用了一个使用小顶堆进行排序的延迟队列存听任务。线程池中的线程会在这个队列上等待直到有任务能够提取。this
ScheduledExecutorService
的实现上有一些特殊,只有一个线程可以提取到延迟队列头的任务,而且根据任务的超时时间进行等待。在这个等待期间,其余的线程是没法获取任务的。这样的实现是为了不多个线程同时获取任务,致使超时时间未到达就职务触发或者在等待任务超时时间时有新的任务被加入而没法响应。
因为ScheduledExecutorService
可使用多个线程,这样也缓解了由于个别任务执行时间长致使的后续任务被阻塞的状况。不过延迟队列也是同样采用小顶堆的排序方式,所以添加任务和删除任务的时间复杂度都是 O(Log2n) 。在任务数量很大的状况下,性能表现比较差。
虽然Timer
和ScheduledThreadPoolExecutor
都提供了对延迟任务的支撑能力,可是因为新增任务和提取任务的时间复杂度都是 O(Log2n) ,在任务数量很大,好比几万,十几万的时候,性能的开销就变得很巨大。
那么,是否存在新增任务和提取任务比 O(Log2n) 复杂度更低的数据结构呢?答案是存在的。在论文《Hashed and Hierarchical Timing Wheels》中设计了一种名为时间轮( Timing Wheels )的数据结构,这种结构在处理延迟任务时,其新增任务和删除任务的时间复杂度下降到了 O(1) 。
时间轮的数据结构很相似于咱们钟表上的数据指针,故而得名时间轮。其数据结构用图示意以下
每个时间“格子”咱们称之为槽位,槽位中存放着延迟任务队列。槽位自己表明着一个时间单位,好比 1 秒。时间轮拥有的槽位个数就是该时间轮可以处理的最大延迟跨度的任务,槽位的时间单位表明着时间轮的精度。这意味着小于时间单位的时间在该时间轮是没法被区分的。
槽位上的延迟任务队列中的任务都有相同的延迟时间。每个单位时间,指针都会移动到下一个槽位。当指针指向某一个槽位时,该槽位的延迟任务队列中的任务都会被触发。
当有一个延迟任务要插入时间轮时,首先计算其延迟时间与单位时间的余值,从指针指向的当前槽位移动余值的个数槽位,就是该延迟任务须要被放入的槽位。
举个例子,时间轮有8个槽位,编号为 0 ~ 7 。指针当前指向槽位 2 。新增一个延迟时间为 4 秒的延迟任务,4 % 8 = 4,所以该任务会被插入 4 + 2 = 6,也就是槽位6的延迟任务队列。
时间轮的槽位实现能够采用循环数组的方式达成,也就是让指针在越过数组的边界后从新回到起始下标。归纳来讲,能够将时间轮的算法描述为
用队列来存储延迟任务,同一个队列中的任务,其延迟时间相同。用循环数组的方式来存储元素,数组中的每个元素都指向一个延迟任务队列。
有一个当前指针指向数组中的某一个槽位,每间隔一个单位时间,指针就移动到下一个槽位。被指针指向的槽位的延迟队列,其中的延迟任务所有被触发。
在时间轮中新增一个延迟任务,将其延迟时间除以单位时间获得的余值,从当前指针开始,移动余值对应个数的槽位,就是延迟任务被放入的槽位。
基于这样的数据结构,插入一个延迟任务的时间复杂度就降低到 O(1) 。而当指针指向到一个槽位时,该槽位链接的延迟任务队列中的延迟任务所有被触发。
延迟任务的触发和执行不该该影响指针向后移动的时间精确性。所以通常状况下,用于移动指针的线程只负责任务的触发,任务的执行交由其余的线程来完成。好比,能够将槽位上的延迟任务队列放入到额外的线程池中执行,而后在槽位上新建一个空白的新的延迟任务队列用于后续任务的添加。
在基本原理中咱们分析了时间轮的基础结构。不过当时咱们假设须要插入的延迟任务的时间不会超过期间轮的长度,也就是说每个槽位上的延迟任务队列中的任务的延迟时间都是相同的。
在这种状况下,要支持更大时间跨度的延迟任务,要么增长时间轮的槽位数,要么减小时间轮的精度,也就是每个槽位表明的单位时间。时间轮的精度显然是一个业务上的硬性要求,那么只能增长槽位数。假设要求精度为 1 秒,要能支持延迟时间为 1 天的延迟任务,时间轮的槽位数须要 60 × 60 × 24 = 86400 。这就须要消耗更多的内存。显然,单纯增长槽位数并非一个好的解决方案。
在论文中,针对大跨度的延迟任务支持,提供了两种扩展方案。
在该方案中,算法引入了“轮次”的概念,延迟任务的延迟时间除以时间轮长度获得的商值为轮次。延迟任务的延迟时间除以时间轮长度获得的余数为要插入的槽位偏移量。
当插入延迟任务时首先计算轮次和槽位偏移量,经过槽位偏移量肯定延迟任务插入的槽位。当指针指向某一个槽位时,对槽位指向的延迟任务队列进行遍历,其中轮次为0的延迟任务所有触发,其他任务则等待下一个周期。
经过引入轮次,就能够在有限的槽位上支持无穷时间范围的延迟任务。可是虽然插入任务的时间复杂度仍然是 O(1) ,可是在延迟任务触发时却须要遍历延迟任务队列来确认其轮次是否为0。任务触发时的时间复杂却上升为了 O(n) 。
对于这个状况,还有一个变化的细节能够采用,就是将延迟任务队列按照轮次进行排序,比方说使用小顶堆对延迟任务队列进行排序。这样,当指针指向一个槽位触发延迟任务时,只须要不断的从队列头取出任务进行轮次检查,一旦任务轮次不等于0就能够中止。任务触发的时间复杂度降低为 O(1) 。对应的,因为队列是排序的了,任务插入的时候除了须要定位插入的槽位,还须要定位在队列中的插入位置。插入的时间复杂度变化为 O(1) 和 O(Log2n) ,n 为该槽位上延迟任务队列的长度。
看看手表的设计,有秒针,分针,时针。像秒针与分针,虽然都有 60 格 ,可是各自的格子表明的时间长度不一样。参考这个思路,咱们能够声明多个不一样层级的时间轮,每个时间轮的槽位的时间跨度是其次级时间轮的总体时间范围。
当低层级的时间轮的指针完整的走完一圈,其对应的高层级时间轮对应的移动一个槽位。而且高层级时间轮指针指向的槽位中的任务按照延迟时间计算,从新放入到低层级时间轮的不一样槽位中。这样的方式,保证了每个时间轮中的每个槽位的延迟任务队列中的任务都具有相同时间精度的延迟时间。
以精度为 1 秒,时间范围为 1 天的时间轮为例子,能够设计三级时间轮:秒级时间轮有 60 个槽位,每一个槽位的时间为 1 秒;分钟级时间轮有 60 个槽位,每一个槽位的时间为 60 秒;小时级时间轮有24个槽位,每一个槽位的时间为 60 分钟。当秒级时间轮走完 60 秒后,秒级时间轮的指针再次指向下标为0的槽位,而分钟级时间轮的指针向后移动一个槽位,而且将该槽位上的延迟任务所有取出而且从新计算后放入秒级时间轮。
总共只须要 60 + 60 + 24 = 144 个槽位便可支撑。对比上面提到的单级时间轮须要 86400 个槽位而言,节省了至关的内存。
层级时间轮有两种常见的作法:
时间轮算法的核心思想就是经过循环数组和指针移动的方式,将新增延迟任务的时间复杂度降低到 O(1) ,可是在具体实现上,包括如何处理更大时间跨度的延迟任务上,各家不一样的实现都会有一些细节上的变化。下面咱们以 Netty 中都时间轮实现为例子来进行代码分析。
Netty 的实现自定义了一个超时器的接口io.netty.util.Timer
,其方法以下
public interface Timer {
//新增一个延时任务,入参为定时任务TimerTask,和对应的延迟时间
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
//中止时间轮的运行,而且返回全部未被触发的延时任务
Set < Timeout > stop();
}
public interface Timeout {
Timer timer();
TimerTask task();
boolean isExpired();
boolean isCancelled();
boolean cancel();
}
复制代码
Timeout
接口是对延迟任务的一个封装,其接口方法说明其实现内部须要维持该延迟任务的状态。后续咱们分析其实现内部代码时能够更容易的看到。
Timer
接口有惟一实现HashedWheelTimer
。首先来看其构造方法,以下
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
//省略代码,省略参数非空检查内容。
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
//省略代码,省略槽位时间范围检查,避免溢出以及小于 1 毫秒。
workerThread = threadFactory.newThread(worker);
//省略代码,省略资源泄漏追踪设置以及时间轮实例个数检查
}
复制代码
首先是方法createWheel
,用于建立时间轮的核心数据结构,循环数组。来看下其方法内容
private static HashedWheelBucket[] createWheel(int ticksPerWheel)
{
//省略代码,确认 ticksPerWheel 处于正确的区间
//将 ticksPerWheel 规范化为 2 的次方幂大小。
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for(int i = 0; i < wheel.length; i++)
{
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
复制代码
数组的长度为 2 的次方幂方便进行求商和取余计算。
HashedWheelBucket
内部存储着由HashedWheelTimeout
节点构成的双向链表,而且存储着链表的头节点和尾结点,方便于任务的提取和插入。
方法HashedWheelTimer#newTimeout
用于新增延迟任务,下面来看下代码
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
//省略代码,用于参数检查
start();
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
if(delay > 0 && deadline < 0)
{
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
复制代码
能够看到,在新增任务的时候,任务并非直接进入到循环数组中,而是首先被放入到一个队列,也就是属性timeouts
,该队列是一个 MPSC 类型的队列,采用这个模式主要出于提高并发性能考虑,由于这个队列只有线程workerThread
会进行任务提取操做。
该线程是在构造方法中经过调用workerThread = threadFactory.newThread(worker)
被建立。可是建立以后并非立刻执行线程的start
方法,其启动的时机是这个时间轮第一次新增延迟任务的时候,也就是本方法中的start
方法的内容。下面是其代码
public void start() {
switch(WORKER_STATE_UPDATER.get(this))
{
case WORKER_STATE_INIT:
if(WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED))
{
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
while(startTime == 0)
{
try
{
startTimeInitialized.await();
}
catch(InterruptedException ignore)
{
// Ignore - it will be ready very soon.
}
}
}
复制代码
方法很明显的分为两个部分,第一部分为Switch
方法块,经过对状态变量的 CAS 操做,确保只有一个线程可以执行workerThread.start()
方法来启动工做线程,避免并发异常。第二部分为阻塞等待,经过CountDownLatch
类型变量startTimeInitialized
执行阻塞等待,用于等待工做线程workerThread
真正进入工做状态。
从newTimeout
方法的角度来看,插入延迟任务首先是放入队列中,以前分析数据结构的时候也说过任务的触发是指针指向时间轮中某个槽位时进行,那么必然存在一个须要将队列中的延迟任务放入到时间轮的数组之中的工做。这个动做显然就是就是由workerThread
工做线程来完成。下面就来看下这个线程的具体代码内容。
工做线程是依托于HashedWheelTimer.Worker
这个实现了Runnable
接口的类进行工做的,那下面看下其对run
方法的实现代码,以下
public void run() {
{//代码块①
startTime = System.nanoTime();
if(startTime == 0)
{
//使用startTime==0 做为线程进入工做状态模式标识,所以这里从新赋值为1
startTime = 1;
}
//通知外部初始化工做线程的线程,工做线程已经启动完毕
startTimeInitialized.countDown();
}
{//代码块②
do {
final long deadline = waitForNextTick();
if(deadline > 0)
{
int idx = (int)(tick & mask);
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
}
{//代码块③
for(HashedWheelBucket bucket: wheel)
{
bucket.clearTimeouts(unprocessedTimeouts);
}
for(;;)
{
HashedWheelTimeout timeout = timeouts.poll();
if(timeout == null)
{
break;
}
if(!timeout.isCancelled())
{
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
}
复制代码
为了方便阅读,这边将run
方法的内容分为三个代码块。首先来看代码块①。经过系统调用System.nanoTime
为启动时间startTime
设置初始值,该变量表明了时间轮的基线时间,用于后续相对时间的计算。赋值完毕后,经过startTimeInitialized
变量对外部的等待线程进行通知。
接着来看代码块②。这是主要的工做部分,总体是在一个while
循环中,确保工做线程只在时间轮没有被终止的时候工做。首先来看方法waitForNextTick
,在时间轮中,指针移动一次,称之为一个tick
,这个方法显然内部应该是用于等待指针移动到下一个tick
,来看具体代码,以下
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for(;;)
{
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if(sleepTimeMs <= 0)
{
if(currentTime == Long.MIN_VALUE)
{
return -Long.MAX_VALUE;
}
else
{
return currentTime;
}
}
if(PlatformDependent.isWindows())
{
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try
{
Thread.sleep(sleepTimeMs);
}
catch(InterruptedException ignored)
{
if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN)
{
return Long.MIN_VALUE;
}
}
}
}
复制代码
整个方法的思路很简单,前面说过,时间轮每移动一次指针,意味着一个tick
。这里tick
能够当作是指针移动的次数。因为槽位的时间范围是固定的,所以能够简单的计算出来指针移动到下一个槽位,理论上应该通过的时间,也就是long deadline = tickDuration * (tick + 1)
。以后再计算从时间轮启动到当前,实际通过的时间,也就是long currentTime = System.nanoTime() - startTime
。两者的差值就是线程所须要睡眠的时间。
若是差值小于0,意味着实际通过的时间超过了理论时间,此时已经超出了应该休眠的范围,方法须要当即返回。因为在这个方法的执行过程当中,可能会遇到时间轮被中止的状况,所以使用一个特殊值来表达这个事件,也就是Long.MIN_VALUE
,这也是为何currentTime
要避开这个值的缘由。
还有一点须要注意,Thread.sleep
方法的实现是依托于操做系统提供的中断检查,也就是操做系统会在每个中断的时候去检查是否有线程须要唤醒而且提供CPU资源。默认状况下 Linux 的中断间隔是 1 毫秒,而 Windows 的中断间隔是 10 毫秒或者 15 毫秒,具体取决于硬件识别。
若是是在 Windows 平台下,当方法调用Thread.sleep
传入的参数不是10的整数倍时,其内部会调用系统方法timeBeginPeriod()
和timeEndPeriod()
来修改中断周期为 1 毫秒,而且在休眠结束后再次设置回默认值。这样的目的是为了保证休眠时间的准确性。可是在 Windows 平台下,频繁的调用修改中断周期会致使 Windows 时钟出现异常,大多数时候的表现是致使时钟加快。这将致使好比尝试休眠 10 秒时,实际上只休眠了 9 秒。因此在这里,经过sleepTimeMs = sleepTimeMs / 10 * 10
保证了sleepTimeMs
是 10 的整数倍,从而避免了 Windows 的这个 BUG 。
当方法waitForNextTick
返回后,而且返回的值是正数,意味着当前tick
的休眠等待已经完成,能够进行延迟任务的触发处理了。经过int idx = (int)(tick & mask)
调用,肯定下一个被触发延迟任务的槽位在循环数组中的下标。在处理触发任务以前,首先将已经取消的延迟任务从槽位所指向的延迟任务队列中删除。每次调用HashedWheelTimer#newTimeout
新增延迟任务时都会返回一个Timeout
对象,能够经过cancle
方法将这个延迟任务取消。当执行取消动做的时候,并不会直接从延迟队列中删除,而是将这个对象放入到取消队列,也就是HashedWheelTimer.cancelledTimeouts
属性。在准备遍历槽位上延迟任务队列以前,经过方法processCancelledTasks
来遍历这个取消队列,将其中的延迟任务从各自槽位上的延迟任务队列中删除。使用这种方式的好处在于延迟任务的删除只有一个线程会进行,避免了多线程带来的并发干扰,减小了开发难度。
在处理完取消的延迟任务后,调用方法transferTimeoutsToBuckets
来将新增延迟任务队列HashedWheelTimer.timeouts
中的延迟任务分别添加到合适其延迟时间的槽位中。方法的代码很简单,就是循环不断从timeouts
取出任务,而且计算其延迟时间与时间轮范围的商值和余数,结果分别为其轮次与槽位下标。根据槽位下标将该任务添加到槽位对应的延迟任务队列中。
在这里能够看到 Netty 做者对时间轮这一结构的并发设计,新增任务是向 MPSC 队列新增元素实现。而槽位上的延迟任务队列只有时间轮自己的线程可以进行新增和删除,设计为了 SPSC 模式。前者是为了提升无锁并发下的性能,后者则是经过约束,减小了设计难度。
transferTimeoutsToBuckets
方法每次最多只会转移 100000 个延迟任务到合适的槽位中,这是为了不外部循环添加任务致使的饿死。方法执行完毕后,就到了槽位上延迟任务的触发处理,也就是方法HashedWheelBucket#expireTimeouts
的功能,方法内的逻辑也很简单。遍历队列,若是延迟任务的轮次不为 0,则减 1。不然触发任务执行方法,也就是HashedWheelTimeout#expire
。该方法内部依然经过 CAS 方式对状态进行更新,避免方法的触发和取消之间的竞争冲突。从这个方法的实现能够看到,Netty 采用了轮次的方式来对超出时间轮范围的延迟时间进行支持。多层级时间轮的实现相比轮次概念的实现更为复杂,考虑到在网络IO应用中,超出时间轮范围的场景比较少,使用轮次的方式去支撑更大的时间,是一个相对容易实现的方案。
当须要被触发的延迟任务都被触发后,经过tick
加 1 来表达指针移动到下一个槽位。
外部线程经过调用HashedWheelTimer#stop
方法来中止时间轮,中止的方式很简单,就是经过 CAS 调用来修改时间轮的状态属性。而在代码块②中经过循环的方式在每一次tick
都会检查这个状态位。代码块③的内容很简单,遍历全部的槽位,而且遍历槽位的延迟任务队列,将全部未到达延迟时间而且未取消的任务,都放入到一个集合中,最终将这个集合返回。这个集合内存储的就是全部未能执行的延迟任务。
在处理大量延迟任务的场景中,时间轮是一个很高效的算法与数据结构。Netty 在对时间轮的实现上,在添加任务,过时任务,删除任务等环节进行了一些细节上的调整。实际上,不一样中间件中都有对时间轮的一些实现,各自也都有区别,可是核心都是围绕在循环数组与槽位过时这个概念上。不一样的细节变化有各自适合的场景和考量。