goroutine不一样于thread,threads是操做系统中的对于一个独立运行实例的描述,不一样操做系统,对于thread的实现也不尽相同;可是,操做系统并不知道goroutine的存在,goroutine的调度是有Golang运行时进行管理的。启动thread虽然比process所需的资源要少,可是多个thread之间的上下文切换仍然是须要大量的工做的(寄存器/Program Count/Stack Pointer/...),Golang有本身的调度器,许多goroutine的数据都是共享的,所以goroutine之间的切换会快不少,启动goroutine所耗费的资源也不多,一个Golang程序同时存在几百个goroutine是很正常的。前端
channel,即“管道”,是用来传递数据(叫消息更为合适)的一个数据结构,便可以从channel里面塞数据,也能够从中获取数据。channel自己并无什么神奇的地方,可是channel加上了goroutine,就造成了一种既简单又强大的请求处理模型,即N个工做goroutine将处理的中间结果或者最终结果放入一个channel,另外有M个工做goroutine从这个channel拿数据,再进行进一步加工,经过组合这种过程,从而胜任各类复杂的业务模型。golang
本身在实践的过程当中,产生了几种经过goroutine + channel实现的工做模型,本文分别对这些模型进行介绍。json
直接加上go
关键字,就可让一个函数脱离原先的主函数独立运行,即主函数直接继续进行剩下的操做,而不须要等待某个十分耗时的操做完成。好比咱们在写一个服务模块,接收到前端请求以后,而后去作一个比较耗时的任务。好比下面这个:后端
func (m *SomeController) PorcessSomeTask() { var task models.Task if err := task.Parse(m.Ctx.Request); err != nil { m.Data["json"] = err m.ServeJson() return } task.Process() m.ServeJson()
若是Process函数须要耗费大量时间的话,这个请求就会被block住。有时候,前端只须要发出一个请求给后端,而且不须要后端当即所处响应。遇到这样的需求,直接在耗时的函数前面加上go
关键字就能够将请求之间返回给前端了,保证了体验数据结构
func (m *SomeController) PorcessSomeTask() { var task models.Task if err := task.Parse(m.Ctx.Request); err != nil { m.Data["json"] = err m.ServeJson() return } go task.Process() m.ServeJson()
不过,这种作法也是有许多限制的。好比:架构
上一个方案有一个缺点就是没法控制并发,若是这一类请求同一个时间段有不少的话,每个请求都启动一个goroutine,若是每一个goroutine中还须要使用其余系统资源,消耗将是不可控的。并发
遇到这种状况,一个解决方案是:将请求都转发给一个channel,而后初始化多个goroutine读取这个channel中的内容,并进行处理。假设咱们能够新建一个全局的channel函数
var TASK_CHANNEL = make(chan models.Task)
而后,启动多个goroutine:优化
for i := 0; i < WORKER_NUM; i ++ { go func() { for { select { case task := <- TASK_CHANNEL: task.Process() } } } () }
服务端接收到请求以后,将任务传入channel中便可:spa
func (m *SomeController) PorcessSomeTask() { var task models.Task if err := task.Parse(m.Ctx.Request); err != nil { m.Data["json"] = err m.ServeJson() return } //go task.Process() TASK_CHANNEL <- task m.ServeJson() }
这样一来,这个操做的并发度就能够经过WORKER_NUM
来控制了。
不过,上面方案有一个bug:那就是channel初始化时是没有设置长度的,所以当全部WORKER_NUM
个goroutine都正在处理请求时,再有请求过来的话,仍然会出现被block的状况,并且会比没有通过优化的方案还要慢(由于须要等某一个goroutine结束时才能处理它)。所以,须要在channel初始化时增长一个长度:
var TASK_CHANNEL = make(chan models.Task, TASK_CHANNEL_LEN)
这样一来,咱们将TASK_CHANNEL_LEN
设置得足够大,请求就能够同时接收TASK_CHANNEL_LEN
个请求而不用担忧被block。不过,这其实仍是有问题的:那若是真的同时有大于TASK_CHANNEL_LEN
个请求过来呢?一方面,这就应该算是架构方面的问题了,能够经过对模块进行扩容等操做进行解决。另外一方面,模块自己也要考虑如何进行“优雅降级了”。遇到这种状况,咱们应该但愿模块可以及时告知调用方,“我已经达处处理极限了,没法给你处理请求了”。其实,这种需求,能够很简单的在Golang中实现:若是channel发送以及接收操做在select语句中执行而且发生阻塞,default语句就会当即执行。
select { case TASK_CHANNEL <- task: //do nothing default: //warnning! return fmt.Errorf("TASK_CHANNEL is full!") } //...
若是处理程序比较复杂的时候,一般都会出如今一个goroutine中,还会发送一些中间处理的结果发送给其余goroutine去作,通过多道“工序”才能最终将结果产出。
那么,咱们既须要把某一个中间结果发送给某个channel,也要能获取处处理此次请求的结果。解决的方法是:将一个channel实例包含在请求中,goroutine处理完成后将结果写回这个channel。
type TaskResponse struct { //... } type Task struct { TaskParameter SomeStruct ResChan *chan TaskResponse } //... task := Task { TaskParameter : xxx, ResChan : make(chan TaskResponse), } TASK_CHANNEL <- task res := <- task.ResChan //...
(这边可能会有疑问:为何不把一个复杂的任务都放在一个goroutine中依次的执行呢?是由于这里须要考虑到不一样子任务,所消耗的系统资源不尽相同,有些是CPU集中的,有些是IO集中的,因此须要对这些子任务设置不一样的并发数,所以须要经由不一样的channel + goroutine去完成。)
将任务通过分组,交由不一样的goroutine进行处理,最终再将每一个goroutine处理的结果进行合并,这个是比较常见的处理流程。这里须要用到WaitGroup
来对一组goroutine进行同步。通常的处理流程以下:
var wg sync.WaitGroup for i := 0; i < someLen; i ++ { wg.Add(1) go func(t Task) { defer wg.Done() //对某一段子任务进行处理 } (tasks[i]) } wg.Wait() //处理剩下的工做
即便是复杂、耗时的任务,也必须设置超时时间。一方面多是业务对此有时限要求(用户必须在XX分钟内看到结果),另外一方面模块自己也不能都消耗在一直没法结束的任务上,使得其余请求没法获得正常处理。所以,也须要对处理流程增长超时机制。
我通常设置超时的方案是:和以前提到的“接收发送给channel以后返回的结果”结合起来,在等待返回channel的外层添加select
,并在其中经过time.After()
来判断超时。
task := Task { TaskParameter : xxx, ResChan : make(chan TaskResponse), } select { case res := <- task.ResChan: //... case <- time.After(PROCESS_MAX_TIME): //处理超时 }
既然有了超时机制,那也须要一种机制来告知其余goroutine结束手上正在作的事情并退出。很明显,仍是须要利用channel来进行交流,第一个想到的确定就是向某一个chan发送一个struct便可。好比执行任务的goroutine在参数中,增长一个chan struct{}
类型的参数,当接收到该channel的消息时,就退出任务。可是,还须要解决两个问题:
对于第一个问题,比较优雅的做法是:使用另一个channel做为函数d输出,再加上select
,就能够一边输出结果,一边接收退出信号了。
另外一方面,对于同时有未知数目个执行goroutine的状况,一次次调用done <-struct{}{}
,显然没法实现。这时候,就会用到golang对于channel的tricky用法:当关闭一个channel时,全部由于接收该channel而阻塞的语句会当即返回。示例代码以下:
// 执行方 func doTask(done <-chan struct{}, tasks <-chan Task) (chan Result) { out := make(chan Result) go func() { // close 是为了让调用方的range可以正常退出 defer close(out) for t := range tasks { select { case result <-f(task): case <-done: return } } }() return out } // 调用方 func Process(tasks <-chan Task, num int) { done := make(chan struct{}) out := doTask(done, tasks) go func() { <- time.After(MAX_TIME) //done <-struct{}{} //通知全部的执行goroutine退出 close(done) }() // 由于goroutine执行完毕,或者超时,致使out被close,range退出 for res := range out { fmt.Println(res) //... } }