我的博客原文:Goroutine并发调度模型深度解析之手撸一个高性能协程池php
并发(并行),一直以来都是一个编程语言里的核心主题之一,也是被开发者关注最多的话题;Go语言做为一个出道以来就自带 『高并发』光环的富二代编程语言,它的并发(并行)编程确定是值得开发者去探究的,而Go语言中的并发(并行)编程是经由goroutine实现的,goroutine是golang最重要的特性之一,具备使用成本低、消耗资源低、能效高等特色,官方宣称原生goroutine并发成千上万不成问题,因而它也成为Gopher们常用的特性。Goroutine是优秀的,但不是完美的,在极大规模的高并发场景下,也可能会暴露出问题,什么问题呢?又有什么可选的解决方案?本文将经过runtime对goroutine的调度分析,帮助你们理解它的机理和发现一些内存和调度的原理和问题,而且基于此提出一种我的的解决方案 — 一个高性能的Goroutine Pool(协程池)。java
Goroutine,Go语言基于并发(并行)编程给出的自家的解决方案。goroutine是什么?一般goroutine会被当作coroutine(协程)的 golang实现,从比较粗浅的层面来看,这种认知也算是合理,但实际上,goroutine并不是传统意义上的协程,如今主流的线程模型分三种:内核级线程模型、用户级线程模型和两级线程模型(也称混合型线程模型),传统的协程库属于用户级线程模型,而goroutine和它的Go Scheduler
在底层实现上实际上是属于两级线程模型,所以,有时候为了方便理解能够简单把goroutine类比成协程,但内心必定要有个清晰的认知 — goroutine并不等同于协程。python
互联网时代以降,因为在线用户数量的爆炸,单台服务器处理的链接也水涨船高,迫使编程模式由从前的串行模式升级到并发模型,而几十年来,并发模型也是一代代地升级,有IO多路复用、多进程以及多线程,这几种模型都各有长短,现代复杂的高并发架构大可能是几种模型协同使用,不一样场景应用不一样模型,扬长避短,发挥服务器的最大性能,而多线程,由于其轻量和易用,成为并发编程中使用频率最高的并发模型,然后衍生的协程等其余子产品,也都基于它,而咱们今天要分析的 goroutine 也是基于线程,所以,咱们先来聊聊线程的三大模型:git
线程的实现模型主要有3种:内核级线程模型、用户级线程模型和两级线程模型(也称混合型线程模型),它们之间最大的差别就在于用户线程与内核调度实体(KSE,Kernel Scheduling Entity)之间的对应关系上。而所谓的内核调度实体 KSE 就是指能够被操做系统内核调度器调度的对象实体(这说的啥玩意儿,敢不敢通俗易懂一点?)。简单来讲 KSE 就是内核级线程,是操做系统内核的最小调度单元,也就是咱们写代码的时候通俗理解上的线程了(这么说不就懂了嘛!装什么13)。github
用户线程与内核线程KSE是多对一(N : 1)的映射模型,多个用户线程的通常从属于单个进程而且多线程的调度是由用户本身的线程库来完成,线程的建立、销毁以及多线程之间的协调等操做都是由用户本身的线程库来负责而无须借助系统调用来实现。一个进程中全部建立的线程都只和同一个KSE在运行时动态绑定,也就是说,操做系统只知道用户进程而对其中的线程是无感知的,内核的全部调度都是基于用户进程。许多语言实现的 协程库 基本上都属于这种方式(好比python的gevent)。因为线程调度是在用户层面完成的,也就是相较于内核调度不须要让CPU在用户态和内核态之间切换,这种实现方式相比内核级线程能够作的很轻量级,对系统资源的消耗会小不少,所以能够建立的线程数量与上下文切换所花费的代价也会小得多。但该模型有个原罪:并不能作到真正意义上的并发,假设在某个用户进程上的某个用户线程由于一个阻塞调用(好比I/O阻塞)而被CPU给中断(抢占式调度)了,那么该进程内的全部线程都被阻塞(由于单个用户进程内的线程自调度是没有CPU时钟中断的,从而没有轮转调度),整个进程被挂起。即使是多CPU的机器,也无济于事,由于在用户级线程模型下,一个CPU关联运行的是整个用户进程,进程内的子线程绑定到CPU执行是由用户进程调度的,内部线程对CPU是不可见的,此时能够理解为CPU的调度单位是用户进程。因此不少的协程库会把本身一些阻塞的操做从新封装为彻底的非阻塞形式,而后在之前要阻塞的点上,主动让出本身,并经过某种方式通知或唤醒其余待执行的用户线程在该KSE上运行,从而避免了内核调度器因为KSE阻塞而作上下文切换,这样整个进程也不会被阻塞了。golang
用户线程与内核线程KSE是一对一(1 : 1)的映射模型,也就是每个用户线程绑定一个实际的内核线程,而线程的调度则彻底交付给操做系统内核去作,应用程序对线程的建立、终止以及同步都基于内核提供的系统调用来完成,大部分编程语言的线程库(好比Java的java.lang.Thread、C++11的std::thread等等)都是对操做系统的线程(内核级线程)的一层封装,建立出来的每一个线程与一个独立的KSE静态绑定,所以其调度彻底由操做系统内核调度器去作。这种模型的优点和劣势一样明显:优点是实现简单,直接借助操做系统内核的线程以及调度器,因此CPU能够快速切换调度线程,因而多个线程能够同时运行,所以相较于用户级线程模型它真正作到了并行处理;但它的劣势是,因为直接借助了操做系统内核来建立、销毁和以及多个线程之间的上下文切换和调度,所以资源成本大幅上涨,且对性能影响很大。web
两级线程模型是博采众长以后的产物,充分吸取前两种线程模型的优势且尽可能规避它们的缺点。在此模型下,用户线程与内核KSE是多对多(N : M)的映射模型:首先,区别于用户级线程模型,两级线程模型中的一个进程能够与多个内核线程KSE关联,因而进程内的多个线程能够绑定不一样的KSE,这点和内核级线程模型类似;其次,又区别于内核级线程模型,它的进程里的全部线程并不与KSE一一绑定,而是能够动态绑定同一个KSE, 当某个KSE由于其绑定的线程的阻塞操做被内核调度出CPU时,其关联的进程中其他用户线程能够从新与其余KSE绑定运行。因此,两级线程模型既不是用户级线程模型那种彻底靠本身调度的也不是内核级线程模型彻底靠操做系统调度的,而是中间态(自身调度与系统调度协同工做),也就是 — 『薛定谔的模型』(误),由于这种模型的高度复杂性,操做系统内核开发者通常不会使用,因此更多时候是做为第三方库的形式出现,而Go语言中的runtime调度器就是采用的这种实现方案,实现了Goroutine与KSE之间的动态关联,不过Go语言的实现更加高级和优雅;该模型为什么被称为两级?即用户调度器实现用户线程到KSE的『调度』,内核调度器实现KSE到CPU上的『调度』。算法
每个OS线程都有一个固定大小的内存块(通常会是2MB)来作栈,这个栈会用来存储当前正在被调用或挂起(指在调用其它函数时)的函数的内部变量。这个固定大小的栈同时很大又很小。由于2MB的栈对于一个小小的goroutine来讲是很大的内存浪费,而对于一些复杂的任务(如深度嵌套的递归)来讲又显得过小。所以,Go语言作了它本身的『线程』。编程
在Go语言中,每个goroutine是一个独立的执行单元,相较于每一个OS线程固定分配2M内存的模式,goroutine的栈采起了动态扩容方式, 初始时仅为2KB,随着任务执行按需增加,最大可达1GB(64位机器最大是1G,32位机器最大是256M),且彻底由golang本身的调度器 Go Scheduler 来调度。此外,GC还会周期性地将再也不使用的内存回收,收缩栈空间。 所以,Go程序能够同时并发成千上万个goroutine是得益于它强劲的调度器和高效的内存模型。Go的创造者大概对goroutine的定位就是屠龙刀,由于他们不只让goroutine做为golang并发编程的最核心组件(开发者的程序都是基于goroutine运行的)并且golang中的许多标准库的实现也处处能见到goroutine的身影,好比net/http这个包,甚至语言自己的组件runtime运行时和GC垃圾回收器都是运行在goroutine上的,做者对goroutine的厚望可见一斑。浏览器
任何用户线程最终确定都是要交由OS线程来执行的,goroutine(称为G)也不例外,可是G并不直接绑定OS线程运行,而是由Goroutine Scheduler中的 P - Logical Processor (逻辑处理器)来做为二者的『中介』,P能够看做是一个抽象的资源或者一个上下文,一个P绑定一个OS线程,在golang的实现里把OS线程抽象成一个数据结构:M,G其实是由M经过P来进行调度运行的,可是在G的层面来看,P提供了G运行所需的一切资源和环境,所以在G看来P就是运行它的 “CPU”,由 G、P、M 这三种由Go抽象出来的实现,最终造成了Go调度器的基本结构:
关于P,咱们须要再絮叨几句,在Go 1.0发布的时候,它的调度器其实G-M模型,也就是没有P的,调度过程全由G和M完成,这个模型暴露出一些问题:
这些问题实在太扎眼了,致使Go1.0虽然号称原生支持并发,却在并发性能上一直饱受诟病,而后,Go语言委员会中一个核心开发大佬看不下了,亲自下场从新设计和实现了Go调度器(在原有的G-M模型中引入了P)而且实现了一个叫作 work-stealing 的调度算法:
该算法避免了在goroutine调度时使用全局锁。
至此,Go调度器的基本模型确立:
Go调度器工做时会维护两种用来保存G的任务队列:一种是一个Global任务队列,一种是每一个P维护的Local任务队列。
当经过go
关键字建立一个新的goroutine的时候,它会优先被放入P的本地队列。为了运行goroutine,M须要持有(绑定)一个P,接着M会启动一个OS线程,循环从P的本地队列里取出一个goroutine并执行。固然还有上文说起的 work-stealing
调度算法:当M执行完了当前P的Local队列里的全部G后,P也不会就这么在那躺尸啥都不干,它会先尝试从Global队列寻找G来执行,若是Global队列为空,它会随机挑选另一个P,从它的队列里中拿走一半的G到本身的队列中执行。
若是一切正常,调度器会以上述的那种方式顺畅地运行,但这个世界没这么美好,总有意外发生,如下分析goroutine在两种例外状况下的行为。
Go runtime会在下面的goroutine被阻塞的状况下运行另一个goroutine:
这四种场景又可归类为两种类型:
当goroutine由于channel操做或者network I/O而阻塞时(实际上golang已经用netpoller实现了goroutine网络I/O阻塞不会致使M被阻塞,仅阻塞G,这里仅仅是举个栗子),对应的G会被放置到某个wait队列(如channel的waitq),该G的状态由_Gruning
变为_Gwaitting
,而M会跳过该G尝试获取并执行下一个G,若是此时没有runnable的G供M运行,那么M将解绑P,并进入sleep状态;当阻塞的G被另外一端的G2唤醒时(好比channel的可读/写通知),G被标记为runnable,尝试加入G2所在P的runnext,而后再是P的Local队列和Global队列。
当G被阻塞在某个系统调用上时,此时G会阻塞在_Gsyscall
状态,M也处于 block on syscall 状态,此时的M可被抢占调度:执行该G的M会与P解绑,而P则尝试与其它idle的M绑定,继续执行其它G。若是没有其它idle的M,但P的Local队列中仍然有G须要执行,则建立一个新的M;当系统调用完成后,G会从新尝试获取一个idle的P进入它的Local队列恢复执行,若是没有idle的P,G会被标记为runnable加入到Global队列。
以上就是从宏观的角度对Goroutine和它的调度器进行的一些概要性的介绍,固然,Go的调度中更复杂的抢占式调度、阻塞调度的更多细节,你们能够自行去找相关资料深刻理解,本文只讲到Go调度器的基本调度过程,为后面本身实现一个Goroutine Pool提供理论基础,这里便再也不继续深刻上述说的那几个调度了,事实上若是要彻底讲清楚Go调度器,一篇文章的篇幅也实在是捉襟见肘,因此想了解更多细节的同窗能够去看看Go调度器 G-P-M 模型的设计者 Dmitry Vyukov 写的该模型的设计文档《Go Preemptive Scheduler Design》以及直接去看源码,G-P-M模型的定义放在src/runtime/runtime2.go
里面,而调度过程则放在了src/runtime/proc.go
里。
既然Go调度器已经这么牛逼优秀了,咱们为何还要本身去实现一个golang的 Goroutine Pool 呢?事实上,优秀不表明完美,任何不考虑具体应用场景的编程模式都是耍流氓!有基于G-P-M的Go调度器背书,go程序的并发编程中,能够任性地起大规模的goroutine来执行任务,官方也宣称用golang写并发程序的时候随便起个成千上万的goroutine毫无压力。
然而,你起1000个goroutine没有问题,10000也没有问题,10w个可能也没问题;那,100w个呢?1000w个呢?(这里只是举个极端的例子,实际编程起这么大规模的goroutine的例子极少)这里就会出问题,什么问题呢?
我想,做为golang拥趸的Gopher们必定都使用过它的net/http标准库,不少人都说用golang写web server彻底能够不用借助第三方的web framework,仅用net/http标准库就能写一个高性能的web server,的确,我也用过它写过web server,简洁高效,性能表现也至关不错,除非有比较特殊的需求不然通常的确不用借助第三方web framework,可是天下没有白吃的午饭,net/http为啥这么快?要搞清这个问题,从源码入手是最好的途径。孔子曾经曰过:源码面前,如同裸奔。因此,高清无码是阻碍程序猿发展大大滴绊脚石啊,源码才是咱们进步阶梯,切记切记!
接下来咱们就来先看看net/http内部是怎么实现的。
net/http接收请求且开始处理的源码放在src/net/http/server.go
里,先从入口函数ListenAndServe
进去:
func (srv *Server) ListenAndServe() error { addr := srv.Addr if addr == "" { addr = ":http" } ln, err := net.Listen("tcp", addr) if err != nil { return err } return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)}) }
看到最后那个srv.Serve调用了吗?没错,这个Serve
方法里面就是实际处理http请求的逻辑,咱们再进入这个方法内部:
func (srv *Server) Serve(l net.Listener) error { defer l.Close() ... // 不断循环取出TCP链接 for { // 看我看我!!! rw, e := l.Accept() ... // 再看我再看我!!! go c.serve(ctx) } }
首先,这个方法的参数(l net.Listener)
,是一个TCP监听的封装,负责监听网络端口,rw, e := l.Accept()
则是一个阻塞操做,从网络端口取出一个新的TCP链接进行处理,最后go c.serve(ctx)
就是最后真正去处理这个http请求的逻辑了,看到前面的go关键字了吗?没错,这里启动了一个新的goroutine去执行处理逻辑,并且这是在一个无限循环体里面,因此意味着,每来一个请求它就会开一个goroutine去处理,至关任性粗暴啊…,不过有Go调度器背书,通常来讲也没啥压力,然而,若是,我是说若是哈,忽然一大波请求涌进来了(比方说黑客搞了成千上万的肉鸡DDOS你,没错!就这么倒霉!),这时候,就很成问题了,他来10w个请求你就要开给他10w个goroutine,来100w个你就要老老实实开给他100w个,线程调度压力陡升,内存爆满,再而后,你就跪了…
有问题,就必定有解决的办法,那么,有什么方案能够减缓大规模goroutine对系统的调度和内存压力?要想解决问题,最重要的是找到形成问题的根源,这个问题根源是什么?goroutine的数量过多致使资源侵占,那要解决这个问题就要限制运行的goroutine数量,合理复用,节省资源,具体就是 — goroutine池化。
超大规模并发的场景下,不加限制的大规模的goroutine可能形成内存暴涨,给机器带来极大的压力,吞吐量降低和处理速度变慢仍是其次,更危险的是可能使得程序crash。因此,goroutine池化是有其现实意义的。
首先,100w个任务,是否是真的须要100w个goroutine来处理?未必!用1w个goroutine也同样能够处理,让一个goroutine多处理几个任务就是了嘛,池化的核心优点就在于对goroutine的复用。此举首先极大减轻了runtime调度goroutine的压力,其次,即是下降了对内存的消耗。
有一个商场,来了1000个顾客买东西,那么该如何安排导购员服务这1000人呢?有两种方案:
第一,我雇1000个导购员实行一对一服务,这种固然是最高效的,可是太浪费资源了,雇1000我的的成本极高且管理困难,这些能够先按下不表,可是每一个顾客到商场买东西也不是一进来就立刻买,通常都得逛一逛,选一选,也就是得花时间挑,1000个导购员一对一盯着,效率极低;这就引出第二种方案:我只雇10个导购员,就在商场里待命,有顾客须要咨询的时候招呼导购员过去进行处理,导购员处理完以后就回来,等下一个顾客须要咨询的时候再去,如此往返反复...
第二种方案有没有以为很眼熟?没错,其基本思路就是模拟一个I/O多路复用,经过一种机制,能够监视多个描述符,一旦某个描述符就绪(通常是读就绪或者写就绪),可以通知程序进行相应的读写操做。关于多路复用,不在本文的讨论范围以内,便再也不赘述,详细原理能够参考 I/O多路复用。
第一种方案就是net/http标准库采用的:来一个请求开一个goroutine处理;第二种方案就是Goroutine Pool(I/O多路复用)。
由于上述陈列的一些因为goroutine规模过大而可能引起的问题,须要有方案来解决这些问题,上文已经分析过,把goroutine池化是一种行之有效的方案,基于此,能够实现一个Goroutine Pool,复用goroutine,减轻runtime的调度压力以及缓解内存压力,依托这些优化,在大规模goroutine并发的场景下能够极大地提升并发性能。
哎玛!前面絮絮不休了这么多,终于进入正题了,接下来就开始讲解如何实现一个高性能的Goroutine Pool,秒杀原生并发的goroutine,在执行速度和占用内存上提升并发程序的性能。好了,话很少说,开始装逼分析。
Goroutine Pool 的实现思路大体以下:
启动服务之时先初始化一个 Goroutine Pool 池,这个Pool维护了一个相似栈的LIFO队列 ,里面存放负责处理任务的Worker,而后在client端提交task到Pool中以后,在Pool内部,接收task以后的核心操做是:
- 检查当前Worker队列中是否有空闲的Worker,若是有,取出执行当前的task;
- 没有空闲Worker,判断当前在运行的Worker是否已超过该Pool的容量,是 — 阻塞等待直至有Worker被放回Pool;否 — 新开一个Worker(goroutine)处理;
- 每一个Worker执行完任务以后,放回Pool的队列中等待。
调度过程以下:
按照这个设计思路,我实现了一个高性能的Goroutine Pool,较好地解决了上述的大规模调度和资源占用的问题,在执行速度和内存占用方面相较于原生goroutine并发占有明显的优点,尤为是内存占用,由于复用,因此规避了无脑启动大规模goroutine的弊端,能够节省大量的内存。
此外,该调度系统还有一个清理过时Worker的定时任务,该任务在初始化一个Pool之时启动,每隔必定的时间间隔去检查空闲Worker队列中是否有已通过期的Worker,有则清理掉,经过定时清理过时worker,进一步节省系统资源。
完整的项目代码能够在个人github上获取:传送门,也欢迎提意见和交流。
Goroutine Pool的设计原理前面已经讲过了,整个调度过程相信你们应该能够理解了,可是有一句老话说得好,空谈误国,实干兴邦,设计思路有了,具体实现的时候确定会有不少细节、难点,接下来咱们经过分析这个Goroutine Pool的几个核心实现以及它们的联动来引导你们过一遍Goroutine Pool的原理。
Pool struct
:type sig struct{} type f func() error // Pool accept the tasks from client,it limits the total // of goroutines to a given number by recycling goroutines. type Pool struct { // capacity of the pool. capacity int32 // running is the number of the currently running goroutines. running int32 // expiryDuration set the expired time (second) of every worker. expiryDuration time.Duration // freeSignal is used to notice pool there are available // workers which can be sent to work. freeSignal chan sig // workers is a slice that store the available workers. workers []*Worker // release is used to notice the pool to closed itself. release chan sig // lock for synchronous operation lock sync.Mutex once sync.Once }
Pool
是一个通用的协程池,支持不一样类型的任务,亦即每个任务绑定一个函数提交到池中,批量执行不一样类型任务,是一种广义的协程池;本项目中还实现了另外一种协程池 — 批量执行同类任务的协程池PoolWithFunc
,每个PoolWithFunc
只会绑定一个任务函数pf
,这种Pool适用于大批量相同任务的场景,由于每一个Pool只绑定一个任务函数,所以PoolWithFunc
相较于Pool
会更加节省内存,但通用性就不如前者了,为了让你们更好地理解协程池的原理,这里咱们用通用的Pool
来分析。
capacity
是该Pool的容量,也就是开启worker数量的上限,每个worker绑定一个goroutine;running
是当前正在执行任务的worker数量;expiryDuration
是worker的过时时长,在空闲队列中的worker的最新一次运行时间与当前时间之差若是大于这个值则表示已过时,定时清理任务会清理掉这个worker;freeSignal
是一个信号,由于Pool开启的worker数量有上限,所以当所有worker都在执行任务的时候,新进来的请求就须要阻塞等待,那当执行完任务的worker被放回Pool之时,如何通知阻塞的请求绑定一个空闲的worker运行呢?freeSignal
就是来作这个事情的;workers
是一个slice,用来存放空闲worker,请求进入Pool以后会首先检查workers
中是否有空闲worker,如有则取出绑定任务执行,不然判断当前运行的worker是否已经达到容量上限,是—阻塞等待,否—新开一个worker执行任务;release
是当关闭该Pool支持通知全部worker退出运行以防goroutine泄露;lock
是一个锁,用以支持Pool的同步操做;once
用在确保Pool关闭操做只会执行一次。
// NewPool generates a instance of ants pool func NewPool(size, expiry int) (*Pool, error) { if size <= 0 { return nil, ErrPoolSizeInvalid } p := &Pool{ capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), expiryDuration: time.Duration(expiry) * time.Second, } // 启动按期清理过时worker任务,独立goroutine运行, // 进一步节省系统资源 p.monitorAndClear() return p, nil }
p.Submit(task f)
以下:
// Submit submit a task to pool func (p *Pool) Submit(task f) error { if len(p.release) > 0 { return ErrPoolClosed } w := p.getWorker() w.sendTask(task) return nil }
第一个if判断当前Pool是否已被关闭,如果则再也不接受新任务,不然获取一个Pool中可用的worker,绑定该task
执行。
p.getWorker()
源码:
// getWorker returns a available worker to run the tasks. func (p *Pool) getWorker() *Worker { var w *Worker // 标志,表示当前运行的worker数量是否已达容量上限 waiting := false // 涉及从workers队列取可用worker,须要加锁 p.lock.Lock() workers := p.workers n := len(workers) - 1 // 当前worker队列为空(无空闲worker) if n < 0 { // 运行worker数目已达到该Pool的容量上限,置等待标志 if p.running >= p.capacity { waiting = true // 不然,运行数目加1 } else { p.running++ } // 有空闲worker,从队列尾部取出一个使用 } else { <-p.freeSignal w = workers[n] workers[n] = nil p.workers = workers[:n] } // 判断是否有worker可用结束,解锁 p.lock.Unlock() if waiting { // 阻塞等待直到有空闲worker <-p.freeSignal p.lock.Lock() workers = p.workers l := len(workers) - 1 w = workers[l] workers[l] = nil p.workers = workers[:l] p.lock.Unlock() // 当前无空闲worker可是Pool尚未满, // 则能够直接新开一个worker执行任务 } else if w == nil { w = &Worker{ pool: p, task: make(chan f), } w.run() } return w }
上面的源码中加了较为详细的注释,结合前面的设计思路,相信你们应该能理解获取可用worker绑定任务执行这个协程池的核心操做,主要就是实现一个LIFO队列用来存取可用worker达到资源复用的效果,之因此采用LIFO后进先出队列是由于后进先出能够保证空闲worker队列是按照每一个worker的最后运行时间从远到近的顺序排列,方便在后续按期清理过时worker时排序以及清理完以后从新分配空闲worker队列,这里还要关注一个地方:达到Pool容量限制以后,额外的任务请求须要阻塞等待idle worker,这里是为了防止无节制地建立goroutine,事实上Go调度器有一个复用机制,每次使用go
关键字的时候它会检查当前结构体M中的P中,是否有可用的结构体G。若是有,则直接从中取一个,不然,须要分配一个新的结构体G。若是分配了新的G,须要将它挂到runtime的相关队列中,可是调度器却没有限制goroutine的数量,这在瞬时性goroutine爆发的场景下就可能来不及复用G而依然建立了大量的goroutine,因此ants
除了复用还作了限制goroutine数量。
其余部分能够依照注释理解,这里再也不赘述。
// Worker is the actual executor who runs the tasks, // it starts a goroutine that accepts tasks and // performs function calls. type Worker struct { // pool who owns this worker. pool *Pool // task is a job should be done. task chan f // recycleTime will be update when putting a worker back into queue. recycleTime time.Time } // run starts a goroutine to repeat the process // that performs the function calls. func (w *Worker) run() { //atomic.AddInt32(&w.pool.running, 1) go func() { //监放任务列表,一旦有任务立马取出运行 for f := range w.task { if f == nil { atomic.AddInt32(&w.pool.running, -1) return } f() //回收复用 w.pool.putWorker(w) } }() } // stop this worker. func (w *Worker) stop() { w.sendTask(nil) } // sendTask sends a task to this worker. func (w *Worker) sendTask(task f) { w.task <- task }
结合前面的p.Submit(task f)
和p.getWorker()
,提交任务到Pool以后,获取一个可用worker,每新建一个worker实例之时都须要调用w.run()
启动一个goroutine监听worker的任务列表task
,一有任务提交进来就执行;因此,当调用worker的sendTask(task f)
方法提交任务到worker的任务队列以后,立刻就能够被接收并执行,当任务执行完以后,会调用w.pool.putWorker(w *Worker)
方法将这个已经执行完任务的worker从当前任务解绑放回Pool中,以供下个任务可使用,至此,一个任务从提交到完成的过程就此结束,Pool调度将进入下一个循环。
// putWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) putWorker(worker *Worker) { // 写入回收时间,亦即该worker的最后运行时间 worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() p.freeSignal <- sig{} }
// ReSize change the capacity of this pool func (p *Pool) ReSize(size int) { if size < p.Cap() { diff := p.Cap() - size for i := 0; i < diff; i++ { p.getWorker().stop() } } else if size == p.Cap() { return } atomic.StoreInt32(&p.capacity, int32(size)) }
func (p *Pool) monitorAndClear() { heartbeat := time.NewTicker(p.expiryDuration) go func() { for range heartbeat.C { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers n := 0 for i, w := range idleWorkers { if currentTime.Sub(w.recycleTime) <= p.expiryDuration { break } n = i w.stop() idleWorkers[i] = nil p.running-- } if n > 0 { n++ p.workers = idleWorkers[n:] } p.lock.Unlock() } }() }
按期检查空闲worker队列中是否有已过时的worker并清理:由于采用了LIFO后进先出队列存放空闲worker,因此该队列默认已是按照worker的最后运行时间由远及近排序,能够方便地按顺序取出空闲队列中的每一个worker并判断它们的最后运行时间与当前时间之差是否超过设置的过时时长,如果,则清理掉该goroutine,释放该worker,而且将剩下的未过时worker从新分配到当前Pool的空闲worker队列中,进一步节省系统资源。
归纳起来,ants
Goroutine Pool的调度过程图示以下:
还记得前面我说除了通用的Pool struct
以外,本项目还实现了一个PoolWithFunc struct
—一个执行批量同类任务的协程池,PoolWithFunc
相较于Pool
,由于一个池只绑定一个任务函数,省去了每一次task都须要传送一个任务函数的代价,所以其性能优点比起Pool
更明显,这里咱们稍微讲一下一个协程池只绑定一个任务函数的细节:
上码!
type pf func(interface{}) error // PoolWithFunc accept the tasks from client,it limits the total // of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { // capacity of the pool. capacity int32 // running is the number of the currently running goroutines. running int32 // expiryDuration set the expired time (second) of every worker. expiryDuration time.Duration // freeSignal is used to notice pool there are available // workers which can be sent to work. freeSignal chan sig // workers is a slice that store the available workers. workers []*WorkerWithFunc // release is used to notice the pool to closed itself. release chan sig // lock for synchronous operation lock sync.Mutex // pf is the function for processing tasks poolFunc pf once sync.Once }
PoolWithFunc struct
中的大部分字段和Pool struct
基本一致,重点关注poolFunc pf
,这是一个函数类型,也就是该Pool绑定的指定任务函数,而client提交到这种类型的Pool的数据就再也不是一个任务函数task f
了,而是poolFunc pf
任务函数的形参,而后交由WorkerWithFunc
处理:
// WorkerWithFunc is the actual executor who runs the tasks, // it starts a goroutine that accepts tasks and // performs function calls. type WorkerWithFunc struct { // pool who owns this worker. pool *PoolWithFunc // args is a job should be done. args chan interface{} // recycleTime will be update when putting a worker back into queue. recycleTime time.Time } // run starts a goroutine to repeat the process // that performs the function calls. func (w *WorkerWithFunc) run() { go func() { for args := range w.args { if args == nil || len(w.pool.release) > 0 { atomic.AddInt32(&w.pool.running, -1) return } w.pool.poolFunc(args) w.pool.putWorker(w) } }() } // stop this worker. func (w *WorkerWithFunc) stop() { w.sendTask(nil) } // sendTask sends a task to this worker. func (w *WorkerWithFunc) sendTask(args interface{}) { w.args <- args }
上面的源码能够看到WorkerWithFunc
是一个相似Worker
的结构,只不过监听的是函数的参数队列,每接收到一个参数包,就直接调用PoolWithFunc
绑定好的任务函数poolFunc pf
任务函数执行任务,接下来的流程就和Worker
是一致的了,执行完任务后就把worker放回协程池,等待下次使用。
至于其余逻辑如提交task
、获取Worker
绑定任务等基本复用自Pool struct
,具体细节有细微差异,但原理一致,万变不离其宗,有兴趣的同窗能够看我在github上的源码:Goroutine Pool协程池 ants 。
吹了这么久的Goroutine Pool,那都是虚的,理论上池化能够复用goroutine,提高性能节省内存,没有benchmark数据以前,好像也不能服众哈!因此,本章就来进行一次实测,验证一下再大规模goroutine并发的场景下,Goroutine Pool的表现是否是真的比原生Goroutine并发更好!
测试机器参数:
OS : macOS High Sierra Processor : 2.7 GHz Intel Core i5 Memory : 8 GB 1867 MHz DDR3 Go1.9
测试结果:
这里为了模拟大规模goroutine的场景,两次测试的并发次数分别是100w和1000w,前两个测试分别是执行100w个并发任务不使用Pool和使用了ants
的Goroutine Pool的性能,后两个则是1000w个任务下的表现,能够直观的看出在执行速度和内存使用上,ants
的Pool都占有明显的优点。100w的任务量,使用ants
,执行速度与原生goroutine至关甚至略快,但只实际使用了不到5w个goroutine完成了所有任务,且内存消耗仅为原生并发的40%;而当任务量达到1000w,优点则更加明显了:用了70w左右的goroutine完成所有任务,执行速度比原生goroutine提升了100%,且内存消耗依旧保持在不使用Pool的40%左右。
测试结果:
基准测试函数名-GOMAXPROCS
,后面的-4表明测试函数运行时对应的CPU核数由于PoolWithFunc
这个Pool只绑定一个任务函数,也即全部任务都是运行同一个函数,因此相较于Pool
对原生goroutine在执行速度和内存消耗的优点更大,上面的结果能够看出,执行速度能够达到原生goroutine的300%,而内存消耗的优点已经达到了两位数的差距,原生goroutine的内存消耗达到了ants
的35倍且原生goroutine的每次执行的内存分配次数也达到了ants
45倍,1000w的任务量,ants
的初始分配容量是5w,所以它完成了全部的任务依旧只使用了5w个goroutine!事实上,ants
的Goroutine Pool的容量是能够自定义的,也就是说使用者能够根据不一样场景对这个参数进行调优直至达到最高性能。
上面的benchmarks出来之后,我当时的心里是这样的:
可是太顺利反而让我疑惑,由于结合我过去这20几年的坎坷人生来看,事情应该不会这么美好才对,果不其然,细细一想,虽然ants
Groutine Pool能在大规模并发下执行速度和内存消耗都对原生goroutine占有明显优点,但前面的测试demo相信你们注意到了,里面使用了WaitGroup,也就是用来对goroutine同步的工具,因此上面的benchmarks中主进程会等待全部子goroutine完成任务后才算完成一次性能测试,然而又有多少场景是单台机器须要扛100w甚至1000w同步任务的?基本没有啊!结果就是造出了屠龙刀,但是世界上没有龙啊!也是无情...
彼时,我心里变成了这样:
幸亏,ants
在同步批量任务方面有点曲高和寡,可是若是是异步批量任务的场景下,就有用武之地了,也就是说,在大批量的任务无须同步等待完成的状况下,能够再测一下ants
和原生goroutine并发的性能对比,这个时候的性能对比也便是吞吐量对比了,就是在相同大规模数量的请求涌进来的时候,ants
和原生goroutine谁能用更快的速度、更少的内存『吞』完这些请求。
测试结果:
由于在个人电脑上测试1000w吞吐量的时候原生goroutine已经到了极限,所以程序直接把电脑拖垮了,没法正常测试了,因此1000w吞吐的测试数据只有ants
Pool的。
从该demo测试吞吐性能对比能够看出,使用ants
的吞吐性能相较于原生goroutine能够保持在2~6倍的性能压制,而内存消耗则能够达到10~20倍的节省优点。
至此,一个高性能的 Goroutine Pool 开发就完成了,事实上,原理不难理解,总结起来就是一个『复用』,具体落实到代码细节就是锁同步、原子操做、channel通讯等这些技巧的使用,ant
这整个项目没有借助任何第三方的库,用golang的标准库就完成了全部功能,由于自己golang的语言原生库已经足够优秀,不少时候开发golang项目的时候是能够保持轻量且高性能的,未必事事须要借助第三方库。
关于ants
的价值,其实前文也说起过了,ants
在大规模的异步&同步批量任务处理都有着明显的性能优点(特别是异步批量任务),而单机上百万上千万的同步批量任务处理现实意义不大,可是在异步批量任务处理方面有很大的应用价值,因此我我的以为,Goroutine Pool真正的价值仍是在:
Go语言的三位最初的缔造者 — Rob Pike、Robert Griesemer 和 Ken Thompson 中,Robert Griesemer 参与设计了Java的HotSpot虚拟机和Chrome浏览器的JavaScript V8引擎,Rob Pike 在大名鼎鼎的bell lab侵淫多年,参与了Plan9操做系统、C编译器以及多种语言编译器的设计和实现,Ken Thompson 更是图灵奖得主、Unix之父、C语言之父。这三人在计算机史上但是元老级别的人物,特别是 Ken Thompson ,是一手缔造了Unix和C语言计算机领域的上古大神,因此Go语言的设计哲学有着深深的Unix烙印:简单、模块化、正交、组合、pipe、功能短小且聚焦等;而令许多开发者青睐于Go的简洁、高效编程模式的缘由,也正在于此。
本文从三大线程模型到Go并发调度器再到自定制的 Goroutine Pool,算是较为完整的窥探了整个Go语言并发模型的前世此生,咱们也能够看到,Go的设计固然不完美,好比一直被诟病的error处理模式、不支持泛型、差强人意的包管理以及面向对象模式的过分抽象化等等,实际上没有任何一门编程语言敢说本身是完美的,仍是那句话,任何不考虑应用场景和语言定位的争执都毫无心义,而Go的定位从出道开始就是系统编程语言&云计算编程语言(这个有点模糊),而Go的做者们也一直坚持的是用最简单抽象的工程化设计完成最复杂的功能,因此若是从这个层面去看Go的并发模型,就能够看出其实除了G-P-M模型中引入的 P ,并无太多革新的原创理论,两级线程模型是早已成熟的理论,抢占式调度更不是什么新鲜的调度模式,Go的伟大之处是在于它诞生之初就是依照Go在谷歌:以软件工程为目的的语言设计而设计的,Go其实就是将这些经典的理论和技术以一种优雅高效的工程化方式组合了起来,并用简单抽象的API或语法糖开放给使用者,Go一直致力于找寻一个高性能&开发效率的共赢点,目前为止,它作得远不够完美,但足够优秀。另外Go经过引入channel与goroutine协同工做,将一种区别于锁&原子操做的并发编程模式 — CSP 带入了Go语言,对开发人员在并发编程模式上的思考有很大的启发。
从本文中对Go调度器的分析以及ants
Goroutine Pool 的设计与实现过程,对Go的并发模型作了一次解构和优化思考,在ants
中的代码实现对锁同步、原子操做、channel通讯的使用也作了一次较为全面的实践,但愿对Gopher们在Go语言并发模型与并发编程的理解上能有所裨益。
感谢阅读。