golang调度高效秘诀之一是它的抢占式调度。当任务函数执行的时间超过了必定的时间,
sysmon方法会不断的检测全部p上任务的执行状况,当有超过预约执行时间的g时,会发起抢占。这一切也是在retake函数中实现的,上文描述了该函数在系统调用中的功能,这里讲下该函数如何执行抢占。html
retake()函数会遍历全部的P,若是一个P处于执行状态, 且已经连续执行了较长时间,就会被抢占。retake()调用preemptone()将P的stackguard0设为 stackPreempt(关于stackguard的详细内容,能够参考 Split Stacks),这将致使该P中正在执行的G进行下一次函数调用时,致使栈空间检查失败。进而触发morestack()(汇编代码,位于asm_XXX.s中)而后进行一连串的函数调用,主要的调用过程以下:
morestack()(汇编代码)-> newstack() -> gopreempt_m() -> goschedImpl() -> schedule()
http://ga0.github.io/golang/2...git
func retake(now int64) uint32 { n := 0 lock(&allpLock) for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { // This can happen if procresize has grown // allp but not yet created new Ps. continue } pd := &_p_.sysmontick s := _p_.status sysretake := false if s == _Prunning || s == _Psyscall { // Preempt G if it's running for too long. t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now { // 超时抢占 preemptone(_p_) // In case of syscall, preemptone() doesn't // work, because there is no M wired to P. sysretake = true } } //p在系统调用中或者被调用 if s == _Psyscall { // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). t := int64(_p_.syscalltick) if !sysretake && int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // On the one hand we don't want to retake Ps if there is no other work to do, // but on the other hand we want to retake them eventually // because they can prevent the sysmon thread from deep sleep. //没有能够调度的任务且时间阻塞时间未到阀值,直接跳过 if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. // 这里出发了系统调用长时间阻塞的调度 unlock(&allpLock) // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ //关键方法,将对长时间阻塞的p进行从新调度 handoffp(_p_) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) return uint32(n) }
// 告诉处理器P上运行的goroutine中止。 // 此功能纯粹是尽力而为。 它可能会错误地没法通知goroutine。 它能够发送通知错误的goroutine。 即便它通知了正确的goroutine,但若是goroutine同时执行newstack,该goroutine可能会忽略该请求。 无需锁定。 若是发出了抢占请求,则返回true。 实际的抢占将在未来的某个时候发生,而且将经过gp-> status指示,再也不处于“Grunning”状态 func preemptone(_p_ *p) bool { mp := _p_.m.ptr() if mp == nil || mp == getg().m { return false } gp := mp.curg if gp == nil || gp == mp.g0 { return false } // 标记可抢占 gp.preempt = true // Every call in a go routine checks for stack overflow by // comparing the current stack pointer to gp->stackguard0. // Setting gp->stackguard0 to StackPreempt folds // preemption into the normal stack overflow check. gp.stackguard0 = stackPreempt // Request an async preemption of this P. // gorotuine 中的每一个调用都会经过将当前堆栈指针与 gp->stackguard0 进行比较来检查堆栈溢出。 // 将 gp->stackguard0 设置为 stackPreempt 会将抢占折叠为正常的堆栈溢出检查。 if preemptMSupported && debug.asyncpreemptoff == 0 { _p_.preempt = true preemptM(mp) } return true }
能够看到只是设置了两个参数,并无执行实际的抢占工做,事实上这个过程是异步的,将在其余的地方执行真正的抢占操做。github
stackguard0自己是用来检测goroutine的栈是否须要扩充的,当设置为stackPreempt时,在执行函数的时候,便会触发栈扩充,调用morestack()方法,morestack会调用newstack,该方法会扩充g的栈空间,也兼职了goroutine的抢占功能。
preempt 为抢占的备用手段,在stackguard0设置stackPreempt且在newstack中未能被抢占时,该标记也会在其余地方设置stackguard0的值为stackPreempt,再次触发抢占。golang
func newstack() { thisg := getg() gp := thisg.m.curg // 注意:若是另外一个线程即将尝试抢占gp,则stackguard0可能会在发生变化。 // 因此如今读一次,判断是否被抢占。 preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt if preempt { //如下状况不会被抢占 if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning { // Let the goroutine keep running for now. // gp->preempt is set, so it will be preempted next time. gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // never return } } if preempt { casgstatus(gp, _Grunning, _Gwaiting) //gc扫描抢占 if gp.preemptscan { for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) { } if !gp.gcscandone { //扫描当前gp栈 gcw := &gp.m.p.ptr().gcw scanstack(gp, gcw) if gcBlackenPromptly { gcw.dispose() } gp.gcscandone = true } gp.preemptscan = false gp.preempt = false casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting) // This clears gcscanvalid. casgstatus(gp, _Gwaiting, _Grunning) gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // 恢复后继续执行 } //转换状态为 _Gwaiting casgstatus(gp, _Gwaiting, _Grunning) gopreempt_m(gp) // never return } ... }
这里最终会取消m和g的绑定,并将g放入全局队列中,而后开始调度m执行新的任务app
以上是golang抢占调度的基本内容,总结以下:异步
正常goroutine的抢占都时由监控线程的sysmon发起的,超时执行的goroutine会被打上可抢占的标志。(gc scan阶段也会发生抢占,主要是为了扫描正在运行的g的栈空间)
在任务的每一个函数中,编译器会加上栈空间检测代码,有须要栈空间扩容或者抢占便会进入morestack,而后调用newstack方法
newstack中会检测是否抢占和抢占类型。gc扫描触发的抢占回扫描当前g栈上的内容,而后继续执行当前g。而普通抢占则会解绑当前g,将g放入全局队列,而后继续调度。async
当系统调用时间过长的时候,会调用handoffp()方法:函数
// p的切换,系统调用或者绑定M时使用 func handoffp(_p_ *p) { // handoffp must start an M in any situation where // findrunnable would return a G to run on _p_. // if it has local work, start it straight away //当前p有任务或者全局任务队列有任务,触发一次调度 //startm()上文有描述,会获取一个m来调度当前p的任务,当前p为nil时,会调度其余p任务队列 if !runqempty(_p_) || sched.runqsize != 0 { startm(_p_, false) return } //gc标记阶段且当前p有标记任务,触发调度 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) { startm(_p_, false) return } // no local work, check that there are no spinning/idle M's, // otherwise our help is not required // //有自旋m或空闲p,触发调度 if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic startm(_p_, true) return } lock(&sched.lock) if sched.gcwaiting != 0 { _p_.status = _Pgcstop sched.stopwait-- if sched.stopwait == 0 { notewakeup(&sched.stopnote) } unlock(&sched.lock) return } if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) { sched.safePointFn(_p_) sched.safePointWait-- if sched.safePointWait == 0 { notewakeup(&sched.safePointNote) } } //全局队列不为空 if sched.runqsize != 0 { unlock(&sched.lock) startm(_p_, false) return } // If this is the last running P and nobody is polling network, // need to wakeup another M to poll network. // 其余队列能够偷 if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) startm(_p_, false) return } // The scheduler lock cannot be held when calling wakeNetPoller below // because wakeNetPoller may call wakep which may call startm. when := nobarrierWakeTime(_p_) //实在没任务,放入空闲队列 pidleput(_p_) unlock(&sched.lock) if when != 0 { wakeNetPoller(when) } }