当咱们使用goroutine
的时候让函数并发执行的时候,能够借助着sync.WaitGroup{}
的能力,其中代码以下:golang
func testGoroutine() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
wg.Done()
fmt.Println("hello world")
}()
}
wg.Wait()
}
复制代码
看完上述代码此时咱们须要考虑的是,假设goroutine
由于一些rpc
请求过慢致使hang
住,此时goroutine
会一直卡住在wg.Wait()
,最终致使请求失败bash
除非你使用的框架提供了一个超时的能力,或者你go
出去的rpc
请求存在超时断开的能力并发
hang
住呢?最简单的解法就是增长超时!框架
实际上超时也有不少解法函数
ctx
的context.WithTimeOut()
实现select
实现这里我选择基于select
实现超时来给你们看下代码如何实现post
func testWithGoroutineTimeOut() {
var wg sync.WaitGroup
done := make(chan struct{})
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
}()
}
// wg.Wait()此时也要go出去,防止在wg.Wait()出堵住
go func() {
wg.Wait()
close(done)
}()
select {
// 正常结束完成
case <-done:
// 超时
case <-time.After(500 * time.Millisecond):
}
}
复制代码
能够看到上述代码,已经基于select
实现了超时,是否是很是简单呢~ui
可是咱们对于这个接口会有更高的要求。spa
goroutine
没有错误处理,go
出去的goroutine
数量是依赖for
循环的数量,假设for
循环100w次,形成goroutine
过多的问题能够写一个协程池解决goroutine
过多,,那么协程池如何实现呢?code
咱们能够使用sync waitGroup
+ 非阻塞channel
实现 代码以下:协程
package ezgopool
import "sync"
// goroutine pool
type GoroutinePool struct {
c chan struct{}
wg *sync.WaitGroup
}
// 采用有缓冲channel实现,当channel满的时候阻塞
func NewGoroutinePool(maxSize int) *GoroutinePool {
if maxSize <= 0 {
panic("max size too small")
}
return &GoroutinePool{
c: make(chan struct{}, maxSize),
wg: new(sync.WaitGroup),
}
}
// add
func (g *GoroutinePool) Add(delta int) {
g.wg.Add(delta)
for i := 0; i < delta; i++ {
g.c <- struct{}{}
}
}
// done
func (g *GoroutinePool) Done() {
<-g.c
g.wg.Done()
}
// wait
func (g *GoroutinePool) Wait() {
g.wg.Wait()
}
复制代码
以上就是协程池的实现,实际是很是简单的,个人博客也记录了另外一个golang
协程池的开源实现,具体见 juejin.im/post/5d4f9f…
而后最后咱们的超时+错误快返回+协程池模型就完成了~
func testGoroutineWithTimeOut() {
wg :=sync.WaitGroup{}
done := make(chan struct{})
// 新增阻塞chan
errChan := make(chan error)
pool.NewGoroutinePool(10)
for i := 0; i < 10; i++ {
pool.Add(1)
go func() {
pool.Done()
if err!=nil{
errChan<-errors.New("error")
}
}()
}
go func() {
pool.Wait()
close(done)
}()
select {
// 错误快返回,适用于get接口
case err := <-errChan:
return nil, err
case <-done:
case <-time.After(500 * time.Millisecond):
}
}
复制代码
谢谢