本文分析了Golang的socket文件描述符和goroutine阻塞调度的原理。代码中大部分是Go代码,小部分是汇编代码。完整理解本文须要Go语言知识,而且用Golang写过网络程序。更重要的是,须要提早理解goroutine的调度原理。cookie
在net.go中有一个名为Conn
的接口,提供了对于链接的读写和其余操做:网络
type Conn interface { Read(b []byte) (n int, err error) Write(b []byte) (n int, err error) Close() error LocalAddr() Addr RemoteAddr() Addr SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error }
这个接口就是对下面的结构体conn
的抽象。conn
结构体包含了对链接的读写和其余操做:数据结构
type conn struct { fd *netFD }
// Read implements the Conn Read method. func (c *conn) Read(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } return c.fd.Read(b) }
// Write implements the Conn Write method. func (c *conn) Write(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } return c.fd.Write(b) }
// Close closes the connection. func (c *conn) Close() error { if !c.ok() { return syscall.EINVAL } return c.fd.Close() }
// SetDeadline implements the Conn SetDeadline method. func (c *conn) SetDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } return c.fd.setDeadline(t) } // SetReadDeadline implements the Conn SetReadDeadline method. func (c *conn) SetReadDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } return c.fd.setReadDeadline(t) } // SetWriteDeadline implements the Conn SetWriteDeadline method. func (c *conn) SetWriteDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } return c.fd.setWriteDeadline(t) }
*netFD
的操做上。咱们继续跟踪c.fd.Read()
函数.net/fd_unix.go:
app
// Network file descriptor. type netFD struct { // locking/lifetime of sysfd + serialize access to Read and Write methods fdmu fdMutex // immutable until Close sysfd int family int sotype int isConnected bool net string laddr Addr raddr Addr // wait server pd pollDesc }
func (fd *netFD) Read(p []byte) (n int, err error) { if err := fd.readLock(); err != nil { return 0, err } defer fd.readUnlock() if err := fd.pd.PrepareRead(); err != nil { return 0, &OpError{"read", fd.net, fd.raddr, err} } // 调用system call,循环从fd.sysfd读取数据 for { // 系统调用Read读取数据 n, err = syscall.Read(int(fd.sysfd), p) // 若是发生错误,则须要处理 // 而且只处理EAGAIN类型的错误,其余错误一概返回给调用者 if err != nil { n = 0 // 对于非阻塞的网络链接的文件描述符,若是错误是EAGAIN // 说明Socket的缓冲区为空,未读取到任何数据 // 则调用fd.pd.WaitRead, if err == syscall.EAGAIN { if err = fd.pd.WaitRead(); err == nil { continue } } } err = chkReadErr(n, err, fd) break } if err != nil && err != io.EOF { err = &OpError{"read", fd.net, fd.raddr, err} } return }
网络轮询器是Golang中针对每一个socket文件描述符创建的轮询机制。 此处的轮询并非通常意义上的轮询,而是Golang的runtime在调度goroutine或者GC完成以后或者指定时间以内,调用epoll_wait获取全部产生IO事件的socket文件描述符。固然在runtime轮询以前,须要将socket文件描述符和当前goroutine的相关信息加入epoll维护的数据结构中,并挂起当前goroutine,当IO就绪后,经过epoll返回的文件描述符和其中附带的goroutine的信息,从新恢复当前goroutine的执行。socket
// Integrated network poller (platform-independent part). // 网络轮询器(平台独立部分) // A particular implementation (epoll/kqueue) must define the following functions: // 实际的实现(epoll/kqueue)必须定义如下函数: // func netpollinit() // to initialize the poller,初始化轮询器 // func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications, 为fd和pd启动边缘触发通知 // and associate fd with pd. // 一个实现必须调用下面的函数,用来指示pd已经准备好 // An implementation must call the following function to denote that the pd is ready. // func netpollready(gpp **g, pd *pollDesc, mode int32) // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer // goroutines respectively. The semaphore can be in the following states: // pollDesc包含了2个二进制的信号,分别负责读写goroutine的暂停. // 信号可能处于下面的状态: // pdReady - IO就绪通知被挂起; // 一个goroutine将次状态置为nil来消费一个通知。 // pdReady - io readiness notification is pending; // a goroutine consumes the notification by changing the state to nil. // pdWait - 一个goroutine准备暂停在信号上,可是尚未完成暂停。 // 这个goroutine经过把这个状态改变为G指针去提交这个暂停动做。 // 或者,替代性的,并行的其余通知将状态改变为READY. // 或者,替代性的,并行的超时/关闭会将次状态变为nil // pdWait - a goroutine prepares to park on the semaphore, but not yet parked; // the goroutine commits to park by changing the state to G pointer, // or, alternatively, concurrent io notification changes the state to READY, // or, alternatively, concurrent timeout/close changes the state to nil. // G指针 - 阻塞在信号上的goroutine // IO通知或者超时/关闭会分别将此状态置为READY或者nil. // G pointer - the goroutine is blocked on the semaphore; // io notification or timeout/close changes the state to READY or nil respectively // and unparks the goroutine. // nil - nothing of the above. const ( pdReady uintptr = 1 pdWait uintptr = 2 )
网络轮询器的数据结构以下:函数
// Network poller descriptor. // 网络轮询器描述符 type pollDesc struct { link *pollDesc // in pollcache, protected by pollcache.lock // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification) // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated // in a lock-free way by all operations. // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg), // that will blow up when GC starts moving objects. // // lock锁对象保护了pollOpen, pollSetDeadline, pollUnblock和deadlineimpl操做。 // 而这些操做又彻底包含了对seq, rt, tw变量。 // fd在PollDesc整个生命过程当中都是一个常量。 // 处理pollReset, pollWait, pollWaitCanceled和runtime.netpollready(IO就绪通知)不须要用到锁。 // 因此closing, rg, rd, wg和wd的全部操做都是一个无锁的操做。 lock mutex // protectes the following fields fd uintptr closing bool seq uintptr // protects from stale timers and ready notifications rg uintptr // pdReady, pdWait, G waiting for read or nil rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline wg uintptr // pdReady, pdWait, G waiting for write or nil wt timer // write deadline timer wd int64 // write deadline user unsafe.Pointer // user settable cookie }
pd.WaitRead():
ui
func (pd *pollDesc) WaitRead() error { return pd.Wait('r') } func (pd *pollDesc) Wait(mode int) error { res := runtime_pollWait(pd.runtimeCtx, mode) return convertErr(res) }
res是runtime_pollWait函数返回的结果,由conevertErr函数包装后返回:this
func convertErr(res int) error { switch res { case 0: return nil case 1: return errClosing case 2: return errTimeout } println("unreachable: ", res) panic("unreachable") }
runtime_pollWait会调用runtime/thunk.s中的函数:spa
TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0 JMP runtime·netpollWait(SB)
这是一个包装函数,没有参数,直接跳转到runtime/netpoll.go中的函数netpollWait:.net
func netpollWait(pd *pollDesc, mode int) int { // 检查pd的状态是否异常 err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // As for now only Solaris uses level-triggered IO. if GOOS == "solaris" { onM(func() { netpollarm(pd, mode) }) } // 循环中检查pd的状态是否是已经被设置为pdReady // 即检查IO是否是已经就绪 for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // Can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // Pretend it has not happened and retry. } return 0 }
netpollcheckerr
函数检查pd是否出现异常:
// 检查pd的异常 func netpollcheckerr(pd *pollDesc, mode int32) int { // 是否已经关闭 if pd.closing { return 1 // errClosing } // 当读写状态下,deadline小于0,表示pd已通过了超时时间 if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) { return 2 // errTimeout } // 正常状况返回0 return 0 }
netpollblock():
// returns true if IO is ready, or false if timedout or closed // waitio - wait only for completed IO, ignore errors // 这个函数被netpollWait循环调用 // 返回true说明IO已经准备好,返回false说明IO操做已经超时或者已经关闭 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { // 获取pd的rg gpp := &pd.rg // 若是模式是w,则获取pd的wg if mode == 'w' { gpp = &pd.wg } // set the gpp semaphore to WAIT // 在循环中设置pd的gpp为pdWait // 由于casuintptr是自旋锁,因此须要在循环中调用 for { // 若是在循环中发现IO已经准备好(pg的rg或者wg为pdReady状态) // 则设置rg/wg为0,返回true old := *gpp if old == pdReady { *gpp = 0 return true } // 每次netpollblock执行完毕以后,gpp重置为0 // 非0表示重复wait if old != 0 { gothrow("netpollblock: double wait") } // CAS操做改变gpp为pdWait if casuintptr(gpp, 0, pdWait) { break } } // need to recheck error states after setting gpp to WAIT // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg // // 当设置gpp为pdWait状态后,从新检查gpp的状态 // 这是必要的,由于runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl会作相反的操做 // 若是状态正常则挂起当前的goroutine // // 当netpollcheckerr检查io出现超时或者错误,waitio为true可用于等待ioReady // 不然当waitio为false, 且io不出现错误或者超时才会挂起当前goroutine if waitio || netpollcheckerr(pd, mode) == 0 { // 解锁函数,设置gpp为pdWait,若是设置不成功 // 说明已是发生其余事件,可让g继续运行,而不是挂起当前g f := netpollblockcommit // 尝试挂起当前g gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait") } // be careful to not lose concurrent READY notification old := xchguintptr(gpp, 0) if old > pdWait { gothrow("netpollblock: corrupted state") } return old == pdReady }
runtime/proc.go: gopark():
// Puts the current goroutine into a waiting state and calls unlockf. // If unlockf returns false, the goroutine is resumed. // 将当前goroutine置为waiting状态,而后调用unlockf func gopark(unlockf unsafe.Pointer, lock unsafe.Pointer, reason string) { // 获取当前M mp := acquirem() // 获取当前G gp := mp.curg // 获取G的状态 status := readgstatus(gp) // 若是不是_Grunning或者_Gscanrunning,则报错 if status != _Grunning && status != _Gscanrunning { gothrow("gopark: bad g status") } // 设置lock和unlockf mp.waitlock = lock mp.waitunlockf = unlockf gp.waitreason = reason releasem(mp) // can't do anything that might move the G between Ms here. // 在m->g0这个栈上调用park_m,而不是当前g的栈 mcall(park_m) }
mcall函数是一段汇编,在m->g0的栈上调用park_m,而不是在当前goroutine的栈上。mcall的功能分两部分,第一部分保存当前G的PC/SP到G的gobuf的pc/sp字段,第二部分调用park_m函数:
// func mcall(fn func(*g)) // Switch to m->g0's stack, call fn(g). // Fn must never return. It should gogo(&g->sched) // to keep running g. TEXT runtime·mcall(SB), NOSPLIT, $0-8 // 将须要执行的函数保存在DI MOVQ fn+0(FP), DI // 将M的TLS存放在CX get_tls(CX) // 将G对象存放在AX MOVQ g(CX), AX // save state in g->sched // 将调用者的PC存放在BX MOVQ 0(SP), BX // caller's PC // 将调用者的PC保存到g->sched.pc MOVQ BX, (g_sched+gobuf_pc)(AX) // 第一个参数的地址,即栈顶的地址,保存到BX LEAQ fn+0(FP), BX // caller's SP // 保存SP的地址到g->sched.sp MOVQ BX, (g_sched+gobuf_sp)(AX) // 将g对象保存到g->sched->g MOVQ AX, (g_sched+gobuf_g)(AX) // switch to m->g0 & its stack, call fn // 将g对象指针保存到BX MOVQ g(CX), BX // 将g->m保存到BX MOVQ g_m(BX), BX // 将m->g0保存到SI MOVQ m_g0(BX), SI CMPQ SI, AX // if g == m->g0 call badmcall JNE 3(PC) MOVQ $runtime·badmcall(SB), AX JMP AX // 将m->g0保存到g MOVQ SI, g(CX) // g = m->g0 // 将g->sched.sp恢复到SP寄存器 // 即便用g0的栈 MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp // AX进栈 PUSHQ AX MOVQ DI, DX // 将fn的地址复制到DI MOVQ 0(DI), DI // 调用函数 CALL DI // AX出栈 POPQ AX MOVQ $runtime·badmcall2(SB), AX JMP AX RET
park_m函数的功能分为三部分,第一部分让当前G和当前M脱离关系,第二部分是调用解锁函数,这里是调用netpoll.go源文件中的netpollblockcommit函数:
// runtime·park continuation on g0. void runtime·park_m(G *gp) { bool ok; // 设置当前g为Gwaiting状态 runtime·casgstatus(gp, Grunning, Gwaiting); // 让当前g和m脱离关系 dropg(); if(g->m->waitunlockf) { ok = g->m->waitunlockf(gp, g->m->waitlock); g->m->waitunlockf = nil; g->m->waitlock = nil; // 返回0为false,非0为true // 0说明g->m->waitlock发生了变化,即不是在gopark是设置的(pdWait) // 说明了脱离了WAIT状态,应该设置为Grunnable,并执行g if(!ok) { runtime·casgstatus(gp, Gwaiting, Grunnable); execute(gp); // Schedule it back, never returns. } } // 这里是调度当前m继续执行其余g // 而不是上面执行execute schedule(); }
netpollblockcommit函数,设置gpp为pdWait,设置成功返回1,不然返回0。1为true,0为false:
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool { return casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp))) }
到这里当前goroutine对socket文件描述符的等待IO继续的行为已经完成。过程当中首先尽早尝试判断IO是否已经就绪,若是未就绪则挂起当前goroutine,挂起以后再次判断IO是否就绪,若是还未就绪则调度当前M
运行其余G
。若是是在调度goroutine以前IO已经就绪,则不会使当前goroutine进入调度队列,会直接运行刚才挂起的G。不然当前goroutine会进入调度队列。
接下来是等待runtime将其唤醒。runtime在执行findrunnablequeue
、starttheworld
,sysmon
函数时,都会调用netpoll_epoll.go中的netpoll
函数,寻找到IO就绪的socket文件描述符,并找到这些socket文件描述符对应的轮询器中附带的信息,根据这些信息将以前等待这些socket文件描述符就绪的goroutine状态修改成Grunnable。在以上函数中,执行完netpoll以后,会找到一个就绪的goroutine列表,接下来将就绪的goroutine加入到调度队列中,等待调度运行。
在netpoll_epoll.go中的netpoll
函数中,epoll_wait
函数返回N个发生事件的文件描述符对应的epollevent,接着对于每一个event使用其data属性,将event.data
转换为*pollDesc
类型,再调用netpoll.go中的netpollready函数,将*pollDesc
类型中的G
数据类型去除,并附加到netpoll
函数的调用者传递的G链表中:
// 将ev.data转换为*pollDesc类型 pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) // 调用netpollready将取出pd中保存的G,并添加到链表中 netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
因此runtime在执行findrunnablequeue
、starttheworld
,sysmon
函数中会执行netpoll
函数,并返回N个goroutine。这些goroutine期待的网络事件已经发生,runtime会将这些goroutine放入到当前P
的可运行队列中,接下来调度它们并运行。
http://ju.outofmemory.cn/entry/168649