针对Golang 1.9的sync.WaitGroup进行分析,与Golang 1.10基本同样除了将panic
改成了throw
以外其余的都同样。
源代码位置:sync\waitgroup.go
。golang
type WaitGroup struct { noCopy noCopy // noCopy能够嵌入到结构中,在第一次使用后不可复制,使用go vet做为检测使用 // 位值:高32位是计数器,低32位是goroution等待计数。 // 64位的原子操做须要64位的对齐,可是32位。编译器不能确保它,因此分配了12个byte对齐的8个byte做为状态。 state1 [12]byte // byte=uint8范围:0~255,只取前8个元素。转为2进制:0000 0000,0000 0000... ...0000 0000 sema uint32 // 信号量,用于唤醒goroution }
不知道你们是否和我同样,不管是使用Java的CountDownLatch仍是Golang的WaitGroup,都会疑问,能够装下多个线程|协程等待呢?看了源码后能够回答了,能够装下算法
1111 1111 1111 ... 1111 \________32___________/
2^32个辣么多!因此不须要担忧单机状况下会被撑爆了。数组
如下代码已经去掉了与核心代码无关的race代码。并发
添加或者减小等待goroutine的数量。函数
添加的delta,多是负的,到WaitGroup计数器。ui
func (wg *WaitGroup) Add(delta int) { // 获取到wg.state1数组中元素组成的二进制对应的十进制的值 statep := wg.state() // 高32位是计数器 state := atomic.AddUint64(statep, uint64(delta)<<32) // 获取计数器 v := int32(state >> 32) w := uint32(state) // 计数器为负数,报panic if v < 0 { panic("sync: negative WaitGroup counter") } // 添加与等待并发调用,报panic if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 计数器添加成功 if v > 0 || w == 0 { return } // 当等待计数器> 0时,而goroutine设置为0。 // 此时不可能有同时发生的状态突变: // - 增长不能与等待同时发生, // - 若是计数器counter == 0,再也不增长等待计数器 if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // Reset waiters count to 0. *statep = 0 for ; w != 0; w-- { // 目的是做为一个简单的wakeup原语,以供同步使用。true为唤醒排在等待队列的第一个goroutine runtime_Semrelease(&wg.sema, false) } }
// unsafe.Pointer其实就是相似C的void *,在golang中是用于各类指针相互转换的桥梁。 // uintptr是golang的内置类型,是能存储指针的整型,uintptr的底层类型是int,它和unsafe.Pointer可相互转换。 // uintptr和unsafe.Pointer的区别就是:unsafe.Pointer只是单纯的通用指针类型,用于转换不一样类型指针,它不能够参与指针运算; // 而uintptr是用于指针运算的,GC 不把 uintptr 当指针,也就是说 uintptr 没法持有对象,uintptr类型的目标会被回收。 // state()函数能够获取到wg.state1数组中元素组成的二进制对应的十进制的值 func (wg *WaitGroup) state() *uint64 { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)) } else { return (*uint64)(unsafe.Pointer(&wg.state1[4])) } }
至关于Add(-1)。atom
func (wg *WaitGroup) Done() { // 计数器减一 wg.Add(-1) }
执行阻塞,直到全部的WaitGroup数量变成0。线程
func (wg *WaitGroup) Wait() { // 获取到wg.state1数组中元素组成的二进制对应的十进制的值 statep := wg.state() // cas算法 for { state := atomic.LoadUint64(statep) // 高32位是计数器 v := int32(state >> 32) w := uint32(state) // 计数器为0,结束等待 if v == 0 { // Counter is 0, no need to wait. return } // 增长等待goroution计数,对低32位加1,不须要移位 if atomic.CompareAndSwapUint64(statep, state, state+1) { // 目的是做为一个简单的sleep原语,以供同步使用 runtime_Semacquire(&wg.sema) if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }