go - 更为安全的使用 sync.Map 组件

go 内置了协程安全的 sync 包来方便咱们同步各协程之间的执行状态,使用起来也很是方便。安全

最近在排查解决一个线下服务的数据同步问题,review 核心代码后,发现这么一段流程控制代码。并发

错误示例ui

package main

import (
    "log"
    "runtime"
    "sync"
)

func main() {
    // 可并行也是重点,生产场景没几个单核的吧?? 
    runtime.GOMAXPROCS(runtime.NumCPU())
    waitGrp := &sync.WaitGroup{}
    waitGrp.Add(1)

    syncTaskProcessMap := &sync.Map{}
    for i := 0; i < 100; i++ {
        syncTaskProcessMap.Store(i, i)
    }

    for j := 0; j < 100; j++ {
        go func(j int) {
            // 协程可能并行抢占一轮开始
            syncTaskProcessMap.Delete(j)
            // 协程可能并行抢占一轮结束
            // 在当前协程 Delete 后 Range 前 又被其余协程 Delete 操做了
            
            syncTaskProcessCount := 0
            syncTaskProcessMap.Range(func(key, value interface{}) bool {
                syncTaskProcessCount++
                return true
            })
            
            if syncTaskProcessCount == 0 {
                log.Println(GetGoroutineID(), "syncTaskProcessMap empty, start syncOnline", syncTaskProcessCount)
            }
        }(j)
    }
    
    waitGrp.Wait()
}

func GetGoroutineID() uint64 {
    b := make([]byte, 64)
    runtime.Stack(b, false)
    b = bytes.TrimPrefix(b, []byte("goroutine "))
    b = b[:bytes.IndexByte(b, ' ')]
    n, _ := strconv.ParseUint(string(b), 10, 64)
    return n
}

代码的本意,是在 i 个协程并发的执行完成后,启动一次 nextProcess 任务,代码使用了 sync.Map 来维护和同步 i 个协程的执行进度,防止多协程并发形成的 map 不安全读写。当最后一个协程执行完毕,sync.Map 为空,启动一次 nextProcess。但能读到状态值 syncTaskProcessCount0 的协程,只会是 最后一个 执行完成的协程吗?日志

sync.Map::Store\Load\Delete\Range 都是协程安全的操做,在调用期间只会被当前 协程 抢占访问,但它们的组合操做并非 独占 的,上面的代码认为,Delete && Range 两项操做期间 不会 夹带其余协程对 sync.Map 读写操做,致使能读到 syncTaskProcessCount0 的协程可能不止最后一个执行完毕的。code

多执行几回,可能获得一下输出:协程

sqrtcat:demo$ go run test.go 
2021/04/20 14:30:27 114 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:30 117 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:30 116 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:33 117 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:35 117 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:35 118 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:35 115 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:38 131 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:38 132 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt

能够看到,syncTaskProcessMap empty 的状态被多个协程读到了。
G117,G118,G115 在多核场景下肯能 并行 执行。队列

  1. SyncMapG117 抢占,Delete 后 2,SyncMap 被释放。
  2. SyncMapG118 抢占,Delete 后 1,SyncMap 被释放。
  3. SyncMapG115 抢占,Delete 后 0,SyncMap 被释放。
  4. 这时的 syncMap 已然为空,G11七、G11八、G115 继续 Range 获得的 syncTaskProcessCount 都为 0,这样就致使了代码执行与指望不一样了。

因此,虽然 sync.Map 的单一操做是自动加锁的排他操做,但组合在一块儿就不是了,咱们要自行在 code section 上加锁。同步

正确示例string

package main

import (
    "log"
    "runtime"
    "sync"
)

// 错误代码示例
func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    
    syncMutex := &sync.Mutex{}
    
    waitGrp := &sync.WaitGroup{}
    waitGrp.Add(1)

    syncTaskProcessMap := &sync.Map{}
    for i := 0; i < 100; i++ {
        syncTaskProcessMap.Store(i, i)
    }

    for j := 0; j < 100; j++ {
        go func(j int) {
            // 保证协程对 syncMap 的组合操做也是独占的
            // 将可能的并行操做顺序化
            syncMutex.Lock()
            defer syncMutex.Unlock()
            
            syncTaskProcessMap.Delete(j)
            
            syncTaskProcessCount := 0
            syncTaskProcessMap.Range(func(key, value interface{}) bool {
                syncTaskProcessCount++
                return true
            })
            
            if syncTaskProcessCount == 0 {
                log.Println(GetGoroutineID(), "syncTaskProcessMap empty, start syncOnline", syncTaskProcessCount)
            }
        }(j)
    }
    
    waitGrp.Wait()
}

func GetGoroutineID() uint64 {
    b := make([]byte, 64)
    runtime.Stack(b, false)
    b = bytes.TrimPrefix(b, []byte("goroutine "))
    b = b[:bytes.IndexByte(b, ' ')]
    n, _ := strconv.ParseUint(string(b), 10, 64)
    return n
}

协程并行it

多核 的平台上,分配在不一样 时间片队列 上的协程是能够 并行 执行的,相同 时间片队列 上的协程是 并发 执行的

func main() {
    // 这行代码将会影响子协程里的日志输出量
    runtime.GOMAXPROCS(runtime.NumCPU())
    waitChan := make(chan int)

    go func() {
        defer func() {
            log.Println(GetGoroutineID(), "sub defer")
        }()
        log.Println(GetGoroutineID(), "sub start")
        waitChan <- 1
        log.Println(GetGoroutineID(), "sub finish")
    }()

    log.Println(GetGoroutineID(), "main start")
    log.Println(<-waitChan)
    log.Println(GetGoroutineID(), "main finish")
}
  1. 若是 mainsub 分配在了同一个 cpu 上 或只有一个 cpumain startwaitChan 读阻塞了 mainsub 开始执行,sub start,写入 waitChan,后续也没有触发协程切换的代码段,继续执行 sub finish sub defer 退出,交出 时间片main 继续执行 main finish
  2. 若是 mainsub 分配在了不一样 cpu 上,当 waitChan 阻塞了 cpu1 上的 main,而 subcpu2 执行了 写入waitChan 后,main 可能会被 cpu1 当即继续执行,主协程 main 退出,sub 也会被终止执行,后面的日志打印可能就执行不到了。
sqrtcat:demo$ go run test.go 
2021/04/20 15:26:42 5 sub start
2021/04/20 15:26:42 1 main start
2021/04/20 15:26:42 1
2021/04/20 15:26:42 1 main finish
2021/04/20 15:26:42 5 sub finish
相关文章
相关标签/搜索