本文是《Go语言调度器源代码情景分析》系列的第16篇,也是第三章《Goroutine调度策略》的第1小节。算法
在调度器概述一节咱们提到过,所谓的goroutine调度,是指程序代码按照必定的算法在适当的时候挑选出合适的goroutine并放到CPU上去运行的过程。这句话揭示了调度系统须要解决的三大核心问题:数组
调度时机:何时会发生调度?并发
调度策略:使用什么策略来挑选下一个进入运行的goroutine?负载均衡
切换机制:如何把挑选出来的goroutine放到CPU上运行?函数
对这三大问题的解决构成了调度器的全部工做,于是咱们对调度器的分析也必将围绕着它们所展开。ui
第二章咱们已经详细的分析了调度器的初始化以及goroutine的切换机制,本章将重点讨论调度器如何挑选下一个goroutine出来运行的策略问题,而剩下的与调度时机相关的内容咱们将在第4~6章进行全面的分析。atom
再探schedule函数spa
在讨论main goroutine的调度时咱们已经见过schedule函数,由于当时咱们的主要关注点在于main goroutine是如何被调度到CPU上运行的,因此并未对schedule函数如何挑选下一个goroutine出来运行作深刻的分析,如今是从新回到schedule函数详细分析其调度策略的时候了。线程
runtime/proc.go : 2467指针
// One round of scheduler: find a runnable goroutine and execute it. // Never returns. func schedule() { _g_ := getg() //_g_ = m.g0 ...... var gp *g ...... if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. //为了保证调度的公平性,每一个工做线程每进行61次调度就须要优先从全局运行队列中获取goroutine出来运行, //由于若是只调度本地运行队列中的goroutine,则全局运行队列中的goroutine有可能得不到运行 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) //全部工做线程都能访问全局运行队列,因此须要加锁 gp = globrunqget(_g_.m.p.ptr(), 1) //从全局运行队列中获取1个goroutine unlock(&sched.lock) } } if gp == nil { //从与m关联的p的本地运行队列中获取goroutine gp, inheritTime = runqget(_g_.m.p.ptr()) if gp != nil && _g_.m.spinning { throw("schedule: spinning with local work") } } if gp == nil { //若是从本地运行队列和全局运行队列都没有找到须要运行的goroutine, //则调用findrunnable函数从其它工做线程的运行队列中偷取,若是偷取不到,则当前工做线程进入睡眠, //直到获取到须要运行的goroutine以后findrunnable函数才会返回。 gp, inheritTime = findrunnable() // blocks until work is available } ...... //当前运行的是runtime的代码,函数调用栈使用的是g0的栈空间 //调用execte切换到gp的代码和栈空间去运行 execute(gp, inheritTime) }
schedule函数分三步分别从各运行队列中寻找可运行的goroutine:
第一步,从全局运行队列中寻找goroutine。为了保证调度的公平性,每一个工做线程每通过61次调度就须要优先尝试从全局运行队列中找出一个goroutine来运行,这样才能保证位于全局运行队列中的goroutine获得调度的机会。全局运行队列是全部工做线程均可以访问的,因此在访问它以前须要加锁。
第二步,从工做线程本地运行队列中寻找goroutine。若是不须要或不能从全局运行队列中获取到goroutine则从本地运行队列中获取。
第三步,从其它工做线程的运行队列中偷取goroutine。若是上一步也没有找到须要运行的goroutine,则调用findrunnable从其余工做线程的运行队列中偷取goroutine,findrunnable函数在偷取以前会再次尝试从全局运行队列和当前线程的本地运行队列中查找须要运行的goroutine。
下面咱们先来看如何从全局运行队列中获取goroutine。
从全局运行队列中获取goroutine
从全局运行队列中获取可运行的goroutine是经过globrunqget函数来完成的,该函数的第一个参数是与当前工做线程绑定的p,第二个参数max表示最多能够从全局队列中拿多少个g到当前工做线程的本地运行队列中来。
runtime/proc.go : 4663
// Try get a batch of G's from the global runnable queue. // Sched must be locked. func globrunqget(_p_ *p, max int32) *g { if sched.runqsize == 0 { //全局运行队列为空 return nil } //根据p的数量平分全局运行队列中的goroutines n := sched.runqsize / gomaxprocs + 1 if n > sched.runqsize { //上面计算n的方法可能致使n大于全局运行队列中的goroutine数量 n = sched.runqsize } if max > 0 && n > max { n = max //最多取max个goroutine } if n > int32(len(_p_.runq)) / 2 { n = int32(len(_p_.runq)) / 2 //最多只能取本地队列容量的一半 } sched.runqsize -= n //直接经过函数返回gp,其它的goroutines经过runqput放入本地运行队列 gp := sched.runq.pop() //pop从全局运行队列的队列头取 n-- for ; n > 0; n-- { gp1 := sched.runq.pop() //从全局运行队列中取出一个goroutine runqput(_p_, gp1, false) //放入本地运行队列 } return gp }
globrunqget函数首先会根据全局运行队列中goroutine的数量,函数参数max以及_p_的本地队列的容量计算出到底应该拿多少个goroutine,而后把第一个g结构体对象经过返回值的方式返回给调用函数,其它的则经过runqput函数放入当前工做线程的本地运行队列。这段代码值得一提的是,计算应该从全局运行队列中拿走多少个goroutine时根据p的数量(gomaxprocs)作了负载均衡。
若是没有从全局运行队列中获取到goroutine,那么接下来就在工做线程的本地运行队列中寻找须要运行的goroutine。
从工做线程本地运行队列中获取goroutine
从代码上来看,工做线程的本地运行队列其实分为两个部分,一部分是由p的runq、runqhead和runqtail这三个成员组成的一个无锁循环队列,该队列最多可包含256个goroutine;另外一部分是p的runnext成员,它是一个指向g结构体对象的指针,它最多只包含一个goroutine。
从本地运行队列中寻找goroutine是经过runqget函数完成的,寻找时,代码首先查看runnext成员是否为空,若是不为空则返回runnext所指的goroutine,并把runnext成员清零,若是runnext为空,则继续从循环队列中查找goroutine。
runtime/proc.go : 4825
// Get g from local runnable queue. // If inheritTime is true, gp should inherit the remaining time in the // current time slice. Otherwise, it should start a new time slice. // Executed only by the owner P. func runqget(_p_ *p) (gp *g, inheritTime bool) { // If there's a runnext, it's the next G to run. //从runnext成员中获取goroutine for { //查看runnext成员是否为空,不为空则返回该goroutine next := _p_.runnext if next == 0 { break } if _p_.runnext.cas(next, 0) { return next.ptr(), true } } //从循环队列中获取goroutine for { h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := _p_.runqtail if t == h { return nil, false } gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume return gp, false } } }
这里首先须要注意的是不论是从runnext仍是从循环队列中拿取goroutine都使用了cas操做,这里的cas操做是必需的,由于可能有其余工做线程此时此刻也正在访问这两个成员,从这里偷取可运行的goroutine。
其次,代码中对runqhead的操做使用了atomic.LoadAcq和atomic.CasRel,它们分别提供了load-acquire和cas-release语义。
对于atomic.LoadAcq来讲,其语义主要包含以下几条:
原子读取,也就是说无论代码运行在哪一种平台,保证在读取过程当中不会有其它线程对该变量进行写入;
位于atomic.LoadAcq以后的代码,对内存的读取和写入必须在atomic.LoadAcq读取完成后才能执行,编译器和CPU都不能打乱这个顺序;
当前线程执行atomic.LoadAcq时能够读取到其它线程最近一次经过atomic.CasRel对同一个变量写入的值,与此同时,位于atomic.LoadAcq以后的代码,无论读取哪一个内存地址中的值,均可以读取到其它线程中位于atomic.CasRel(对同一个变量操做)以前的代码最近一次对内存的写入。
对于atomic.CasRel来讲,其语义主要包含以下几条:
原子的执行比较并交换的操做;
位于atomic.CasRel以前的代码,对内存的读取和写入必须在atomic.CasRel对内存的写入以前完成,编译器和CPU都不能打乱这个顺序;
线程执行atomic.CasRel完成后其它线程经过atomic.LoadAcq读取同一个变量能够读到最新的值,与此同时,位于atomic.CasRel以前的代码对内存写入的值,能够被其它线程中位于atomic.LoadAcq(对同一个变量操做)以后的代码读取到。
由于可能有多个线程会并发的修改和读取runqhead,以及须要依靠runqhead的值来读取runq数组的元素,因此须要使用atomic.LoadAcq和atomic.CasRel来保证上述语义。
咱们可能会问,为何读取p的runqtail成员不须要使用atomic.LoadAcq或atomic.load?由于runqtail不会被其它线程修改,只会被当前工做线程修改,此时没有人修改它,因此也就不须要使用原子相关的操做。
最后,由p的runq、runqhead和runqtail这三个成员组成的这个无锁循环队列很是精妙,咱们会在后面的章节对这个循环队列进行分析。
CAS操做与ABA问题
咱们知道使用cas操做须要特别注意ABA的问题,那么runqget函数这两个使用cas的地方会不会有问题呢?答案是这两个地方都不会有ABA的问题。缘由分析以下:
首先来看对runnext的cas操做。只有跟_p_绑定的当前工做线程才会去修改runnext为一个非0值,其它线程只会把runnext的值从一个非0值修改成0值,然而跟_p_绑定的当前工做线程正在此处执行代码,因此在当前工做线程读取到值A以后,不可能有线程修改其值为B(0)以后再修改回A。
再来看对runq的cas操做。当前工做线程操做的是_p_的本地队列,只有跟_p_绑定在一块儿的当前工做线程才会由于往该队列里面添加goroutine而去修改runqtail,而其它工做线程不会往该队列里面添加goroutine,也就不会去修改runqtail,它们只会修改runqhead,因此,当咱们这个工做线程从runqhead读取到值A以后,其它工做线程也就不可能修改runqhead的值为B以后再第二次把它修改成值A(由于runqtail在这段时间以内不可能被修改,runqhead的值也就没法越过runqtail再回绕到A值),也就是说,代码从逻辑上已经杜绝了引起ABA的条件。
到此,咱们已经分析完工做线程从全局运行队列和本地运行队列获取goroutine的代码,因为篇幅的限制,咱们下一节再来分析从其它工做线程的运行队列偷取goroutine的流程。