在golang中,只须要在函数调用前加上关键字go便可建立一个并发任务单元,而这个新建的任务会被放入队列中,等待调度器安排。相比系统的MB级别线程栈,goroutine的自定义栈只有2KB,这使得咱们可以轻易建立上万个并发任务,如此对性能提高很多。但随之而来的有如下几个问题:html
本文记录了笔者就以上几个问题进行探究的过程,文中给出了大部分问题的解决方案,同时也抛出了未解决的问题,期待与各位交流:pgit
开始以前先定义一个常量const N=100
以及一个HeavyWork
函数,假定该函数具备极其冗长、复杂度高、难以解耦的特性github
func HeavyWork(id int) { rand.Seed(int64(id)) interval := time.Duration(rand.Intn(3)+1) * time.Second time.Sleep(interval) fmt.Printf("HeavyWork %-3d cost %v\n", id, interval) }
以上定义的内容将在以后的代码中直接使用以缩减篇幅,大部分完整代码可在 Github: explore-goroutine 中找到golang
"Do not communicate by sharing memory; instead, share memory by communicating"——GO的一大设计哲学《Share Memory By Communicating》
翻译成中文就是,用通讯来共享内存数据,而不要经过共享内存数据来进行通讯。
Go中的goroutines和channel提供了一种优雅而独特的结构化并发软件的方法,咱们能够利用通道(channel)的特性,来实现当前等待goroutine的操做。可是channel并非当前这个场景的最佳方案,用它来实现的方式是稍显笨拙的,须要知道肯定个数的goroutine,同时稍不注意就极易产生死锁,代码以下:算法
// "talk is cheap, show me the code." func main() { waitChan := make(chan int, 1) for i := 0; i < N; i++ { go func(n int) { HeavyWork(n) waitChan <- 1 }(i) } cnt := 0 for range waitChan { cnt++ if cnt == N { break } } close(waitChan) fmt.Println("finished") }
上述代码使用了一个缓存大小为1的通道(channel),建立N个goroutine用于运行HeavyWork
,每一个任务完成后向waitChan写入一个数据,在收到N个完成信号后退出。
但事实上比较优雅的方式是使用go标准库sync
,其中提供了专门的解决方案sync.WaitGroup
用于等待一个goroutines集合的结束c#
// "talk is cheap, show me the code." func main() { wg := sync.WaitGroup{} for i := 0; i < N; i++ { wg.Add(1) go func(n int) { defer wg.Done() HeavyWork(n) }(i) } wg.Wait() fmt.Println("finished") }
关于sync.WaitGroup
的具体使用请参照官方文档 [GoDoc] sync.WaitGroup ,这里再也不赘述缓存
信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是能够用来保证两个或多个关键代码段不被并发调用。多线程
其中V操做会增长信号量的数值即释放资源,而P操做会减小它即占用资源并发
那么很是容易想到的就是利用channel(通道)缓存有限的特性,它容许咱们能够自实现一个简单的数量控制,就如同使用信号量通常,在这基础再加上前面提到的sync.WaitGroup
,咱们能够打出一套组合拳,提供可阻塞的信号量PV操做,可以实现固定建立goroutine数量而且支持等待当前goroutine的退出。结构体定义以下:app
type Semaphore struct { Threads chan int Wg sync.WaitGroup }
而P操做只需在channel中加入一个元素同时调用WaitGroup.Add便可,这一操做完成对资源的申请
func (sem *Semaphore) P() { sem.Threads <- 1 sem.Wg.Add(1) }
相反则是V操做,进行资源的释放
func (sem *Semaphore) V() { sem.Wg.Done() <-sem.Threads }
Wait则阻塞等待直到当前全部资源都归还,直接调用WaitGroup的方法便可
func (sem *Semaphore) Wait() { sem.Wg.Wait() }
完整代码能够在 Github: semaphore 中查看
利用上面的信号量就能够作到,在一个时刻的goroutines数量不会超过信号量值的大小,而某个goroutine退出后将返还占用的信号量,而正在等待的goroutine就能够当即申请,下图形象地展示了运行时的状态
对于goroutine的主动退出,比较友好的作法就是循环监听一个channel,经过相似信号的方式来告知goroutine的”该退出了“,而后goroutine本身主动退出,这种作法在网上十分常见,也是Golang官方推荐的作法,思想也很简单。
func main() { ok, quit := make(chan int, 1), make(chan int, 1) go func() { i := 0 for { select { case <-quit: ok <- 1 return default: HeavyWork(i) i++ } } }() time.Sleep(5 * time.Second) quit <- 1 <-ok }
运行结果以下图
上面讲了一些关于goroutines和channel的简单使用,接下来终于写到本文的重点了。笔者并无解决如何从外部杀死一个goroutine,但记录了尝试“杀死”中的可行或不可行方法,但愿对各位有所帮助。
由于近期在开发中遇到这样一个问题,当一个函数是极其冗长、复杂度高、难以解耦的顺序结构代码时(例如某个极其复杂无循环结构的加密算法),并且因为数据量巨大,须要反复调用该函数,因为每运行一次,程序都会消耗大量的时间、空间,那么当一个任务已经被用户抛弃时,如何才能抛弃仍在作着无用功的goroutine?
为了达到“杀死goroutine”的目的,笔者作了不少尝试,如
关于“如何杀死goroutine”,网上有一部分答案就是利用select实现的,可是这种方式实现的代码并不适用于服务类的程序,可是对于通常非服务类程序的确可以实现杀死goroutine的效果,代码以下:
func main() { wrapper := func() chan int { c := make(chan int) go func() { HeavyWork(0) c <- 1 }() return c } select { case <-wrapper(): case <-time.After(1 * time.Second): fmt.Println("time limit exceed") } // time.Sleep(3 * time.Second) }
可是一旦主函数没有当即退出,而是做为某种服务而继续运行时,这里删除了main函数的最后一行注释time.Sleep(3 * time.Second)
,延迟三秒后退出。能够看见尽管已经超时并输出"time limit exceed"以后,HeavyWork
在main函数没退出前依旧在运行。效果以下
因此使用select-timeout的方式比较适合实时退出类型的程序,可以实现必定程度上的并发控制,
就目前而言,尚未完美的方案来解决控制goroutine的问题,事实上Go彷佛并不容许和推荐人们直接控制goroutine,因此暂时还没法作到从外部直接控制goroutine的生命周期,因此比较推荐的作法仍是只能经过goroutine主动退出的方法,循环监听channel,在发出退出信号后最多只消耗一轮资源后就退出,但这就要求该代码具备循环结构不然就很难使用。有更好解决方案的朋友,请务必告诉我!