本文是《Go语言调度器源代码情景分析》系列的第17篇,也是第三章《Goroutine调度策略》的第2小节。linux
上一小节咱们分析了从全局运行队列与工做线程的本地运行队列获取goroutine的过程,这一小节咱们继续分析因没法从上述两个队列中拿到须要运行的goroutine而致使的从其它工做线程的本地运行队列中盗取goroutine的过程。算法
findrunnable() 函数负责处理与盗取相关的逻辑,该函数代码很繁杂,由于它还作了与gc和netpoll等相关的事情,为了避免影响咱们的分析思路,这里咱们仍然把不相关的代码删掉了,不过代码仍是比较多,但总结起来就一句话:尽力去各个运行队列中寻找goroutine,若是实在找不到则进入睡眠状态。下面是代码细节:app
runtime/proc.go : 2176dom
// Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from global queue, poll network. func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() // The conditions here and in handoffp must agree: if // findrunnable would return a G to run, handoffp must start // an M. top: _p_ := _g_.m.p.ptr() ...... // local runq //再次看一下本地运行队列是否有须要运行的goroutine if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq //再看看全局运行队列是否有须要运行的goroutine if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } ...... // Steal work from other P's. //若是除了当前工做线程还在运行外,其它工做线程已经处于休眠中,那么也就不用去偷了,确定没有 procs := uint32(gomaxprocs) if atomic.Load(&sched.npidle) == procs-1 { // Either GOMAXPROCS=1 or everybody, except for us, is idle already. // New work can appear from returning syscall/cgocall, network or timers. // Neither of that submits to local run queues, so no point in stealing. goto stop } // If number of spinning M's >= number of busy P's, block. // This is necessary to prevent excessive CPU consumption // when GOMAXPROCS>>1 but the program parallelism is low. // 这个判断主要是为了防止由于寻找可运行的goroutine而消耗太多的CPU。 // 由于已经有足够多的工做线程正在寻找可运行的goroutine,让他们去找就行了,本身偷个懒去睡觉 if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } if !_g_.m.spinning { //设置m的状态为spinning _g_.m.spinning = true //处于spinning状态的m数量加一 atomic.Xadd(&sched.nmspinning, 1) } //从其它p的本地运行队列盗取goroutine for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2 // first look for ready queues with more than 1 g if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { return gp, false } } } stop: ...... // Before we drop our P, make a snapshot of the allp slice, // which can change underfoot once we no longer block // safe-points. We don't need to snapshot the contents because // everything up to cap(allp) is immutable. allpSnapshot := allp // return P and block lock(&sched.lock) ...... if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } // 当前工做线程解除与p之间的绑定,准备去休眠 if releasep() != _p_ { throw("findrunnable: wrong p") } //把p放入空闲队列 pidleput(_p_) unlock(&sched.lock) // Delicate dance: thread transitions from spinning to non-spinning state, // potentially concurrently with submission of new goroutines. We must // drop nmspinning first and then check all per-P queues again (with // #StoreLoad memory barrier in between). If we do it the other way around, // another thread can submit a goroutine after we've checked all run queues // but before we drop nmspinning; as the result nobody will unpark a thread // to run the goroutine. // If we discover new work below, we need to restore m.spinning as a signal // for resetspinning to unpark a new worker thread (because there can be more // than one starving goroutine). However, if after discovering new work // we also observe no idle Ps, it is OK to just park the current thread: // the system is fully loaded so no spinning threads are required. // Also see "Worker thread parking/unparking" comment at the top of the file. wasSpinning := _g_.m.spinning if _g_.m.spinning { //m即将睡眠,状态再也不是spinning _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // check all runqueues once again // 休眠以前再看一下是否有工做要作 for _, _p_ := range allpSnapshot { if !runqempty(_p_) { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } break } } ...... //休眠 stopm() goto top }
从上面的代码能够看到,工做线程在放弃寻找可运行的goroutine而进入睡眠以前,会反复尝试从各个运行队列寻找须要运行的goroutine,可谓是全力以赴了。这个函数须要重点注意如下两点:ide
第一点,工做线程M的自旋状态(spinning)。工做线程在从其它工做线程的本地运行队列中盗取goroutine时的状态称为自旋状态。从上面代码能够看到,当前M在去其它p的运行队列盗取goroutine以前把spinning标志设置成了true,同时增长处于自旋状态的M的数量,而盗取结束以后则把spinning标志还原为false,同时减小处于自旋状态的M的数量,从后面的分析咱们能够看到,当有空闲P又有goroutine须要运行的时候,这个处于自旋状态的M的数量决定了是否须要唤醒或者建立新的工做线程。函数
第二点,盗取算法。盗取过程用了两个嵌套for循环。内层循环实现了盗取逻辑,从代码能够看出盗取的实质就是遍历allp中的全部p,查看其运行队列是否有goroutine,若是有,则取其一半到当前工做线程的运行队列,而后从findrunnable返回,若是没有则继续遍历下一个p。但这里为了保证公平性,遍历allp时并非固定的从allp[0]即第一个p开始,而是从随机位置上的p开始,并且遍历的顺序也随机化了,并非如今访问了第i个p下一次就访问第i+1个p,而是使用了一种伪随机的方式遍历allp中的每一个p,防止每次遍历时使用一样的顺序访问allp中的元素。下面是这个算法的伪代码:ui
offset := uint32(random()) % nprocs coprime := 随机选取一个小于nprocs且与nprocs互质的数 for i := 0; i < nprocs; i++ { p := allp[offset] 从p的运行队列偷取goroutine if 偷取成功 { break } offset += coprime offset = offset % nprocs }
下面举例说明一下上述算法过程,现假设nprocs为8,也就是一共有8个p。atom
若是第一次随机选择的offset = 6,coprime = 3(3与8互质,知足算法要求)的话,则从allp切片中偷取的下标顺序为6, 1, 4, 7, 2, 5, 0, 3,计算过程:spa
6,(6+3)%8=1,(1+3)%8=4, (4+3)%8=7, (7+3)%8=2, (2+3)%8=5, (5+3)%8=0, (0+3)%8=3
若是第二次随机选择的offset = 4,coprime = 5的话,则从allp切片中偷取的下标顺序为1, 6, 3, 0, 5, 2, 7, 4,计算过程:操作系统
1,(1+5)%8=6,(6+5)%8=3, (3+5)%8=0, (0+5)%8=5, (5+5)%8=2, (2+5)%8=7, (7+5)%8=4
能够看到只要随机数不同,偷取p的顺序也不同,但能够保证通过8次循环,每一个p都会被访问到。能够用数论知识证实,无论nprocs是多少,这个算法均可以保证通过nprocs次循环,每一个p均可以获得访问。
挑选出盗取的对象p以后,则调用runqsteal盗取p的运行队列中的goroutine,runqsteal函数再调用runqgrap从p的队列中批量拿出多个goroutine,这两个函数自己比较简单,但runqgrab有一个小细节须要注意一下,见下面代码:
runtime/proc.go : 4854
// Grabs a batch of goroutines from _p_'s runnable queue into batch. // Batch is a ring buffer starting at batchHead. // Returns number of grabbed goroutines. // Can be executed by any P. func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 { for { h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer n := t - h //计算队列中有多少个goroutine n = n - n/2 //取队列中goroutine个数的一半 if n == 0 { ...... return ...... } //小细节:按理说队列中的goroutine个数最多就是len(_p_.runq), //因此n的最大值也就是len(_p_.runq)/2,那为何须要这个判断呢? if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t continue } ...... } }
代码中n的计算很简单,从计算过程来看n应该是runq队列中goroutine数量的一半,它的最大值不会超过队列容量的一半,但为何这里的代码却恰恰要去判断n是否大于队列容量的一半呢?这里关键点在于读取runqhead和runqtail是两个操做而非一个原子操做,当咱们读取runqhead以后但还未读取runqtail以前,若是有其它线程快速的在增长(这是彻底有可能的,其它偷取者从队列中偷取goroutine会增长runqhead,而队列的全部者往队列中添加goroutine会增长runqtail)这两个值,则会致使咱们读取出来的runqtail已经远远大于咱们以前读取出来放在局部变量h里面的runqhead了,也就是代码注释中所说的h和t已经不一致了,因此这里须要这个if判断来检测异常状况。
工做线程进入睡眠
分析完盗取过程,咱们继续回到findrunnable函数。
若是工做线程通过屡次努力一直找不到须要运行的goroutine则调用stopm进入睡眠状态,等待被其它工做线程唤醒。
runtime/proc.go : 1918
// Stops execution of the current m until new work is available. // Returns with acquired P. func stopm() { _g_ := getg() if _g_.m.locks != 0 { throw("stopm holding locks") } if _g_.m.p != 0 { throw("stopm holding p") } if _g_.m.spinning { throw("stopm spinning") } lock(&sched.lock) mput(_g_.m) //把m结构体对象放入sched.midle空闲队列 unlock(&sched.lock) notesleep(&_g_.m.park) //进入睡眠状态 //被其它工做线程唤醒 noteclear(&_g_.m.park) acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 }
stopm的核心是调用mput把m结构体对象放入sched的midle空闲队列,而后经过notesleep(&m.park)函数让本身进入睡眠状态。
note是go runtime实现的一次性睡眠和唤醒机制,一个线程能够经过调用notesleep(*note)进入睡眠状态,而另一个线程则能够经过notewakeup(*note)把其唤醒。note的底层实现机制跟操做系统相关,不一样系统使用不一样的机制,好比linux下使用的futex系统调用,而mac下则是使用的pthread_cond_t条件变量,note对这些底层机制作了一个抽象和封装,这种封装给扩展性带来了很大的好处,好比当睡眠和唤醒功能须要支持新平台时,只须要在note层增长对特定平台的支持便可,不须要修改上层的任何代码。
回到stopm,当从notesleep函数返回后,须要再次绑定一个p,而后返回到findrunnable函数继续从新寻找可运行的goroutine,一旦找到可运行的goroutine就会返回到schedule函数,并把找到的goroutine调度起来运行,如何把goroutine调度起来运行的代码咱们已经分析过了。如今继续看notesleep函数。
runtime/lock_futex.go : 139
func notesleep(n *note) { gp := getg() if gp != gp.m.g0 { throw("notesleep not on g0") } ns := int64(-1) //超时时间设置为-1,表示无限期等待 if *cgo_yield != nil { // Sleep for an arbitrary-but-moderate interval to poll libc interceptors. ns = 10e6 } //使用循环,保证不是意外被唤醒 for atomic.Load(key32(&n.key)) == 0 { gp.m.blocked = true futexsleep(key32(&n.key), 0, ns) if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) } gp.m.blocked = false } }
notesleep函数调用futexsleep进入睡眠,这里之因此须要用一个循环,是由于futexsleep有可能意外从睡眠中返回,因此从futexsleep函数返回后还须要检查note.key是否仍是0,若是是0则表示并非其它工做线程唤醒了咱们,只是futexsleep意外返回了,须要再次调用futexsleep进入睡眠。
futexsleep调用futex函数进入睡眠。
runtime/os_linux.go : 32
// Atomically, // if(*addr == val) sleep // Might be woken up spuriously; that's allowed. // Don't sleep longer than ns; ns < 0 means forever. //go:nosplit func futexsleep(addr *uint32, val uint32, ns int64) { var ts timespec // Some Linux kernels have a bug where futex of // FUTEX_WAIT returns an internal error code // as an errno. Libpthread ignores the return value // here, and so can we: as it says a few lines up, // spurious wakeups are allowed. if ns < 0 { //调用futex进入睡眠 futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0) return } // It's difficult to live within the no-split stack limits here. // On ARM and 386, a 64-bit divide invokes a general software routine // that needs more stack than we can afford. So we use timediv instead. // But on real 64-bit systems, where words are larger but the stack limit // is not, even timediv is too heavy, and we really need to use just an // ordinary machine instruction. if sys.PtrSize == 8 { ts.set_sec(ns / 1000000000) ts.set_nsec(int32(ns % 1000000000)) } else { ts.tv_nsec = 0 ts.set_sec(int64(timediv(ns, 1000000000, (*int32)(unsafe.Pointer(&ts.tv_nsec))))) } futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0) }
futex是go汇编实现的函数,主要功能就是执行futex系统调用进入操做系统内核进行睡眠。
runtime/sys_linux_amd64.s : 525
// int64 futex(int32 *uaddr, int32 op, int32 val, // struct timespec *timeout, int32 *uaddr2, int32 val2); TEXT runtime·futex(SB),NOSPLIT,$0 #下面的6条指令在为futex系统调用准备参数 MOVQ addr+0(FP), DI MOVL op+8(FP), SI MOVL val+12(FP), DX MOVQ ts+16(FP), R10 MOVQ addr2+24(FP), R8 MOVL val3+32(FP), R9 MOVL $SYS_futex, AX #系统调用编号放入AX寄存器 SYSCALL #执行futex系统调用进入睡眠,从睡眠中被唤醒后接着执行下一条MOVL指令 MOVL AX, ret+40(FP) #保存系统调用的返回值 RET
futex系统的参数比较多,其函数原型为
int64 futex(int32*uaddr, int32op, int32val, structtimespec*timeout, int32*uaddr2, int32val2);
这里,futex系统调用为咱们提供的功能为若是 *uaddr == val 则进入睡眠,不然直接返回。顺便说一下,为何futex系统调用须要第三个参数val,须要在内核判断*uaddr与val是否相等,而不能在用户态先判断它们是否相等,若是相等才进入内核睡眠岂不是更高效?缘由在于判断*uaddr与val是否相等和进入睡眠这两个操做必须是一个原子操做,不然会存在一个竞态条件:若是不是原子操做,则当前线程在第一步判断完*uaddr与val相等以后进入睡眠以前的这一小段时间内,有另一个线程经过唤醒操做把*uaddr的值修改了,这就会致使当前工做线程永远处于睡眠状态而无人唤醒它。而在用户态没法实现判断与进入睡眠这两步为一个原子操做,因此须要内核来为其实现原子操做。
咱们知道线程一旦进入睡眠状态就中止了运行,那么若是后来又有可运行的goroutine须要工做线程去运行,正在睡眠的线程怎么知道有工做可作了呢?
从前面的代码咱们已经看到,stopm调用notesleep时给它传递的参数是m结构体的park成员,而m又早已经过mput放入了全局的milde空闲队列,这样其它运行着的线程一旦发现有更多的goroutine须要运行时就能够经过全局的m空闲队列找处处于睡眠状态的m,而后调用notewakeup(&m.park)将其唤醒,至于怎么唤醒,咱们在其它章节继续讨论。
到此,咱们已经完整分析了调度器的调度策略,从下一章起咱们将开始讨论有关调度的另一个话题:调度时机,即何时会发生调度。