Posted on 2021年3月20日 by luozhiyun
转载请声明出处哦~,本篇文章发布于luozhiyun的博客: https://www.luozhiyun.com/arc...缓存
本文使用的 Go 的源码1.15.7安全
stopTheWorldWithSema
与startTheWorldWithSema
是一对用于暂停和恢复程序的核心函数。并发
func stopTheWorldWithSema() { _g_ := getg() lock(&sched.lock) sched.stopwait = gomaxprocs // 标记 gcwaiting,调度时看见此标记会进入等待 atomic.Store(&sched.gcwaiting, 1) // 发送抢占信号 preemptall() // 暂停当前 P _g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic. sched.stopwait-- // 遍历全部的 P ,修改 P 的状态为 _Pgcstop 中止运行 for _, p := range allp { s := p.status if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) { if trace.enabled { traceGoSysBlock(p) traceProcStop(p) } p.syscalltick++ sched.stopwait-- } } // 中止空闲的 P 列表 for { p := pidleget() if p == nil { break } p.status = _Pgcstop sched.stopwait-- } wait := sched.stopwait > 0 unlock(&sched.lock) if wait { for { // 等待 100 us if notetsleep(&sched.stopnote, 100*1000) { noteclear(&sched.stopnote) break } // 再次进行发送抢占信号 preemptall() } } // 安全检测 bad := "" if sched.stopwait != 0 { bad = "stopTheWorld: not stopped (stopwait != 0)" } else { for _, p := range allp { if p.status != _Pgcstop { bad = "stopTheWorld: not stopped (status != _Pgcstop)" } } } if atomic.Load(&freezing) != 0 { lock(&deadlock) lock(&deadlock) } if bad != "" { throw(bad) } }
这个方法会经过sched.stopwait
来检测是否全部的P都已暂停。首先会经过调用preemptall
发送抢占信号进行抢占全部运行中的G,而后遍历P将全部状态为_Psyscall
、空闲的P都暂停,若是仍有须要中止的P, 则等待它们中止。函数
func startTheWorldWithSema(emitTraceEvent bool) int64 { mp := acquirem() // disable preemption because it can be holding p in a local var // 判断收到的 netpoll 事件并添加对应的G到待运行队列 if netpollinited() { list := netpoll(0) // non-blocking injectglist(&list) } lock(&sched.lock) procs := gomaxprocs if newprocs != 0 { procs = newprocs newprocs = 0 } // 扩容或者缩容全局的处理器 p1 := procresize(procs) // 取消GC等待标记 sched.gcwaiting = 0 // 若是 sysmon (后台监控线程) 在等待则唤醒它 if sched.sysmonwait != 0 { sched.sysmonwait = 0 notewakeup(&sched.sysmonnote) } unlock(&sched.lock) // 唤醒有可运行任务的P for p1 != nil { p := p1 p1 = p1.link.ptr() if p.m != 0 { mp := p.m.ptr() p.m = 0 if mp.nextp != 0 { throw("startTheWorld: inconsistent mp->nextp") } mp.nextp.set(p) notewakeup(&mp.park) } else { // Start M to run P newm(nil, p, -1) } } startTime := nanotime() if emitTraceEvent { traceGCSTWDone() } // 若是有空闲的P,而且没有自旋中的M则唤醒或者建立一个M wakep() releasem(mp) return startTime }
func gcBgMarkStartWorkers() { // 遍历全部 P for _, p := range allp { // 若是已启动则不重复启动 if p.gcBgMarkWorker == 0 { // 为全局每一个处理器建立用于执行后台标记任务的 Goroutine go gcBgMarkWorker(p) // 启动后等待该任务通知信号量 bgMarkReady 再继续 notetsleepg(&work.bgMarkReady, -1) noteclear(&work.bgMarkReady) } } }
gcBgMarkStartWorkers
会为全局每一个 P 建立用于执行后台标记任务的Goroutine,每个 Goroutine 都会运行gcBgMarkWorker
,notetsleepg
会等待gcBgMarkWorker
通知信号量bgMarkReady
再继续。ui
这里虽然为每一个P启动了一个后台标记任务, 可是能够同时工做的只有25%,调度器在调度循环runtime.schedule
中经过调用 gcController.findRunnableGCWorker
方法进行控制。atom
在看这个方法以前,先来了解一个概念, Mark Worker Mode
标记工做模式,目前来讲有三种,这三种是为了保证后台的标记线程的利用率。spa
type gcMarkWorkerMode int const ( // gcMarkWorkerDedicatedMode indicates that the P of a mark // worker is dedicated to running that mark worker. The mark // worker should run without preemption. gcMarkWorkerDedicatedMode gcMarkWorkerMode = iota // gcMarkWorkerFractionalMode indicates that a P is currently // running the "fractional" mark worker. The fractional worker // is necessary when GOMAXPROCS*gcBackgroundUtilization is not // an integer. The fractional worker should run until it is // preempted and will be scheduled to pick up the fractional // part of GOMAXPROCS*gcBackgroundUtilization. gcMarkWorkerFractionalMode // gcMarkWorkerIdleMode indicates that a P is running the mark // worker because it has nothing else to do. The idle worker // should run until it is preempted and account its time // against gcController.idleMarkTime. gcMarkWorkerIdleMode )
经过代码注释能够知道:pwa
func (c *gcControllerState) findRunnableGCWorker(_p_ *p) *g { ... // 原子减小对应的值, 若是减小后大于等于0则返回true, 不然返回false decIfPositive := func(ptr *int64) bool { if *ptr > 0 { if atomic.Xaddint64(ptr, -1) >= 0 { return true } // We lost a race atomic.Xaddint64(ptr, +1) } return false } // 减小dedicatedMarkWorkersNeeded, 成功时后台标记任务的模式是Dedicated if decIfPositive(&c.dedicatedMarkWorkersNeeded) { _p_.gcMarkWorkerMode = gcMarkWorkerDedicatedMode } else if c.fractionalUtilizationGoal == 0 { // No need for fractional workers. return nil } else { // 执行标记任务的时间 delta := nanotime() - gcController.markStartTime if delta > 0 && float64(_p_.gcFractionalMarkTime)/float64(delta) > c.fractionalUtilizationGoal { // Nope. No need to run a fractional worker. return nil } _p_.gcMarkWorkerMode = gcMarkWorkerFractionalMode } gp := _p_.gcBgMarkWorker.ptr() casgstatus(gp, _Gwaiting, _Grunnable) return gp }
看过个人《详解Go语言调度循环源码实现》的同窗应该都知道,抢占调度运行到这里的时候,一般是 P 抢占不到 G 了,打算进行休眠了,所以在休眠以前能够安全的进行标记任务的执行。线程
没看过调度循环的同窗能够看这里:详解Go语言调度循环源码实现 https://www.luozhiyun.com/arc... 。code
并发扫描标记能够大概归纳为如下几个部分:
func gcBgMarkWorker(_p_ *p) { gp := getg() type parkInfo struct { m muintptr attach puintptr } gp.m.preemptoff = "GC worker init" // 初始化 park park := new(parkInfo) gp.m.preemptoff = "" // 设置当前的M并禁止抢占 park.m.set(acquirem()) // 设置当前的P park.attach.set(_p_) // 通知gcBgMarkStartWorkers能够继续处理 notewakeup(&work.bgMarkReady) for { // 让当前 G 进入休眠 gopark(func(g *g, parkp unsafe.Pointer) bool { park := (*parkInfo)(parkp) releasem(park.m.ptr()) // 设置关联的 P if park.attach != 0 { p := park.attach.ptr() park.attach.set(nil) // 把当前的G设到P的gcBgMarkWorker成员 if !p.gcBgMarkWorker.cas(0, guintptr(unsafe.Pointer(g))) { return false } } return true }, unsafe.Pointer(park), waitReasonGCWorkerIdle, traceEvGoBlock, 0) ... } }
在 gcBgMarkStartWorkers 中咱们看到,它会遍历全部的 P ,而后为每一个 P 建立一个负责 Mark Work 的 G,这里虽然为每一个 P 启动了一个后台标记任务, 可是不可能每一个 P 都会去执行标记任务,后台标记任务默认资源占用率是 25%,因此 gcBgMarkWorker 中会初始化 park 并将 G 和 P 的 gcBgMarkWorker 进行绑定后进行休眠。
调度器在调度循环runtime.schedule
中经过调用gcController.findRunnableGCWorker
方法进行控制,让哪些 Mark Work 能够执行,上面代码已经贴过了,这里就不重复了
在唤醒后,咱们会根据gcMarkWorkerMode
选择不一样的标记执行策略,不一样的执行策略都会调用runtime.gcDrain
:
func gcBgMarkWorker(_p_ *p) { gp := getg() ... for { ... // 检查P的gcBgMarkWorker是否和当前的G一致, 不一致时结束当前的任务 if _p_.gcBgMarkWorker.ptr() != gp { break } // 禁止G被抢占 park.m.set(acquirem()) // 记录开始时间 startTime := nanotime() _p_.gcMarkWorkerStartTime = startTime decnwait := atomic.Xadd(&work.nwait, -1) systemstack(func() { // 设置G的状态为等待中这样它的栈能够被扫描 casgstatus(gp, _Grunning, _Gwaiting) // 判断后台标记任务的模式 switch _p_.gcMarkWorkerMode { default: throw("gcBgMarkWorker: unexpected gcMarkWorkerMode") case gcMarkWorkerDedicatedMode: // 这个模式下P应该专心执行标记 gcDrain(&_p_.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit) if gp.preempt { // 被抢占时把本地运行队列中的全部G都踢到全局运行队列 lock(&sched.lock) for { gp, _ := runqget(_p_) if gp == nil { break } globrunqput(gp) } unlock(&sched.lock) } // 继续执行标记 gcDrain(&_p_.gcw, gcDrainFlushBgCredit) case gcMarkWorkerFractionalMode: // 执行标记 gcDrain(&_p_.gcw, gcDrainFractional|gcDrainUntilPreempt|gcDrainFlushBgCredit) case gcMarkWorkerIdleMode: // 执行标记, 直到被抢占或者达到必定的量 gcDrain(&_p_.gcw, gcDrainIdle|gcDrainUntilPreempt|gcDrainFlushBgCredit) } // 恢复G的状态到运行中 casgstatus(gp, _Gwaiting, _Grunning) }) ... } }
在上面已经讲了不一样的 Mark Worker Mode 的区别,不记得的同窗能够往上翻一下。执行标记这部分主要在 switch 判断中,根据不一样的模式传入不一样的参数到 gcDrain 函数中执行。
须要注意的是,传入到 gcDrain 中的是一个 gcWork 的结构体,它至关于每一个 P 的私有缓存空间,存放须要被扫描的对象,为垃圾收集器提供了生产和消费任务的抽象,,该结构体持有了两个重要的工做缓冲区 wbuf1 和 wbuf2:
当咱们向该结构体中增长或者删除对象时,它总会先操做 wbuf1 缓冲区,一旦 wbuf1 缓冲区空间不足或者没有对象,会触发缓冲区的切换,而当两个缓冲区空间都不足或者都为空时,会从全局的工做缓冲区中插入或者获取对象:
func (w *gcWork) tryGet() uintptr { wbuf := w.wbuf1 ... // wbuf1缓冲区无数据时 if wbuf.nobj == 0 { // wbuf1 与 wbuf2 进行对象互换 w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1 wbuf = w.wbuf1 if wbuf.nobj == 0 { owbuf := wbuf // 从 work 的 full 队列中获取 wbuf = trygetfull() ... } } wbuf.nobj-- return wbuf.obj[wbuf.nobj] }
当咱们向该结构体中增长或者删除对象时,它总会先操做 wbuf1 缓冲区,一旦 wbuf1 缓冲区空间不足或者没有对象,会触发缓冲区的切换,而当两个缓冲区空间都不足或者都为空时,会从全局的工做缓冲区中插入或者获取对象:
func (w *gcWork) tryGet() uintptr { wbuf := w.wbuf1 ... // wbuf1缓冲区无数据时 if wbuf.nobj == 0 { // wbuf1 与 wbuf2 进行对象互换 w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1 wbuf = w.wbuf1 if wbuf.nobj == 0 { owbuf := wbuf // 从 work 的 full 队列中获取 wbuf = trygetfull() ... } } wbuf.nobj-- return wbuf.obj[wbuf.nobj] }
继续上面的 gcBgMarkWorker 方法,在标记完以后就要进行标记完成:
func gcBgMarkWorker(_p_ *p) { gp := getg() ... for { ... // 累加所用时间 duration := nanotime() - startTime switch _p_.gcMarkWorkerMode { case gcMarkWorkerDedicatedMode: atomic.Xaddint64(&gcController.dedicatedMarkTime, duration) atomic.Xaddint64(&gcController.dedicatedMarkWorkersNeeded, 1) case gcMarkWorkerFractionalMode: atomic.Xaddint64(&gcController.fractionalMarkTime, duration) atomic.Xaddint64(&_p_.gcFractionalMarkTime, duration) case gcMarkWorkerIdleMode: atomic.Xaddint64(&gcController.idleMarkTime, duration) } incnwait := atomic.Xadd(&work.nwait, +1) // 判断是否全部后台标记任务都完成, 而且没有更多的任务 if incnwait == work.nproc && !gcMarkWorkAvailable(nil) { // 取消和P的关联 _p_.gcBgMarkWorker.set(nil) // 容许G被抢占 releasem(park.m.ptr()) // 准备进入完成标记阶段 gcMarkDone() // 休眠以前会从新关联P // 由于上面容许被抢占, 到这里的时候可能就会变成其余P // 若是从新关联P失败则这个任务会结束 park.m.set(acquirem()) park.attach.set(_p_) } } }
gcBgMarkWorker 会根据 incnwait 来检查是不是最后一个 worker,而后调用 gcMarkWorkAvailable 函数来校验 gcwork的任务和全局任务是否已经所有都处理完了,若是都确认没问题,那么调用 gcMarkDone 进入完成标记阶段。
下面咱们来看看 gcDrain:
func gcDrain(gcw *gcWork, flags gcDrainFlags) { gp := getg().m.curg // 看到抢占标志时是否要返回 preemptible := flags&gcDrainUntilPreempt != 0 // 是否计算后台的扫描量来减小协助线程和唤醒等待中的G flushBgCredit := flags&gcDrainFlushBgCredit != 0 // 是否只执行必定量的工做 idle := flags&gcDrainIdle != 0 // 记录初始的已扫描数量 initScanWork := gcw.scanWork checkWork := int64(1<<63 - 1) var check func() bool if flags&(gcDrainIdle|gcDrainFractional) != 0 { // drainCheckThreshold 默认 100000 checkWork = initScanWork + drainCheckThreshold if idle { check = pollWork } else if flags&gcDrainFractional != 0 { check = pollFractionalWorkerExit } } // 若是根对象未扫描完, 则先扫描根对象 if work.markrootNext < work.markrootJobs { // 一直循环直到被抢占或 STW for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) { // 从根对象扫描队列取出一个值 job := atomic.Xadd(&work.markrootNext, +1) - 1 if job >= work.markrootJobs { break } // 执行根对象扫描工做 markroot(gcw, job) if check != nil && check() { goto done } } } ... }
gcDrain 函数在开始的时候,会根据 flags 不一样而选择不一样的策略。
完成标记后会获取待执行的任务:
func gcDrain(gcw *gcWork, flags gcDrainFlags) { ... // 根对象已经在标记队列中, 消费标记队列 // 一直循环直到被抢占或 STW for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) { // 将本地一部分工做放回全局队列中 if work.full == 0 { gcw.balance() } // 获取任务 b := gcw.tryGetFast() if b == 0 { b = gcw.tryGet() if b == 0 { wbBufFlush(nil, 0) b = gcw.tryGet() } } // 获取不到对象, 标记队列已为空, 跳出循环 if b == 0 { break } // 扫描获取到的对象 scanobject(b, gcw) // 若是已经扫描了必定数量的对象,gcCreditSlack值是2000 if gcw.scanWork >= gcCreditSlack { // 把扫描的对象数量添加到全局 atomic.Xaddint64(&gcController.scanWork, gcw.scanWork) if flushBgCredit { // 记录此次扫描的内存字节数用于减小辅助标记的工做量 gcFlushBgCredit(gcw.scanWork - initScanWork) initScanWork = 0 } checkWork -= gcw.scanWork gcw.scanWork = 0 if checkWork <= 0 { checkWork += drainCheckThreshold if check != nil && check() { break } } } } done: // 把扫描的对象数量添加到全局 if gcw.scanWork > 0 { atomic.Xaddint64(&gcController.scanWork, gcw.scanWork) if flushBgCredit { // 记录此次扫描的内存字节数用于减小辅助标记的工做量 gcFlushBgCredit(gcw.scanWork - initScanWork) } gcw.scanWork = 0 } }
这里在获取缓存队列以前会调用runtime.gcWork.balance
,会将gcWork缓存一部分工做放回全局队列中,这个方法主要是用来平衡一下不一样 P 的负载状况。
而后获取gcWork的缓存任务,并将获取到的任务交给scanobject执行,该函数会从传入的位置开始扫描,并会给找到的活跃对象上色。runtime.gcFlushBgCredit
会记录此次扫描的内存字节数用于减小辅助标记的工做量。
这里我来总结一下gcWork出入队状况。gcWork的出队就是咱们上面的scanobject
方法,会获取到 gcWork 缓存对象并执行,可是同时若是找到活跃对象也会再次的入队到 gcWork 中。
除了 scanobject 之外,写屏障、根对象扫描和栈扫描都会向 gcWork 中增长额外的灰色对象等待处理。