前段时间由于一个爬虫项目,最开始作的时候是无脑的一个下载任务就开一个协程,可是后期出现了比较大的内存问题,而且下载的效果也不是那么的好,后面发现是由于协程开的太多了,而且下行的带宽就只有那么的大,因此并不能和想象中的那样。哎,仍是由于too young,too simple,sometimes naive.git
这篇主要是讲解的tunny是如何是如何实现并保持一个goroutine pool的。github
由于本人是小菜,加上时间仓促,因此要是有什么问题的话但愿大佬指正。安全
tunny地址:https://github.com/Jeffail/tunny
这是一个goroutine pool包,能够设置或者动态改变goroutine pool中goroutine的数量,生成一个固定的数量的pool,实现goroutine的重复使用,而且能在必定程度上控制goroutineapp
1.基本的数据类型函数
经过tunny的源码包文件数量并很少,只有3个文件,tonny.go和worker.go,没有那么多的文件层次结构,因此阅读起来特别的方便。这也是我比较喜欢阅读go语言代码的缘由。this
tunny.go中spa
Pool结构
主要是用于对整个pool的管理,其中包括poolcode
type Pool struct { ctor func() Worker //goroutine中用户的业务逻辑函数 workers []*workerWrapper //目前已经存在的goroutine信息,workerWrapper结构定义在worker.go的中, reqChan chan workRequest //任务调度管道,主要是用户管理当前goroutine是否执行任务,它和workerWrapper中的reqChan 实际上是一个,可是workerWrapper的reqChan只是一个发送管道,这个后面会继续讲解 workerMut sync.Mutex //锁 queuedJobs int64 计数,表示当前已经在运行的任务 }
worker接口主要用户包装用户的业务逻辑的funcorm
type Worker interface { // Process will synchronously perform a job and return the result. // Process(interface{}) interface{} // BlockUntilReady is called before each job is processed and must block the // calling goroutine until the Worker is ready to process the next job. BlockUntilReady() // Interrupt is called when a job is cancelled. The worker is responsible // for unblocking the Process implementation. Interrupt() // Terminate is called when a Worker is removed from the processing pool // and is responsible for cleaning up any held resources. Terminate() }
closureWorker 顾明思议,主要是用于包装用户的业务逻辑,
而且是Worker的彻底接收者协程
type closureWorker struct { processor func(interface{}) interface{} }
在worker.go中
type workerWrapper struct { worker Worker //用户存放用户定义的业务逻辑函数 interruptChan chan struct{} //用于外部干预,使当前goroutine提早终止 // reqChan is NOT owned by this type, it is used to send requests for work. reqChan chan<- workRequest //这个和pool.go中Pool类型中的reqChan是一个,只不过当前这个是一个发送管道 // closeChan can be closed in order to cleanly shutdown this worker. closeChan chan struct{} //这个是用于传递关闭当前goroutine的消息 // closedChan is closed by the run() goroutine when it exits. closedChan chan struct{} //这个我感受并无太大的实际意义 }
这个主要是用于传递任务参数。以及返回任务执行结果的类型
type workRequest struct { // jobChan is used to send the payload to this worker. jobChan chan<- interface{} // retChan is used to read the result from this worker. retChan <-chan interface{} // interruptFunc can be called to cancel a running job. When called it is no // longer necessary to read from retChan. interruptFunc func() }
2.如何建立一个goroutine pool
根据代码的调用步骤,
首先是实例化一个Pool类型的数据,并将用户用户的业务func包装成closureWorker类型并存储在Pool类型实例中的ctor字段中
使用外部调用建立一个Pool对象:
包中建立一个Pool的逻辑
逻辑很简单,一眼就能看明白。
那么在哪里启动一个goroutine,请看下面
注意这里的参数传递,这里传递了一个channel类型的参数,众所周知,在go中,分为两种类型,一种是值类型,一种是引用类型(map,slice,channel),说这么多有什么用呢,怎么扯到引用类型上面去了呢,但这个很重要
咱们接下咱们看在newWorkerWrapper中的逻辑
上面说到,咱们传递过去了两个参数,其中一个是一个channel类型的,由于channel引用类型,因此他的传递是地址,因此在最后newWorkerWrapper中赋值的时候workerWrapper.reqChan和pool.reqChan实际指向的是同一个地址,区别就是workerWrapper.reqChan是一个发送管道罢了
咱们能够输出看看
下面是run函数中的代码
run函数中的代码算是是整个包中最重要的代码了。
他的实现原理是比较简单的,就是采用的是一个for+select+channel来实现的,而且select采用是嵌套的形式,可是其中仍是有些比较难以理解的(固然对我小白我来讲哈,2333333)
我感受主要是这两段
这两段的代码,须要结合到下一个小姐来讲,请看下一个。
2.调用goroutine pool
这里调用很简单,只须要ret := pool.Process(参数)就ok了
咱们来看看Process中是怎么样的
Process中逻辑很简单,上一个小姐咱们知道,pool的reqChan 和 pool.workers.reqChan 是指向的同一个地址,可是后者为一个发送管道因此,在这样来使用时安全的,数据是不会错误的 。
在前面个人run函数中,有两段代码还没说明意思,如今我就说明一下,第一个就是这段,
(1)在咱们定义reqChan管道的时候,咱们定义的是一个没有缓冲区的管道,因此在没有接受操做的状况下,咱们向管道里面推送数据是会被阻塞住的。
(2)在go中select是在有IO操做的状况下会被触发,因此要是咱们没有在Process函数中调用reqChan接收数据,当前goroutine是会被阻塞住的这样当前select内层的select也会被阻塞住。
而后咱们在来看经过reqChan传递过来的值
上面讲到,channel是引用类型,因此它在传递的时候是传递的地址,而不是值,因此,咱们接收到的jobChan和retChan和传递过来指向的是一样的地址,这样咱们就能实现共享通讯了。咱们能够输出里面两边的地址看看,这里我开了一个容量为2的pool,而后我调用pool里面的其中一个goroutine,咱们看打印的地址
看。。。。没错吧。。。。。
有一个问题,就是当咱们的pool有2个goroutine的时候,可是咱们有200个任务须要完成,也就是须要调用200测goroutine,Tunny是怎么样实现调度的呢,这个后面的文章补充吧,下班。。。。。。。。
算了仍是在这里写吧。。。。。
对于前面的问题其实很简单。由于我在建立了一个Pool的时候,就只开了2个goroutine,而且使用的是一个双层的select,第一层是reqChan发送管道阻塞住的
因此就算你这时候同时执行了100个pool.Process(10),可是每次同时也只能有两个消息从reqChan发送管道发出,其余的98个reqChan接收管道都会阻塞住。