因与工做相关,因此本文中的数据都进行了更改,但逻辑是同样的。闭包
笔者的服务ServerA会请求服务ServerH获取一些数据,但ServerH的接口有个N秒内只能请求M次的限制,并返回false。而笔者的服务瞬时请求量远超M次,因此采用了协程池在收到103错误时,中止worker的运行N秒,而后再启动。函数
协程池的相关概念:要有一个必定数量大小的池子(pool),池子里存储须要执行的任务(task),还要有若干个工做协程(worker)。测试
协程池要有启动,中止,睡眠的功能。atom
下面是从零开始记录一下思想过程和遇到的问题。code
在此版本里,除了睡眠的功能,已经实现了一个基本的协程池。server
// workpool.go package workpool import ( "context" "sync" ) type TaskFunc func() type Task struct { f TaskFunc } type WorkPool struct { pool chan *Task workerCount int stopCtx context.Context stopCancelFunc context.CancelFunc wg sync.WaitGroup } func (t *Task) Execute() { t.f() } func New(workerCount, poolLen int) *WorkPool { return &WorkPool{ workerCount: workerCount, pool: make(chan *Task, poolLen), } } func (w *WorkPool) PushTask(t *Task) { w.pool <- t } func (w *WorkPool) PushTaskFunc(f TaskFunc) { w.pool <- &Task{ f: f, } } func (w *WorkPool) work() { for { select { case <-w.stopCtx.Done(): w.wg.Done() return case t := <-w.pool: t.Execute() } } } func (w *WorkPool) Start() *WorkPool { w.wg.Add(w.workerCount) w.stopCtx, w.stopCancelFunc = context.WithCancel(context.Background()) for i := 0; i < w.workerCount; i++ { go w.work() } return w } func (w *WorkPool) Stop() { w.stopCancelFunc() w.wg.Wait() }
看起来没什么毛病,还挺简洁。其实否则...协程
下面的程序是建立一个容量为50的workpool,并将经过3个worker输出100个数字。接口
// workpool_test.go package workpool import ( "fmt" "sync" "testing" ) func TestWorkPool_Start(t *testing.T) { wg := sync.WaitGroup{} wp := New(3, 50).Start() lenth := 100 wg.Add(lenth) for i := 0; i < lenth; i++ { wp.PushTaskFunc(func() { defer wg.Done() fmt.Print(i, " ") }) } wg.Wait() }
运行后输出结果以下:it
50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 51 51 51 51 69 72 78 78 80 81 81 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 84 84 84 84 50 84 100 100 100 100 100 100 100 100 100 100 50 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 84 100 100 100
这和想象中的输出 0-99 相差甚远。io
其缘由在于闭包函数对于外部变量是引用的,因此在函数执行的时候,i的值早就已经改变了。下面是一个关于闭包的简单例子。
x := 1 f := func() { println(x) } x = 2 x = 3 f() // 3
能够将 f() 的调用时机对应为协程池中的 t.Execute()。
既然是由于闭包引用致使的问题,那就不使用闭包了呗。
能够把参数传到函数内,可是由于并不知道将要执行的函数须要的参数个数及类型,因此只能是使用不定长的interface{}
TaskFunc,在使用的时候进行断言。
如下仅列出改动部分:
// workpool.go type TaskFunc func(args ...interface{}) type Task struct { f TaskFunc args []interface{} } func (t *Task) Execute() { t.f(t.args...) } func (w *WorkPool) PushTaskFunc(f TaskFunc, args ...interface{}) { w.pool <- &Task{ f: f, args: args, } }
如下是测试程序:
// workpool_test.go package workpool import ( "fmt" "sync" "testing" ) func TestWorkPool_Start(t *testing.T) { wg := sync.WaitGroup{} wp := New(3, 50).Start() lenth := 100 wg.Add(lenth) for i := 0; i < lenth; i++ { wp.PushTaskFunc(func(args ...interface{}) { defer wg.Done() fmt.Print(args[0].(int), " ") }, i) } wg.Wait() }
输出内容以下:
0 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 2 1 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 26 48 49 51 52 53 54 55 56 50 58 59 57 61 62 63 64 65 66 25 68 6 9 70 71 72 73 67 75 76 77 74 79 78 81 82 83 84 60 86 87 88 89 90 91 92 85 94 95 96 97 98 99 80 93
虽然顺序是错乱的,但这是正常状况,闭包引用问题已解决。
基于开头的应用场景,在任意一个被worker执行的任务收到ServerH的103错误后,要中止全部worker一段时间,由于再一直请求也没有意义。
这个版本已经与笔者正在使用的相差无几了
// workpool.go package workpool import ( "context" "fmt" "sync" "sync/atomic" "time" ) type Flag int64 const ( FLAG_OK Flag = 1 << iota FLAG_RETRY Flag = 1 << iota ) type TaskFunc func(w *WorkPool, args ...interface{}) Flag type Task struct { f TaskFunc args []interface{} } type WorkPool struct { pool chan *Task workerCount int // stop相关 stopCtx context.Context stopCancelFunc context.CancelFunc wg sync.WaitGroup // sleep相关 sleepCtx context.Context sleepCancelFunc context.CancelFunc sleepSeconds int64 sleepNotify chan bool } func (t *Task) Execute(w *WorkPool) Flag { return t.f(w, t.args...) } func New(workerCount, poolLen int) *WorkPool { return &WorkPool{ workerCount: workerCount, pool: make(chan *Task, poolLen), sleepNotify: make(chan bool), } } func (w *WorkPool) PushTask(t *Task) { w.pool <- t } func (w *WorkPool) PushTaskFunc(f TaskFunc, args ...interface{}) { w.pool <- &Task{ f: f, args: args, } } func (w *WorkPool) work(i int) { for { select { case <-w.stopCtx.Done(): w.wg.Done() return case <-w.sleepCtx.Done(): time.Sleep(time.Duration(w.sleepSeconds) * time.Second) case t := <-w.pool: flag := t.Execute(w) if flag&FLAG_RETRY != 0 { w.PushTask(t) fmt.Printf("work %v PushTask,pool length %v\n", i, len(w.pool)) } } } } func (w *WorkPool) Start() *WorkPool { fmt.Printf("workpool run %d worker\n", w.workerCount) w.wg.Add(w.workerCount + 1) w.stopCtx, w.stopCancelFunc = context.WithCancel(context.Background()) w.sleepCtx, w.sleepCancelFunc = context.WithCancel(context.Background()) go w.sleepControl() for i := 0; i < w.workerCount; i++ { go w.work(i) } return w } func (w *WorkPool) Stop() { w.stopCancelFunc() w.wg.Wait() } func (w *WorkPool) sleepControl() { fmt.Println("sleepControl start...") for { select { case <-w.stopCtx.Done(): w.wg.Done() return case <-w.sleepNotify: fmt.Printf("receive sleep notify start...\n") w.sleepCtx, w.sleepCancelFunc = context.WithCancel(context.Background()) w.sleepCancelFunc() fmt.Printf("sleepControl will star sleep %v s\n", w.sleepSeconds) time.Sleep(time.Duration(w.sleepSeconds) * time.Second) w.sleepSeconds = 0 fmt.Println("sleepControl was end sleep") } } } func (w *WorkPool) SleepNotify(seconds int64) { // 由于须要CAS操做,因此sleepSeconds没有采用time.Duration类型 // 成功设置后才发出通知 if atomic.CompareAndSwapInt64(&w.sleepSeconds, 0, seconds) { fmt.Printf("sleepSeconds set %v\n", seconds) w.sleepNotify <- true } }
下面的测试程序中,模拟了一下ServerH,其使用场景与笔者工做中大同小异。
// workpool_test.go package workpool import ( "fmt" "sync" "testing" "time" ) // 这里模拟ServerH服务的限流操做 var serverh = &server{max: 10, interval: 5} type server struct { count int max int lasttime time.Time interval int64 mu sync.Mutex } func (s *server) Access(i int) bool { now := time.Now() s.mu.Lock() defer s.mu.Unlock() time.Sleep(100 * time.Millisecond) if s.lasttime.Unix() <= 0 || s.count >= s.max { if now.After(s.lasttime) { s.count = 1 s.lasttime = time.Unix(now.Unix()+s.interval, 0) return true } fmt.Printf("Access false,i=%d \n", i) return false } else { s.count++ fmt.Printf("Access true,i=%d s.count %d\n", i, s.count) return true } } // 这里是笔者服务的逻辑 func TestWorkPool_Start(t *testing.T) { wp := New(3, 100).Start() for i := 0; i < 100; i++ { time.Sleep(100 * time.Millisecond) wp.PushTaskFunc(func(w *WorkPool, args ...interface{}) Flag { if !serverh.Access(args[0].(int)) { // 发送睡眠5秒的通知 w.SleepNotify(5) // 这次未执行成功,要将该任务放回协程池 return FLAG_RETRY } return FLAG_OK }, i) } time.Sleep(100 * time.Second) }
输出内容以下:
workpool run 3 worker sleepControl start... Access true,i=1 s.count 2 Access true,i=2 s.count 3 Access true,i=3 s.count 4 Access true,i=4 s.count 5 Access true,i=5 s.count 6 Access true,i=6 s.count 7 Access true,i=7 s.count 8 Access true,i=8 s.count 9 Access true,i=9 s.count 10 Access false,i=10 sleepSeconds set 5 work 1 PushTask,pool length 0 receive sleep notify start... sleepControl will star sleep 5 s Access false,i=10 work 0 PushTask,pool length 1 Access false,i=10 work 0 PushTask,pool length 2 Access false,i=11 work 2 PushTask,pool length 3 Access false,i=12 work 1 PushTask,pool length 5 Access false,i=13 work 0 PushTask,pool length 6 Access false,i=14 work 0 PushTask,pool length 7 Access false,i=10 work 1 PushTask,pool length 8 Access false,i=15 work 1 PushTask,pool length 9 Access false,i=11 work 0 PushTask,pool length 11 Access false,i=12 work 0 PushTask,pool length 11 Access false,i=16 work 0 PushTask,pool length 12 sleepControl was end sleep Access true,i=17 s.count 2 Access true,i=14 s.count 3 Access true,i=18 s.count 4 Access true,i=10 s.count 5 Access true,i=15 s.count 6 Access true,i=20 s.count 7 Access true,i=19 s.count 8 Access true,i=12 s.count 9 Access true,i=11 s.count 10 Access false,i=21 sleepSeconds set 5 work 0 PushTask,pool length 53 receive sleep notify start... sleepControl will star sleep 5 s Access false,i=16 work 1 PushTask,pool length 54 Access false,i=22 work 2 PushTask,pool length 55 Access false,i=23 work 0 PushTask,pool length 57 Access false,i=24 ...........
重试次数的逻辑