简单说说Kafka中的时间轮算法

零、时间轮定义

简单说说时间轮吧,它是一个高效的延时队列,或者说定时器。实际上如今网上对于时间轮算法的解释不少,定义也很全,这里引用一下朱小厮博客里出现的定义:java

参考下图,Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每一个元素能够存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask。git

时间轮

若是你理解了上面的定义,那么就没必要往下看了。但若是你第一次看到和我同样懵比,而且有很多疑问,那么这篇博文将带你进一步了解时间轮,甚至理解时间轮算法。github

若是有兴趣,能够去看看其余的定时器 你真的了解延时队列吗。博主认为,时间轮定时器最大的优势:算法

    1. 是任务的添加与移除,都是O(1)级的复杂度;
    1. 不会占用大量的资源;
    1. 只须要有一个线程去推动时间轮就能够工做了。

咱们将对时间轮作层层推动的解析:

1、为何使用环形队列

假设咱们如今有一个很大的数组,专门用于存放延时任务。它的精度达到了毫秒级!那么咱们的延迟任务实际上须要将定时的那个时间简单转换为毫秒便可,而后将定时任务存入其中:数组

好比说当前的时间是2018/10/24 19:43:45,那么就将任务存入Task[1540381425000],value则是定时任务的内容。数据结构

private Task[很长] tasks;

public List<Task> getTaskList(long timestamp) {
	return task.get(timestamp)
}

// 伪装这里真的能一毫秒一个循环
public void run(){
	while (true){
		getTaskList(System.currentTimeMillis()).后台执行()
		Thread.sleep(1);
	}
}

假如这个数组长度达到了亿亿级,咱们确实能够这么干。 那若是将精度缩减到秒级呢?咱们也须要一个百亿级长度的数组。post

先不说内存够不够,显然你的定时器要这么大的内存显然很浪费。this

固然若是咱们本身写一个map,并保证它不存在hash冲突问题,那也是彻底可行的。(我不肯定个人想法是否正确,若是错误,请指出).net

/* 一个精度为秒级的延时任务管理类 */
private Map<Long, Task> taskMap;

public List<Task> getTaskList(long timestamp) {
	return taskMap.get(timestamp - timestamp % 1000)
}

// 新增一个任务
public void addTask(long timestamp, Task task) {
	List<Task> taskList = getTaskList(timestamp - timestamp % 1000);
		if (taskList == null){
			taskList = new ArrayList();
		}
	taskList.add(task);
}

// 伪装这里真的能一秒一个循环
public void run(){
	while (true){
		getTaskList(System.currentTimeMillis()).后台执行()
		Thread.sleep(1000);
	}
}

其实时间轮就是一个不存在hash冲突的数据结构

抛开其余疑问,咱们看看手腕上的手表(若是没有去找个钟表,或者想象一个),是否是不管当前是什么时间,总能用咱们的表盘去表示它(忽略精度)线程

就拿秒表来讲,它老是落在 0 - 59 秒,每走一圈,又会从新开始。

用伪代码模拟一下咱们这个秒表:

private Bucket[60] buckets;// 表示60秒

public void addTask(long timestamp, Task task) {
	Bucket bucket = buckets[timestamp / 1000 % 60];
	bucket.add(task);
}

public Bucket getBucket(long timestamp) {
	return buckets[timestamp / 1000 % 60];
}

// 伪装这里真的能一秒一个循环
public void run(){
	while (true){
		getBucket(System.currentTimeMillis()).后台执行()
		Thread.sleep(1000);
	}
}

这样,咱们的时间总能落在0 - 59任意一个bucket上,就如同咱们的秒钟老是落在0 - 59刻度上同样,这即是时间轮的环形队列

2、表示的时间有限

可是细心的小伙伴也会发现这么一个问题:若是只能表示60秒内的定时任务应该怎么存储与取出,那是否是太有局限性了?若是想要加入一小时后的延迟任务,该怎么办?

其实仍是能够看一看钟表,对于只有三个指针的表(通常的表)来讲,最大能表示12个小时,超过了12小时这个范围,时间就会产生歧义。若是咱们加多几个指针呢?好比说咱们有秒针,分针,时针,上下午针,天针,月针,年针...... 那不就能表示很长很长的一段时间了?并且,它并不须要占用很大的内存。

好比说秒针咱们能够用一个长度为60的数组来表示,分针也一样能够用一个长度为60的数组来表示,时针能够用一个长度为24的数组来表示。那么表示一天内的全部时间,只须要三个数组便可。


动手来作吧,咱们将这个数据结构称做时间轮,tickMs表示一个刻度,好比说上面说的一秒。wheelSize表示一圈有多少个刻度,即上面说的60。interval表示一圈能表示多少时间,即 tickMs * wheelSize = 60秒。

overflowWheel表示上一层的时间轮,好比说,对于秒钟来讲,overflowWheel就表示分钟,以此类推。

public class TimeWheel {

    /** 一个时间槽的时间 */
    private long tickMs;

    /** 时间轮大小 */
    private int wheelSize;

    /** 时间跨度 */
    private long interval;

    /** 槽 */
    private Bucket[] buckets;

    /** 时间轮指针 */
    private long currentTimestamp;

    /** 上层时间轮 */
    private volatile TimeWheel overflowWheel;

    public TimeWheel(long tickMs, int wheelSize, long currentTimestamp) {
        this.currentTimestamp = currentTimestamp;
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.interval = tickMs * wheelSize;
        this.buckets = new Bucket[wheelSize];
        this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);

        for (int i = 0; i < wheelSize; i++) {
            buckets[i] = new Bucket();
        }
    }
}

将任务添加到时间轮中十分简单,对于每一个时间轮来讲,好比说秒级时间轮,和分级时间轮,都有它本身的过时槽。也就是delayMs < tickMs的时候。

添加延时任务的时候一共就这几种状况:

####1、时间到期

  • 1)好比说有一个任务要在 16:29:07 执行,从秒级时间轮中来看,当咱们的当前时间走到16:29:06的时候,则表示这个任务已通过期了。由于它的delayMs = 1000ms,小于了咱们的秒级时间轮的tickMs(1000ms)。
    1. 好比说有一个任务要在 16:41:25 执行,从分级时间轮中来看,当咱们的当前时间走到 16:41的时候(分级时间轮没有秒针!它的最小精度是分钟(必定要理解这一点)),则表示这个任务已经到期,由于它的delayMs = 25000ms,小于了咱们的分级时间轮的tickMs(60000ms)。

2、时间未到期,且delayMs小于interval。

对于秒级时间轮来讲,就是延迟时间小于60s,那么确定能找到一个秒钟槽扔进去。

3、时间未到期,且delayMs大于interval。

对于妙级时间轮来讲,就是延迟时间大于等于60s,这时候就须要借助上层时间轮的力量了,很简单的代码实现,就是拿到上层时间轮,而后相似递归同样,把它扔进去。


好比说一个有一个延时为一年后的定时任务,就会在这个递归中不断建立更上层的时间轮,直到找到知足delayMs小于interval的那个时间轮。

这里为了避免把代码写的那么复杂,咱们每一层时间轮的刻度都同样,也就是秒级时间轮表示60秒,上面则表示60分钟,再上面则表示60小时,再上层则表示60个60小时,再上层则表示60个60个60小时 = 216000小时。

也就是若是将最底层时间轮的tickMs(精度)设置为1000ms。wheelSize设置为60。那么只须要5层时间轮,可表示的时间跨度已经长达24年(216000小时)

/**
     * 添加任务到某个时间轮
     */
    public boolean addTask(TimedTask timedTask) {
        long expireTimestamp = timedTask.getExpireTimestamp();
        long delayMs = expireTimestamp - currentTimestamp;
        if (delayMs < tickMs) {// 到期了
            return false;
        } else {

            // 扔进当前时间轮的某个槽中,只有时间【大于某个槽】,才会放进去
            if (delayMs < interval) {
                int bucketIndex = (int) (((delayMs + currentTimestamp) / tickMs) % wheelSize);

                Bucket bucket = buckets[bucketIndex];
                bucket.addTask(timedTask);
            } else {
			// 当maybeInThisBucket大于等于wheelSize时,须要将它扔到上一层的时间轮
                TimeWheel timeWheel = getOverflowWheel();
                timeWheel.addTask(timedTask);
            }
        }
        return true;
    }


   /**
     * 获取或建立一个上层时间轮
     */
	private TimeWheel getOverflowWheel() {
        if (overflowWheel == null) {
            synchronized (this) {
                if (overflowWheel == null) {
                    overflowWheel = new TimeWheel(interval, wheelSize, currentTimestamp, delayQueue);
                }
            }
        }
        return overflowWheel;
    }

固然咱们的时间轮还须要一个指针的推动机制,总不能让时间永远停留在当前吧?推动的时候,同时相似递归,去推动一下上一层的时间轮。

注意:要强调一点的是,咱们这个时间轮更像是电子表,它不存在时间的中间状态,也就是精度这个概念必定要理解好。好比说,对于秒级时间轮来讲,它的精度只能保证到1秒,小于1秒的,都会当成是已到期

对于分级时间轮来讲,它的精度只能保证到1分,小于1分的,都会当成是已到期

/**
     * 尝试推动一下指针
     */
    public void advanceClock(long timestamp) {
        if (timestamp >= currentTimestamp + tickMs) {
            currentTimestamp = timestamp - (timestamp % tickMs);

            if (overflowWheel != null) {
                this.getOverflowWheel()
                    .advanceClock(timestamp);
            }
        }
    }

3、对于高层时间轮来讲,精度愈来愈不许,会不会有影响?

上面说到,分级时间轮,精度只有分钟级,总不能延迟1秒的定时任务和延迟59秒的定时任务同时执行吧?

有这个疑问的同窗很好!实际上很好解决,只需再入时间轮便可。好比说,对于分钟级时间轮来讲,delayMs为1秒和delayMs为59秒的都已通过期,咱们将其取出,再扔进底层的时间轮不就能够了?

1秒的会被扔到秒级时间轮的下一个执行槽中,而59秒的会被扔到秒级时间轮的后59个时间槽中。

细心的同窗会发现,咱们的添加任务方法,返回的是一个bool

public boolean addTask(TimedTask timedTask)

再倒回去好好看看,添加到最底层时间轮失败的(咱们只能直接操做最底层的时间轮,不能直接操做上层的时间轮),是否是会直接返回flase?对于再入失败的任务,咱们直接执行便可。

/**
     * 将任务添加到时间轮
     */
    public void addOrSubmitTask(TimedTask timedTask) {
        if (!timeWheel.addTask(timedTask)) {
            taskExecutor.submit(timedTask.getTask());
        }
    }

4、如何知道一个任务已通过期?

记得咱们将任务存储在槽中嘛?好比说秒级时间轮中,有60个槽,那么一共有60个槽。若是时间轮共有两层,也仅仅只有120个槽。咱们只需将槽扔进一个delayedQueue之中便可。

咱们轮询地从delayedQueue取出已通过期的槽便可。(前面的全部代码,为了简单说明,并无引入这个DelayQueue的概念,因此不用去上面翻了,并无。博主以为...已经看到这里了,应该很明白这个DelayQueue的意义了。

其实简单来讲,实际上定时任务单单使用DelayQueue来实现,也是能够的,可是一旦任务的数量多了起来,达到了百万级,千万级,针对这个delayQueue的增删,将很是的慢。

** 1、面向槽的delayQueue**

而对于时间轮来讲,它只须要往delayQueue里面扔各类槽便可,好比咱们的定时任务长短不一,最长的跨度到了24年,这个delayQueue也仅仅只有300个元素。

** 2、处理过时的槽**

而这个槽到期后,也就是被咱们从delayQueue中poll出来后,咱们只须要将槽中的全部任务循环一次,从新加到新的槽中(添加失败则直接执行)便可。

/**
     * 推动一下时间轮的指针,而且将delayQueue中的任务取出来再从新扔进去
     */
    public void advanceClock(long timeout) {
        try {
            Bucket bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
            if (bucket != null) {
                timeWheel.advanceClock(bucket.getExpire());
                bucket.flush(this::addTask);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

完整的时间轮GitHub,其实就是半抄半本身撸的Kafka时间轮简化版 Timer#main 中模拟了六百万个简单的延时任务,执行的效率很高 ~

相关文章
相关标签/搜索