go
内置了协程安全的 sync
包来方便咱们同步各协程之间的执行状态,使用起来也很是方便。安全
最近在排查解决一个线下服务的数据同步问题,review
核心代码后,发现这么一段流程控制代码。并发
错误示例
ui
package main import ( "log" "runtime" "sync" ) func main() { // 可并行也是重点,生产场景没几个单核的吧?? runtime.GOMAXPROCS(runtime.NumCPU()) waitGrp := &sync.WaitGroup{} waitGrp.Add(1) syncTaskProcessMap := &sync.Map{} for i := 0; i < 100; i++ { syncTaskProcessMap.Store(i, i) } for j := 0; j < 100; j++ { go func(j int) { // 协程可能并行抢占一轮开始 syncTaskProcessMap.Delete(j) // 协程可能并行抢占一轮结束 // 在当前协程 Delete 后 Range 前 又被其余协程 Delete 操做了 syncTaskProcessCount := 0 syncTaskProcessMap.Range(func(key, value interface{}) bool { syncTaskProcessCount++ return true }) if syncTaskProcessCount == 0 { log.Println(GetGoroutineID(), "syncTaskProcessMap empty, start syncOnline", syncTaskProcessCount) } }(j) } waitGrp.Wait() } func GetGoroutineID() uint64 { b := make([]byte, 64) runtime.Stack(b, false) b = bytes.TrimPrefix(b, []byte("goroutine ")) b = b[:bytes.IndexByte(b, ' ')] n, _ := strconv.ParseUint(string(b), 10, 64) return n }
代码的本意,是在 i
个协程并发的执行完成后,启动一次 nextProcess
任务,代码使用了 sync.Map
来维护和同步 i
个协程的执行进度,防止多协程并发形成的 map
不安全读写。当最后一个协程执行完毕,sync.Map
为空,启动一次 nextProcess
。但能读到状态值 syncTaskProcessCount
为 0
的协程,只会是 最后一个
执行完成的协程吗?日志
sync.Map::Store\Load\Delete\Range
都是协程安全的操做,在调用期间只会被当前 协程
抢占访问,但它们的组合操做并非 独占
的,上面的代码认为,Delete && Range
两项操做期间 不会
夹带其余协程对 sync.Map
读写操做,致使能读到 syncTaskProcessCount
为 0
的协程可能不止最后一个执行完毕的。code
多执行几回,可能获得一下输出:协程
sqrtcat:demo$ go run test.go 2021/04/20 14:30:27 114 syncTaskProcessMap empty, start syncOnline 0 ^Csignal: interrupt sqrtcat:demo$ go run test.go 2021/04/20 14:30:30 117 syncTaskProcessMap empty, start syncOnline 0 2021/04/20 14:30:30 116 syncTaskProcessMap empty, start syncOnline 0 ^Csignal: interrupt sqrtcat:demo$ go run test.go 2021/04/20 14:30:33 117 syncTaskProcessMap empty, start syncOnline 0 ^Csignal: interrupt sqrtcat:demo$ go run test.go 2021/04/20 14:30:35 117 syncTaskProcessMap empty, start syncOnline 0 2021/04/20 14:30:35 118 syncTaskProcessMap empty, start syncOnline 0 2021/04/20 14:30:35 115 syncTaskProcessMap empty, start syncOnline 0 ^Csignal: interrupt sqrtcat:demo$ go run test.go 2021/04/20 14:30:38 131 syncTaskProcessMap empty, start syncOnline 0 2021/04/20 14:30:38 132 syncTaskProcessMap empty, start syncOnline 0 ^Csignal: interrupt
能够看到,syncTaskProcessMap empty
的状态被多个协程读到了。G117
,G118
,G115
在多核场景下肯能 并行
执行。队列
SyncMap
被 G117
抢占,Delete
后 2,SyncMap
被释放。SyncMap
被 G118
抢占,Delete
后 1,SyncMap
被释放。SyncMap
被 G115
抢占,Delete
后 0,SyncMap
被释放。syncMap
已然为空,G11七、G11八、G115
继续 Range
获得的 syncTaskProcessCount
都为 0
,这样就致使了代码执行与指望不一样了。因此,虽然 sync.Map
的单一操做是自动加锁的排他操做,但组合在一块儿就不是了,咱们要自行在 code section
上加锁。同步
正确示例
string
package main import ( "log" "runtime" "sync" ) // 错误代码示例 func main() { runtime.GOMAXPROCS(runtime.NumCPU()) syncMutex := &sync.Mutex{} waitGrp := &sync.WaitGroup{} waitGrp.Add(1) syncTaskProcessMap := &sync.Map{} for i := 0; i < 100; i++ { syncTaskProcessMap.Store(i, i) } for j := 0; j < 100; j++ { go func(j int) { // 保证协程对 syncMap 的组合操做也是独占的 // 将可能的并行操做顺序化 syncMutex.Lock() defer syncMutex.Unlock() syncTaskProcessMap.Delete(j) syncTaskProcessCount := 0 syncTaskProcessMap.Range(func(key, value interface{}) bool { syncTaskProcessCount++ return true }) if syncTaskProcessCount == 0 { log.Println(GetGoroutineID(), "syncTaskProcessMap empty, start syncOnline", syncTaskProcessCount) } }(j) } waitGrp.Wait() } func GetGoroutineID() uint64 { b := make([]byte, 64) runtime.Stack(b, false) b = bytes.TrimPrefix(b, []byte("goroutine ")) b = b[:bytes.IndexByte(b, ' ')] n, _ := strconv.ParseUint(string(b), 10, 64) return n }
协程并行it
在 多核
的平台上,分配在不一样 时间片队列
上的协程是能够 并行
执行的,相同 时间片队列
上的协程是 并发
执行的
func main() { // 这行代码将会影响子协程里的日志输出量 runtime.GOMAXPROCS(runtime.NumCPU()) waitChan := make(chan int) go func() { defer func() { log.Println(GetGoroutineID(), "sub defer") }() log.Println(GetGoroutineID(), "sub start") waitChan <- 1 log.Println(GetGoroutineID(), "sub finish") }() log.Println(GetGoroutineID(), "main start") log.Println(<-waitChan) log.Println(GetGoroutineID(), "main finish") }
main
和 sub
分配在了同一个 cpu
上 或只有一个 cpu
,main start
,waitChan
读阻塞了 main
,sub
开始执行,sub start
,写入 waitChan
,后续也没有触发协程切换的代码段,继续执行 sub finish
sub defer
退出,交出 时间片
,main
继续执行 main finish
。main
和 sub
分配在了不一样 cpu
上,当 waitChan
阻塞了 cpu1
上的 main
,而 sub
被 cpu2
执行了 写入waitChan
后,main
可能会被 cpu1
当即继续执行,主协程 main
退出,sub
也会被终止执行,后面的日志打印可能就执行不到了。sqrtcat:demo$ go run test.go 2021/04/20 15:26:42 5 sub start 2021/04/20 15:26:42 1 main start 2021/04/20 15:26:42 1 2021/04/20 15:26:42 1 main finish 2021/04/20 15:26:42 5 sub finish