Go 1.13版本中有几个比较大的修改,其中之一是sync.Pool
修改了部分实现,减少某些极端状况下的性能开销。文中内容来源于笔者读完sync.Pool源代码的思考和总结,内容以Go 1.13中的实现为准,少许内容涉及到Go 1.13以前,若有误区请读者多多指教。编程
在本文内容开始以前须要理解几个在Go runtime中的概念,以便于更好的理解sync.Pool
中一些实现。数组
Go中调度器是GMP模型,简单理解G就是goroutine;M能够类比内核线程,是执行G的地方;P是调度G以及为G的执行准备所需资源。通常状况下,P的数量CPU的可用核心数,也可由runtime.GOMAXPROCS
指定。本文的重点并不是goroutine调度器,在此不作详细解释,感兴趣能够翻阅延伸阅读的文章。 缓存
Go有这样的调度规则:某个G不能一直占用M,在某个时刻的时候,runtime(参见sysmon
)会判断当前M是否能够被抢占,即M上正在执行的G让出。P在合理的时刻将G调度到合理的M上执行,在runtime里面,每一个P维护一个本地存放待执行G的队列localq,同时还存在一个全局的待执行G的队列globalq;调度就是P从localq或globalq中取出G到对应的M上执行,所谓抢占,runtime将G抢占移出运行状态,拷贝G的执行栈放入待执行队列中,多是某个P的localq,也多是globalq,等待下一次调度,所以当被抢占的G重回待执行队列时有可能此时的P与前一次运行的P并不是同一个。 数据结构
所谓禁止抢占,即当前执行G不容许被抢占调度,直到禁止抢占标记解除。Go runtime实现了G的禁止抢占与解除禁止抢占。架构
func runtime_procPin() int
禁止抢占,标记当前G在M上不会被抢占,并返回当前所在P的ID。并发
func runtime_procUnpin()
解除G的禁止抢占状态,以后G可被抢占。app
type poolDequeue struct { headTail uint64 vals []eface } type eface struct { typ, val unsafe.Pointer }
poolDequeue
被实现为单生产者多消费者的固定大小的无锁Ring式队列。生产者能够从head插入head删除,而消费者仅可从tail删除。headTail
指向了队列的头和尾,经过位运算将head和tail位置存入headTail
变量中。编程语言
func (d *poolDequeue) pushHead(val interface{}) bool { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // Ring式队列,头尾相等则队列已满 if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { return false } slot := &d.vals[head&uint32(len(d.vals)-1)] // 原子操做拿到slot.typ typ := atomic.LoadPointer(&slot.typ) if typ != nil { return false } if val == nil { val = dequeueNil(nil) } // slot占位,将val存入vals中 *(*interface{})(unsafe.Pointer(slot)) = val // 更改队列指向头 atomic.AddUint64(&d.headTail, 1<<dequeueBits) return true }
slot
,根据slot.typ
判断当前slot是否已被存放数据,注意这里使用了atomic.LoadPointer
取代锁操做。val
赋值给slot
,这里实现的比较巧妙,slot
是eface
类型,将slot
转为interface{}
类型,这样val
能以interface{}
赋值给slot
让slot.typ
和slot.val
指向其内存块,这样slot.typ
和slot.val
均不为空,这也就是第2点条件判断的来由。插入成功后head加1,指向队头的前一个空位,因为插入删除都涉及到对headTail
的修改,此处使用原子操做取代锁。func (d *poolDequeue) popHead() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 判断队列是否为空 if tail == head { return nil, false } // head位置是队头的前一个位置,因此此处要先退一位 head-- ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[head&uint32(len(d.vals)-1)] break } } // 取出val val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // 重置slot,typ和val均为nil *slot = eface{} return val, true }
popHead
的代码比较简单,流程上无非就是判断队列是否空,拿出slot
转换为interface{}
而后重置slot
。popHead
与pushHead
有一点须要注意,在popHead
中,是先修改了headTail
,而后再取slot
,而在pushHead
中是先插入数据,而后再修改headTail
,至于为何这里先留一个疑问,后面将会详细解释。函数
func (d *poolDequeue) popTail() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 判断队列是否空 if tail == head { return nil, false } ptrs2 := d.pack(head, tail+1) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[tail&uint32(len(d.vals)-1)] break } } // 取出val val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // 重置slot.typ slot.val = nil atomic.StorePointer(&slot.typ, nil) return val, true }
popTail
的代码相似popHead
,只是删除对象从队首变成队尾,注意结尾部分代码,使用了atomic.StorePointer
取代锁操做。性能
poolDequeue
被实现为Ring式队列,而poolChain
则是基于poolDequeue
实现为双向链表。
type poolChain struct { head *poolChainElt tail *poolChainElt } type poolChainElt struct { poolDequeue next, prev *poolChainElt }
同理,poolChain
也实现了pushHead
,popHead
和popTail
。
func (c *poolChain) pushHead(val interface{}) { d := c.head if d == nil { // poolDequeue初始长度为8 const initSize = 8 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } if d.pushHead(val) { return } // 前一个poolDequeue长度的2倍 newSize := len(d.vals) * 2 if newSize >= dequeueLimit { newSize = dequeueLimit } // 首尾相连,构成链表 d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) d2.pushHead(val) }
到这里大概就明白了,poolDequeue
是在poolChain
的pushHead
中建立的,且每次建立的长度都是前一个poolDequeue
长度的2倍,初始长度为8。
func (c *poolChain) popHead() (interface{}, bool) { d := c.head for d != nil { if val, ok := d.popHead(); ok { return val, ok } d = loadPoolChainElt(&d.prev) } return nil, false }
popHead
以队首向队尾为方向遍历链表,对每一个poolDequeue
执行popHead
尝试取出存放的对象。
func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) if d == nil { return nil, false } for { d2 := loadPoolChainElt(&d.next) if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { return nil, false } if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { storePoolChainElt(&d2.prev, nil) } d = d2 } }
popTail
以队尾向队队首为方向遍历链表,对每一个poolDequeue
执行popTail
尝试从尾部取出存放的对象。
宏观上来看,poolChain
的结构以下图:
poolChain
为sync.Pool的底层数据结构,接下来一览Pool的实现。
type Pool struct { // ... // 每一个P的本地队列,实际类型为[P]poolLocal local unsafe.Pointer // [P]poolLocal的大小,<= P localSize uintptr victim unsafe.Pointer victimSize uintptr // 自定义的对象建立回调,当pool中无可用对象时会调用此函数 New func() interface{} } type poolLocalInternal struct { // 每一个P的私有共享,使用时无需加锁 private interface{} // 对象列表,本地P能够pushHead/popHead,其余P仅能popTail shared poolChain } type poolLocal struct { poolLocalInternal // 伪共享,仅占位用,防止在cache line上分配多个poolLocalInternal pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte }
接下来看看Pool的具体实现。
func (p *Pool) pin() (*poolLocal, int) { pid := runtime_procPin() s := atomic.LoadUintptr(&p.localSize) l := p.local if uintptr(pid) < s { return indexLocal(l, pid), pid } return p.pinSlow() }
pin
首先标记了当前G禁止抢占,在runtime_procUnpin
以前,当前G和P不会被抢占。此处之因此标记禁止抢占是由于下文中有使用到P ID,若是被抢占了,有可能接下里使用的P ID与所绑定的P并不是同一个。
在得到P ID以后,当P ID小于p.local
数组长度时在p.local
数组里找到P对应的poolLocal
对象,不然进入pinSlow
函数建立新的poolLocal
。
func (p *Pool) pinSlow() (*poolLocal, int) { runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() s := p.localSize l := p.local if uintptr(pid) < s { return indexLocal(l, pid), pid } if p.local == nil { allPools = append(allPools, p) } // 当前P的数量 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) atomic.StoreUintptr(&p.localSize, uintptr(size)) return &local[pid], pid }
pinSlow
的实现比较简单,即当前P ID在Pool中没有对应的poolLocal
对象时,则建立一个新的poolLocal
对象,旧的poolLocal
将会进入GC。仔细观察pinSlow
函数发现先执行了runtime_procUnpin
,随后有执行了runtime_procPin
,后者的目的在于获取最新的P ID,这里的意图有点难以理解,在仔细阅读和理解以后,发现目的在于尽可能减小[]poolLocal
的建立次数,由于pinSlow
以前和pinSlow
里面可能会由于解除禁止抢占而致使绑定的P不一致,万一新绑定的P里存在可用的poolLocal
呢。
func (p *Pool) Get() interface{} { // ... l, pid := p.pin() x := l.private l.private = nil if x == nil { x, _ = l.shared.popHead() if x == nil { x = p.getSlow(pid) } } // 解除禁止抢占 runtime_procUnpin() // ... if x == nil && p.New != nil { x = p.New() } return x }
poolLocal
和P ID。poolLocal.private
不为空,则表示可复用此对象;若为空,则在poolLocal.shared
队列中获取对象。poolLocal.shared
无可用对象,则进入getSlow
获取对象。nil
。func (p *Pool) getSlow(pid int) interface{} { size := atomic.LoadUintptr(&p.localSize) locals := p.local // 从其余P中窃取对象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // 尝试从victim cache中取对象 size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // 清空victim cache atomic.StoreUintptr(&p.victimSize, 0) return nil }
进入getSlow
的前提是当前P本地无可用对象,因而转头去其余P的里窃取对象,getSlow
就是作这件事情。
Pool.local
中的其余P,确认其余P的shared
里是否有可用对象,若是有,则从链尾取出。纵观整个Get过程会发现,从当前P的poolLocal
中取对象时使用的时popHead
,而从其余P的poolLocal
中窃取对象时使用的时popTail
,再回到上文中对poolChain
的定义,能够知道,当前P对本地poolLocal是生产者,对其余P的poolLocal而言是消费者。
再次回到poolDequeue
和poolChain
上。咱们知道某一时刻P只会调度一个G,那么对于生产者而言,调用pushHead
和popHead
并不须要加锁,由于当前P操做的是本地poolLocal
;当消费者是其余P,在进行popTail
操做时,则会对pushHead
以及popHead
造成竞争关系,对这种问题,poolDequeue
的实现直指要害。
首先注意eface
这个结构,若插入成功eface
下的两个字段会指向要缓存对象的内存地址,在pushHead
中使用了原子操做判断typ
字段是否为nil
,存在这样一种可能性:pushHead
所取到的slot
正在popTail
里准备重置,这种状况下pushHead
会直接返回失败。
回到竞争问题上,pushHead
的流程能够简化为先取slot
,再判断是否可插入最后修改headTail
,而popTail
的流程能够简化为先修改headTail
再取slot
而后重置slot
,pushHead
修改head位置,popTail
修改tail位置,因此对于headTail
字段使用原子操做避免便可读写冲突。
疑问是为什么popTail
中须要先修改headTail
呢,由于存在其余P都会到当前P上窃取对象,当多个P都调用本地P的popTail
时,竞争现象就会更加明显,因此此时应尽早修改headTail
,一旦某个P窃取到了其余P就没法再窃取此对象。
func (p *Pool) Put(x interface{}) { if x == nil { return } // ... l, _ := p.pin() if l.private == nil { l.private = x x = nil } if x != nil { l.shared.pushHead(x) } runtime_procUnpin() // ... }
Put
的实现比较简单,优先将对象存入private
,若private
已存在则放入shared链表中,pin
中会标记禁止抢占,所以须要在pin
结束以及Put逻辑结束后取消禁止抢占。
Victim Cache本是计算机架构里面的一个概念,是CPU硬件处理缓存的一种技术,sync.Pool
引入的意图在于下降GC压力同时提升命中率,本文并不须要详解Victim Cache的原理,分析sync.Pool
便可明白其意图。对于Pool而言有一点须要明白,这个Pool并不是是无限制扩展的,不然会引发内存溢出。几乎全部的池技术中,都会在某个时刻清空或清除部分缓存对象,那么在Go中什么时候清理未使用的对象呢?
在Pool.Get函数中,取不到对象时会尝试从p.victim
中取,用完后放回当前P的本地队列,而p.vcitim
是什么被建立的呢?是在poolCleanup
函数中,该函数会在GC时被调用到,在init
函数里注册。
func poolCleanup() { // 1 for _, p := range oldPools { p.victim = nil p.victimSize = 0 } // 2 for _, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 } oldPools, allPools = allPools, nil }
poolCleanup
会在STW阶段被调用,函数实现虽然看起来简单,但其意图较为复杂,那么该如何解释呢?
尝试模拟一下实际状况:
oldPools
和allPools
均为nil
p.local
为nil
,将会在pinSlow
中建立p.local
,而后将p
放入allPools
,此时allPools
长度为1,oldPools
为nil
allPools
中全部p.local
将值赋值给victim
并置为nil
,最后allPools
为nil
,oldPools
长度为1p.local
为nil
,此时会从p.victim
里面尝试取对象p.local
为nil
,从新建立p.local
,并将对象放回,此时allPools
长度为1,oldPools
长度为1oldPools
中全部p.victim
置nil
,前一次的cache在本次GC时被回收,allPools
全部p.local
将值赋值给victim
并置为nil
,最后allPools
为nil,oldPools
长度为1再来看看Go 1.13之前poolCleanup
的实现:
func poolCleanup() { for i, p := range allPools { allPools[i] = nil for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local, i) l.private = nil for j := range l.shared { l.shared[j] = nil } l.shared = nil } p.local = nil p.localSize = 0 } allPools = []*Pool{} }
Go 1.13之前poolCleanup
的实现简单粗暴,每次GC STW阶段遍历allPools
,清空p.local
、poolLocal.shared
。
经过二者的对比发现,新版的实现相比Go 1.13以前,GC的粒度拉大了,因为实际回收的时间线拉长,单位时间内GC的开销减少。
由此基本明白p.victim
的做用,它的定位是次级缓存,GC时将对象放入其中,下一次GC来临以前若是有Get调用则会从p.victim
中取,直到再一次GC来临作回收,同时因为从p.victim
中取出对象使用完毕以后并未放回p.victim
中,在必定程度也减少了下一次GC的开销。原来1次GC的开销被拉长到2次且会有必定程度的开销减少,这就是p.victim
引入的意图。
关于Victim Cache更多的信息能够在延伸阅读中找到。
无锁编程是不少编程语言里逃离不了的话题。sync.Pool
的无锁是在poolDequeue
和poolChain
层面实现的。
纵观整个sync.Pool
的实现,明确了生产者(本地P)访问head,消费者(其余P)访问tail,从P的角度切入操做方向,实现了目标操做对象层面的“解耦”,大部分时候二者的操做互不影响。图文示意以下:
poolDequeue
对一些关键变量采用了CAS操做,好比poolDequeue.headTail
,既可完整保证并发又能下降相比锁而言的开销。
这点与“操做对象隔离”是相辅相成的,一旦设计目标为尽可能减小对同一对象的操做锁,就须要对行为进行隔离,链表能很好的知足这个设计目标:特定的P访问特定的位置。从整个过程来看,链表是减小锁的高效数据结构。
GC的开销已经足够足够小了,但仍不可避免。对于sync.Pool
而言,避免极端状况GC的开销也是重点之一,因此Go 1.13的sync.Pool
引入了Victim Cache机制,有效拉长真正回收的时间线,从而减少单次GC的开销。