goroutine并发实践(协程池+超时+错误快返回)

当咱们使用goroutine的时候让函数并发执行的时候,能够借助着sync.WaitGroup{}的能力,其中代码以下:golang

func testGoroutine() {
	wg := sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			wg.Done()
			fmt.Println("hello world")
		}()
	}
	wg.Wait()
}
复制代码

看完上述代码此时咱们须要考虑的是,假设goroutine由于一些rpc请求过慢致使hang住,此时goroutine会一直卡住在wg.Wait(),最终致使请求失败bash

除非你使用的框架提供了一个超时的能力,或者你go出去的rpc请求存在超时断开的能力并发

那么咱们如何让代码不被hang住呢?

最简单的解法就是增长超时!框架

实际上超时也有不少解法函数

  • 基于ctxcontext.WithTimeOut()实现
  • 基于select实现

这里我选择基于select实现超时来给你们看下代码如何实现post

func testWithGoroutineTimeOut() {
	var wg sync.WaitGroup
	done := make(chan struct{})
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
		}()
	}
	// wg.Wait()此时也要go出去,防止在wg.Wait()出堵住
	go func() {
		wg.Wait()
		close(done)
	}()
	select {
	// 正常结束完成
	case <-done:
	// 超时	
	case <-time.After(500 * time.Millisecond):
	}
}
复制代码

能够看到上述代码,已经基于select实现了超时,是否是很是简单呢~ui

可是咱们对于这个接口会有更高的要求。spa

  • goroutine没有错误处理,
  • 此时go出去的goroutine数量是依赖for循环的数量,假设for循环100w次,形成goroutine过多的问题

能够写一个协程池解决goroutine过多,,那么协程池如何实现呢?code

咱们能够使用sync waitGroup+ 非阻塞channel实现 代码以下:协程

package ezgopool

import "sync"

// goroutine pool
type GoroutinePool struct {
	c  chan struct{}
	wg *sync.WaitGroup
}

// 采用有缓冲channel实现,当channel满的时候阻塞
func NewGoroutinePool(maxSize int) *GoroutinePool {
	if maxSize <= 0 {
		panic("max size too small")
	}
	return &GoroutinePool{
		c:  make(chan struct{}, maxSize),
		wg: new(sync.WaitGroup),
	}
}

// add
func (g *GoroutinePool) Add(delta int) {
	g.wg.Add(delta)
	for i := 0; i < delta; i++ {
		g.c <- struct{}{}
	}

}

// done
func (g *GoroutinePool) Done() {
	<-g.c
	g.wg.Done()
}

// wait
func (g *GoroutinePool) Wait() {
	g.wg.Wait()
}

复制代码

以上就是协程池的实现,实际是很是简单的,个人博客也记录了另外一个golang协程池的开源实现,具体见 juejin.im/post/5d4f9f…

而后最后咱们的超时+错误快返回+协程池模型就完成了~

func testGoroutineWithTimeOut() {
	 wg :=sync.WaitGroup{}
	done := make(chan struct{})
	// 新增阻塞chan
	errChan := make(chan error)

	pool.NewGoroutinePool(10)
	for i := 0; i < 10; i++ {
		pool.Add(1)
		go func() {
			pool.Done()
			if err!=nil{
				errChan<-errors.New("error")
			}
		}()
	}

	go func() {
		pool.Wait()
		close(done)
	}()

	select {
	// 错误快返回,适用于get接口
	case err := <-errChan:
		return nil, err
	case <-done:
	case <-time.After(500 * time.Millisecond):
	}
}

复制代码

谢谢

相关文章
相关标签/搜索