转载请声明出处哦~,本篇文章发布于luozhiyun的博客:https://www.luozhiyun.com/archives/444java
最近在工做中有一个需求,简单来讲就是在短期内会建立上百万个定时任务,建立的时候会将对应的金额相加,防止超售,须要过半个小时再去核对数据,若是数据对不上就须要将加上的金额再减回去。git
这个需求若是用Go内置的Timer来作的话性能比较低下,由于Timer是使用最小堆来实现的,建立和删除的时间复杂度都为 O(log n)。若是使用时间轮的话则是O(1)性能会好不少。github
对于时间轮来讲,我之前写过一篇java版的时间轮算法分析:https://www.luozhiyun.com/archives/59,此次来看看Go语言的时间轮实现,顺便你们有兴趣的也能够对比一下二者的区别,以及我写文章的水平和一年多前有没有提高,哈哈哈。算法
时间轮的运用实际上是很是的普遍的,在 Netty、Akka、Quartz、ZooKeeper、Kafka 等组件中都存在时间轮的踪迹。下面用Go实现的时间轮是以Kafka的代码为原型来实现的,完整代码:https://github.com/devYun/timingwheel。apache
在时间轮中存储任务的是一个环形队列,底层采用数组实现,数组中的每一个元素能够存放一个定时任务列表。定时任务列表是一个环形的双向链表,链表中的每一项表示的都是定时任务项,其中封装了真正的定时任务。数组
时间轮由多个时间格组成,每一个时间格表明当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用 wheelSize 来表示,那么整个时间轮的整体时间跨度(interval)能够经过公式 tickMs×wheelSize 计算得出。并发
时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍。currentTime指向的地方是表示到期的时间格,表示须要处理的时间格所对应的链表中的全部任务。app
以下图是一个tickMs为1s,wheelSize等于10的时间轮,每一格里面放的是一个定时任务链表,链表里面存有真正的任务项:异步
初始状况下表盘指针 currentTime 指向时间格0,若时间轮的 tickMs 为 1ms 且 wheelSize 等于10,那么interval则等于10s。以下图此时有一个定时为2s的任务插进来会存放到时间格为2的任务链表中,用红色标记。随着时间的不断推移,指针 currentTime 不断向前推动,若是过了2s,那么 currentTime 会指向时间格2的位置,会将此时间格的任务链表获取出来处理。函数
若是当前的指针 currentTime 指向的是2,此时若是插入一个9s的任务进来,那么新来的任务会服用原来的时间格链表,会存放到时间格1中
这里所讲的时间轮都是简单时间轮,只有一层,整体时间范围在 currentTime 和 currentTime+interval 之间。若是如今有一个15s的定时任务是须要从新开启一个时间轮,设置一个时间跨度至少为15s的时间轮才够用。可是这样扩充是没有底线的,若是须要一个1万秒的时间轮,那么就须要一个这么大的数组去存放,不只占用很大的内存空间,并且也会由于须要遍历这么大的数组从而拉低效率。
所以引入了层级时间轮的概念。
如图是一个两层的时间轮,第二层时间轮也是由10个时间格组成,每一个时间格的跨度是10s。第二层的时间轮的 tickMs 为第一层时间轮的 interval,即10s。每一层时间轮的 wheelSize 是固定的,都是10,那么第二层的时间轮的整体时间跨度 interval 为100s。
图中展现了每一个时间格对应的过时时间范围, 咱们能够清晰地看到, 第二层时间轮的第0个时间格的过时时间范围是 [0,9]。也就是说, 第二层时间轮的一个时间格就能够表示第一层时间轮的全部(10个)时间格;
若是向该时间轮中添加一个15s的任务,那么当第一层时间轮容纳不下时,进入第二层时间轮,并插入到过时时间为[10,19]的时间格中。
随着时间的流逝,当本来15s的任务还剩下5s的时候,这里就有一个时间轮降级的操做,此时第一层时间轮的整体时间跨度已足够,此任务被添加到第一层时间轮到期时间为5的时间格中,以后再经历5s后,此任务真正到期,最终执行相应的到期操做。
由于咱们这个Go语言版本的时间轮代码是仿照Kafka写的,因此在具体实现时间轮 TimingWheel 时还有一些小细节:
type TimingWheel struct { // 时间跨度,单位是毫秒 tick int64 // in milliseconds // 时间轮个数 wheelSize int64 // 总跨度 interval int64 // in milliseconds // 当前指针指向时间 currentTime int64 // in milliseconds // 时间格列表 buckets []*bucket // 延迟队列 queue *delayqueue.DelayQueue // 上级的时间轮引用 overflowWheel unsafe.Pointer // type: *TimingWheel exitC chan struct{} waitGroup waitGroupWrapper }
tick、wheelSize、interval、currentTime都比较好理解,buckets字段表明的是时间格列表,queue是一个延迟队列,全部的任务都是经过延迟队列来进行触发,overflowWheel是上层时间轮的引用。
type bucket struct { // 任务的过时时间 expiration int64 mu sync.Mutex // 相同过时时间的任务队列 timers *list.List }
bucket里面实际上封装的是时间格里面的任务队列,里面放入的是相同过时时间的任务,到期后会将队列timers拿出来进行处理。这里有个有意思的地方是因为会有多个线程并发的访问bucket,因此须要用到原子类来获取int64位的值,为了保证32位系统上面读取64位数据的一致性,须要进行64位对齐。具体的能够看这篇:https://www.luozhiyun.com/archives/429,讲的是对内存对齐的思考。
type Timer struct { // 到期时间 expiration int64 // in milliseconds // 要被执行的具体任务 task func() // Timer所在bucket的指针 b unsafe.Pointer // type: *bucket // bucket列表中对应的元素 element *list.Element }
Timer是时间轮的最小执行单元,是定时任务的封装,到期后会调用task来执行任务。
例如如今初始化一个tick是1s,wheelSize是10的时间轮:
func main() { tw := timingwheel.NewTimingWheel(time.Second, 10) tw.Start() } func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel { // 将传入的tick转化成毫秒 tickMs := int64(tick / time.Millisecond) // 若是小于零,那么panic if tickMs <= 0 { panic(errors.New("tick must be greater than or equal to 1ms")) } // 设置开始时间 startMs := timeToMs(time.Now().UTC()) // 初始化TimingWheel return newTimingWheel( tickMs, wheelSize, startMs, delayqueue.New(int(wheelSize)), ) } func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel { // 初始化buckets的大小 buckets := make([]*bucket, wheelSize) for i := range buckets { buckets[i] = newBucket() } // 实例化TimingWheel return &TimingWheel{ tick: tickMs, wheelSize: wheelSize, // currentTime必须是tickMs的倍数,因此这里使用truncate进行修剪 currentTime: truncate(startMs, tickMs), interval: tickMs * wheelSize, buckets: buckets, queue: queue, exitC: make(chan struct{}), } }
初始化十分简单,你们能够看看上面的代码注释便可。
下面咱们看看start方法:
func (tw *TimingWheel) Start() { // Poll会执行一个无限循环,将到期的元素放入到queue的C管道中 tw.waitGroup.Wrap(func() { tw.queue.Poll(tw.exitC, func() int64 { return timeToMs(time.Now().UTC()) }) }) // 开启无限循环获取queue中C的数据 tw.waitGroup.Wrap(func() { for { select { // 从队列里面出来的数据都是到期的bucket case elem := <-tw.queue.C: b := elem.(*bucket) // 时间轮会将当前时间 currentTime 往前移动到 bucket的到期时间 tw.advanceClock(b.Expiration()) // 取出bucket队列的数据,并调用addOrRun方法执行 b.Flush(tw.addOrRun) case <-tw.exitC: return } } }) }
这里使用了util封装的一个Wrap方法,这个方法会起一个goroutines异步执行传入的函数,具体的能够到我上面给出的连接去看源码。
Start方法会启动两个goroutines。第一个goroutines用来调用延迟队列的queue的Poll方法,这个方法会一直循环获取队列里面的数据,而后将到期的数据放入到queue的C管道中;第二个goroutines会无限循环获取queue中C的数据,若是C中有数据表示已经到期,那么会先调用advanceClock方法将当前时间 currentTime 往前移动到 bucket的到期时间,而后再调用Flush方法取出bucket中的队列,并调用addOrRun方法执行。
func (tw *TimingWheel) advanceClock(expiration int64) { currentTime := atomic.LoadInt64(&tw.currentTime) // 过时时间大于等于(当前时间+tick) if expiration >= currentTime+tw.tick { // 将currentTime设置为expiration,从而推动currentTime currentTime = truncate(expiration, tw.tick) atomic.StoreInt64(&tw.currentTime, currentTime) // Try to advance the clock of the overflow wheel if present // 若是有上层时间轮,那么递归调用上层时间轮的引用 overflowWheel := atomic.LoadPointer(&tw.overflowWheel) if overflowWheel != nil { (*TimingWheel)(overflowWheel).advanceClock(currentTime) } } }
advanceClock方法会根据到期时间来重新设置currentTime,从而推动时间轮前进。
func (b *bucket) Flush(reinsert func(*Timer)) { var ts []*Timer b.mu.Lock() // 循环获取bucket队列节点 for e := b.timers.Front(); e != nil; { next := e.Next() t := e.Value.(*Timer) // 将头节点移除bucket队列 b.remove(t) ts = append(ts, t) e = next } b.mu.Unlock() b.SetExpiration(-1) // TODO: Improve the coordination with b.Add() for _, t := range ts { reinsert(t) } }
Flush方法会根据bucket里面timers列表进行遍历插入到ts数组中,而后调用reinsert方法,这里是调用的addOrRun方法。
func (tw *TimingWheel) addOrRun(t *Timer) { // 若是已通过期,那么直接执行 if !tw.add(t) { // 异步执行定时任务 go t.task() } }
addOrRun会调用add方法检查传入的定时任务Timer是否已经到期,若是到期那么异步调用task方法直接执行。add方法咱们下面会接着分析。
整个start执行流程如图:
func main() { tw := timingwheel.NewTimingWheel(time.Second, 10) tw.Start() // 添加任务 tw.AfterFunc(time.Second*15, func() { fmt.Println("The timer fires") exitC <- time.Now().UTC() }) }
咱们经过AfterFunc方法添加一个15s的定时任务,若是到期了,那么执行传入的函数。
func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer { t := &Timer{ expiration: timeToMs(time.Now().UTC().Add(d)), task: f, } tw.addOrRun(t) return t }
AfterFunc方法回根据传入的任务到期时间,以及到期须要执行的函数封装成Timer,调用addOrRun方法。addOrRun方法咱们上面已经看过了,会根据到期时间来决定是否须要执行定时任务。
下面咱们来看一下add方法:
func (tw *TimingWheel) add(t *Timer) bool { currentTime := atomic.LoadInt64(&tw.currentTime) // 已通过期 if t.expiration < currentTime+tw.tick { // Already expired return false // 到期时间在第一层环内 } else if t.expiration < currentTime+tw.interval { // Put it into its own bucket // 获取时间轮的位置 virtualID := t.expiration / tw.tick b := tw.buckets[virtualID%tw.wheelSize] // 将任务放入到bucket队列中 b.Add(t) // 若是是相同的时间,那么返回false,防止被屡次插入到队列中 if b.SetExpiration(virtualID * tw.tick) { // 将该bucket加入到延迟队列中 tw.queue.Offer(b, b.Expiration()) } return true } else { // Out of the interval. Put it into the overflow wheel // 若是放入的到期时间超过第一层时间轮,那么放到上一层中去 overflowWheel := atomic.LoadPointer(&tw.overflowWheel) if overflowWheel == nil { atomic.CompareAndSwapPointer( &tw.overflowWheel, nil, // 须要注意的是,这里tick变成了interval unsafe.Pointer(newTimingWheel( tw.interval, tw.wheelSize, currentTime, tw.queue, )), ) overflowWheel = atomic.LoadPointer(&tw.overflowWheel) } // 往上递归 return (*TimingWheel)(overflowWheel).add(t) } }
add方法根据到期时间来分红了三部分,第一部分是小于当前时间+tick,表示已经到期,那么返回false执行任务便可;
第二部分的判断会根据expiration是否小于时间轮的跨度,若是小于的话表示该定时任务能够放入到当前时间轮中,经过取模找到buckets对应的时间格并放入到bucket队列中,SetExpiration方法会根据传入的参数来判断是否已经执行过延迟队列的Offer方法,防止重复插入;
第三部分表示该定时任务的时间跨度超过了当前时间轮,须要升级到上一层的时间轮中。须要注意的是,上一层的时间轮的tick是当前时间轮的interval,延迟队列仍是同一个,而后设置为指针overflowWheel,并调用add方法往上层递归。
到这里时间轮已经讲完了,不过还有须要注意的地方,咱们在用上面的时间轮实现中,使用了DelayQueue加环形队列的方式实现了时间轮。对定时任务项的插入和删除操做而言,TimingWheel时间复杂度为 O(1),在DelayQueue中的队列使用的是优先队列,时间复杂度是O(log n),可是因为buckets列表其实是很是小的,因此并不会影响性能。
https://github.com/RussellLuo/timingwheel