Go并发编程之传统同步—(2)条件变量

前言

回顾上篇文章《Go并发编程之传统同步—(1)互斥锁》其中说到,同步最终是为了达到如下两种目的:linux

  • 维持共享数据一致性,并发安全
  • 控制流程管理,更好的协同工做

示例程序经过使用互斥锁,达到了数据一致性目的,那么流程管理应该怎么作呢?git

传统同步

条件变量

上篇文章的示例程序,仅仅实现了累加功能,但在现实的工做场景中,需求每每不可能这么简单,如今扩展一下这个程序,给它加上累减的功能。github

加上了累减的示例程序,能够抽象的理解为一个固定容量的“储水池”,能够注水、排水。编程

仅用互斥锁

当水注满之后,中止注水,开始排水,当水排空之后,开始注水,反反复复...segmentfault

func TestDemo1(t *testing.T) {
    var mut sync.Mutex
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            mut.Lock()
            if counter == maxSize {
                for i := 0; i < maxSize; i++ {
                    counter--
                    log.Printf("OUTPUT counter = %d", counter)
                }
            }
            mut.Unlock()
            time.Sleep(1 * time.Second)
        }
    }()

    // 注水口
    for {
        mut.Lock()
        if counter == 0 {
            for i := 0; i < maxSize; i++ {
                counter++
                log.Printf(" INPUT counter = %d", counter)
            }
        }
        mut.Unlock()
        time.Sleep(1 * time.Second)
    }
}

结果安全

=== RUN   TestDemo1
                ···
2020/10/06 13:52:50  INPUT counter = 8
2020/10/06 13:52:50  INPUT counter = 9
2020/10/06 13:52:50  INPUT counter = 10
2020/10/06 13:52:50 OUTPUT counter = 9
2020/10/06 13:52:50 OUTPUT counter = 8
2020/10/06 13:52:50 OUTPUT counter = 7
                ···

看着没有什么问题,一切正常,但就是这样工做的策略效率过低。多线程

优化互斥锁

优化策略,不用等注满水再排水,也不用放空以后,再注水,注水口和排水口一块儿工做。并发

func TestDemo2(t *testing.T) {
    var mut sync.Mutex
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            mut.Lock()
            if counter != 0 {
                counter--
            }
            log.Printf("OUTPUT counter = %d", counter)
            mut.Unlock()
            time.Sleep(5 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        mut.Lock()
        if counter != maxSize {
            counter++
        }
        log.Printf(" INPUT counter = %d", counter)
        mut.Unlock()
        time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒
    }
}

结果异步

=== RUN   TestDemo2
                ···
2020/10/06 14:11:46  INPUT counter = 7
2020/10/06 14:11:47  INPUT counter = 8
2020/10/06 14:11:48 OUTPUT counter = 7
2020/10/06 14:11:48  INPUT counter = 8
2020/10/06 14:11:49  INPUT counter = 9
2020/10/06 14:11:50  INPUT counter = 10
2020/10/06 14:11:51  INPUT counter = 10
2020/10/06 14:11:52  INPUT counter = 10
2020/10/06 14:11:53 OUTPUT counter = 9
2020/10/06 14:11:53  INPUT counter = 10
2020/10/06 14:11:54  INPUT counter = 10
2020/10/06 14:11:55  INPUT counter = 10
2020/10/06 14:11:56  INPUT counter = 10
2020/10/06 14:11:57  INPUT counter = 10
2020/10/06 14:11:58 OUTPUT counter = 9
2020/10/06 14:11:58  INPUT counter = 10
2020/10/06 14:11:59  INPUT counter = 10
                ···

经过日志输出,能够看到程序达到了需求,运做正常。优化

可是,经过日志输出发现,当排水口效率低下的时候,注水口一直在轮询,这里频繁的上锁操做形成的开销非常浪费。

条件变量:单发通知

那有没有什么好的办法,省去没必要要的轮询?若是注水口和排水口能互相“通知”就行了!这个功能,条件变量能够作到。

条件变量老是与互斥锁组合使用,除了可使用 Lock、Unlock,还有以下三个方法:

  • Wait 等待通知
  • Signal 单发通知
  • Broadcast 广播通知
func TestDemo3(t *testing.T) {
    cond := sync.NewCond(new(sync.Mutex)) // 初始化条件变量
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            cond.L.Lock() // 上锁
            if counter == 0 { // 没水了
                cond.Wait() // 啥时候来水?等通知!
            }
            counter--
            log.Printf("OUTPUT counter = %d", counter)
            cond.Signal() // 单发通知:已排水
            cond.L.Unlock() // 解锁
            time.Sleep(5 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        cond.L.Lock() // 上锁
        if counter == maxSize { // 水满了
            cond.Wait() // 啥时候排水?等待通知!
        }
        counter++
        log.Printf(" INPUT counter = %d", counter)
        cond.Signal() // 单发通知:已来水
        cond.L.Unlock() // 解锁
        time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒
    }
}

结果

=== RUN   TestDemo3
                ···
2020/10/06 14:51:22  INPUT counter = 7
2020/10/06 14:51:23  INPUT counter = 8
2020/10/06 14:51:24 OUTPUT counter = 7
2020/10/06 14:51:24  INPUT counter = 8
2020/10/06 14:51:25  INPUT counter = 9
2020/10/06 14:51:26  INPUT counter = 10
2020/10/06 14:51:29 OUTPUT counter = 9
2020/10/06 14:51:29  INPUT counter = 10
2020/10/06 14:51:34 OUTPUT counter = 9
2020/10/06 14:51:34  INPUT counter = 10
                ···

经过日志输出,能够看出来,注水口没有一直轮询了,而是等到排水口发通知后,再进行注水,注水口一直再等排水口。那么新的问题又来了,如何提升排水口的效率呢?

条件变量:广播通知

多制造出一个排水口,提升排水效率。

那就不能继续使用单发通知了(Signal),由于单发通知只会通知到一个等待(Wait),针对多等待的这种状况,就须要使用广播通知(Broadcast)。

func TestDemo4(t *testing.T) {
    cond := sync.NewCond(new(sync.Mutex)) // 初始化条件变量
    maxSize := 10
    counter := 0

    // 排水口 1
    go func() {
        for {
            cond.L.Lock() // 上锁
            if counter == 0 { // 没水了
            //for counter == 0 { // 没水了
                cond.Wait() // 啥时候来水?等通知!
            }
            counter--
            log.Printf("OUTPUT A counter = %d", counter)
            cond.Broadcast() // 单发通知:已排水
            cond.L.Unlock() // 解锁
            //time.Sleep(2 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 排水口 2
    go func() {
        for {
            cond.L.Lock() // 上锁
            if counter == 0 { // 没水了
            //for counter == 0 { // 没水了
                cond.Wait() // 啥时候来水?等通知!
            }
            counter--
            log.Printf("OUTPUT B counter = %d", counter)
            cond.Broadcast() // 单发通知:已排水
            cond.L.Unlock() // 解锁
            //time.Sleep(2 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        cond.L.Lock() // 上锁
        if counter == maxSize { // 水满了
        //for counter == maxSize { // 水满了
            cond.Wait() // 啥时候排水?等待通知!
        }
        counter++
        log.Printf(" INPUT   counter = %d", counter)
        cond.Broadcast() // 单发通知:已来水
        cond.L.Unlock() // 解锁
        //time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒
    }
}

结果

=== RUN   TestDemo4
                ···
2020/10/07 20:57:30 OUTPUT B counter = 2
2020/10/07 20:57:30 OUTPUT B counter = 1
2020/10/07 20:57:30 OUTPUT B counter = 0
2020/10/07 20:57:30 OUTPUT A counter = -1
2020/10/07 20:57:30 OUTPUT A counter = -2
2020/10/07 20:57:30 OUTPUT A counter = -3
2020/10/07 20:57:30 OUTPUT A counter = -4
                ···
2020/10/07 20:57:31 OUTPUT B counter = -7605
2020/10/07 20:57:31  INPUT   counter = -7604
2020/10/07 20:57:31 OUTPUT A counter = -7605
2020/10/07 20:57:31 OUTPUT A counter = -7606
                ···

经过日志输出能够看到,刚开始的时候还很正常,到后面的时候就变成负值了,一直在负增加,What?

《Go并发编程之传统同步—(1)互斥锁》文章中,程序由于没有加上互斥锁,出现过 counter 值异常的状况。

但此次程序此次加了互斥锁,按理说造成了一个临界区应该是没有问题了,因此问题应该不是出在临界区上,难道问题出在 Wait 上?

经过IDE追踪一下Wait的源码

func (c *Cond) Wait() {
    // 检查 c 是不是被复制的,若是是就 panic
    c.checker.check()
    // 将当前 goroutine 加入等待队列
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    // 等待当前 goroutine 被唤醒
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

原来 Wait 内部的执行流程是,先执行了解锁,而后进入等待状态,接到通知以后,再执行加锁操做。

那按照这个代码逻辑结合输出日志,走一程序遍流程,看看能不能复现出 counter 为负值的状况:

  1. 注水口将 counter 累加到 10 以后,发送广播通知(Broadcast)。
  2. goroutine A 在“第1步”以前的时候进入了等待通知(Wait),如今接收到了广播通知(Broadcast),从 runtime_notifyListWait() 返回,而且成功执行了加锁(Lock)操做。
  3. goroutine B 在“第1步”以前的时候进入了等待通知(Wait),如今接收到了广播通知(Broadcast),从 runtime_notifyListWait() 返回,在执行加锁(Lock)操做的时候,发现 goroutine A 先抢占了临界区,因此一直阻塞在 c.L.Lock()。
  4. goroutine A 虽然完成任务后会释放锁,可是每次也成功将锁抢占,因此就这样 一直将 counter 减到了 0,而后发送广播通知(Broadcast)、解锁(Unlock)。
  5. goroutine B 在 goroutine A 解锁后,成功得到锁并从 Lock 方法中返回,接下来跳出 Wait 方法、跳出 if 判断,执行 counter--(0--),这时候 counter 的值是 -1

图示

image

问题就出如今第五步,只要 goroutine B 加锁成功的时候,再判断一下 counter 是否为 0 就行了。

因此将 if counter == 0 改为 for counter == 0,这样上面的“第五步”就变成了

5.goroutine B 在 goroutine A 解锁后,成功加锁(Lock)并从阻塞总返回,接下来跳出 Wait 方法、再次进入 for 循环,判断 counter == 0 结果为真,再次进入等待(Wait)。

代码作出相应的修改后,再执行看结果,没有问题了。

延伸

发送通知

等待通知(Wait)确定是要在临界区里面的,那发送通知(Signal、Broadcast)在哪里更好呢?

Luck()
Wait()
Broadcast()// Signal()
Unlock()

// 或者

Luck()
Wait()
Unlock()
Broadcast()// Signal()

// 两种写法都不会报错

在 go 的发送通知方法(Broadcast、Signal)上有这么一段话:

// It is allowed but not required for the caller to hold c.L
// during the call.

在我以往的 C 多线程开发的时候,发送通知老是在锁中的:

pthread_mutex_lock(&thread->mutex);
//              ...
pthread_cond_signal(&thread->cond);
pthread_mutex_unlock(&thread->mutex);

man 手册中有写到:

The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

我的对此并无什么看法,就不乱下定论了,有想法的小伙伴能够在文章下面留言,一块儿讨论。

等待通知

消息通知是有即时性的,若是没有 goroutine 在等待通知,那么此次通知直接被丢弃。

kubernetes

https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/cache/fifo.go

总结

  1. Wait() 内会执行解锁、等待、加锁。
  2. Wait() 必须在 for 循环里面。
  3. Wait() 方法会把当前的 goroutine 添加到通知队列的队尾。
  4. 单发通知,唤醒通知队列第一个排队的 goroutine。
  5. 广播通知,唤醒通知队列里面所有的 goroutine。
  6. 程序示例只是为了演示效果,实际的开发中,生产者和消费者应该是异步消费,不该该使用同一个互斥锁。

文章示例代码

Sown专栏地址:https://segmentfault.com/blog/sown

相关文章
相关标签/搜索