工做线程的唤醒及建立(19)

本文是《Go语言调度器源代码情景分析》系列的第19篇,也是第四章《Goroutine被动调度》的第2小节。linux


本文须要重点关注:网络

  • 如何唤醒睡眠中的工做线程函数

  • 如何建立新的工做线程oop

上一篇文章咱们分析到了ready函数经过把须要唤醒的goroutine放入运行队列来唤醒它,本文接着上文继续分析。ui

唤醒空闲的Patom

为了充分利用CPU,ready函数在唤醒goroutine以后会去判断是否须要启动新工做线程出来工做,判断规则是,若是当前有空闲的p并且没有工做线程正在尝试从各个工做线程的本地运行队列偷取goroutine的话(没有处于spinning状态的工做线程),那么就须要把空闲的p唤醒起来工做,详见下面的ready函数:spa

runtime/proc.go : 639操作系统

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
    ......
    // Mark runnable.
    _g_ := getg()
    ......
    // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
    casgstatus(gp, _Gwaiting, _Grunnable)
    runqput(_g_.m.p.ptr(), gp, next) //放入运行队列
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
        //有空闲的p并且没有正在偷取goroutine的工做线程,则须要唤醒p出来工做
        wakep()
    }
    ......
}

而唤醒空闲的p是由wakep函数完成的。线程

runtime/proc.go : 2051指针

// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
func wakep() {
    // be conservative about spinning threads
    if !atomic.Cas(&sched.nmspinning, 0, 1) {
        return
    }
    startm(nil, true)
}

wakep首先经过cas操做再次确认是否有其它工做线程正处于spinning状态,这里之因此须要使用cas操做再次进行确认,缘由在于,在当前工做线程经过以下条件

atomic.Load(&sched.npidle) != 0 & &atomic.Load(&sched.nmspinning) == 0

判断到须要启动工做线程以后到真正启动工做线程以前的这一段时间以内,若是已经有工做线程进入了spinning状态而在四处寻找须要运行的goroutine,这样的话咱们就没有必要再启动一个多余的工做线程出来了。

若是cas操做成功,则继续调用startm建立一个新的或唤醒一个处于睡眠状态的工做线程出来工做。

runtime/proc.go : 1947

// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed.
// If spinning is set, the caller has incremented nmspinning and startm will
// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {
    lock(&sched.lock)
    if _p_ == nil { //没有指定p的话须要从p的空闲队列中获取一个p
        _p_ = pidleget() //从p的空闲队列中获取空闲p
        if _p_ == nil {
            unlock(&sched.lock)
            if spinning {
                // The caller incremented nmspinning, but there are no idle Ps,
                // so it's okay to just undo the increment and give up.
                //spinning为true表示进入这个函数以前已经对sched.nmspinning加了1,须要还原
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                    throw("startm: negative nmspinning")
                }
            }
            return //没有空闲的p,直接返回
        }
    }
    mp := mget() //从m空闲队列中获取正处于睡眠之中的工做线程,全部处于睡眠状态的m都在此队列中
    unlock(&sched.lock)
    if mp == nil {
        //没有处于睡眠状态的工做线程
        var fn func()
        if spinning {
            // The caller incremented nmspinning, so set m.spinning in the new M.
            fn = mspinning
        }
        newm(fn, _p_) //建立新的工做线程
        return
    }
    if mp.spinning {
        throw("startm: m is spinning")
    }
    if mp.nextp != 0 {
        throw("startm: m has p")
    }
    if spinning && !runqempty(_p_) {
        throw("startm: p has runnable gs")
    }
    // The caller incremented nmspinning, so set m.spinning in the new M.
    mp.spinning = spinning
    mp.nextp.set(_p_)
   
    //唤醒处于休眠状态的工做线程
    notewakeup(&mp.park)
}

startm函数首先判断是否有空闲的p结构体对象,若是没有则直接返回,若是有则须要建立或唤醒一个工做线程出来与之绑定,从这里能够看出所谓的唤醒p,其实就是把空闲的p利用起来。

在确保有能够绑定的p对象以后,startm函数首先尝试从m的空闲队列中查找正处于休眠状态的工做线程,若是找到则经过notewakeup函数唤醒它,不然调用newm函数建立一个新的工做线程出来。

下面咱们首先分析notewakeup函数是如何唤醒工做线程的,而后再讨论newm函数建立工做线程的流程。

唤醒睡眠中的工做线程

在第三章咱们讨论过,当找不到须要运行的goroutine时,工做线程会经过notesleep函数睡眠在m.park成员上,因此这里使用m.park成员做为参数调用notewakeup把睡眠在该成员之上的工做线程唤醒。

runtime/lock_futex.go : 130

func notewakeup(n *note) {
    //设置n.key = 1, 被唤醒的线程经过查看该值是否等于1来肯定是被其它线程唤醒仍是意外从睡眠中苏醒
    old := atomic.Xchg(key32(&n.key), 1)  
    if old != 0 {
        print("notewakeup - double wakeup (", old, ")\n")
        throw("notewakeup - double wakeup")
    }
    //调用futexwakeup唤醒
    futexwakeup(key32(&n.key), 1)
}

notewakeup函数首先使用atomic.Xchg设置note.key值为1,这是为了使被唤醒的线程能够经过查看该值是否等于1来肯定是被其它线程唤醒仍是意外从睡眠中苏醒了过来,若是该值为1则表示是被唤醒的,能够继续工做了,但若是该值为0则表示是意外苏醒,须要再次进入睡眠,工做线程苏醒以后的处理逻辑咱们已经在notesleep函数中见过,因此这里略过。 

把note.key的值设置为1后,notewakeup函数继续调用futexwakeup函数

runtime/os_linux.go : 66

// If any procs are sleeping on addr, wake up at most cnt.
//go:nosplit
func futexwakeup(addr *uint32, cnt uint32) {
    //调用futex函数唤醒工做线程
    ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
    if ret >= 0 {
        return
    }

    // I don't know that futex wakeup can return
    // EAGAIN or EINTR, but if it does, it would be
    // safe to loop and call futex again.
    systemstack(func() {
        print("futexwakeup addr=", addr, " returned ", ret, "\n")
    })

    *(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}

对于Linux平台来讲,工做线程经过note睡眠实际上是经过futex系统调用睡眠在内核之中,因此唤醒处于睡眠状态的线程也须要经过futex系统调用进入内核来唤醒,因此这里的futexwakeup又继续调用包装了futex系统调用的futex函数来实现唤醒睡眠在内核中的工做线程。

runtime/sys_linux_amd64.s : 525

// int64 futex(int32 *uaddr, int32 op, int32 val,
//struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
    MOVQ  addr+0(FP), DI #这6条指令在为futex系统调用准备参数
    MOVL  op+8(FP), SI
    MOVL  val+12(FP), DX
    MOVQ  ts+16(FP), R10
    MOVQ  addr2+24(FP), R8
    MOVL  val3+32(FP), R9
    MOVL  $SYS_futex, AX  #futex系统调用编号放入AX寄存器
    SYSCALL  #系统调用,进入内核
    MOVL  AX, ret+40(FP) #系统调用经过AX寄存器返回返回值,这里把返回值保存到内存之中
    RET

futex函数由汇编代码写成,前面的几条指令都在为futex系统调用准备参数,参数准备完成以后则经过SYSCALL指令进入操做系统内核完成线程的唤醒功能,内核在完成唤醒工做以后当前工做线程则从内核返回到futex函数继续执行SYSCALL指令以后的代码并按函数调用链原路返回,继续执行其它代码,而被唤醒的工做线程则由内核负责在适当的时候调度到CPU上运行。

看完唤醒流程,下面咱们来分析工做线程的建立。

建立工做线程

回到startm函数,若是没有正处于休眠状态的工做线程,则须要调用newm函数新建一个工做线程。

runtime/proc.go : 1807

// Create a new m. It will start off with a call to fn, or else the scheduler.
// fn needs to be static and not a heap allocated closure.
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrierrec
func newm(fn func(), _p_ *p) {
    mp := allocm(_p_, fn)
    mp.nextp.set(_p_)
    ......
    newm1(mp)
}

newm首先调用allocm函数从堆上分配一个m结构体对象,而后调用newm1函数。

runtime/proc.go : 1843

func newm1(mp *m) {
      //省略cgo相关代码.......
      execLock.rlock() // Prevent process clone.
      newosproc(mp)
      execLock.runlock()
}

newm1继续调用newosproc函数,newosproc的主要任务是调用clone函数建立一个系统线程,而新建的这个系统线程将从mstart函数开始运行。

runtime/os_linux.go : 143

// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrier
func newosproc(mp *m) {
    stk := unsafe.Pointer(mp.g0.stack.hi)                    
    ......
    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0),         unsafe.Pointer(funcPC(mstart)))
    ......
}
//clone系统调用的Flags选项
cloneFlags = _CLONE_VM | /* share memory */ //指定父子线程共享进程地址空间
  _CLONE_FS | /* share cwd, etc */
  _CLONE_FILES | /* share fd table */
  _CLONE_SIGHAND | /* share sig handler table */
  _CLONE_SYSVSEM | /* share SysV semaphore undo lists (see issue #20763) */
  _CLONE_THREAD /* revisit - okay for now */  //建立子线程而不是子进程

clone函数是由汇编语言实现的,该函数使用clone系统调用完成建立系统线程的核心功能。咱们分段来看

runtime/sys_linux_amd64.s : 539

// int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void));
TEXT runtime·clone(SB),NOSPLIT,$0
    MOVL  flags+0(FP), DI//系统调用的第一个参数
    MOVQ  stk+8(FP), SI  //系统调用的第二个参数
    MOVQ  $0, DX        //第三个参数
    MOVQ  $0, R10        //第四个参数

    // Copy mp, gp, fn off parent stack for use by child.
    // Careful:Linux system call clobbers CXand R11.
    MOVQ  mp+16(FP), R8
    MOVQ  gp+24(FP), R9
    MOVQ  fn+32(FP), R12

    MOVL  $SYS_clone, AX
    SYSCALL

clone函数首先用了4条指令为clone系统调用准备参数,该系统调用一共须要四个参数,根据Linux系统调用约定,这四个参数须要分别放入rdi, rsi,rdx和r10寄存器中,这里最重要的是第一个参数和第二个参数,分别用来指定内核建立线程时须要的选项和新线程应该使用的栈。由于即将被建立的线程与当前线程共享同一个进程地址空间,因此这里必须为子线程指定其使用的栈,不然父子线程会共享同一个栈从而形成混乱,从上面的newosproc函数能够看出,新线程使用的栈为m.g0.stack.lo~m.g0.stack.hi这段内存,而这段内存是newm函数在建立m结构体对象时从进程的堆上分配而来的。

准备好系统调用的参数以后,还有另一件很重的事情须要作,那就是把clone函数的其它几个参数(mp, gp和线程入口函数)保存到寄存器中,之因此须要在系统调用以前保存这几个参数,缘由在于这几个参数目前还位于父线程的栈之中,而一旦经过系统调用把子线程建立出来以后,子线程将会使用咱们在clone系统调用时给它指定的栈,因此这里须要把这几个参数先保存到寄存器,等子线程从系统调用返回后直接在寄存器中获取这几个参数。这里要注意的是虽然这个几个参数值保存在了父线程的寄存器之中,但建立子线程时,操做系统内核会把父线程的全部寄存器帮咱们复制一份给子线程,因此当子线程开始运行时就能拿到父线程保存在寄存器中的值,从而拿到这几个参数。这些准备工做完成以后代码调用syscall指令进入内核,由内核帮助咱们建立系统线程。

clone系统调用完成后实际上就多了一个操做系统线程,新建立的子线程和当前线程都得从系统调用返回而后继续执行后面的代码,那么从系统调用返回以后咱们怎么知道哪一个是父线程哪一个是子线程,从而来决定它们的执行流程?使用过fork系统调用的读者应该知道,咱们须要经过返回值来判断父子线程,系统调用的返回值若是是0则表示这是子线程,不为0则表示这个是父线程。用c代码来描述大概就是这个样子:

if (clone(...) == 0) { //子线程
    子线程代码
} else {//父线程
    父线程代码
}

虽然这里只有一次clone调用,但它却返回了2次,一次返回到父线程,一次返回到子线程,而后2个线程各自执行本身的代码流程。

回到clone函数,下面代码的第一条指令就在判断系统调用的返回值,若是是子线程则跳转到后面的代码继续执行,若是是父线程,它建立子线程的任务已经完成,因此这里把返回值保存在栈上以后就直接执行ret指令返回到newosproc函数了。

runtime/sys_linux_amd64.s : 555   

// In parent, return.
    CMPQ  AX, $0  #判断clone系统调用的返回值
    JEQ  3(PC) / #跳转到子线程部分
    MOVL  AX, ret+40(FP) #父线程须要执行的指令
    RET #父线程须要执行的指令

而对于子线程来讲,还有不少初始化工做要作,下面是子线程须要继续执行的指令。

runtime/sys_linux_amd64.s : 561

# In child, on new stack.
    #子线程须要继续执行的指令
    MOVQ  SI, SP #设置CPU栈顶寄存器指向子线程的栈顶,这条指令看起来是多余的?内核应该已经把SP设置好了

    # If g or m are nil, skip Go-related setup.
    CMPQ  R8, $0    # m,新建立的m结构体对象的地址,由父线程保存在R8寄存器中的值被复制到了子线程
    JEQ  nog
    CMPQ  R9, $0    # g,m.g0的地址,由父线程保存在R9寄存器中的值被复制到了子线程
    JEQ  nog

    # Initialize m->procid to Linux tid
    MOVL  $SYS_gettid, AX #经过gettid系统调用获取线程ID(tid)
    SYSCALL
    MOVQ  AX, m_procid(R8)  #m.procid = tid

    #Set FS to point at m->tls.
    #新线程刚刚建立出来,还未设置线程本地存储,即m结构体对象还未与工做线程关联起来,
    #下面的指令负责设置新线程的TLS,把m对象和工做线程关联起来
    LEAQ  m_tls(R8), DI #取m.tls字段的地址
    CALL  runtime·settls(SB)

    #In child, set up new stack
    get_tls(CX)
    MOVQ  R8, g_m(R9)  # g.m = m 
    MOVQ  R9, g(CX)      # tls.g = &m.g0
    CALL  runtime·stackcheck(SB)

nog:
    # Call fn
    CALL  R12 #这里调用mstart函数
    ......

这段代码的第一条指令把CPU寄存器的栈顶指针设置为新线程的的栈顶,这条指令看起来是多余的,由于咱们在clone系统调用时已经把栈信息告诉操做系统了,操做系统在把新线程调度起来运行时已经帮咱们把CPU的rsp寄存器设置好了,这里应该不必本身去设置。接下来的4条指令判断m和g是否为nil,若是是则直接去执行fn函数,对于咱们这个流程来讲,由于如今正在建立工做线程,因此m和g(实际上是m.g0)都不为空,于是须要继续对m进行初始化。

对新建立出来的工做线程的初始化过程从上面代码片断的第6条指令开始,它首先经过系统调用获取到子线程的线程id,并赋值给m.procid,而后调用settls设置线程本地存储并经过把m.g0的地址放入线程本地存储之中,从而实现了m结构体对象与工做线程之间的关联,settls函数咱们已经在第二章详细分析过,因此这里直接跳过。

新工做线程的初始化完成以后,便开始执行mstart函数,咱们在第二章也见过该函数,主线程初始化完成以后也是调用的它。回忆一下,mstart函数首先会去设置m.g0的stackguard成员,而后调用mstart1()函数把当前工做线程的g0的调度信息保存在m.g0.sched成员之中,最后经过调用schedule函数进入调度循环。

总结

本章仅以读写channel为例分析了goroutine因操做被阻塞而发生的被动调度,其实发生被动调度的状况还比较多,好比因读写网络链接而阻塞、加锁被阻塞或select操做阻塞等等都会发生被动调度,读者能够自行阅读相关源代码。

本章还分析了睡眠中的工做线程是如何被唤起起来工做的以及新工做线程的建立和初始化流程。

相关文章
相关标签/搜索