func get(data chan int){ for v,ok := range chan{ if !ok{ //channel 已经关闭 break } // do something with v } }
若是须要中止使用channel,须要手动将channel关闭缓存
close(data)
关闭后的channel还能获取其中存在的数据,可是不能再增长数据。数据取完后ok
值为false。并发
ch = make(chan int, 10) //....some code select{ case r,ok := <- ch: if !ok { //通道已空 而且已经关闭 } }
ch = make(chan int, 10) Fill: //为循环设置tag for{ select { case ch <- 1: default: break Fill } }
for循环必须设置tag,否则select中的break没法中止外部循环,会一直执行default,陷入死循环。app
//这段代码会陷入死循环中,每次都执行default for{ select{ case <- time.After(10*time.Second): default: break } }
select{ case job <- jobList case <- time.After(10 * time.Second): //10秒后作超时处理 }
前些日子写的限制请求次数,结果用的时候发现能够更简单实现。ide
需求:抓数据的网站限定1秒只能有10次请求函数
因为发起并发请求几乎是0耗时的,因此能够选择同时发完全部的请求,而后等到下一个周期。这样控制周期内请求次数只须要一个ticker
就能搞定:发完请求就阻塞一个周期;网站
而控制同时最大并发只须要一个channel用来计数。计数不能用互斥锁计数器,由于互斥锁不能实现阻塞atom
package main import ( "fmt" "sync" "time" ) var ( working chan int //goroutine计数器 用于限制最大并发数 wg sync.WaitGroup ) func main() { jobList := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} //要工做的任务清单 //每秒3个请求 最大同时4个请求 duration := time.Second concurrency := 3 concurrencyMax := 4 ticker := time.NewTicker(duration) working = make(chan int, concurrencyMax) //经过限定1个周期派发3个任务来实现限制请求次数 k := 0 //用于控制周期内发送次数 for c, job := range jobList { working <- c //计数器+1 可能会发生阻塞 wg.Add(1) go work(job) k++ if k == concurrency { <-ticker.C //等待一个周期 可能会白等 k = 0 } } wg.Wait() } func work(j int64) { defer wg.Done() fmt.Println("doing work#", j) <-time.After(5 * time.Second) //假设5秒完成 //工做完成后计数器减1 <-working }
上面这个相对就省事不少了。
可是,若是计数器+1的时候发生阻塞,那么下一个等待周期多是白等的。
一样的缘由,若是发起请求的操做也有耗时,极可能这一批请求发完就已经进入下一个周期,因而不等就有超发的风险,等待有白等的风险。设计
所以上面的方法仅限于发起并发请求几乎0耗时的操做。code
若是要避免白等,就还须要一个精确的周期计数器。两种方案:token
不管哪一种方案都须要加锁。
第一种方案加锁是为了不在发放令牌的时候遭遇通道关闭(会引起panic)。
第二种在+1和-1甚至比对的时候都要加锁。
抓数据的网站限定1秒只能有10次请求,所以设计了一个令牌管理机制来控制请求数量。
设计思路以下:
package main import ( "errors" "fmt" "sync" "sync/atomic" "time" ) //节流器 type throttle struct { D time.Duration //周期是D, C int64 //限制一个周期最多操做C次 Mu sync.Mutex Token chan bool //令牌池 num int64 //当前的goroutine数量 maxNum int64 //容许工做goroutine最大数量 } //若是两个周期后尚未申请到令牌,就报错超时 //目前用不到,若是限制routine最大数量须要靠这来监控 var ErrApplyTimeout = errors.New("apply token time out") func NewThrottle(D time.Duration, C, maxNum int64) *throttle { instance := &throttle{ D: D, C: C, Token: make(chan bool, C), maxNum: maxNum, } go instance.reset() return instance } //每周期从新填充一次令牌池 func (t *throttle) reset() { ticker := time.NewTicker(t.D) for _ = range ticker.C { //goroutine数量不超过最大数量时再填充令牌池 if t.num >= t.maxNum { continue } t.Mu.Lock() supply := t.C - int64(len(t.Token)) fmt.Printf("reset token:%d\n", supply) for supply > 0 { t.Token <- true supply-- } t.Mu.Unlock() } } //申请令牌,若是过两个周期还没申请到就报超时退出 func (t *throttle) ApplyToken() (bool, error) { select { case <-t.Token: return true, nil case <-time.After(t.D * 2): return false, ErrApplyTimeout } } func (t *throttle) Work(job func()) { if ok, err := t.ApplyToken(); !ok { fmt.Println(err) return } go func() { atomic.AddInt64(&t.num, 1) defer atomic.AddInt64(&t.num, -1) job() }() } func main() { t := NewThrottle(time.Second, 10, 20) //每秒10次,同时最多20个routine存在 for { t.Work(doWork) } } //真正的工做函数 假设每一个须要执行5秒 func doWork() { fmt.Println(time.Now()) <-time.After(5 * time.Second) }