kafka中对时间轮的应用分析

kafka中存在着大量的延时操作,比如延迟生产,延迟拉取,延迟删除等,这些延时操作并不是基于JDK 自带的Timer或者DelayQueue 实现,而是基于时间轮的概念自己实现了一个延时定时器,JDK中Timer和DelayQueue的插入和删除操作的平均时间复杂度为O(nlogn)并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为 O(1)。

kafka中的时间轮是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList),TimerTaskList是一个环形的双向链表,链表中的每个元素TimerTaskEntry封装了一个真正的定时任务TimerTask
时间轮由固定格数(wheelSize)的时间格组成,每一格都代表当前时间轮的基本时间跨度(tickMs),整个时间轮的总体时间跨度(interval)就是 wheelSize*tickMs
时间轮还有一个表盘指针(currentTime),其值是tickMs的整数倍,用来表示时间轮当前所处的时间,表示当前需要处理的时间格对应的TimeTaskList 中的所有任务。

  • 时间轮的tickMs为1ms,wheelSize等于20,总体时间跨度interval就是20ms,初始情况下currentTime指向时间格0。
  • 此时有一个定时为2ms的任务插进来,就会放到时间格为2的TimeTaskList中,当currentTime指向时间格2时,就需要执行时间格为2对应的TimeTaskList中的任务。
  • 此时若又一个定时为8ms的任务插进来,则会放在时间格10中。当currentTime指向时间格10时,同理执行对应的任务。如果此时又插入了一个定时19ms的任务怎么办呢?新来的TimerTaskEntry会复用原来的TimerTaskList,所以它会插入到原本已经到期的时间格1中。

总之,整个时间轮的跨度是不会变的,随着currentTime的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围就是currentTimecurrentTime + interval之间。

那么,问题又来了,如果一个新的定时任务远远超过了当前的总体时间范围,比如350ms,那怎么办呢?

为此,kafka引入了层级时间轮的概念,当任务到期时间远远超过当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。

如上图:

  • 第一层时间轮:tickMs= 1ms 、wheelSize=20 、interval=20ms
  • 第二层时间轮的tickMs 为第一层时间轮的interval,即20ms,每一层时间轮的wheelSize是固定的,都是20,那么第二层时间轮的总体时间跨度就是400ms。
  • 依次类推,第三层的时间轮的interval为400ms,那么总体时间跨度就是8000ms。

生活中我们常见的钟表就是一种具有三层结构的时间轮,第一层时间轮 tickMs=1ms 、wheelSize=60 、interval=1min,此为秒钟 ; 第二层 tickMs= 1min、wheelSize=60 、interval= 1hour,此为分钟 ;第三层 tickMs=1hour 、 wheelSize= 12 、 interval= 12hours,此为时钟 。

引入多层时间轮之后,对于之前所说的350ms的定时任务,就会升级到第二层时间轮的时间格17所对应的TimerTaskList中去。

那么随着时间的推移,之前定时为350ms的任务执行时间也会越来越接近,比如,距离该任务执行时间还有15ms的时候,15ms已经处于第一层时间轮的interval范围之内了,显然该任务继续放在第二层的时间格17的位置是不合理的,这里就有一个时间轮降级的操作,会将这个任务重新提交到层级时间轮中,此时该任务就会重新放在第一层时间轮中currentTime所指的时间格后的第15个时间格内等待被执行。

所以,所有位于第二及第二层时间轮以上的任务在执行前都会有一个时间轮降级的过程,会从第n级,降到第n-1级,n-2级……直到降到第一级为止。

我们可以总结出,kafka的定时器只是持有第一层时间轮的引用,并不会直接持有其他高层时间轮的引用,但是每个时间轮都会有一个指向更高一层时间轮的引用,随着时间的推移,高层时间轮内的定时任务也会重新插入到时间轮内,直到插入到第一层时间轮内等待被最终的执行。

当然Kafka的定时器任务执行并没有这么简单,在kafka中,时间轮是专门用来执行插入和删除TimeTaskEntry的(即封装真正的定时任务TimeTask的实例),而时间轮的推进照样是借用了JDK的DelayQueue来实现的。具体做法是将每个用到的TimerTaskList放入DelayQueue,在DelayQueue中会按照TimerTaskList的过期时间expiration来排序,expiration最小的排在头部,Kafka中有一个专门的线程来从DelayQueue中获取到期的任务列表,然后就可以根据expiration来推进时间轮的时间,也可以处理获取到的TimeTaskList,对里面的TimerTaskEntry 执行过期操作或降级时间轮 。

这里最难懂的就是DelayQueue 与 时间轮的关系,文章开头说了DelayQueue不能满足kafka的高性能要求,那么这里怎么还要用到DelayQueue呢?

对于时间轮而言,TimerTaskEntry的插入以及删除,时间复杂度都为O(1),而DelayQueue是一个有序无界的BlockingQueue ,当一个新的定时任务放入DelayQueue的时候,首先要在队列中找到该任务的位置,然后插进去,即使将若干个TimeTaskEntry按照一定的规则封装到TimerTaskList,然后将TimerTaskList插入到DelayQueue,这时当一个新的TimerTaskEntry插入的时候,也是比较麻烦的一个操作,另外如果只用DelayQueue,时间推进的单位设置过大,则精度不足,过小则浪费资源,显然是满足不了kafka的高性能要求。

因此,kafka的设计者就使用了DelayQueue+时间轮的方式,来保证kafka的高性能定时任务的执行,Delayqueue负责时间轮的推进工作,时间轮则负责将每个定时任务TimerTaskEntry按照时间顺序插入以及删除,然后又使用专门的一个线程来从DelayQueue中获取到期的任务列表,然后执行对应的操作,这样就保证了kafka的高性能运行。