慕课go高级工程师实战营

爱分享 爱生活 加油 2021

 

循环获取channel

func get(data chan int){
    for v,ok := range chan{
        if !ok{
            //channel 已经关闭
            break
        }
        // do something with v
    }
}

若是须要中止使用channel,须要手动将channel关闭缓存

close(data)

关闭后的channel还能获取其中存在的数据,可是不能再增长数据。数据取完后ok值为false。并发

channel关闭的判断

ch = make(chan int, 10)
//....some code
select{
    case r,ok := <- ch:
    if !ok  {
        //通道已空 而且已经关闭
    }
}

向有缓存的channel传数据,满了就中止,不阻塞

ch = make(chan int, 10)

Fill: //为循环设置tag
for{
    select {
        case ch <- 1:
        default:
            break Fill
    }
}

for循环必须设置tag,否则select中的break没法中止外部循环,会一直执行default,陷入死循环。app

//这段代码会陷入死循环中,每次都执行default
for{
    select{
    case <- time.After(10*time.Second):
    default:
        break
    }
}

超时的使用

select{
    case job <- jobList
    case <- time.After(10 * time.Second):
    //10秒后作超时处理
}



前些日子写的限制请求次数,结果用的时候发现能够更简单实现。ide

需求:抓数据的网站限定1秒只能有10次请求函数

因为发起并发请求几乎是0耗时的,因此能够选择同时发完全部的请求,而后等到下一个周期。这样控制周期内请求次数只须要一个ticker就能搞定:发完请求就阻塞一个周期;网站

而控制同时最大并发只须要一个channel用来计数。计数不能用互斥锁计数器,由于互斥锁不能实现阻塞atom

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    working chan int //goroutine计数器 用于限制最大并发数
    wg      sync.WaitGroup
)

func main() {
    jobList := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} //要工做的任务清单

    //每秒3个请求 最大同时4个请求
    duration := time.Second
    concurrency := 3
    concurrencyMax := 4

    ticker := time.NewTicker(duration)
    working = make(chan int, concurrencyMax)

    //经过限定1个周期派发3个任务来实现限制请求次数
    k := 0 //用于控制周期内发送次数
    for c, job := range jobList {
        working <- c //计数器+1 可能会发生阻塞

        wg.Add(1)
        go work(job)

        k++
        if k == concurrency {
            <-ticker.C //等待一个周期 可能会白等
            k = 0
        }
    }
    wg.Wait()
}

func work(j int64) {
    defer wg.Done()

    fmt.Println("doing work#", j)
    <-time.After(5 * time.Second) //假设5秒完成

    //工做完成后计数器减1
    <-working
}

上面这个相对就省事不少了。
可是,若是计数器+1的时候发生阻塞,那么下一个等待周期多是白等的。
一样的缘由,若是发起请求的操做也有耗时,极可能这一批请求发完就已经进入下一个周期,因而不等就有超发的风险,等待有白等的风险。设计

所以上面的方法仅限于发起并发请求几乎0耗时的操做。code

若是要避免白等,就还须要一个精确的周期计数器。两种方案:token

  1. 相似令牌池,维持一个channel来发放令牌,周期性刷新。就像这里令牌池的实现
  2. 维持一个计数器,周期性重置

不管哪一种方案都须要加锁。
第一种方案加锁是为了不在发放令牌的时候遭遇通道关闭(会引起panic)。
第二种在+1和-1甚至比对的时候都要加锁。

 

抓数据的网站限定1秒只能有10次请求,所以设计了一个令牌管理机制来控制请求数量。

设计思路以下:

  • 发请求前须要先获取令牌
  • 限定某时间段内的发放的令牌数量
  • 任务执行完成后不能归还令牌,只能使用定时器不断重置令牌
  • 若是当前goroutine数量过多时也不重置令牌
package main

import (
    "errors"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

//节流器

type throttle struct {
    D      time.Duration //周期是D,
    C      int64         //限制一个周期最多操做C次
    Mu     sync.Mutex
    Token  chan bool //令牌池
    num    int64     //当前的goroutine数量
    maxNum int64     //容许工做goroutine最大数量
}

//若是两个周期后尚未申请到令牌,就报错超时
//目前用不到,若是限制routine最大数量须要靠这来监控
var ErrApplyTimeout = errors.New("apply token time out")

func NewThrottle(D time.Duration, C, maxNum int64) *throttle {
    instance := &throttle{
        D:      D,
        C:      C,
        Token:  make(chan bool, C),
        maxNum: maxNum,
    }
    go instance.reset()
    return instance
}

//每周期从新填充一次令牌池
func (t *throttle) reset() {
    ticker := time.NewTicker(t.D)
    for _ = range ticker.C {
        //goroutine数量不超过最大数量时再填充令牌池
        if t.num >= t.maxNum {
            continue
        }
        t.Mu.Lock()
        supply := t.C - int64(len(t.Token))
        fmt.Printf("reset token:%d\n", supply)
        for supply > 0 {
            t.Token <- true
            supply--
        }
        t.Mu.Unlock()
    }
}

//申请令牌,若是过两个周期还没申请到就报超时退出
func (t *throttle) ApplyToken() (bool, error) {
    select {
    case <-t.Token:
        return true, nil
    case <-time.After(t.D * 2):
        return false, ErrApplyTimeout
    }
}

func (t *throttle) Work(job func()) {
    if ok, err := t.ApplyToken(); !ok {
        fmt.Println(err)
        return
    }
    go func() {
        atomic.AddInt64(&t.num, 1)
        defer atomic.AddInt64(&t.num, -1)
        job()
    }()
}
func main() {
    t := NewThrottle(time.Second, 10, 20) //每秒10次,同时最多20个routine存在
    for {
        t.Work(doWork)
    }
}

//真正的工做函数 假设每一个须要执行5秒
func doWork() {
    fmt.Println(time.Now())
    <-time.After(5 * time.Second)
}
相关文章
相关标签/搜索