Goroutine被动调度之一(18)

本文是《Go语言调度器源代码情景分析》系列的第18篇,也是第四章《Goroutine被动调度》的第1小节。编程


前一章咱们详细分析了调度器的调度策略,即调度器如何选取下一个进入运行的goroutine,但咱们还不清楚何时以及什么状况下会发生调度,从这一章开始咱们就来讨论这个问题。缓存

整体说来,go语言的调度器会在如下三种状况下对goroutine进行调度:并发

  1. goroutine执行某个操做因条件不知足须要等待而发生的调度;dom

  2. goroutine主动调用Gosched()函数让出CPU而发生的调度;函数

  3. goroutine运行时间太长或长时间处于系统调用之中而被调度器剥夺运行权而发生的调度。ui

本章主要分析咱们称之为被动调度的第1种调度,剩下的两种调度将在后面两章分别进行讨论。this

Demo例子atom

咱们以一个demo程序为例来分析因阻塞而发生的被动调度。spa

package main

func start(c chan int) {
    c<-100
}

func main() {
    c:=make(chan int)

    go start(c)

    <-c
}

 该程序启动时,main goroutine首先会建立一个无缓存的channel,而后启动一个goroutine(为了方便讨论咱们称它为g2)向channel发送数据,而main本身则去读取这个channel。线程

这两个goroutine读写channel时必定会发生一次阻塞,不是main goroutine读取channel时发生阻塞就是g2写入channel时发生阻塞。

建立g2 goroutine

首先用gdb反汇编一下main函数,看看汇编代码。

0x44f4d0<+0>: mov   %fs:0xfffffffffffffff8,%rcx
0x44f4d9<+9>: cmp   0x10(%rcx),%rsp
0x44f4dd<+13>: jbe   0x44f549 <main.main+121>
0x44f4df<+15>: sub   $0x28,%rsp
0x44f4e3<+19>: mov   %rbp,0x20(%rsp)
0x44f4e8<+24>: lea   0x20(%rsp),%rbp
0x44f4ed<+29>: lea   0xb36c(%rip),%rax       
0x44f4f4<+36>: mov   %rax,(%rsp)
0x44f4f8<+40>: movq   $0x0,0x8(%rsp)
0x44f501<+49>: callq    0x404330 <runtime.makechan>  #建立channel
0x44f506<+54>: mov   0x10(%rsp),%rax
0x44f50b<+59>: mov   %rax,0x18(%rsp)
0x44f510<+64>: movl   $0x8,(%rsp)
0x44f517<+71>: lea   0x240f2(%rip),%rcx       
0x44f51e<+78>: mov   %rcx,0x8(%rsp)
0x44f523<+83>: callq   0x42c1b0 <runtime.newproc> #建立goroutine
0x44f528<+88>: mov   0x18(%rsp),%rax
0x44f52d<+93>: mov   %rax,(%rsp)
0x44f531<+97>: movq   $0x0,0x8(%rsp)
0x44f53a<+106>: callq   0x405080 <runtime.chanrecv1> #从channel读取数据
0x44f53f<+111>: mov   0x20(%rsp),%rbp
0x44f544<+116>: add   $0x28,%rsp
0x44f548<+120>: retq   
0x44f549<+121>: callq 0x447390 <runtime.morestack_noctxt>
0x44f54e<+126>: jmp   0x44f4d0 <main.main>

从main函数的汇编代码咱们能够看到,建立goroutine的go关键字被编译器翻译成了对runtime.newproc函数的调用,第二章咱们对这个函数的主要流程作过详细分析,这里简单的回顾一下:

  1. 切换到g0栈;

  2. 分配g结构体对象;

  3. 初始化g对应的栈信息,并把参数拷贝到新g的栈上;

  4. 设置好g的sched成员,该成员包括调度g时所必须pc, sp, bp等调度信息;

  5. 调用runqput函数把g放入运行队列;

  6. 返回

由于当时咱们的主要目标是调度器的初始化部分,因此并无详细分析上述流程中的第5步,也就是runqput是如何把goroutine放入运行队列的,如今就回头分析一下这个过程,下面咱们直接从runqput函数开始。

经过runqput函数把goroutine挂入运行队列

runtime/proc.go : 4746

// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(_p_ *p, gp *g, next bool)   {
    if randomizeScheduler && next && fastrand() % 2 == 0  {
        next = false
    }

    if next  {
        //把gp放在_p_.runnext成员里,
        //runnext成员中的goroutine会被优先调度起来运行
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp)))  {
             //有其它线程在操做runnext成员,须要重试
            goto retryNext
        }
        if oldnext == 0  { //本来runnext为nil,因此没任何事情可作了,直接返回
            return
        }
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr() //本来存放在runnext的gp须要放入runq的尾部
    }

retry:
    //可能有其它线程正在并发修改runqhead成员,因此须要跟其它线程同步
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    if t - h < uint32(len(_p_.runq))  { //判断队列是否满了
        //队列尚未满,能够放入
        _p_.runq[t % uint32(len(_p_.runq))].set(gp)
       
        // store-release, makes it available for consumption
        //虽然没有其它线程并发修改这个runqtail,但其它线程会并发读取该值以及p的runq成员
        //这里使用StoreRel是为了:
        //1,原子写入runqtail
        //2,防止编译器和CPU乱序,保证上一行代码对runq的修改发生在修改runqtail以前
        //3,可见行屏障,保证当前线程对运行队列的修改对其它线程立马可见
        atomic.StoreRel(&_p_.runqtail, t + 1)
        return
    }
    //p的本地运行队列已满,须要放入全局运行队列
    if runqputslow(_p_, gp, h, t) {
        return
    }
    // the queue is not full, now the put above must succeed
    goto retry
}

runqput函数流程很清晰,它首先尝试把gp放入_p_的本地运行队列,若是本地队列满了,则经过runqputslow函数把gp放入全局运行队列。

runtime/proc.go : 4784

// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
func runqputslow(_p_ *p, gp *g, h, t uint32) bool  {
    var batch [len(_p_.runq) / 2 + 1]*g  //gp加上_p_本地队列的一半

    // First, grab a batch from local queue.
    n := t - h
    n = n / 2
    if n != uint32(len(_p_.runq) / 2)  {
        throw("runqputslow: queue is not full")
    }
    for i := uint32(0); i < n; i++ { //取出p本地队列的一半
        batch[i] = _p_.runq[(h+i) % uint32(len(_p_.runq))].ptr()
    }
    if !atomic.CasRel(&_p_.runqhead, h, h + n)  { // cas-release, commits consume
        //若是cas操做失败,说明已经有其它工做线程从_p_的本地运行队列偷走了一些goroutine,因此直接返回
        return false
    }
    batch[n] = gp

    if randomizeScheduler {
        for i := uint32(1); i <= n; i++ {
            j := fastrandn(i + 1)
            batch[i], batch[j] = batch[j], batch[i]
        }
    }

    // Link the goroutines.
    //全局运行队列是一个链表,这里首先把全部须要放入全局运行队列的g连接起来,
    //减小后面对全局链表的锁住时间,从而下降锁冲突
    for i := uint32(0); i < n; i++  {
        batch[i].schedlink.set(batch[i+1])
    }
    var q gQueue
    q.head.set(batch[0])
    q.tail.set(batch[n])

    // Now put the batch on global queue.
    lock(&sched.lock)
    globrunqputbatch(&q, int32(n+1))
    unlock(&sched.lock)
    return true
}

runqputslow函数首先使用链表把从_p_的本地队列中取出的一半连同gp一块儿串联起来,而后在加锁成功以后经过globrunqputbatch函数把该链表链入全局运行队列(全局运行队列是使用链表实现的)。值的一提的是runqputslow函数并无一开始就把全局运行队列锁住,而是等全部的准备工做作完以后才锁住全局运行队列,这是并发编程加锁的基本原则,须要尽可能减少锁的粒度,下降锁冲突的几率。

分析完runqput函数是如何把goroutine放入运行队列以后,接下来咱们继续分析main goroutine因读取channel而发生的阻塞流程。

因读取channel阻塞而发生的被动调度

从代码逻辑的角度来讲,咱们不能肯定main goroutine和新建立出来的g2谁先运行,但对于咱们分析来讲咱们能够假定某个goroutine先运行,由于无论谁先运行,都会阻塞在channel的读或则写上,因此这里咱们假设main建立好g2后首先阻塞在了对channel的读操做上。下面咱们看看读取channel的过程。

从前面的反汇编代码咱们知道读取channel是经过调用runtime.chanrecv1函数来完成的,咱们就从它开始分析,不过在分析过程当中咱们不会把精力放在对channel的操做上,而是分析这个过程当中跟调度有关的细节。

runtime/chan.go : 403

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

// runtime/chan.go : 415
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ......
    //省略部分的代码逻辑主要在判断读取操做是否能够当即完成,若是不能当即完成
    //就须要把g挂在channel c的读取队列上,而后调用goparkunlock函数阻塞此goroutine
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
    ......
}

chanrecv1直接调用chanrecv函数实现读取操做,chanrecv首先会判断channel是否有数据可读,若是有数据则直接读取并返回,但若是没有数据,则须要把当前goroutine挂入channel的读取队列之中并调用goparkunlock函数阻塞该goroutine.

runtime/proc.go : 304

// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).
func goparkunlock(lock*mutex, reasonwaitReason, traceEvbyte, traceskipint) {
    gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}

// runtime/proc.go : 276
// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
// Reason explains why the goroutine has been parked.
// It is displayed in stack traces and heap dumps.
// Reasons should be unique and descriptive.
// Do not re-use reasons, add new ones.
func gopark(unlockffunc(*g, unsafe.Pointer) bool, lockunsafe.Pointer, reason    waitReason, traceEvbyte, traceskipint) {
    ......
    // can't do anything that might move the G between Ms here.
    mcall(park_m) //切换到g0栈执行park_m函数
}

goparkunlock函数直接调用gopark函数,gopark则调用mcall从当前main goroutine切换到g0去执行park_m函数(mcall前面咱们分析过,其主要做用就是保存当前goroutine的现场,而后切换到g0栈去调用做为参数传递给它的函数)

runtime/proc.go : 2581

// park continuation on g0.
func park_m(gp*g) {
    _g_ := getg()

    if trace.enabled {
        traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
    }

    casgstatus(gp, _Grunning, _Gwaiting)
    dropg()  //解除g和m之间的关系

    ......
   
    schedule()
}

park_m首先把当前goroutine的状态设置为_Gwaiting(由于它正在等待其它goroutine往channel里面写数据),而后调用dropg函数解除g和m之间的关系,最后经过调用schedule函数进入调度循环,schedule函数咱们也详细分析过,它首先会从运行队列中挑选出一个goroutine,而后调用gogo函数切换到被挑选出来的goroutine去运行。由于main goroutine在读取channel被阻塞以前已经把建立好的g2放入了运行队列,因此在这里schedule会把g2调度起来运行,这里完成了一次从main goroutine到g2调度(咱们假设只有一个工做线程在进行调度)。

唤醒阻塞在channel上的goroutine

g2 goroutine的入口是start函数,下面咱们就从该函数开始分析g2写channel的流程,看它如何唤醒正在等待着读取channel的main goroutine。仍是先来反汇编一下start函数的代码:

0x44f480<+0>:mov   %fs:0xfffffffffffffff8,%rcx
0x44f489<+9>:cmp   0x10(%rcx),%rsp
0x44f48d<+13>:jbe   0x44f4c1 <main.start+65>
0x44f48f<+15>:sub   $0x18,%rsp
0x44f493<+19>:mov   %rbp,0x10(%rsp)
0x44f498<+24>:lea   0x10(%rsp),%rbp
0x44f49d<+29>:mov   0x20(%rsp),%rax
0x44f4a2<+34>:mov   %rax,(%rsp)
0x44f4a6<+38>:lea   0x2d71b(%rip),%rax       
0x44f4ad<+45>:mov   %rax,0x8(%rsp)
0x44f4b2<+50>:callq   0x404560 <runtime.chansend1> #写channel
0x44f4b7<+55>:mov   0x10(%rsp),%rbp
0x44f4bc<+60>:add   $0x18,%rsp
0x44f4c0<+64>:retq   
0x44f4c1<+65>:callq    0x447390 <runtime.morestack_noctxt>
0x44f4c6<+70>:jmp   0x44f480 <main.start>

能够看到,编译器把对channel的发送操做翻译成了对runtime.chansend1函数的调用

runtime/chan.go : 124

/ entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

// runtime/chan.go : 142
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        //能够直接发送数据给sg
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    ......
}

// runtime/chan.go : 269
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ......
    goready(gp, skip+1)
}

// runtime/proc.go : 310
func goready(gp *g, traceskip int) {
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}

channel发送和读取的流程相似,若是可以当即发送则当即发送并返回,若是不能当即发送则须要阻塞,在咱们这个场景中,由于main goroutine此时此刻正挂在channel的读取队列上等待数据,因此这里直接调用send函数发送给main goroutine,send函数则调用goready函数切换到g0栈并调用ready函数来唤醒sg对应的goroutine,即正在等待读channel的main goroutine。

runtime/proc.go : 639

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
    ......
    // Mark runnable.
    _g_ := getg()
    ......
    // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
    casgstatus(gp, _Gwaiting, _Grunnable)
    runqput(_g_.m.p.ptr(), gp, next) //放入运行队列
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
        //有空闲的p并且没有正在偷取goroutine的工做线程,则须要唤醒p出来工做
        wakep()
    }
    ......
}

ready函数首先把须要唤醒的goroutine的状态设置为_Grunnable,而后把其放入运行队列之中等待调度器的调度。

对于本章咱们分析的场景,执行到这里main goroutine已经被放入了运行队列,但还未被调度起来运行,而g2 goroutine在向channel写完数据以后就从这里的ready函数返回并退出了,从第二章咱们对goroutine的退出流程的分析能够得知,在g2的退出过程当中将会在goexit0函数中调用schedule函数进入下一轮调度,从而把刚刚放入运行队列的main goroutine调度起来运行。

在上面分析ready函数时咱们略过了一种状况:若是当前有空闲的p并且没有工做线程正在尝试从各个工做线程的本地运行队列偷取goroutine的话(没有处于spinning状态的工做线程),那么就须要经过wakep函数把空闲的p唤醒起来工做。为了避免让篇幅过长,下一节咱们再来分析wakep如何去唤醒和建立新的工做线程。

相关文章
相关标签/搜索