Golang 并发编程与同步原语

浅谈 Go 语言实现原理

原文连接:https://draveness.me/golang/c...html

当提到并发编程、多线程编程时,咱们每每都离不开『锁』这一律念,Go 语言做为一个原生支持用户态进程 Goroutine 的语言,也必定会为开发者提供这一功能,锁的主要做用就是保证多个线程或者 Goroutine 在访问同一片内存时不会出现混乱的问题,锁实际上是一种并发编程中的同步原语(Synchronization Primitives)。git

在这一节中咱们就会介绍 Go 语言中常见的同步原语 MutexRWMutexWaitGroupOnceCond 以及扩展原语 ErrGroupSemaphoreSingleFlight 的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。github

基本原语

Go 语言在 sync 包中提供了用于同步的一些基本原语,包括常见的互斥锁 Mutex 与读写互斥锁 RWMutex 以及 OnceWaitGroupgolang

golang-basic-sync-primitives

这些基本原语的主要做用是提供较为基础的同步功能,咱们应该使用 Channel 和通讯来实现更加高级的同步机制,咱们在这一节中并不会介绍标准库中所有的原语,而是会介绍其中比较常见的 MutexRWMutexOnceWaitGroupCond,咱们并不会涉及剩下两个用于存取数据的结构体 MapPool数据库

Mutex

Go 语言中的互斥锁在 sync 包中,它由两个字段 statesema 组成,state 表示当前互斥锁的状态,而 sema 真正用于控制锁状态的信号量,这两个加起来只占 8 个字节空间的结构体就表示了 Go 语言中的互斥锁。编程

type Mutex struct {
    state int32
    sema  uint32
}

状态

互斥锁的状态是用 int32 来表示的,可是锁的状态并非互斥的,它的最低三位分别表示 mutexLockedmutexWokenmutexStarving,剩下的位置都用来表示当前有多少个 Goroutine 等待互斥锁被释放:数组

golang-mutex-state

互斥锁在被建立出来时,全部的状态位的默认值都是 0,当互斥锁被锁定时 mutexLocked 就会被置成 1、当互斥锁被在正常模式下被唤醒时 mutexWoken 就会被被置成 1mutexStarving 用于表示当前的互斥锁进入了状态,最后的几位是在当前互斥锁上等待的 Goroutine 个数。缓存

饥饿模式

在了解具体的加锁和解锁过程以前,咱们须要先简单了解一下 Mutex 在使用过程当中可能会进入的饥饿模式,饥饿模式是在 Go 语言 1.9 版本引入的特性,它的主要功能就是保证互斥锁的获取的『公平性』(Fairness)。数据结构

互斥锁能够同时处于两种不一样的模式,也就是正常模式和饥饿模式,在正常模式下,全部锁的等待者都会按照先进先出的顺序获取锁,可是若是一个刚刚被唤起的 Goroutine 遇到了新的 Goroutine 进程也调用了 Lock 方法时,大几率会获取不到锁,为了减小这种状况的出现,防止 Goroutine 被『饿死』,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式。多线程

golang-mutex-mode

在饥饿模式中,互斥锁会被直接交给等待队列最前面的 Goroutine,新的 Goroutine 在这时不能获取锁、也不会进入自旋的状态,它们只会在队列的末尾等待,若是一个 Goroutine 得到了互斥锁而且它是队列中最末尾的协程或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。

相比于饥饿模式,正常模式下的互斥锁可以提供更好地性能,饥饿模式的主要做用就是避免一些 Goroutine 因为陷入等待没法获取锁而形成较高的尾延时,这也是对 Mutex 的一个优化。

加锁

互斥锁 Mutex 的加锁是靠 Lock 方法完成的,最新的 Go 语言源代码中已经将 Lock 方法进行了简化,方法的主干只保留了最多见、简单而且快速的状况;当锁的状态是 0 时直接将 mutexLocked 位置成 1

func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    m.lockSlow()
}

可是当 Lock 方法被调用时 Mutex 的状态不是 0 时就会进入 lockSlow 方法尝试经过自旋或者其余的方法等待锁的释放并获取互斥锁,该方法的主体是一个很是大 for 循环,咱们会将该方法分红几个部分介绍获取锁的过程:

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }

在这段方法的第一部分会判断当前方法可否进入自旋来等待锁的释放,自旋(Spinnig)实际上是在多线程同步的过程当中使用的一种机制,当前的进程在进入自旋的过程当中会一直保持 CPU 的占用,持续检查某个条件是否为真,在多核的 CPU 上,自旋的优势是避免了 Goroutine 的切换,因此若是使用恰当会对性能带来很是大的增益。

在 Go 语言的 Mutex 互斥锁中,只有在普通模式下才可能进入自旋,除了模式的限制以外,runtime_canSpin 方法中会判断当前方法是否能够进入自旋,进入自旋的条件很是苛刻:

  1. 运行在多 CPU 的机器上;
  2. 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
  3. 当前机器上至少存在一个正在运行的处理器 P 而且处理的运行队列是空的;

一旦当前 Goroutine 可以进入自旋就会调用 runtime_doSpin,它最终调用汇编语言编写的方法 procyield 并执行指定次数的 PAUSE 指令,PAUSE 指令什么都不会作,可是会消耗 CPU 时间,每次自旋都会调用 30PAUSE,下面是该方法在 386 架构的机器上的实现:

TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ    again
    RET

处理了自旋相关的特殊逻辑以后,互斥锁接下来就根据上下文计算当前互斥锁最新的状态了,几个不一样的条件分别会更新 state 中存储的不一样信息 mutexLockedmutexStarvingmutexWokenmutexWaiterShift

new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            new &^= mutexWoken
        }

计算了新的互斥锁状态以后,咱们就会使用 atomic 包提供的 CAS 函数修改互斥锁的状态,若是当前的互斥锁已经处于饥饿和锁定的状态,就会跳过当前步骤,调用 runtime_SemacquireMutex 方法:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

runtime_SemacquireMutex 方法的主要做用就是经过 Mutex 的使用互斥锁中的信号量保证资源不会被两个 Goroutine 获取,从这里咱们就能看出 Mutex 其实就是对更底层的信号量进行封装,对外提供更加易用的 API,runtime_SemacquireMutex 会在方法中不断调用 goparkunlock 将当前 Goroutine 陷入休眠等待信号量能够被获取。

一旦当前 Goroutine 能够获取信号量,就证实互斥锁已经被解锁,该方法就会马上返回,Lock 方法的剩余代码也会继续执行下去了,当前互斥锁处于饥饿模式时,若是该 Goroutine 是队列中最后的一个 Goroutine 或者等待锁的时间小于 starvationThresholdNs(1ms),当前 Goroutine 就会直接得到互斥锁而且从饥饿模式中退出并得到锁。

解锁

互斥锁的解锁过程相比之下就很是简单,Unlock 方法会直接使用 atomic 包提供的 AddInt32,若是返回的新状态不等于 0 就会进入 unlockSlow 方法:

func (m *Mutex) Unlock() {
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}

unlockSlow 方法首先会对锁的状态进行校验,若是当前互斥锁已经被解锁过了就会直接抛出异常 sync: unlock of unlocked mutex 停止当前程序,在正常状况下会根据当前互斥锁的状态是正常模式仍是饥饿模式进入不一样的分支:

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        runtime_Semrelease(&m.sema, true, 1)
    }
}

若是当前互斥锁的状态是饥饿模式就会直接调用 runtime_Semrelease 方法直接将当前锁交给下一个正在尝试获取锁的等待者,等待者会在被唤醒以后设置 mutexLocked 状态,因为此时仍然处于 mutexStarving,因此新的 Goroutine 也没法得到锁。

在正常模式下,若是当前互斥锁不存在等待者或者最低三位表示的状态都为 0,那么当前方法就不须要唤醒其余 Goroutine 能够直接返回,当有 Goroutine 正在处于等待状态时,仍是会经过 runtime_Semrelease 唤醒对应的 Goroutine 并移交锁的全部权。

小结

经过对互斥锁 Mutex 加锁和解锁过程的分析,咱们可以得出如下的一些结论,它们可以帮助咱们更好地理解互斥锁的工做原理,互斥锁的加锁的过程比较复杂,涉及自旋、信号量以及 Goroutine 调度等概念:

  • 若是互斥锁处于初始化状态,就会直接经过置位 mutexLocked 加锁;
  • 若是互斥锁处于 mutexLocked 而且在普通模式下工做,就会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
  • 若是当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会被切换到饥饿模式;
  • 互斥锁在正常状况下会经过 runtime_SemacquireMutex 方法将调用 Lock 的 Goroutine 切换至休眠状态,等待持有信号量的 Goroutine 唤醒当前协程;
  • 若是当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,当前 Goroutine 会将互斥锁切换回正常模式;

互斥锁的解锁过程相对来讲就比较简单,虽然对于普通模式和饥饿模式的处理有一些不一样,可是因为代码行数很少,因此逻辑清晰,也很是容易理解:

  • 若是互斥锁已经被解锁,那么调用 Unlock 会直接抛出异常;
  • 若是互斥锁处于饥饿模式,会直接将锁的全部权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
  • 若是互斥锁处于普通模式,而且没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 得到了锁就会直接返回,在其余状况下回经过 runtime_Semrelease 唤醒对应的 Goroutine;

RWMutex

读写互斥锁也是 Go 语言 sync 包为咱们提供的接口之一,一个常见的服务对资源的读写比例会很是高,若是大多数的请求都是读请求,它们之间不会相互影响,那么咱们为何不能将对资源读和写操做分离呢?这也就是 RWMutex 读写互斥锁解决的问题,不限制对资源的并发读,可是读写、写写操做没法并行执行。

Y N
N N

读写互斥锁在 Go 语言中的实现是 RWMutex,其中不只包含一个互斥锁,还持有两个信号量,分别用于写等待读和读等待写:

type RWMutex struct {
    w           Mutex
    writerSem   uint32
    readerSem   uint32
    readerCount int32
    readerWait  int32
}

readerCount 存储了当前正在执行的读操做的数量,最后的 readerWait 表示当写操做被阻塞时等待的读操做个数。

读锁

读锁的加锁很是简单,咱们经过 atomic.AddInt32 方法为 readerCount 加一,若是该方法返回了负数说明当前有 Goroutine 得到了写锁,当前 Goroutine 就会调用 runtime_SemacquireMutex 陷入休眠等待唤醒:

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}

若是没有写操做获取当前互斥锁,当前方法就会在 readerCount 加一后返回;当 Goroutine 想要释放读锁时会调用 RUnlock 方法:

func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        rw.rUnlockSlow(r)
    }
}

该方法会在减小正在读资源的 readerCount,当前方法若是遇到了返回值小于零的状况,说明有一个正在进行的写操做,在这时就应该经过 rUnlockSlow 方法减小当前写操做等待的读操做数 readerWait 并在全部读操做都被释放以后触发写操做的信号量 writerSem

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        throw("sync: RUnlock of unlocked RWMutex")
    }
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

writerSem 在被触发以后,尝试获取读写锁的进程就会被唤醒并得到锁。

读写锁

当资源的使用者想要获取读写锁时,就须要经过 Lock 方法了,在 Lock 方法中首先调用了读写互斥锁持有的 MutexLock 方法保证其余获取读写锁的 Goroutine 进入等待状态,随后的 atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 实际上是为了阻塞后续的读操做:

func (rw *RWMutex) Lock() {
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

若是当前仍然有其余 Goroutine 持有互斥锁的读锁,该 Goroutine 就会调用 runtime_SemacquireMutex 进入休眠状态,等待读锁释放时触发 writerSem 信号量将当前协程唤醒。

对资源的读写操做完成以后就会将经过 atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) 变回正数并经过 for 循环触发全部因为获取读锁而陷入等待的 Goroutine:

func (rw *RWMutex) Unlock() {
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        throw("sync: Unlock of unlocked RWMutex")
    }
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    rw.w.Unlock()
}

在方法的最后,RWMutex 会释放持有的互斥锁让其余的协程可以从新获取读写锁。

小结

相比状态复杂的互斥锁 Mutex 来讲,读写互斥锁 RWMutex 虽然提供的功能很是复杂,可是因为站在了 Mutex 的『肩膀』上,因此总体的实现上会简单不少。

  1. readerSem — 读写锁释放时通知因为获取读锁等待的 Goroutine;
  2. writerSem — 读锁释放时通知因为获取读写锁等待的 Goroutine;
  3. w 互斥锁 — 保证写操做之间的互斥;
  4. readerCount — 统计当前进行读操做的协程数,触发写锁时会将其减小 rwmutexMaxReaders 阻塞后续的读操做;
  5. readerWait — 当前读写锁等待的进行读操做的协程数,在触发 Lock 以后的每次 RUnlock 都会将其减一,当它归零时该 Goroutine 就会得到读写锁;
  6. 当读写锁被释放 Unlock 时首先会通知全部的读操做,而后才会释放持有的互斥锁,这样可以保证读操做不会被连续的写操做『饿死』;

RWMutexMutex 之上提供了额外的读写分离功能,可以在读请求远远多于写请求时提供性能上的提高,咱们也能够在场景合适时选择读写互斥锁。

WaitGroup

WaitGroup 是 Go 语言 sync 包中比较常见的同步机制,它能够用于等待一系列的 Goroutine 的返回,一个比较常见的使用场景是批量执行 RPC 或者调用外部服务:

requests := []*Request{...}

wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()
        
        // res, err := service.call(r)
    }(request)
}

wg.Wait()

经过 WaitGroup 咱们能够在多个 Goroutine 之间很是轻松地同步信息,本来顺序执行的代码也能够在多个 Goroutine 中并发执行,加快了程序处理的速度,在上述代码中只有在全部的 Goroutine 都执行完毕以后 Wait 方法才会返回,程序能够继续执行其余的逻辑。

golang-syncgroup

总而言之,它的做用就像它的名字同样,,经过 Done 来传递任务完成的信号,比较经常使用于等待一组 Goroutine 中并发执行的任务所有结束。

结构体

WaitGroup 结构体中的成员变量很是简单,其中的 noCopy 的主要做用就是保证 WaitGroup 不会被开发者经过再赋值的方式进行拷贝,进而致使一些诡异的行为:

type WaitGroup struct {
    noCopy noCopy

    state1 [3]uint32
}

copylock 包就是一个用于检查相似错误的分析器,它的原理就是在 编译期间 检查被拷贝的变量中是否包含 noCopy 或者 sync 关键字,若是包含当前关键字就会报出如下的错误:

package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := sync.Mutex{}
    yawg := wg
    fmt.Println(wg, yawg)
}

$ go run proc.go
./prog.go:10:10: assignment copies lock value to yawg: sync.Mutex
./prog.go:11:14: call of fmt.Println copies lock value: sync.Mutex
./prog.go:11:18: call of fmt.Println copies lock value: sync.Mutex

这段代码会在赋值和调用 fmt.Println 时发生值拷贝致使分析器报错,你能够经过访问 连接 尝试运行这段代码。

除了 noCopy 以外,WaitGroup 结构体中还包含一个总共占用 12 字节大小的数组,这个数组中会存储当前结构体持有的状态和信号量,在 64 位与 32 位的机器上表现也很是不一样。

golang-waitgroup-state

WaitGroup 提供了私有方法 state 可以帮助咱们从 state1 字段中取出它的状态和信号量。

操做

WaitGroup 对外暴露的接口只有三个 AddWaitDone,其中 Done 方法只是调用了 wg.Add(-1) 自己并无什么特殊的逻辑,咱们来了解一下剩余的两个方法:

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if v > 0 || w == 0 {
        return
    }
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

Add 方法的主要做用就是更新 WaitGroup 中持有的计数器 counter,64 位状态的高 32 位,虽然 Add 方法传入的参数能够为负数,可是一个 WaitGroup 的计数器只能是非负数,当调用 Add 方法致使计数器归零而且还有等待的 Goroutine 时,就会经过 runtime_Semrelease 唤醒处于等待状态的全部 Goroutine。

另外一个 WaitGroup 的方法 Wait 就会在当前计数器中保存的数据大于 0 时修改等待 Goroutine 的个数 waiter 并调用 runtime_Semacquire 陷入睡眠状态。

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        if v == 0 {
            return
        }
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(semap)
            if +statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

陷入睡眠的 Goroutine 就会等待 Add 方法在计数器为 0 时唤醒。

小结

经过对 WaitGroup 的分析和研究,咱们可以得出如下的一些结论:

  • Add 不能在和 Wait 方法在 Goroutine 中并发调用,一旦出现就会形成程序崩溃;
  • WaitGroup 必须在 Wait 方法返回以后才能被从新使用;
  • Done 只是对 Add 方法的简单封装,咱们能够向 Add 方法传入任意负数(须要保证计数器非负)快速将计数器归零以唤醒其余等待的 Goroutine;
  • 能够同时有多个 Goroutine 等待当前 WaitGroup 计数器的归零,这些 Goroutine 也会被『同时』唤醒;

Once

Go 语言在标准库的 sync 同步包中还提供了 Once 语义,它的主要功能其实也很好理解,保证在 Go 程序运行期间 Once 对应的某段代码只会执行一次。

在以下所示的代码中,Do 方法中传入的函数只会被执行一次,也就是咱们在运行以下所示的代码时只会看见一次 only once 的输出结果:

func main() {
    o := &sync.Once{}
    for i := 0; i < 10; i++ {
        o.Do(func() {
            fmt.Println("only once")
        })
    }
}

$ go run main.go
only once

做为 sync 包中的结构体,Once 有着很是简单的数据结构,每个 Once 结构体中都只包含一个用于标识代码块是否被执行过的 done 以及一个互斥锁 Mutex

type Once struct {
    done uint32
    m    Mutex
}

Once 结构体对外惟一暴露的方法就是 Do,该方法会接受一个入参为空的函数,若是使用 atomic.LoadUint32 检查到已经执行过函数了,就会直接返回,不然就会进入 doSlow 运行传入的函数:

func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

doSlow 的实现也很是简单,咱们先为当前的 Goroutine 获取互斥锁,而后经过 defer 关键字将 done 成员变量设置成 1 并运行传入的函数,不管当前函数是正常运行仍是抛出 panic,当前方法都会将 done 设置成 1 保证函数不会执行第二次。

小结

做为用于保证函数执行次数的 Once 结构体,它使用互斥锁和 atomic 提供的方法实现了某个函数在程序运行期间只能执行一次的语义,在使用的过程当中咱们也须要注意如下的内容:

  • Do 方法中传入的函数只会被执行一次,哪怕函数中发生了 panic
  • 两次调用 Do 方法传入不一样的函数时只会执行第一次调用的函数;

Cond

Go 语言在标准库中提供的 Cond 实际上是一个条件变量,经过 Cond 咱们可让一系列的 Goroutine 都在触发某个事件或者条件时才被唤醒,每个 Cond 结构体都包含一个互斥锁 L,咱们先来看一下 Cond 是如何使用的:

func main() {
    c := sync.NewCond(&sync.Mutex{})

    for i := 0; i < 10; i++ {
        go listen(c)
    }

    go broadcast(c)

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)
    <-ch
}

func broadcast(c *sync.Cond) {
    c.L.Lock()
    c.Broadcast()
    c.L.Unlock()
}

func listen(c *sync.Cond) {
    c.L.Lock()
    c.Wait()
    fmt.Println("listen")
    c.L.Unlock()
}

$ go run main.go
listen
listen
...
listen

在上述代码中咱们同时运行了 11 个 Goroutine,其中的 10 个 Goroutine 会经过 Wait 等待指望的信号或者事件,而剩下的一个 Goroutine 会调用 Broadcast 方法通知全部陷入等待的 Goroutine,当调用 Boardcast 方法以后,就会打印出 10 次 "listen" 并结束调用。

golang-cond-broadcast

结构体

Cond 的结构体中包含 noCopycopyChecker 两个字段,前者用于保证 Cond 不会再编译期间拷贝,后者保证在运行期间发生拷贝会直接 panic,持有的另外一个锁 L 实际上是一个接口 Locker,任意实现 LockUnlock 方法的结构体均可以做为 NewCond 方法的参数:

type Cond struct {
    noCopy noCopy

    L Locker

    notify  notifyList
    checker copyChecker
}

结构体中最后的变量 notifyList 其实也就是为了实现 Cond 同步机制,该结构体其实就是一个 Goroutine 的链表:

type notifyList struct {
    wait uint32
    notify uint32

    lock mutex
    head *sudog
    tail *sudog
}

在这个结构体中,headtail 分别指向的就是整个链表的头和尾,而 waitnotify 分别表示当前正在等待的 Goroutine 和已经通知到的 Goroutine,咱们经过这两个变量就能确认当前待通知和已通知的 Goroutine。

操做

Cond 对外暴露的 Wait 方法会将当前 Goroutine 陷入休眠状态,它会先调用 runtime_notifyListAdd 将等待计数器 +1,而后解锁并调用 runtime_notifyListWait 等待其余 Goroutine 的唤醒:

func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
    return atomic.Xadd(&l.wait, 1) - 1
}

notifyListWait 方法的主要做用就是获取当前的 Goroutine 并将它追加到 notifyList 链表的最末端:

func notifyListWait(l *notifyList, t uint32) {
    lock(&l.lock)

    if less(t, l.notify) {
        unlock(&l.lock)
        return
    }

    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    releaseSudog(s)
}

除了将当前 Goroutine 追加到链表的末端以外,咱们还会调用 goparkunlock 陷入休眠状态,该函数也是在 Go 语言切换 Goroutine 时常常会使用的方法,它会直接让出当前处理器的使用权并等待调度器的唤醒。

golang-cond-notifylist

Cond 对外提供的 SignalBroadcast 方法就是用来唤醒调用 Wait 陷入休眠的 Goroutine,从两个方法的名字来看,前者会唤醒队列最前面的 Goroutine,后者会唤醒队列中所有的 Goroutine:

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

notifyListNotifyAll 方法会从链表中取出所有的 Goroutine 并为它们依次调用 readyWithTime,该方法会经过 goready 将目标的 Goroutine 唤醒:

func notifyListNotifyAll(l *notifyList) {
    s := l.head
    l.head = nil
    l.tail = nil

    atomic.Store(&l.notify, atomic.Load(&l.wait))

    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

虽然它会依次唤醒所有的 Goroutine,可是这里唤醒的顺序其实也是按照加入队列的前后顺序,先加入的会先被 goready 唤醒,后加入的 Goroutine 可能就须要等待调度器的调度。

notifyListNotifyOne 函数就只会从 sudog 构成的链表中知足 sudog.ticket == l.notify 的 Goroutine 并经过 readyWithTime 唤醒:

func notifyListNotifyOne(l *notifyList) {
    t := l.notify
    atomic.Store(&l.notify, t+1)

    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
}

在通常状况下咱们都会选择在不知足特定条件时调用 Wait 陷入休眠,当某些 Goroutine 检测到当前知足了唤醒的条件,就能够选择使用 Signal 通知一个或者 Broadcast 通知所有的 Goroutine 当前条件已经知足,能够继续完成工做了。

小结

Mutex 相比,Cond 仍是一个不被全部人都清楚和理解的同步机制,它提供了相似队列的 FIFO 的等待机制,同时也提供了 SignalBroadcast 两种不一样的唤醒方法,相比于使用 for {} 忙碌等待,使用 Cond 可以在遇到长时间条件没法知足时将当前处理器让出的功能,若是咱们合理使用仍是可以在一些状况下提高性能,在使用的过程当中咱们须要注意:

  • Wait 方法在调用以前必定要使用 L.Lock 持有该资源,不然会发生 panic 致使程序崩溃;
  • Signal 方法唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;
  • Broadcast 虽然是广播通知所有等待的 Goroutine,可是真正被唤醒时也是按照必定顺序的;

扩展原语

除了这些标准库中提供的同步原语以外,Go 语言还在子仓库 x/sync 中提供了额外的四种同步原语,ErrGroupSemaphoreSingleFlightSyncMap,其中的 SyncMap 其实就是 sync 包中的 sync.Map,它在 1.9 版本的 Go 语言中被引入了 x/sync 包,随着 API 的成熟和稳定最后被移到了标准库 sync 包中。

golang-extension-sync-primitives

咱们在这一节中就会介绍 Go 语言目前在扩展包中提供的三种原语,也就是 ErrGroupSemaphoreSingleFlight

ErrGroup

子仓库 x/sync 中的包 errgroup 其实就为咱们在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能,咱们可使用以下所示的方式并行获取网页的数据:

var g errgroup.Group
var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.somestupidname.com/",
}
for i := range urls {
    url := urls[i]
    g.Go(func() error {
        resp, err := http.Get(url)
        if err == nil {
            resp.Body.Close()
        }
        return err
    })
}
if err := g.Wait(); err == nil {
    fmt.Println("Successfully fetched all URLs.")
}

Go 方法可以建立一个 Goroutine 并在其中执行传入的函数,而 Wait 方法会等待 Go 方法建立的 Goroutine 所有返回后返回第一个非空的错误,若是全部的 Goroutine 都没有返回错误,该函数就会返回 nil

结构体

errgroup 包中的 Group 结构体同时由三个比较重要的部分组成:

  1. 建立 Context 时返回的 cancel 函数,主要用于通知使用 context 的 Goroutine 因为某些子任务出错,能够中止工做让出资源了;
  2. 用于等待一组 Goroutine 完成子任务的 WaitGroup 同步原语;
  3. 用于接受子任务返回错误的 err 和保证 err 只会被赋值一次的 errOnce
type Group struct {
    cancel func()

    wg sync.WaitGroup

    errOnce sync.Once
    err     error
}

这些字段共同组成了 Group 结构体并为咱们提供同步、错误传播以及上下文取消等功能。

操做

errgroup 对外惟一暴露的构造器就是 WithContext 方法,咱们只能从一个 Context 中建立一个新的 Group 变量,WithCancel 返回的取消函数也仅会在 Group 结构体内部使用:

func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    return &Group{cancel: cancel}, ctx
}

建立新的并行子任务须要使用 Go 方法,这个方法内部会对 WaitGroup 加一并建立一个新的 Goroutine,在 Goroutine 内部运行子任务并在返回错误时及时调用 cancel 并对 err 赋值,只有最先返回的错误才会被上游感知到,后续的错误都会被舍弃:

func (g *Group) Go(f func() error) {
    g.wg.Add(1)

    go func() {
        defer g.wg.Done()

        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel()
                }
            })
        }
    }()
}

func (g *Group) Wait() error {
    g.wg.Wait()
    if g.cancel != nil {
        g.cancel()
    }
    return g.err
}

Wait 方法其实就只是调用了 WaitGroup 的同步方法,在子任务所有完成时取消 Context 并返回可能出现的错误。

小结

errgroup 包中的 Group 同步原语的实现原理仍是很是简单的,它没有涉及很是底层和运行时包中的 API,只是对基本同步语义进行了简单的封装提供了更加复杂的功能,在使用时咱们也须要注意如下的几个问题:

  • 出现错误或者等待结束后都会调用 Contextcancel 方法取消上下文;
  • 只有第一个出现的错误才会被返回,剩余的错误都会被直接抛弃;

Semaphore

信号量是在并发编程中比较常见的一种同步机制,它会保证持有的计数器在 0 到初始化的权重之间,每次获取资源时都会将信号量中的计数器减去对应的数值,在释放时从新加回来,当遇到计数器大于信号量大小时就会进入休眠等待其余进程释放信号,咱们经常会在控制访问资源的进程数量时用到。

Golang 的扩展包中就提供了带权重的信号量,咱们能够按照不一样的权重对资源的访问进行管理,这个包对外也只提供了四个方法:

  • NewWeighted 用于建立新的信号量;
  • Acquire 获取了指定权重的资源,若是当前没有『空闲资源』,就会陷入休眠等待;
  • TryAcquire 也用于获取指定权重的资源,可是若是当前没有『空闲资源』,就会直接返回 false
  • Release 用于释放指定权重的资源;

结构体

NewWeighted 方法的主要做用建立一个新的权重信号量,传入信号量最大的权重就会返回一个新的 Weighted 结构体指针:

func NewWeighted(n int64) *Weighted {
    w := &Weighted{size: n}
    return w
}

type Weighted struct {
    size    int64
    cur     int64
    mu      sync.Mutex
    waiters list.List
}

Weighted 结构体中包含一个 waiters 列表其中存储着等待获取资源的『用户』,除此以外它还包含当前信号量的上限以及一个计数器 cur,这个计数器的范围就是 [0, size]

golang-semaphore

信号量中的计数器会随着用户对资源的访问和释放进行改变,引入的权重概念可以帮助咱们更好地对资源的访问粒度进行控制,尽量知足全部常见的用例。

获取

在上面咱们已经提到过 Acquire 方法就是用于获取指定权重资源的方法,这个方法总共由三个不一样的状况组成:

  1. 当信号量中剩余的资源大于获取的资源而且没有等待的 Goroutine 时就会直接获取信号量;
  2. 当须要获取的信号量大于 Weighted 的大小时,因为不可能知足条件就会直接返回;
  3. 遇到其余状况时会将当前 Goroutine 加入到等待列表并经过 select 等待当前 Goroutine 被唤醒,被唤醒后就会获取信号量;
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
    }

    if n > s.size {
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
    }

    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()

    select {
    case <-ctx.Done():
        err := ctx.Err()
        s.mu.Lock()
        select {
        case <-ready:
            err = nil
        default:
            s.waiters.Remove(elem)
        }
        s.mu.Unlock()
        return err

    case <-ready:
        return nil
    }
}

另外一个用于获取信号量的方法 TryAcquire 相比之下就很是简单,它只会判断当前信号量是否有充足的资源获取,若是有充足的资源就会直接马上返回 true 不然就会返回 false

func (s *Weighted) TryAcquire(n int64) bool {
    s.mu.Lock()
    success := s.size-s.cur >= n && s.waiters.Len() == 0
    if success {
        s.cur += n
    }
    s.mu.Unlock()
    return success
}

Acquire 相比,TryAcquire 因为不会等待资源的释放因此可能更适用于一些延时敏感、用户须要马上感知结果的场景。

释放

最后要介绍的 Release 方法其实也很是简单,当咱们对信号量进行释放时,Release 方法会从头至尾遍历 waiters 列表中所有的等待者,若是释放资源后的信号量有充足的剩余资源就会经过 Channel 唤起指定的 Goroutine:

func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    s.cur -= n
    for {
        next := s.waiters.Front()
        if next == nil {
            break
        }

        w := next.Value.(waiter)
        if s.size-s.cur < w.n {
            break
        }

        s.cur += w.n
        s.waiters.Remove(next)
        close(w.ready)
    }
    s.mu.Unlock()
}

固然也可能会出现剩余资源没法唤起 Goroutine 的状况,在这时当前方法就会释放锁后直接返回,经过对这段代码的分析咱们也能发现,若是一个信号量须要的占用的资源很是多,他可能会长时间没法获取锁,这可能也是 Acquire 方法引入另外一个参数 Context 的缘由,为信号量的获取设置一个超时时间。

小结

带权重的信号量确实有着更多的应用场景,这也是 Go 语言对外提供的惟一一种信号量实现,在使用的过程当中咱们须要注意如下的几个问题:

  • AcquireTryAcquire 方法均可以用于获取资源,前者用于同步获取会等待锁的释放,后者会在没法获取锁时直接返回;
  • Release 方法会按照 FIFO 的顺序唤醒能够被唤醒的 Goroutine;
  • 若是一个 Goroutine 获取了较多地资源,因为 Release 的释放策略可能会等待比较长的时间;

SingleFlight

singleflight 是 Go 语言扩展包中提供了另外一种同步原语,这其实也是做者最喜欢的一种同步扩展机制,它可以在一个服务中抑制对下游的屡次重复请求,一个比较常见的使用场景是 — 咱们在使用 Redis 对数据库中的一些热门数据进行了缓存并设置了超时时间,缓存超时的一瞬间可能有很是多的并行请求发现了 Redis 中已经不包含任何缓存因此大量的流量会打到数据库上影响服务的延时和质量。

golang-query-without-single-flight

可是 singleflight 就能有效地解决这个问题,它的主要做用就是对于同一个 Key 最终只会进行一次函数调用,在这个上下文中就是只会进行一次数据库查询,查询的结果会写回 Redis 并同步给全部请求对应 Key 的用户:

golang-extension-single-flight

这其实就减小了对下游的瞬时流量,在获取下游资源很是耗时,例如:访问缓存、数据库等场景下就很是适合使用 singleflight 对服务进行优化,在上述的这个例子中咱们就能够在想 Redis 和数据库中获取数据时都使用 singleflight 提供的这一功能减小下游的压力;它的使用其实也很是简单,咱们能够直接使用 singleflight.Group{} 建立一个新的 Group 结构体,而后经过调用 Do 方法就能对相同的请求进行抑制:

type service struct {
    requestGroup singleflight.Group
}

func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
    v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
        rows, err := // select * from tables
        if err != nil {
            return nil, err
        }
        return rows, nil
    })
    if err != nil {
        return nil, err
    }
    
    return Response{
        rows: rows,
    }, nil
}

上述代码使用请求的哈希做为抑制相同请求的键,咱们也能够选择一些比较关键或者重要的字段做为 Do 方法的第一个参数避免对下游的瞬时大量请求。

结构体

Group 结构体自己由一个互斥锁 Mutex 和一个从 Keycall 结构体指针的映射表组成,每个 call 结构体都保存了当前此次调用对应的信息:

type Group struct {
    mu sync.Mutex
    m  map[string]*call
}

type call struct {
    wg sync.WaitGroup

    val interface{}
    err error

    dups  int
    chans []chan<- Result
}

call 结构体中的 valerr 字段都是在执行传入的函数时只会被赋值一次,它们也只会在 WaitGroup 等待结束都被读取,而 dupschans 字段分别用于存储当前 singleflight 抑制的请求数量以及在结果返回时将信息传递给调用方。

操做

singleflight 包提供了两个用于抑制相同请求的方法,其中一个是同步等待的方法 Do,另外一个是返回 Channel 的 DoChan,这两个方法在功能上没有太多的区别,只是在接口的表现上稍有不一样。

每次 Do 方法的调用时都会获取互斥锁并尝试对 Group 持有的映射表进行懒加载,随后判断是否已经存在 key 对应的函数调用:

  1. 当不存在对应的 call 结构体时:

    1. 初始化一个新的 call 结构体指针;
    2. 增长 WaitGroup 持有的计数器;
    3. call 结构体指针添加到映射表;
    4. 释放持有的互斥锁 Mutex
    5. 阻塞地调用 doCall 方法等待结果的返回;
  2. 当已经存在对应的 call 结构体时;

    1. 增长 dups 计数器,它表示当前重复的调用次数;
    2. 释放持有的互斥锁 Mutex
    3. 经过 WaitGroup.Wait 等待请求的返回;
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        g.mu.Unlock()
        c.wg.Wait()
        return c.val, c.err, true
    }
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    g.doCall(c, key, fn)
    return c.val, c.err, c.dups > 0
}

由于 valerr 两个字段都只会在 doCall 方法中被赋值,因此当 doCall 方法和 WaitGroup.Wait 方法返回时,这两个值就会返回给 Do 函数的调用者。

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    c.val, c.err = fn()
    c.wg.Done()

    g.mu.Lock()
    delete(g.m, key)
    for _, ch := range c.chans {
        ch <- Result{c.val, c.err, c.dups > 0}
    }
    g.mu.Unlock()
}

doCall 中会运行传入的函数 fn,该函数的返回值就会赋值给 c.valc.err,函数执行结束后就会调用 WaitGroup.Done 方法通知全部被抑制的请求,当前函数已经执行完成,能够从 call 结构体中取出返回值并返回了;在这以后,doCall 方法会获取持有的互斥锁并经过管道将信息同步给使用 DoChan 方法的调用方。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    ch := make(chan Result, 1)
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        c.chans = append(c.chans, ch)
        g.mu.Unlock()
        return ch
    }
    c := &call{chans: []chan<- Result{ch}}
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    go g.doCall(c, key, fn)

    return ch
}

DoChan 方法和 Do的区别就是,它使用 Goroutine 异步执行 doCall 并向 call 持有的 chans 切片中追加 chan Result 变量,这也是它可以提供异步传值的缘由。

小结

singleflight 包提供的 Group 接口确实很是好用,当咱们须要这种抑制对下游的相同请求时就能够经过这个方法来增长吞吐量和服务质量,在使用的过程当中咱们也须要注意如下的几个问题:

  • DoDoChan 一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并经过 Channel 接受函数的返回值;
  • Forget 方法能够通知 singleflight 在持有的映射表中删除某个键,接下来对该键的调用就会直接执行方法而不是等待前面的函数返回;
  • 一旦调用的函数返回了错误,全部在等待的 Goroutine 也都会接收到一样的错误;

总结

咱们在这一节中介绍了 Go 语言标准库中提供的基本原语以及扩展包中的扩展原语,这些并发编程的原语可以帮助咱们更好地利用 Go 语言的特性构建高吞吐量、低延时的服务,并解决因为并发带来的错误,到这里咱们再从新回顾一下这一节介绍的内容:

  • Mutex 互斥锁

    • 若是互斥锁处于初始化状态,就会直接经过置位 mutexLocked 加锁;
    • 若是互斥锁处于 mutexLocked 而且在普通模式下工做,就会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
    • 若是当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会被切换到饥饿模式;
    • 互斥锁在正常状况下会经过 runtime_SemacquireMutex 方法将调用 Lock 的 Goroutine 切换至休眠状态,等待持有信号量的 Goroutine 唤醒当前协程;
    • 若是当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,当前 Goroutine 会将互斥锁切换回正常模式;
    • 若是互斥锁已经被解锁,那么调用 Unlock 会直接抛出异常;
    • 若是互斥锁处于饥饿模式,会直接将锁的全部权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
    • 若是互斥锁处于普通模式,而且没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 得到了锁就会直接返回,在其余状况下回经过 runtime_Semrelease 唤醒对应的 Goroutine;
  • RWMutex 读写互斥锁

    • readerSem — 读写锁释放时通知因为获取读锁等待的 Goroutine;
    • writerSem — 读锁释放时通知因为获取读写锁等待的 Goroutine;
    • w 互斥锁 — 保证写操做之间的互斥;
    • readerCount — 统计当前进行读操做的协程数,触发写锁时会将其减小 rwmutexMaxReaders 阻塞后续的读操做;
    • readerWait — 当前读写锁等待的进行读操做的协程数,在触发 Lock 以后的每次 RUnlock 都会将其减一,当它归零时该 Goroutine 就会得到读写锁;
    • 当读写锁被释放 Unlock 时首先会通知全部的读操做,而后才会释放持有的互斥锁,这样可以保证读操做不会被连续的写操做『饿死』;
  • WaitGroup 等待一组 Goroutine 结束

    • Add 不能在和 Wait 方法在 Goroutine 中并发调用,一旦出现就会形成程序崩溃;
    • WaitGroup 必须在 Wait 方法返回以后才能被从新使用;
    • Done 只是对 Add 方法的简单封装,咱们能够向 Add 方法传入任意负数(须要保证计数器非负)快速将计数器归零以唤醒其余等待的 Goroutine;
    • 能够同时有多个 Goroutine 等待当前 WaitGroup 计数器的归零,这些 Goroutine 也会被『同时』唤醒;
  • Once 程序运行期间仅执行一次

    • Do 方法中传入的函数只会被执行一次,哪怕函数中发生了 panic
    • 两次调用 Do 方法传入不一样的函数时只会执行第一次调用的函数;
  • Cond 发生指定事件时唤醒

    • Wait 方法在调用以前必定要使用 L.Lock 持有该资源,不然会发生 panic 致使程序崩溃;
    • Signal 方法唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;
    • Broadcast 虽然是广播通知所有等待的 Goroutine,可是真正被唤醒时也是按照必定顺序的;
  • ErrGroup 为一组 Goroutine 提供同步、错误传播以及上下文取消的功能

    • 出现错误或者等待结束后都会调用 Contextcancel 方法取消上下文;
    • 只有第一个出现的错误才会被返回,剩余的错误都会被直接抛弃;
  • Semaphore 带权重的信号量

    • AcquireTryAcquire 方法均可以用于获取资源,前者用于同步获取会等待锁的释放,后者会在没法获取锁时直接返回;
    • Release 方法会按照 FIFO 的顺序唤醒能够被唤醒的 Goroutine;
    • 若是一个 Goroutine 获取了较多地资源,因为 Release 的释放策略可能会等待比较长的时间;
  • SingleFlight 用于抑制对下游的重复请求

    • DoDoChan 一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并经过 Channel 接受函数的返回值;
    • Forget 方法能够通知 singleflight 在持有的映射表中删除某个键,接下来对该键的调用就会直接执行方法而不是等待前面的函数返回;
    • 一旦调用的函数返回了错误,全部在等待的 Goroutine 也都会接收到一样的错误;

这些同步原语的实现不只要考虑 API 接口的易用、解决并发编程中可能遇到的线程竞争问题,还须要对尾延时进行优化避免某些 Goroutine 没法获取锁或者资源而被饿死,对同步原语的学习也可以加强咱们队并发编程的理解和认识,也是了解并发编程没法跨越的一个步骤。

Reference

相关文章
相关标签/搜索