有没感受
Go
的sync
包不够用?有没遇到类型没有sync/atomic
支持?git咱们一块儿看看
go-zero
的syncx
包对标准库的一些增值补充。github
name | 做用 |
---|---|
AtomicBool | bool类型 原子类 |
AtomicDuration | Duration有关 原子类 |
AtomicFloat64 | float64类型 原子类 |
Barrier | 栏栅【将加锁解锁包装】 |
Cond | 条件变量 |
DoneChan | 优雅通知关闭 |
ImmutableResource | 建立后不会修改的资源 |
Limit | 控制请求数 |
LockedCalls | 确保方法的串行调用 |
ManagedResource | 资源管理 |
Once | 提供 once func |
OnceGuard | 一次性使用的资源管理 |
Pool | pool,简单的池 |
RefResource | 引用计数的资源 |
ResourceManager | 资源管理器 |
SharedCalls | 相似 singflight 的功能 |
SpinLock | 自旋锁:自旋+CAS |
TimeoutLimit | Limit + timeout 控制 |
下面开始对以上库组件作分别介绍。缓存
由于没有 泛型 支持,因此才会出现多种类型的原子类支持。如下采用 float64
做为例子:微信
func (f *AtomicFloat64) Add(val float64) float64 { for { old := f.Load() nv := old + val if f.CompareAndSwap(old, nv) { return nv } } } func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool { return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val)) } func (f *AtomicFloat64) Load() float64 { return math.Float64frombits(atomic.LoadUint64((*uint64)(f))) } func (f *AtomicFloat64) Set(val float64) { atomic.StoreUint64((*uint64)(f), math.Float64bits(val)) }
Add(val)
:若是 CAS
失败,不断for循环重试,获取 old val,并set old+val;CompareAndSwap(old, new)
:调用底层 atomic
的 CAS
;Load()
:调用 atomic.LoadUint64
,而后转换Set(val)
:调用 atomic.StoreUint64
至于其余类型,开发者想本身扩展本身想要的类型,能够依照上述,基本上调用原始 atomic
操做,而后转换为须要的类型,好比:遇到 bool
能够借助 0, 1
来分辨对应的 false, true
。网络
这里 Barrier
只是将业务函数操做封装,做为闭包传入,内部将 lock
操做的加锁解锁自行解决了【防止开发者加锁了忘记解锁】session
func (b *Barrier) Guard(fn func()) { b.lock.Lock() defer b.lock.Unlock() // 本身的业务逻辑 fn() }
这个数据结构和 Limit
一块儿组成了 TimeoutLimit
,这里将这3个一块儿讲:数据结构
func NewTimeoutLimit(n int) TimeoutLimit { return TimeoutLimit{ limit: NewLimit(n), cond: NewCond(), } } func NewLimit(n int) Limit { return Limit{ pool: make(chan lang.PlaceholderType, n), } }
limit
这里是有缓冲的 channel
;cond
是无缓冲的;因此这里结合名字来理解:由于 Limit
是限制某一种资源的使用,因此须要预先在资源池中放入预置数量的资源;Cond
相似阀门,须要两边都准备好,才能进行数据交换,因此使用无缓冲,同步控制。闭包
这里咱们看看 stores/mongo
中关于 session
的管理,来理解 资源控制:并发
func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) { // 选项参数注入 ... // 看 limit 中是否还能取出资源 if err := cs.limit.Borrow(o.timeout); err != nil { return nil, err } else { return cs.Copy(), nil } } func (l TimeoutLimit) Borrow(timeout time.Duration) error { // 1. 若是还有 limit 中还有资源,取出一个,返回 if l.TryBorrow() { return nil } // 2. 若是 limit 中资源已经用完了 var ok bool for { // 只有 cond 能够取出一个【无缓存,也只有 cond <- 此条才能经过】 timeout, ok = l.cond.WaitWithTimeout(timeout) // 尝试取出一个【上面 cond 经过时,就有一个资源返回了】 // 看 `Return()` if ok && l.TryBorrow() { return nil } // 超时控制 if timeout <= 0 { return ErrTimeout } } } func (l TimeoutLimit) Return() error { // 返回去一个资源 if err := l.limit.Return(); err != nil { return err } // 同步通知另外一个须要资源的协程【实现了阀门,两方交换】 l.cond.Signal() return nil }
同文件夹中还有 ResourceManager
,从名字上相似,这里将两个组件放在一块儿讲解。
先从结构上:
type ManagedResource struct { // 资源 resource interface{} lock sync.RWMutex // 生成资源的逻辑,由开发者本身控制 generate func() interface{} // 对比资源 equals func(a, b interface{}) bool } type ResourceManager struct { // 资源:这里看得出来是 I/O, resources map[string]io.Closer sharedCalls SharedCalls // 对资源map互斥访问 lock sync.RWMutex }
而后来看获取资源的方法签名:
func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error) // 获取一个资源(有就直接获取,没有生成一个) func (mr *ManagedResource) Take() interface{} // 判断这个资源是否不符合传入的判断要求,不符合则重置 func (mr *ManagedResource) MarkBroken(resource interface{})
ResourceManager
使用 SharedCalls
作防重复请求,并将资源缓存在内部的 sourMap
;另外传入的 create func
和 IO
操做有关,常见用在网络资源的缓存;ManagedResource
缓存资源没有 map
而是单一的 interface
,说明只有一份,可是它提供了 Take()
和传入 generate()
说明可让开发者自行更新 resource
;因此在用途上:
ResourceManager
:用在网络资源的管理。如:数据库链接管理;ManagedResource
:用在一些变化资源,能够作资源先后对比,达到更新资源。如:token
管理和验证这个就和 GC
中引用计数相似:
Use() -> ref++
Clean() -> ref--; if ref == 0 -> ref clean
func (r *RefResource) Use() error { // 互斥访问 r.lock.Lock() defer r.lock.Unlock() // 清除标记 if r.cleaned { return ErrUseOfCleaned } // 引用 +1 r.ref++ return nil }
一句话形容:使用SharedCalls能够使得同时多个请求只须要发起一次拿结果的调用,其余请求"不劳而获",这种设计有效减小了资源服务的并发压力,能够有效防止缓存击穿。
这个组件被反复应用在其余组件中,上面说的 ResourceManager
。
相似当须要高频并发访问一个资源时,就能够使用 SharedCalls
缓存。
// 当多个请求同时使用Do方法请求资源时 func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) { // 先申请加锁 g.lock.Lock() // 根据key,获取对应的call结果,并用变量c保存 if c, ok := g.calls[key]; ok { // 拿到call之后,释放锁,此处call可能尚未实际数据,只是一个空的内存占位 g.lock.Unlock() // 调用wg.Wait,判断是否有其余goroutine正在申请资源,若是阻塞,说明有其余goroutine正在获取资源 c.wg.Wait() // 当wg.Wait再也不阻塞,表示资源获取已经结束,能够直接返回结果 return c.val, c.err } // 没有拿到结果,则调用makeCall方法去获取资源,注意此处仍然是锁住的,能够保证只有一个goroutine能够调用makecall c := g.makeCall(key, fn) // 返回调用结果 return c.val, c.err }
不重复造轮子,一直是 go-zero
设计主旨之一;也同时将平时业务沉淀到组件中,这才是框架和组件的意义。
关于 go-zero
更多的设计和实现文章,能够持续关注咱们。欢迎你们去关注和使用。
https://github.com/tal-tech/go-zero
欢迎使用 go-zero 并 star 支持咱们!
关注『微服务实践』公众号并回复 进群 获取社区群二维码。
go-zero 系列文章见『微服务实践』公众号