使用Golang能够轻松地为每个TCP链接建立一个协程去服务而不用担忧性能问题,这是由于Go内部使用goroutine结合IO多路复用实现了一个“异步”的IO模型,这使得开发者不用过多的关注底层,而只须要按照需求编写上层业务逻辑。这种异步的IO是如何实现的呢?下面我会针对Linux系统进行分析。linux
在Unix/Linux系统下,一切皆文件,每条TCP链接对应了一个socket句柄,这个句柄也能够看作是一个文件,在socket上收发数据,至关于对一个文件进行读写,因此一个socket句柄,一般也用表示文件描述符fd来表示。能够进入/proc/PID/fd/查看进程占用的fd。golang
系统内核会为每一个socket句柄分配一个读(接收)缓冲区和一个写(发送)缓冲区,发送数据就是在这个fd对应的写缓冲区上写数据,而接收数据就是在读缓冲区上读数据,当程序调用write或者send时,并不表明数据发送出去,仅仅是把数据拷贝到了写缓冲区,在时机恰当时候(积累到必定数量),会将数据发送到目的端。编程
Golang runtime仍是须要频繁去检查是否有fd就绪的,严格说并不算真正的异步,算是一种非阻塞IO复用。
借用教科书中几张图segmentfault
程序想在缓冲区读数据时,缓冲区并不必定会有数据,这会形成陷入系统调用,只能等待数据能够读取,没有数据读取时则会阻塞住进程,这就是阻塞式IO。当须要为多个客户端提供服务时,可使用线程方式,每一个socket句柄使用一个线程来服务,这样阻塞住的则是某个线程。虽然如此能够解决进程阻塞,可是仍是会有至关一部分CPU资源浪费在了等待数据上,同时,使用线程来服务fd有些浪费资源,由于若是要处理的fd较多,则又是一笔资源开销。数组
与之对应的是非阻塞IO,当程序想要读取数据时,若是缓冲区不存在,则直接返回给用户程序,可是须要用户程序去频繁检查,直到有数据准备好。这一样也会形成空耗CPU。网络
而IO多路复用则不一样,他会使用一个线程去管理多个fd,能够将多个fd加入IO多路复用函数中,每次调用该函数,传入要检查的fd,若是有就绪的fd,直接返回就绪的fd,再启动线程处理或者顺序处理就绪的fd。这达到了一个线程管理多个fd任务,相对来讲较为高效。常见的IO多路复用函数有select,poll,epoll。select与poll的最大缺点是每次调用时都须要传入全部要监听的fd集合,内核再遍历这个传入的fd集合,当并发量大时候,用户态与内核态之间的数据拷贝以及内核轮询fd又要浪费一波系统资源(关于select与poll这里不展开)。并发
接下来介绍一下epoll系统调用异步
epoll相比于select与poll相比要灵活且高效,他提供给用户三个系统调用函数。Golang底层就是经过这三个系统调用结合goroutine完成的“异步”IO。socket
//用于建立并返回一个epfd句柄,后续关于fd的添加删除等操做都依据这个句柄。 int epoll_create(int size); //用于向epfd添加,删除,修改要监听的fd。 int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event); //传入建立返回的epfd句柄,以及超时时间,返回就绪的fd句柄。 int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
大体工做原理如图tcp
当用户程序想要读取fd数据时,系统调用直接通知到内核并返回处理其余的事情,内核将数据准备好以后,通知用户程序,用户程序再处理这个fd上的事件。
咱们都知道,协程的资源占有量很小,并且协程也拥有多种状态如阻塞,就绪,运行等,可使用一个协程服务一个fd不用担忧资源问题。将监听fd的事件交由runtime来管理,实现协程调度与依赖fd的事件。当要协程读取fd数据可是没有数据时,park住该协程(改成Gwaiting),调度其余协程执行。
在执行协程调度时候,去检查fd是否就绪,若是就绪时,调度器再通知该park住的协程fd能够处理了(改成Grunnable并加入执行队列),该协程处理fd数据,这样既减小了CPU的空耗,也实现了消息的通知,用户层面上看实现了一个异步的IO模型。
Golang netpoll的大体思想就是这样,接下来看一下具体代码实现,本文基于go1.14。
接下来看下Golang netpoll对其的使用。
跟随一个很简单的demo探索一下。
func main() { fmt.Println("服务端进程id:",os.Getpid()) lister, err := net.Listen("tcp", "0.0.0.0:9009") if err != nil { fmt.Println("链接失败", err) return } for { conn, err := lister.Accept() //等待创建链接 if err != nil { fmt.Println("创建链接失败", err) continue } //开启协程处理 go func() { defer conn.Close() for { buf := make([]byte, 128) n, err := conn.Read(buf) if err != nil{ fmt.Println("读出错",err) return } fmt.Println("读取到的数据:",string(buf[:n])) } }() } }
net.Listen依次调用lc.Listen->sl.listenTCP->internetSocket->socket到fd.listenStream函数建立了一个监听9009的tcp链接的socket接口,也就是建立了socket fd,
接下来为了监听该socket对象就须要把这个socket fd加入到eventpoll中了。
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error { ...... //绑定该socket接口 if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } //监听该socket if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil { return os.NewSyscallError("listen", err) } //初始化fd,也就是把socket放入epoll中,进入 if err = fd.init(); err != nil { return err } lsa, _ = syscall.Getsockname(fd.pfd.Sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil } func (fd *FD) Init(net string, pollable bool) error { ...... //将socket fd加到poll,进入 err := fd.pd.init(fd) ...... return err } //最终跳转到该处,主要关注两个函数runtime_pollServerInit,runtime_pollOpen, //这两个函数都是runtime实现的,将epoll交由runtime来管理 func (pd *pollDesc) init(fd *FD) error { //sync.once方法,调用epoll_create建立eventpoll对象 serverInit.Do(runtime_pollServerInit) //将当前的fd加到epoll中,底层调用epollctl函数 ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) //若是出错,处理相应的fd,删除epoll中fd以及解除状态等操做 if errno != 0 { if ctx != 0 { runtime_pollUnblock(ctx) runtime_pollClose(ctx) } return errnoErr(syscall.Errno(errno)) } pd.runtimeCtx = ctx return nil }
查看runtime_pollServerInit,是对epoll_create的封装。
func poll_runtime_pollServerInit() { //初始化全局epoll对象 netpollinit() /全局标志位设置为1 atomic.Store(&netpollInited, 1) } func netpollinit() { //系统调用,建立一个eventpoll对象 epfd = epollcreate1(_EPOLL_CLOEXEC) if epfd >= 0 { return } ...... }
查看一下runtime_pollOpen方法,将当前监听的socket fd加入eventpoll对象中。其实是对epoll_ctl的封装。
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) { //返回一个存储在Go程序中的一个fd对应的结构体,算是用于记录 //goroutine与fd之间的关系,后面会分析到 pd := pollcache.alloc() //加锁,防止并发问题 lock(&pd.lock) if pd.wg != 0 && pd.wg != pdReady { throw("runtime: blocked write on free polldesc") } if pd.rg != 0 && pd.rg != pdReady { throw("runtime: blocked read on free polldesc") } pd.fd = fd pd.closing = false pd.everr = false pd.rseq++ pd.rg = 0 pd.rd = 0 pd.wseq++ pd.wg = 0 pd.wd = 0 unlock(&pd.lock) var errno int32 //epoll_ctl系统调用 errno = netpollopen(fd, pd) return pd, int(errno) } func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent //注册event事件,这里使用了epoll的ET模式,相对于ET,ET须要每次产生事件时候就要处理事件, //不然容易丢失事件。 ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET //events记录上pd的指针 *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd //系统调用将该fd加到eventpoll对象中,交由内核监听 return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) }
接下来返回到主函数。
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { ...... //检查fd状态是否变化 if err := fd.pd.prepareRead(fd.isFile); err != nil { return -1, nil, "", err } for { //accept系统调用,若是有对监听的socket的链接请求,则直接返回发起链接的socket文件描述符 //,不然返回EAGAIN错误,被下面捕获到 s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err { case syscall.EAGAIN: if fd.pd.pollable() { //进入waitRead方法,内部 if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } case syscall.ECONNABORTED: continue } return -1, nil, errcall, err } } func (pd *pollDesc) wait(mode int, isFile bool) error { if pd.runtimeCtx == 0 { return errors.New("waiting for unsupported file type") } //进入runtime_pollWait方法内部,该方法会跳转到runtime包下,条件知足会park住goroutine res := runtime_pollWait(pd.runtimeCtx, mode) return convertErr(res, isFile) } func poll_runtime_pollWait(pd *pollDesc, mode int) int { ...... //进入netpollblock函数,该函数内部会阻塞住该goroutine for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } } return 0 } func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } ...... if waitio || netpollcheckerr(pd, mode) == 0 { //gark住该g,此时传参主要关注前两个,一个netpollblockcommit函数,一个gpp为当前pd的rg或者wg, //用于后面记录fd对应的阻塞的goroutine gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5) } old := atomic.Xchguintptr(gpp, 0) if old > pdWait { throw("runtime: corrupted polldesc") } return old == pdReady } func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { ...... //主要关注两个传参,lock是gpp指针 mp.waitlock = lock //unlockf为netpollblockcommit函数 mp.waitunlockf = unlockf ...... //切换到g0栈去执行park_m mcall(park_m) } func park_m(gp *g) { //获取当前goroutine _g_ := getg() //修改状态为Gwaiting,表明当前的goroutine被park住了 casgstatus(gp, _Grunning, _Gwaiting) //解除m和g关联 dropg() if fn := _g_.m.waitunlockf; fn != nil { //调用刚传入的函数参数,也就是netpollblockcommit ok := fn(gp, _g_.m.waitlock) //调用完清除 _g_.m.waitunlockf = nil _g_.m.waitlock = nil if !ok { if trace.enabled { traceGoUnpark(gp, 2) } casgstatus(gp, _Gwaiting, _Grunnable) execute(gp, true) // Schedule it back, never returns. } } //调度新的g到m上来 schedule() } func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool { //把当前g的指针存为gpp指针,gpp为pd的rg或wg r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp))) if r { //将全局变量改成1,表明系统有netpoll的等待者 atomic.Xadd(&netpollWaiters, 1) } return r }
到此时,accept函数就被阻塞住了,系统会在这个监听的socket fd事件(0.0.0.0:9009的这个fd)的状态发生变化时候(也就是有新的客户端请求链接的时候),将该park住的goroutine给ready。
//上面提到过的accept函数,根据序号顺序分析 func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { ...... for { //2.使用accept系统调用能获取到新的链接,linux会为新的链接分配一个新的fd, //这个函数会返回新的链接的socket fd对应的进程描述符 s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { //3.返回新的进程描述符 return s, rsa, "", err } switch err { case syscall.EAGAIN: if fd.pd.pollable() { //1.刚才阻塞到了这个goroutine,后来新的链接请求,该goroutine被唤醒 if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } ...... } ...... } } //返回上一层的函数 func (fd *netFD) accept() (netfd *netFD, err error) { //此时获取到了新的fd d, rsa, errcall, err := fd.pfd.Accept() ...... //建立新的fd结构体 if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil { poll.CloseFunc(d) return nil, err } //init函数又会进入func (pd *pollDesc) init(fd *FD) error函数,并将新的socket链接经过epoll_ctl传入 //epoll的监听事件 if err = netfd.init(); err != nil { fd.Close() return nil, err } //系统调用,能够得到客户端的socket的ip信息等 lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd) netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) return netfd, nil }
go会在调度goroutine时候执行epoll_wait系统调用,检查是否有状态发生改变的fd,有的话就把他取出,唤醒对应的goroutine去处理。该部分对应了runtime中的netpoll方法。
源码调用runtime中的schedule() -> findrunnable() -> netpoll()
func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() //分别从本地队列和全局队列寻找可执行的g ...... //判断是否知足条件,初始化netpoll对象,是否等待者,以及上次调用时间 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { //netpoll底层调用epoll_wait,传参表明epoll_wait时候是阻塞等待或者非阻塞直接返回 //这里是非阻塞模式,会当即返回内核eventpoll对象的rdlist列表 if list := netpoll(false); !list.empty() { gp := list.pop() //将可运行G的列表注入调度程序并清除glist injectglist(&list) //修改gp状态 casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } //返回可运行的g return gp, false } } ....... stopm() goto top } //对epoll_wait的进一步封装 func netpoll(block bool) gList { if epfd == -1 { return gList{} } waitms := int32(-1) if !block { waitms = 0 } //声明一个epollevent事件,在epoll_wait系统调用时候,会给该数组赋值并返回一个索引位, /以后能够遍历数组取出就绪的fd事件。 var events [128]epollevent retry: //陷入系统调用,取出内核eventpoll中的rdlist,返回就绪的事件 n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { if n != -_EINTR { println("runtime: epollwait on fd", epfd, "failed with", -n) throw("runtime: netpoll failed") } goto retry } var toRun gList //遍历event事件数组 for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } var mode int32 //是否有就绪的读写事件,放入mode标志位 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { //取出存入的pollDesc的指针 pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) pd.everr = false if ev.events == _EPOLLERR { pd.everr = true } //取出pd中的rg或wg,后面放到运行队列 netpollready(&toRun, pd, mode) } } if block && toRun.empty() { goto retry } return toRun } func netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) } if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) } //将阻塞的goroutine加入gList返回 if rg != nil { toRun.push(rg) } if wg != nil { toRun.push(wg) } }
回到主函数,咱们使用go func形式使用一个协程去处理一个tcp链接,每一个协程里面会有conn.Read,该函数在读取时候若是缓冲区不可读,该goroutine也会陪park住,等待socket fd可读,调度器经过netpoll函数调度它。
func main() { ...... //开启处理 go func() { defer conn.Close() for { buf := make([]byte, 128) //将缓冲区的数据读出来放到buf中 n, err := conn.Read(buf) ...... } }() } } func (fd *FD) Read(p []byte) (int, error) { ...... for { //系统调用读取缓冲区数据,这里没有可读会直接返回,不会阻塞 n, err := syscall.Read(fd.Sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN && fd.pd.pollable() { //不可读,进入waitRead方法,park住该goroutine, //并记录goroutine到pd的rg中,等待唤醒 if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } } ...... } }
后面会等待缓冲区可读写,shchedule函数调用netpoll并进一步调用epoll_wait检测到并唤醒该goroutine。能够查看上面netpoll,这里不作重复工做了。
Golang也提供了对于epoll item节点的删除操做,具体封装函数poll_runtime_pollClose
//当发生某些状况,如链接断开,fd销毁等,会调用到此处 func poll_runtime_pollClose(pd *pollDesc) { ....... netpollclose(pd.fd) //释放对应的pd pollcache.free(pd) } //调用epoll_ctl系统调用,删除该fd在eventpoll上对应的epitem func netpollclose(fd uintptr) int32 { var ev epollevent return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev) }
抓了一部分系统调用分析一下上述程序与内核交互的大体过程。
$ strace -f ./server
部分系统调用函数以下。
#....省略内存管理部分以及线程管理部分 #执行到fmt.Println("服务端进程id:",os.Getpid()) [pid 30307] getpid() = 30307 [pid 30307] write(1, "346234215345212241347253257350277233347250213id357274232 30307n", 27服务端进程id:30307 ) = 27 ......因为过多,省略关于socket的系统调用 [pid 30308] <... nanosleep resumed> NULL) = 0 #打开系统文件,该文件定义tcp最大链接数,会被设置成pollable,并加入epoll节点中 [pid 30307] openat(AT_FDCWD, "/proc/sys/net/core/somaxconn", O_RDONLY|O_CLOEXEC <unfinished ...> [pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ...> [pid 30307] <... openat resumed> ) = 4 #调用epoll_ctl,建立一个eventpoll [pid 30307] epoll_create1(EPOLL_CLOEXEC) = 5 #将fd加到epoll事件 [pid 30307] epoll_ctl(5, EPOLL_CTL_ADD, 4, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=2174189320, u64=139635855949576}}) = 0 [pid 30307] fcntl(4, F_GETFL) = 0x8000 (flags O_RDONLY|O_LARGEFILE) [pid 30307] fcntl(4, F_SETFL, O_RDONLY|O_NONBLOCK|O_LARGEFILE) = 0 [pid 30308] <... nanosleep resumed> NULL) = 0 [pid 30307] read(4, <unfinished ...> #执行epoll_wait查看就绪事件 [pid 30308] epoll_pwait(5, <unfinished ...> [pid 30307] <... read resumed> "512n", 65536) = 4 [pid 30308] <... epoll_pwait resumed> [{EPOLLIN|EPOLLOUT, {u32=2174189320, u64=139635855949576}}], 128, 0, NULL, 139635812673280) = 1 [pid 30307] read(4, <unfinished ...> [pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ...> [pid 30307] <... read resumed> "", 65532) = 0 #将/proc/sys/net/core/somaxconn文件的fd从epoll中删除 [pid 30307] epoll_ctl(5, EPOLL_CTL_DEL, 4, 0xc00005e8d4) = 0 #关掉打开的somaxconn描述符 [pid 30307] close(4) = 0 #设置监听的socket描述符 [pid 30307] setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0 [pid 30307] bind(3, {sa_family=AF_INET6, sin6_port=htons(9009), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0 [pid 30307] listen(3, 512 <unfinished ...> [pid 30308] <... nanosleep resumed> NULL) = 0 [pid 30307] <... listen resumed> ) = 0 [pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ...> #将用于监听的socket fd加入到epoll中 [pid 30307] epoll_ctl(5, EPOLL_CTL_ADD, 3, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=2174189320, u64=139635855949576}}) = 0 [pid 30307] getsockname(3, {sa_family=AF_INET6, sin6_port=htons(9009), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28]) = 0 #执行accept4发现没有链接,返回EAGAIN错误 [pid 30307] accept4(3, 0xc00005eb98, [112], SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN (Resource temporarily unavailable) #查看是否有就绪的fd,这次调用是非阻塞,当即返回 [pid 30307] epoll_pwait(5, [], 128, 0, NULL, 0) = 0 [pid 30308] <... nanosleep resumed> NULL) = 0 #查看是否有就绪的fd,这次会阻塞等待,直到有链接进来 [pid 30307] epoll_pwait(5, <unfinished ...> [pid 30308] futex(0x60dc70, FUTEX_WAIT_PRIVATE, 0, {tv_sec=60, tv_nsec=0} <unfinished ...> [pid 30307] <... epoll_pwait resumed> [{EPOLLIN, {u32=2174189320, u64=139635855949576}}], 128, -1, NULL, 0) = 1 [pid 30307] futex(0x60dc70, FUTEX_WAKE_PRIVATE, 1) = 1 [pid 30308] <... futex resumed> ) = 0 #新的链接,表明收到了一个客户端链接,分配了一个fd是4 [pid 30307] accept4(3, <unfinished ...>, <... accept4 resumed> {sa_family=AF_INET6, sin6_port=htons(52082), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28], SOCK_CLOEXEC|SOCK_NONBLOCK) = 4 #把4加入到epoll中管理 [pid 30307] epoll_ctl(5, EPOLL_CTL_ADD, 4, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=2174189112, u64=139635855949368}}) = 0 [pid 30307] getsockname(4, {sa_family=AF_INET6, sin6_port=htons(9009), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28]) = 0 ...... #后来将client端关掉,此时tcp链接断掉了,将epoll中的fd移除 [pid 30309] epoll_ctl(5, EPOLL_CTL_DEL, 4, 0xc00005fdd4 <unfinished ...> [pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ...> [pid 30309] <... epoll_ctl resumed> ) = 0 [pid 30309] close(4) = 0 [pid 30309] epoll_pwait(5, [], 128, 0, NULL, 824634114048) = 0 #阻塞等待 [pid 30309] epoll_pwait(5, <unfinished ...> ........