方便的并发,是Golang的一大特点优点,而使用并发,对sync包的WaitGroup不会陌生。WaitGroup主要用来作Golang并发实例即Goroutine的等待,当使用go启动多个并发程序,经过waitgroup能够等待全部go程序结束后再执行后面的代码逻辑,好比:bash
func Main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(10 * time.Second)
}()
}
wg.Wait() // 等待在此,等全部go func里都执行了Done()才会退出
}
复制代码
WaitGroup对外提供三个方法,Add(int),Done()和Wait(), 其中Done()是调用了Add(-1),通常使用方法是,先统一Add,在goroutine里并发的Done,而后Wait。并发
WaitGroup主要维护了2个计数器,一个是请求计数器 v,一个是等待计数器 w,两者组成一个64bit的值,请求计数器占高32bit,等待计数器占低32bit。app
那么等待计数器拿来干吗?是由于同一个实例的Wait()方法支持多处调用,每一次Wait()方法执行,等待计数器 w 就会加1,而当请求计数器v为0触发Wait()时,要根据w的数量发送w份的信号量,正确的触发全部的Wait(),这虽然不是经常使用的一个特性,可是在一些特殊场合是有用处的(好比多个并发都依赖于WaitGroup的实例的结束信号来进行下一个action),演示代码以下:ui
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
}()
}
time.Sleep(2 * time.Second)
for j := 0; j < 3; j++ {
go func(i int) {
// 3个地方调用Wait(),经过等待j计时器,每一个Wati都会被hu唤醒
wg.Wait()
fmt.Println("wait done now ", i)
}(j)
}
time.Sleep(10 * time.Second)
return
}
/*
输出以下,数字出现的顺序随机
wait done now 1
wait done now 0
wait done now 2
*/
复制代码
同时,WaitGroup里还对使用逻辑进行了严格的检查,好比Wait()一旦开始不能Add().this
下面是带注释的代码,去掉了不影响代码逻辑的trace部分:atom
func (wg *WaitGroup) Add(delta int) {
statep := wg.state()
// 更新statep,statep将在wait和add中经过原子操做一块儿使用
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
// wait不等于0说明已经执行了Wait,此时不允许Add
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 正常状况,Add会让v增长,Done会让v减小,若是没有所有Done掉,此处v老是会大于0的,直到v为0才往下走
// 而w表明是有多少个goruntine在等待done的信号,wait中经过compareAndSwap对这个w进行加1
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state: // - Adds must not happen concurrently with Wait, // - Wait does not increment waiters if it sees counter == 0. // Still do a cheap sanity check to detect WaitGroup misuse. // 当v为0(Done掉了全部)或者w不为0(已经开始等待)才会到这里,可是在这个过程当中又有一次Add,致使statep变化,panic if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // Reset waiters count to 0. // 将statep清0,在Wait中经过这个值来保护信号量发出后还对这个Waitgroup进行操做 *statep = 0 // 将信号量发出,触发wait结束 for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false) } } // Done decrements the WaitGroup counter by one. func (wg *WaitGroup) Done() { wg.Add(-1) } // Wait blocks until the WaitGroup counter is zero. func (wg *WaitGroup) Wait() { statep := wg.state() for { state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state) if v == 0 { // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // Increment waiters count. // 若是statep和state相等,则增长等待计数,同时进入if等待信号量 // 此处作CAS,主要是防止多个goroutine里进行Wait()操做,每有一个goroutine进行了wait,等待计数就加1 // 若是这里不相等,说明statep,在 从读出来 到 CAS比较 的这个时间区间内,被别的goroutine改写了,那么不进入if,回去再读一次,这样写避免用锁,更高效些 if atomic.CompareAndSwapUint64(statep, state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(&wg.sema)) } // 等待信号量 runtime_Semacquire(&wg.sema) // 信号量来了,表明全部Add都已经Done if *statep != 0 { // 走到这里,说明在全部Add都已经Done后,触发信号量后,又被执行了Add panic("sync: WaitGroup is reused before previous Wait has returned") } return } } } 复制代码