Go语言GC实现原理及源码分析 (go1.15.7)[2]

Posted on 2021年3月20日 by luozhiyun
转载请声明出处哦~,本篇文章发布于luozhiyun的博客: https://www.luozhiyun.com/arc...缓存

本文使用的 Go 的源码1.15.7安全

stopTheWorldWithSema 与 startTheWorldWithSema

stopTheWorldWithSemastartTheWorldWithSema是一对用于暂停和恢复程序的核心函数。并发

func stopTheWorldWithSema() {
    _g_ := getg() 

    lock(&sched.lock)
    sched.stopwait = gomaxprocs
    // 标记 gcwaiting,调度时看见此标记会进入等待
    atomic.Store(&sched.gcwaiting, 1)
    // 发送抢占信号
    preemptall() 
    // 暂停当前 P
    _g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
    sched.stopwait--
    // 遍历全部的 P ,修改 P 的状态为 _Pgcstop 中止运行
    for _, p := range allp {
        s := p.status
        if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
            if trace.enabled {
                traceGoSysBlock(p)
                traceProcStop(p)
            }
            p.syscalltick++
            sched.stopwait--
        }
    }
    // 中止空闲的 P 列表
    for {
        p := pidleget()
        if p == nil {
            break
        }
        p.status = _Pgcstop
        sched.stopwait--
    }
    wait := sched.stopwait > 0
    unlock(&sched.lock)
    if wait {
        for {
            //  等待 100 us
            if notetsleep(&sched.stopnote, 100*1000) {
                noteclear(&sched.stopnote)
                break
            }
            // 再次进行发送抢占信号
            preemptall()
        }
    }
    // 安全检测
    bad := ""
    if sched.stopwait != 0 {
        bad = "stopTheWorld: not stopped (stopwait != 0)"
    } else {
        for _, p := range allp {
            if p.status != _Pgcstop {
                bad = "stopTheWorld: not stopped (status != _Pgcstop)"
            }
        }
    }
    if atomic.Load(&freezing) != 0 {
        lock(&deadlock)
        lock(&deadlock)
    }
    if bad != "" {
        throw(bad)
    }
}

这个方法会经过sched.stopwait来检测是否全部的P都已暂停。首先会经过调用preemptall发送抢占信号进行抢占全部运行中的G,而后遍历P将全部状态为_Psyscall、空闲的P都暂停,若是仍有须要中止的P, 则等待它们中止。函数

func startTheWorldWithSema(emitTraceEvent bool) int64 {
    mp := acquirem() // disable preemption because it can be holding p in a local var
    // 判断收到的 netpoll 事件并添加对应的G到待运行队列
    if netpollinited() {
        list := netpoll(0) // non-blocking
        injectglist(&list)
    }
    lock(&sched.lock)

    procs := gomaxprocs
    if newprocs != 0 {
        procs = newprocs
        newprocs = 0
    }
  // 扩容或者缩容全局的处理器
    p1 := procresize(procs)
    // 取消GC等待标记
    sched.gcwaiting = 0
    // 若是 sysmon (后台监控线程) 在等待则唤醒它
    if sched.sysmonwait != 0 {
        sched.sysmonwait = 0
        notewakeup(&sched.sysmonnote)
    }
    unlock(&sched.lock)
    // 唤醒有可运行任务的P
    for p1 != nil {
        p := p1
        p1 = p1.link.ptr()
        if p.m != 0 {
            mp := p.m.ptr()
            p.m = 0
            if mp.nextp != 0 {
                throw("startTheWorld: inconsistent mp->nextp")
            }
            mp.nextp.set(p)
            notewakeup(&mp.park)
        } else {
            // Start M to run P 
            newm(nil, p, -1)
        }
    } 
    startTime := nanotime()
    if emitTraceEvent {
        traceGCSTWDone()
    }
    // 若是有空闲的P,而且没有自旋中的M则唤醒或者建立一个M
    wakep() 
    releasem(mp) 
    return startTime
}

建立后台标记 Worker

func gcBgMarkStartWorkers() {
    // 遍历全部 P
    for _, p := range allp {
        // 若是已启动则不重复启动
        if p.gcBgMarkWorker == 0 {
            // 为全局每一个处理器建立用于执行后台标记任务的 Goroutine
            go gcBgMarkWorker(p)
            // 启动后等待该任务通知信号量 bgMarkReady 再继续
            notetsleepg(&work.bgMarkReady, -1)
            noteclear(&work.bgMarkReady)
        }
    }
}

gcBgMarkStartWorkers会为全局每一个 P 建立用于执行后台标记任务的Goroutine,每个 Goroutine 都会运行gcBgMarkWorkernotetsleepg 会等待gcBgMarkWorker通知信号量bgMarkReady再继续。ui

这里虽然为每一个P启动了一个后台标记任务, 可是能够同时工做的只有25%,调度器在调度循环runtime.schedule中经过调用 gcController.findRunnableGCWorker方法进行控制。atom

在看这个方法以前,先来了解一个概念, Mark Worker Mode 标记工做模式,目前来讲有三种,这三种是为了保证后台的标记线程的利用率。spa

type gcMarkWorkerMode int

const (
    // gcMarkWorkerDedicatedMode indicates that the P of a mark
    // worker is dedicated to running that mark worker. The mark
    // worker should run without preemption.
    gcMarkWorkerDedicatedMode gcMarkWorkerMode = iota

    // gcMarkWorkerFractionalMode indicates that a P is currently
    // running the "fractional" mark worker. The fractional worker
    // is necessary when GOMAXPROCS*gcBackgroundUtilization is not
    // an integer. The fractional worker should run until it is
    // preempted and will be scheduled to pick up the fractional
    // part of GOMAXPROCS*gcBackgroundUtilization.
    gcMarkWorkerFractionalMode

    // gcMarkWorkerIdleMode indicates that a P is running the mark
    // worker because it has nothing else to do. The idle worker
    // should run until it is preempted and account its time
    // against gcController.idleMarkTime.
    gcMarkWorkerIdleMode
)

经过代码注释能够知道:pwa

  • gcMarkWorkerDedicatedMode :P 专门负责标记对象,不会被调度器抢占;
  • gcMarkWorkerFractionalMode:主要是因为如今默认标记线程的占用率要为 25%,因此若是 CPU 核数不是4的倍数,就没法除得整数,启动该类型的工做模式帮助垃圾收集达到利用率的目标;
  • gcMarkWorkerIdleMode:表示 P 当前只有标记线程在跑,没有其余能够执行的 G ,它会运行垃圾收集的标记任务直到被抢占;
func (c *gcControllerState) findRunnableGCWorker(_p_ *p) *g {
    ...
    // 原子减小对应的值, 若是减小后大于等于0则返回true, 不然返回false
    decIfPositive := func(ptr *int64) bool {
        if *ptr > 0 {
            if atomic.Xaddint64(ptr, -1) >= 0 {
                return true
            }
            // We lost a race
            atomic.Xaddint64(ptr, +1)
        }
        return false
    }
    // 减小dedicatedMarkWorkersNeeded, 成功时后台标记任务的模式是Dedicated
    if decIfPositive(&c.dedicatedMarkWorkersNeeded) { 
        _p_.gcMarkWorkerMode = gcMarkWorkerDedicatedMode
    } else if c.fractionalUtilizationGoal == 0 {
        // No need for fractional workers.
        return nil
    } else {
        // 执行标记任务的时间
        delta := nanotime() - gcController.markStartTime 
        if delta > 0 && float64(_p_.gcFractionalMarkTime)/float64(delta) > c.fractionalUtilizationGoal {
            // Nope. No need to run a fractional worker.
            return nil
        }
        _p_.gcMarkWorkerMode = gcMarkWorkerFractionalMode
    }

    gp := _p_.gcBgMarkWorker.ptr()
    casgstatus(gp, _Gwaiting, _Grunnable) 
    return gp
}

看过个人《详解Go语言调度循环源码实现》的同窗应该都知道,抢占调度运行到这里的时候,一般是 P 抢占不到 G 了,打算进行休眠了,所以在休眠以前能够安全的进行标记任务的执行。线程

没看过调度循环的同窗能够看这里:详解Go语言调度循环源码实现 https://www.luozhiyun.com/arc...code

并发扫描标记

并发扫描标记能够大概归纳为如下几个部分:

  1. 将当前传入的 P 打包成 parkInfo ,而后调用 gopark 让当前 G 进入休眠,在休眠前会将 P 的 gcBgMarkWorker 与 G 进行绑定,等待唤醒;
  2. 根据 Mark Worker Mode 调用不一样的策略调用 gcDrain 执行标记;
  3. 判断是否全部后台标记任务都完成, 而且没有更多的任务,调用 gcMarkDone 准备进入完成标记阶段;

后台标记休眠等待

func gcBgMarkWorker(_p_ *p) {
    gp := getg()

    type parkInfo struct {
        m      muintptr  
        attach puintptr  
    }  
    gp.m.preemptoff = "GC worker init"
    // 初始化 park
    park := new(parkInfo)
    gp.m.preemptoff = ""
    // 设置当前的M并禁止抢占
    park.m.set(acquirem())
    // 设置当前的P
    park.attach.set(_p_) 
    // 通知gcBgMarkStartWorkers能够继续处理
    notewakeup(&work.bgMarkReady)

    for { 
        // 让当前 G 进入休眠
        gopark(func(g *g, parkp unsafe.Pointer) bool {
            park := (*parkInfo)(parkp)

            releasem(park.m.ptr()) 
            // 设置关联的 P
            if park.attach != 0 {
                p := park.attach.ptr()
                park.attach.set(nil) 
                // 把当前的G设到P的gcBgMarkWorker成员
                if !p.gcBgMarkWorker.cas(0, guintptr(unsafe.Pointer(g))) { 
                    return false
                }
            }
            return true
        }, unsafe.Pointer(park), waitReasonGCWorkerIdle, traceEvGoBlock, 0)
        ...
    }
}

在 gcBgMarkStartWorkers 中咱们看到,它会遍历全部的 P ,而后为每一个 P 建立一个负责 Mark Work 的 G,这里虽然为每一个 P 启动了一个后台标记任务, 可是不可能每一个 P 都会去执行标记任务,后台标记任务默认资源占用率是 25%,因此 gcBgMarkWorker 中会初始化 park 并将 G 和 P 的 gcBgMarkWorker 进行绑定后进行休眠。

调度器在调度循环runtime.schedule中经过调用gcController.findRunnableGCWorker方法进行控制,让哪些 Mark Work 能够执行,上面代码已经贴过了,这里就不重复了

后台标记

在唤醒后,咱们会根据gcMarkWorkerMode选择不一样的标记执行策略,不一样的执行策略都会调用runtime.gcDrain:

func gcBgMarkWorker(_p_ *p) {
    gp := getg()
    ...
    for { 
        ...
        // 检查P的gcBgMarkWorker是否和当前的G一致, 不一致时结束当前的任务
        if _p_.gcBgMarkWorker.ptr() != gp {
            break
        }
        // 禁止G被抢占
        park.m.set(acquirem())

        // 记录开始时间
        startTime := nanotime()
        _p_.gcMarkWorkerStartTime = startTime

        decnwait := atomic.Xadd(&work.nwait, -1)

        systemstack(func() { 
            // 设置G的状态为等待中这样它的栈能够被扫描
            casgstatus(gp, _Grunning, _Gwaiting)
            // 判断后台标记任务的模式
            switch _p_.gcMarkWorkerMode {
            default:
                throw("gcBgMarkWorker: unexpected gcMarkWorkerMode")
            case gcMarkWorkerDedicatedMode:
                // 这个模式下P应该专心执行标记
                gcDrain(&_p_.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit)
                if gp.preempt { 
                    // 被抢占时把本地运行队列中的全部G都踢到全局运行队列
                    lock(&sched.lock)
                    for {
                        gp, _ := runqget(_p_)
                        if gp == nil {
                            break
                        }
                        globrunqput(gp)
                    }
                    unlock(&sched.lock)
                }
                // 继续执行标记
                gcDrain(&_p_.gcw, gcDrainFlushBgCredit)
            case gcMarkWorkerFractionalMode:
                // 执行标记
                gcDrain(&_p_.gcw, gcDrainFractional|gcDrainUntilPreempt|gcDrainFlushBgCredit)
            case gcMarkWorkerIdleMode:
                // 执行标记, 直到被抢占或者达到必定的量
                gcDrain(&_p_.gcw, gcDrainIdle|gcDrainUntilPreempt|gcDrainFlushBgCredit)
            }
            // 恢复G的状态到运行中
            casgstatus(gp, _Gwaiting, _Grunning)
        }) 
        ...
    }
}

在上面已经讲了不一样的 Mark Worker Mode 的区别,不记得的同窗能够往上翻一下。执行标记这部分主要在 switch 判断中,根据不一样的模式传入不一样的参数到 gcDrain 函数中执行。

须要注意的是,传入到 gcDrain 中的是一个 gcWork 的结构体,它至关于每一个 P 的私有缓存空间,存放须要被扫描的对象,为垃圾收集器提供了生产和消费任务的抽象,,该结构体持有了两个重要的工做缓冲区 wbuf1 和 wbuf2:

当咱们向该结构体中增长或者删除对象时,它总会先操做 wbuf1 缓冲区,一旦 wbuf1 缓冲区空间不足或者没有对象,会触发缓冲区的切换,而当两个缓冲区空间都不足或者都为空时,会从全局的工做缓冲区中插入或者获取对象:

func (w *gcWork) tryGet() uintptr {
    wbuf := w.wbuf1
    ...
    // wbuf1缓冲区无数据时
    if wbuf.nobj == 0 {
        // wbuf1 与 wbuf2 进行对象互换
        w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1
        wbuf = w.wbuf1
        if wbuf.nobj == 0 {
            owbuf := wbuf
            // 从 work 的 full 队列中获取
            wbuf = trygetfull()
            ...
        }
    } 
    wbuf.nobj--
    return wbuf.obj[wbuf.nobj]
}

当咱们向该结构体中增长或者删除对象时,它总会先操做 wbuf1 缓冲区,一旦 wbuf1 缓冲区空间不足或者没有对象,会触发缓冲区的切换,而当两个缓冲区空间都不足或者都为空时,会从全局的工做缓冲区中插入或者获取对象:

func (w *gcWork) tryGet() uintptr {
    wbuf := w.wbuf1
    ...
    // wbuf1缓冲区无数据时
    if wbuf.nobj == 0 {
        // wbuf1 与 wbuf2 进行对象互换
        w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1
        wbuf = w.wbuf1
        if wbuf.nobj == 0 {
            owbuf := wbuf
            // 从 work 的 full 队列中获取
            wbuf = trygetfull()
            ...
        }
    } 
    wbuf.nobj--
    return wbuf.obj[wbuf.nobj]
}

继续上面的 gcBgMarkWorker 方法,在标记完以后就要进行标记完成:

func gcBgMarkWorker(_p_ *p) {
    gp := getg()
    ...
    for { 
        ...  
        // 累加所用时间
        duration := nanotime() - startTime
        switch _p_.gcMarkWorkerMode {
        case gcMarkWorkerDedicatedMode:
            atomic.Xaddint64(&gcController.dedicatedMarkTime, duration)
            atomic.Xaddint64(&gcController.dedicatedMarkWorkersNeeded, 1)
        case gcMarkWorkerFractionalMode:
            atomic.Xaddint64(&gcController.fractionalMarkTime, duration)
            atomic.Xaddint64(&_p_.gcFractionalMarkTime, duration)
        case gcMarkWorkerIdleMode:
            atomic.Xaddint64(&gcController.idleMarkTime, duration)
        }

        incnwait := atomic.Xadd(&work.nwait, +1)

        // 判断是否全部后台标记任务都完成, 而且没有更多的任务
        if incnwait == work.nproc && !gcMarkWorkAvailable(nil) { 
            // 取消和P的关联
            _p_.gcBgMarkWorker.set(nil)
            // 容许G被抢占
            releasem(park.m.ptr())
            // 准备进入完成标记阶段
            gcMarkDone()

            // 休眠以前会从新关联P
            // 由于上面容许被抢占, 到这里的时候可能就会变成其余P
            // 若是从新关联P失败则这个任务会结束
            park.m.set(acquirem())
            park.attach.set(_p_)
        }
    }
}

gcBgMarkWorker 会根据 incnwait 来检查是不是最后一个 worker,而后调用 gcMarkWorkAvailable 函数来校验 gcwork的任务和全局任务是否已经所有都处理完了,若是都确认没问题,那么调用 gcMarkDone 进入完成标记阶段。

标记扫描

下面咱们来看看 gcDrain:

func gcDrain(gcw *gcWork, flags gcDrainFlags) {
    gp := getg().m.curg
    // 看到抢占标志时是否要返回
    preemptible := flags&gcDrainUntilPreempt != 0
    // 是否计算后台的扫描量来减小协助线程和唤醒等待中的G
    flushBgCredit := flags&gcDrainFlushBgCredit != 0
    // 是否只执行必定量的工做
    idle := flags&gcDrainIdle != 0
    // 记录初始的已扫描数量
    initScanWork := gcw.scanWork

    checkWork := int64(1<<63 - 1)
    var check func() bool
    if flags&(gcDrainIdle|gcDrainFractional) != 0 {
        // drainCheckThreshold 默认 100000
        checkWork = initScanWork + drainCheckThreshold
        if idle {
            check = pollWork
        } else if flags&gcDrainFractional != 0 {
            check = pollFractionalWorkerExit
        }
    } 
    // 若是根对象未扫描完, 则先扫描根对象
    if work.markrootNext < work.markrootJobs {
        // 一直循环直到被抢占或 STW
        for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) {
            // 从根对象扫描队列取出一个值
            job := atomic.Xadd(&work.markrootNext, +1) - 1
            if job >= work.markrootJobs {
                break
            }
            // 执行根对象扫描工做
            markroot(gcw, job)
            if check != nil && check() {
                goto done
            }
        }
    } 
    ...
}

gcDrain 函数在开始的时候,会根据 flags 不一样而选择不一样的策略。

  • gcDrainUntilPreempt:当 G 被抢占时返回;
  • gcDrainIdle:调用 runtime.pollWork,当 P 上包含其余待执行 G 时返回;
  • gcDrainFractional:调用 runtime.pollFractionalWorkerExit,当 CPU 的占用率超过 fractionalUtilizationGoal 的 20% 时返回;
    设置完 check 变量后就能够执行 runtime.markroot进行根对象扫描,每次扫描完毕都会调用 check 函数校验是否应该退出标记任务,若是是那么就跳到 done 代码块中退出标记。

完成标记后会获取待执行的任务:

func gcDrain(gcw *gcWork, flags gcDrainFlags) {
    ...
    // 根对象已经在标记队列中, 消费标记队列
    // 一直循环直到被抢占或 STW
    for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) { 
        // 将本地一部分工做放回全局队列中
        if work.full == 0 {
            gcw.balance()
        }
        // 获取任务
        b := gcw.tryGetFast()
        if b == 0 {
            b = gcw.tryGet()
            if b == 0 { 
                wbBufFlush(nil, 0)
                b = gcw.tryGet()
            }
        }
        // 获取不到对象, 标记队列已为空, 跳出循环
        if b == 0 { 
            break
        }
        // 扫描获取到的对象
        scanobject(b, gcw)

        // 若是已经扫描了必定数量的对象,gcCreditSlack值是2000
        if gcw.scanWork >= gcCreditSlack {
            // 把扫描的对象数量添加到全局
            atomic.Xaddint64(&gcController.scanWork, gcw.scanWork)
            if flushBgCredit {
                // 记录此次扫描的内存字节数用于减小辅助标记的工做量
                gcFlushBgCredit(gcw.scanWork - initScanWork)
                initScanWork = 0
            }
            checkWork -= gcw.scanWork
            gcw.scanWork = 0

            if checkWork <= 0 {
                checkWork += drainCheckThreshold
                if check != nil && check() {
                    break
                }
            }
        }
    }

done:
    // 把扫描的对象数量添加到全局
    if gcw.scanWork > 0 {
        atomic.Xaddint64(&gcController.scanWork, gcw.scanWork)
        if flushBgCredit {
            // 记录此次扫描的内存字节数用于减小辅助标记的工做量
            gcFlushBgCredit(gcw.scanWork - initScanWork)
        }
        gcw.scanWork = 0
    }
}

这里在获取缓存队列以前会调用runtime.gcWork.balance,会将gcWork缓存一部分工做放回全局队列中,这个方法主要是用来平衡一下不一样 P 的负载状况。

而后获取gcWork的缓存任务,并将获取到的任务交给scanobject执行,该函数会从传入的位置开始扫描,并会给找到的活跃对象上色。runtime.gcFlushBgCredit会记录此次扫描的内存字节数用于减小辅助标记的工做量。

这里我来总结一下gcWork出入队状况。gcWork的出队就是咱们上面的scanobject方法,会获取到 gcWork 缓存对象并执行,可是同时若是找到活跃对象也会再次的入队到 gcWork 中。

除了 scanobject 之外,写屏障、根对象扫描和栈扫描都会向 gcWork 中增长额外的灰色对象等待处理。

相关文章
相关标签/搜索