更简的并发代码,更强的并发控制

有没感受 Gosync 包不够用?有没遇到类型没有 sync/atomic 支持?git

咱们一块儿看看 go-zerosyncx 包对标准库的一些增值补充。github

https://github.com/tal-tech/g...数据库

name 做用
AtomicBool bool类型 原子类
AtomicDuration Duration有关 原子类
AtomicFloat64 float64类型 原子类
Barrier 栏栅【将加锁解锁包装】
Cond 条件变量
DoneChan 优雅通知关闭
ImmutableResource 建立后不会修改的资源
Limit 控制请求数
LockedCalls 确保方法的串行调用
ManagedResource 资源管理
Once 提供 once func
OnceGuard 一次性使用的资源管理
Pool pool,简单的池
RefResource 引用计数的资源
ResourceManager 资源管理器
SharedCalls 相似 singflight 的功能
SpinLock 自旋锁:自旋+CAS
TimeoutLimit Limit + timeout 控制

下面开始对以上库组件作分别介绍。缓存

atomic

由于没有 泛型 支持,因此才会出现多种类型的原子类支持。如下采用 float64 做为例子:微信

func (f *AtomicFloat64) Add(val float64) float64 {
    for {
        old := f.Load()
        nv := old + val
        if f.CompareAndSwap(old, nv) {
            return nv
        }
    }
}

func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool {
    return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val))
}

func (f *AtomicFloat64) Load() float64 {
    return math.Float64frombits(atomic.LoadUint64((*uint64)(f)))
}

func (f *AtomicFloat64) Set(val float64) {
    atomic.StoreUint64((*uint64)(f), math.Float64bits(val))
}
  • Add(val):若是 CAS 失败,不断for循环重试,获取 old val,并set old+val;
  • CompareAndSwap(old, new):调用底层 atomicCAS
  • Load():调用 atomic.LoadUint64 ,而后转换
  • Set(val):调用 atomic.StoreUint64

至于其余类型,开发者想本身扩展本身想要的类型,能够依照上述,基本上调用原始 atomic 操做,而后转换为须要的类型,好比:遇到 bool 能够借助 0, 1 来分辨对应的 false, true网络

Barrier

这里 Barrier 只是将业务函数操做封装,做为闭包传入,内部将 lock 操做的加锁解锁自行解决了【防止开发者加锁了忘记解锁】session

func (b *Barrier) Guard(fn func()) {
    b.lock.Lock()
    defer b.lock.Unlock()
  // 本身的业务逻辑
    fn()
}

Cond/Limit/TimeoutLimit

这个数据结构和 Limit 一块儿组成了 TimeoutLimit ,这里将这3个一块儿讲:数据结构

func NewTimeoutLimit(n int) TimeoutLimit {
    return TimeoutLimit{
        limit: NewLimit(n),
        cond:  NewCond(),
    }
}

func NewLimit(n int) Limit {
    return Limit{
        pool: make(chan lang.PlaceholderType, n),
    }
}
  • limit 这里是有缓冲的 channel
  • cond 是无缓冲的;

因此这里结合名字来理解:由于 Limit 是限制某一种资源的使用,因此须要预先在资源池中放入预置数量的资源;Cond 相似阀门,须要两边都准备好,才能进行数据交换,因此使用无缓冲,同步控制。闭包

这里咱们看看 stores/mongo 中关于 session 的管理,来理解 资源控制:并发

func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) {
  // 选项参数注入
    ...
  // 看 limit 中是否还能取出资源
    if err := cs.limit.Borrow(o.timeout); err != nil {
        return nil, err
    } else {
        return cs.Copy(), nil
    }
}

func (l TimeoutLimit) Borrow(timeout time.Duration) error {
  // 1. 若是还有 limit 中还有资源,取出一个,返回
    if l.TryBorrow() {
        return nil
    }
    // 2. 若是 limit 中资源已经用完了
    var ok bool
    for {
    // 只有 cond 能够取出一个【无缓存,也只有 cond <- 此条才能经过】
        timeout, ok = l.cond.WaitWithTimeout(timeout)
    // 尝试取出一个【上面 cond 经过时,就有一个资源返回了】
    // 看 `Return()`
        if ok && l.TryBorrow() {
            return nil
        }
        // 超时控制
        if timeout <= 0 {
            return ErrTimeout
        }
    }
}

func (l TimeoutLimit) Return() error {
  // 返回去一个资源
    if err := l.limit.Return(); err != nil {
        return err
    }
    // 同步通知另外一个须要资源的协程【实现了阀门,两方交换】
    l.cond.Signal()
    return nil
}

资源管理

同文件夹中还有 ResourceManager,从名字上相似,这里将两个组件放在一块儿讲解。

先从结构上:

type ManagedResource struct {
  // 资源
    resource interface{}
    lock     sync.RWMutex
  // 生成资源的逻辑,由开发者本身控制
    generate func() interface{}
  // 对比资源
    equals   func(a, b interface{}) bool
}

type ResourceManager struct {
  // 资源:这里看得出来是 I/O,
    resources   map[string]io.Closer
    sharedCalls SharedCalls
  // 对资源map互斥访问
    lock        sync.RWMutex
}

而后来看获取资源的方法签名:

func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error)

// 获取一个资源(有就直接获取,没有生成一个)
func (mr *ManagedResource) Take() interface{}
// 判断这个资源是否不符合传入的判断要求,不符合则重置
func (mr *ManagedResource) MarkBroken(resource interface{})
  1. ResourceManager 使用 SharedCalls 作防重复请求,并将资源缓存在内部的 sourMap;另外传入的 create funcIO 操做有关,常见用在网络资源的缓存;
  2. ManagedResource 缓存资源没有 map 而是单一的 interface ,说明只有一份,可是它提供了 Take() 和传入 generate()说明可让开发者自行更新 resource

因此在用途上:

  • ResourceManager:用在网络资源的管理。如:数据库链接管理;
  • ManagedResource:用在一些变化资源,能够作资源先后对比,达到更新资源。如:token 管理和验证

RefResource

这个就和 GC 中引用计数相似:

  • Use() -> ref++
  • Clean() -> ref--; if ref == 0 -> ref clean
func (r *RefResource) Use() error {
  // 互斥访问
    r.lock.Lock()
    defer r.lock.Unlock()
    // 清除标记
    if r.cleaned {
        return ErrUseOfCleaned
    }
    // 引用 +1
    r.ref++
    return nil
}

SharedCalls

一句话形容:使用SharedCalls能够使得同时多个请求只须要发起一次拿结果的调用,其余请求"不劳而获",这种设计有效减小了资源服务的并发压力,能够有效防止缓存击穿

这个组件被反复应用在其余组件中,上面说的 ResourceManager

相似当须要高频并发访问一个资源时,就能够使用 SharedCalls 缓存。

// 当多个请求同时使用Do方法请求资源时
func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  // 先申请加锁
  g.lock.Lock()

  // 根据key,获取对应的call结果,并用变量c保存
  if c, ok := g.calls[key]; ok {
    // 拿到call之后,释放锁,此处call可能尚未实际数据,只是一个空的内存占位
    g.lock.Unlock()
    // 调用wg.Wait,判断是否有其余goroutine正在申请资源,若是阻塞,说明有其余goroutine正在获取资源
    c.wg.Wait()
    // 当wg.Wait再也不阻塞,表示资源获取已经结束,能够直接返回结果
    return c.val, c.err
  }

  // 没有拿到结果,则调用makeCall方法去获取资源,注意此处仍然是锁住的,能够保证只有一个goroutine能够调用makecall
  c := g.makeCall(key, fn)
  // 返回调用结果
  return c.val, c.err
}

总结

不重复造轮子,一直是 go-zero 设计主旨之一;也同时将平时业务沉淀到组件中,这才是框架和组件的意义。

关于 go-zero 更多的设计和实现文章,能够持续关注咱们。欢迎你们去关注和使用。

项目地址

https://github.com/tal-tech/go-zero

欢迎使用 go-zero 并 star 支持咱们!

微信交流群

关注『微服务实践』公众号并回复 进群 获取社区群二维码。

go-zero 系列文章见『微服务实践』公众号
相关文章
相关标签/搜索