定时器的实现通常有如下几种:linux
建立 Timer 的代码:nginx
func NewTimer(d Duration) *Timer { c := make(chan Time, 1) t := &Timer{ C: c, r: runtimeTimer{ when: when(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t }
建立 Ticker 的代码:redis
func NewTicker(d Duration) *Ticker { if d <= 0 { panic(errors.New("non-positive interval for NewTicker")) } // Give the channel a 1-element time buffer. // If the client falls behind while reading, we drop ticks // on the floor until the client catches up. c := make(chan Time, 1) t := &Ticker{ C: c, r: runtimeTimer{ when: when(d), period: int64(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t }
Timer 和 Ticker 都是调用的 startTimer(*runtimeTimer)
,区别是 Ticker 比 Timer 多了一个 period 字段。缓存
其中 startTimer()
的声明以下:app
func startTimer(*runtimeTimer)
没有实现,对应的是 runtime/time.go 中的以下函数:函数
// startTimer adds t to the timer heap. //go:linkname startTimer time.startTimer func startTimer(t *timer) { if raceenabled { racerelease(unsafe.Pointer(t)) } addtimer(t) }
这里的参数是 runtime.timer,而传进来时是 time.runtimeTimer,这两个结构体字段是一一对应的:性能
// Interface to timers implemented in package runtime. // Must be in sync with ../runtime/time.go:/^type timer type runtimeTimer struct { tb uintptr i int when int64 period int64 f func(interface{}, uintptr) // NOTE: must not be closure arg interface{} seq uintptr } type timer struct { tb *timersBucket // the bucket the timer lives in i int // heap index // Timer wakes up at when, and then at when+period, ... (period > 0 only) // each time calling f(arg, now) in the timer goroutine, so f must be // a well-behaved function and not block. when int64 period int64 f func(interface{}, uintptr) arg interface{} seq uintptr }
在 startTimer()
中调用了 addtimer()
:ui
func addtimer(t *timer) { tb := t.assignBucket() lock(&tb.lock) ok := tb.addtimerLocked(t) unlock(&tb.lock) if !ok { badTimer() } }
在 runtime/time.go 中有一个全局变量 timers:this
var timers [timersLen]struct { timersBucket // The padding should eliminate false sharing // between timersBucket values. pad [cpu.CacheLinePadSize - unsafe.Sizeof(timersBucket{})%cpu.CacheLinePadSize]byte }
它的结构大概是这样子的:code
timers 包含固定的 64 个 timersBucket,而每一个 timersBucket 中包含多个 *timer
(字段 t)。timersBucket 中的多个 timer 使用最小堆来组织的。
为何是 64 个?
个数最好应该是 GOMAXPROCS 个,可是这样的话就须要动态分配了,64 是根据内存使用和性能之间平衡得出的。
addtimer()
首先肯定一个 timersBucket,而后将 timer 放入这个 bucket 中。
怎么肯定 bucket 的?
func (t *timer) assignBucket() *timersBucket { id := uint8(getg().m.p.ptr().id) % timersLen t.tb = &timers[id].timersBucket return t.tb }
根据当前 G 所在的 P 的 id。
而后是放入 bucket 中的逻辑:
func (tb *timersBucket) addtimerLocked(t *timer) bool { // when must never be negative; otherwise timerproc will overflow // during its delta calculation and never expire other runtime timers. if t.when < 0 { t.when = 1<<63 - 1 } t.i = len(tb.t) tb.t = append(tb.t, t) if !siftupTimer(tb.t, t.i) { return false } if t.i == 0 { // siftup moved to top: new earliest deadline. if tb.sleeping && tb.sleepUntil > t.when { tb.sleeping = false notewakeup(&tb.waitnote) } if tb.rescheduling { tb.rescheduling = false goready(tb.gp, 0) } if !tb.created { tb.created = true go timerproc(tb) } } return true }
首先是加入到 t 切片中,而后使用 siftupTimer()
来维护最小堆的性质。t.i == 0
说明当前 bucket 中没有其余 timer。
bucket 第一个添加 timer 时会启动一个协程调用 timerproc
,代码以下:
func timerproc(tb *timersBucket) { tb.gp = getg() for { lock(&tb.lock) tb.sleeping = false now := nanotime() delta := int64(-1) for { // 列表是空的,跳出循环 if len(tb.t) == 0 { delta = -1 break } // 堆上最小的 timer,最老的那个 t := tb.t[0] delta = t.when - now // 还没到时间 if delta > 0 { break } ok := true // ticker,从新计算到期时间,不从堆上删除 if t.period > 0 { // leave in heap but adjust next time to fire t.when += t.period * (1 + -delta/t.period) if !siftdownTimer(tb.t, 0) { ok = false } } else { // timer, remove from heap last := len(tb.t) - 1 if last > 0 { tb.t[0] = tb.t[last] tb.t[0].i = 0 } tb.t[last] = nil tb.t = tb.t[:last] if last > 0 { if !siftdownTimer(tb.t, 0) { ok = false } } t.i = -1 // mark as removed } f := t.f arg := t.arg seq := t.seq unlock(&tb.lock) if !ok { badTimer() } if raceenabled { raceacquire(unsafe.Pointer(t)) } f(arg, seq) lock(&tb.lock) } if delta < 0 || faketime > 0 { // No timers left - put goroutine to sleep. tb.rescheduling = true goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1) continue } // At least one timer pending. Sleep until then. tb.sleeping = true tb.sleepUntil = now + delta noteclear(&tb.waitnote) unlock(&tb.lock) notetsleepg(&tb.waitnote, delta) } }
若是当前 t 列表是空的,那么 rescheduling = true
,而后将当前协程挂起。何时再唤醒呢? 在 addtimer()
中若是 rescheduling
为 true,那么就将协程唤醒继续 for 循环。
若是堆上最小的元素(最早到期的)还没到期,那么 sleeping = true
,同时会 sleep 知道该元素到期。若是在 sleep 期间又添加了一个元素,而这个元素比堆上全部的 timer 都更快到期,在 addtimer()
中会经过 waitnote 来唤醒,继续 for 循环来处理。
若是堆上最小的元素已经到期了,应该给这个到期的 timer.C 发送当前时间。若是 timer 是一个 Ticker,那么会修改它的到期时间,不从堆上移走。若是 timer 是一个 Timer,是一次性的,那么会从堆上删除它。
如何计算 Ticker 的下次到期时间?
t.when += t.period * (1 + -delta/t.period)
这里的 delta 是 t.when - now
的结果,表示距离过时时间已通过去了多久,计算新的过时时间时将这个值减去了。
处理 timer 就是调用 timer.f()
,对应的是 timer.sendTime()
:
func sendTime(c interface{}, seq uintptr) { // Non-blocking send of time on c. // Used in NewTimer, it cannot block anyway (buffer). // Used in NewTicker, dropping sends on the floor is // the desired behavior when the reader gets behind, // because the sends are periodic. select { case c.(chan Time) <- Now(): default: } }
Timer 和 Ticker 的 c 都是 make(chan Time, 1)
。对于 Timer 来讲,由于有一个缓存,因此会执行到 case 分支。对于 Ticker 来讲,由于会屡次调用这个方法,若是一直没有从 Ticker.C 中拿取时间,那么这里会调用 default 分支,也就是后面的时间会被丢弃,以此来保证 timerproc
不会阻塞。
Timer 和 Ticker 都是经过 runtime/time.go 中的 stopTimer()
来中止的:
// stopTimer removes t from the timer heap if it is there. // It returns true if t was removed, false if t wasn't even there. //go:linkname stopTimer time.stopTimer func stopTimer(t *timer) bool { return deltimer(t) } // Delete timer t from the heap. // Do not need to update the timerproc: if it wakes up early, no big deal. func deltimer(t *timer) bool { if t.tb == nil { // t.tb can be nil if the user created a timer // directly, without invoking startTimer e.g // time.Ticker{C: c} // In this case, return early without any deletion. // See Issue 21874. return false } tb := t.tb lock(&tb.lock) removed, ok := tb.deltimerLocked(t) unlock(&tb.lock) if !ok { badTimer() } return removed } func (tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) { // t may not be registered anymore and may have // a bogus i (typically 0, if generated by Go). // Verify it before proceeding. i := t.i last := len(tb.t) - 1 if i < 0 || i > last || tb.t[i] != t { return false, true } if i != last { tb.t[i] = tb.t[last] tb.t[i].i = i } tb.t[last] = nil tb.t = tb.t[:last] ok = true if i != last { if !siftupTimer(tb.t, i) { ok = false } if !siftdownTimer(tb.t, i) { ok = false } } return true, ok }
timer.i
表示这个 timer 在堆上的索引。对于 Timer 来讲,在到期后可能会从堆上删掉了,这时 timerproc()
函数会将 timer.i 标记为 -1。
删除就是将 timer 和堆上最后一个元素交换,而后从 t 中删除,最后从新维护下堆的性质。
若是不调用 Timer.Stop()/Ticker.Stop() 会发生什么?
Timer 在到期后会被 timerproc()
函数删除,但及时主动删除能够减轻 timersBucket 的压力,尤为是在定时器比较多的状况下。
Ticker 若是不调用 Stop 会一直存在堆上。