1资源同步算法
1.1 解决方案编程
2 信号量缓存
2.1 共享变量数据结构
2.2 信号量多线程
3 锁并发
3.1 死锁app
3.2 活锁编程语言
3.3 饥饿ide
4 Golang sync 包函数
4.4.1 数据结构
4.4.2 NewCond函数
4.4.3 Wait方法
4.4.4 Singal方法
4.4.5 Broadcast方法
4.3.1 数据结构
4.3.2 Add和Done方法
4.3.3 Wait方法
4.2.1 常量和结构
4.2.1 RLock和RUnlock方法
4.2.2 Lock和Unlock方法
4.1.1 接口和结构
4.1.2 Lock 方法
4.1.3 Unlock方法
4.1 sync.mutex.go
4.2 sync.rwmutex.go
4.3 sync.waitgroup.go
4.4 sync.cond.go
并发已经成为现代程序设计中的重要考虑内容,可是并发涉及到一个很重要的内容就是资源同步。当两个或者多个线程访问一样的资源的时候,运行的结果取决于线程运行时精确的时序。这样致使结果与指望的结果截然不同,所以咱们须要对资源的访问顺序进行控制,已达到资源同步的目的。
将共享资源(也就是共享内存)的程序片断成为临界区域
,经过适当安排,使得两个者线程同时位于临界区域。对于临界区域
访问解决方案,须要知足以下4个条件
任何两个线程不能同时位于临界区
不对CPU执行速度和时间作任何假设
临界区外运行的线程不阻塞其余线程
不能使线程无限期等待进入临界区
常见的互斥解决方案:
屏蔽中断线程的切换是由CPU中断机制提供的,若是一个线程进入临界区域后,CPU关闭中断响应;在离开临界区域后,再打开中断机制。那么在临界区域将不会有其余线程来竞争资源。 当时将屏蔽中断权利交给用户空间执行是不明智的,并且对于多核CPU而言没有效果。
锁变量几乎每个编程语言都提供了资源同步方式:锁机制。该机制经过对资源进行Lock
和Unlock
,以达到对关键资源有序访问。
严格轮换法线程不停的执行CPU时间,连续测试某一个值是否出现。可是若是认为等待的时间很是短,可使用该方式浪费CPU时间,用于等待的锁也成为自旋锁
。
在理解信号量以前,先了解采用共享变量使用多线程会出现什么问题。下面是一个C代码片断
1for (i=0; i<niters; i++){
2 cnt ++;
3}
cnt
为全局变量,一个线程执行该代码片断的时候的汇编代码以下:
1 movq (%rdi), %rcx
2 testq %rcx, %rcx
3 jle .L2
4 movl $0, %eax
5.L3:
6 movq cnt(%rip), %rdx
7 addq %eax
8 movq %eax, cnt(%rip)
9 addq $1, %rax
10 cmpq %rcx, %rax
11 jne .L3
12.L2
2.2 信号量其中6-8
行分别对应对应着加载cnt
,更新cnt
和存储cnt
。将cnt
变量从内存位置读出,加载到CPU寄存器中,在CPU运算器中加1,而后存储到cnt
的内存位置。虽然代码中cnt++
只有一行,可是转换为汇编代码的时候不仅有一个操做,也就是说该语句不是原子操做。若是多个线程同时执行代码,按照以前的条件,不对CPU的执行顺序作任何假设,若是其中线程a
在执行7
行汇编代码,而线程b
执行6
行汇编代码,那么b
将"看不到"线程a
对全局变量cnt
加1的操做,那么每次执行的结果cnt
也不彻底一致。
计算机领域先驱Dijkstra
提出经典的解决上述问题的方法:信号量(semaphore)。它是一个非负整数的全局变量。并且该变量只能有两个特殊操做来处理: P
和V
。
P(s): 若是s
非零,那么P
将s
减1
,而且当即返回。若是s
为零,那么就挂起这个线程,知道s
为非零。
V(s): V
操做将s
加1
。若是有任何线程阻塞在P
操做等待s
非零,那么V
将重启其中线程中的一个。
Posix
标准定义须要操做信号量的函数
1#include <semaphore.h>
2int sem_init(sem_t *sem, 0, unsigned int value);
3int sem_wait(sem_t *s); /*P(s)*/
4int sem_post(sem_t *s); /*P(s)*/
那么如何使用信号量是的2.1小节出现同步问题解决呢?首先定义全局信号量
1volatile long cnt = 0; /* global variable */
2sem_t mutex; /*global semaphore*/
初始化信号量,在这里初始值为1
1sem_init(&mutex, 0, 1);
最后使用信号量操做函数将临界区域代码包含起来
1for (i =0; i<niters; i++){3 锁
2 sem_wait(&mutex);
3 cnt++;
4 sem_post(&mutex);
5}
首先看一下死锁的规范定义:
若是一个线程(进程)集合中的每个线程(进程)都在等待只能由该线程(进程)集合中的其余线程(进程)才能引起的事件,那么该线程(进程)集合是死锁的。
举一个例子,若是线程 a
和线程 b
同是执行,线程a
获取了资源r1
,等待获取资源r2
;而线程b
获取了资源r2
,等待获取资源r1
。那么线程a
和线程b
组成的集合是死锁的。
预防死锁
破坏占有等待条件
对于须要获取多个资源的线程,一次性获取所有资源,而不是依次获取各个资源。
破坏环路等待条件
死锁集合的线程按照等占有线程和等待线程能够组成有向环图。那么若是对全部资源进行排序,全部线程按照资源顺序获取资源。
在某些状况下,当线程意识它不能获下一个资源的时候,它会“礼貌性”地释放已经得到的资源,而后等待1ms
,在尝试一次。若是另外一个线程也在相同的时候作了相同的操做, 那么同步的步调将致使两个线程都没法前进。
在信号量小节中,当执行V
操做后,将恢复挂起线程中的一个,那么问题出现了:若是有多个线程被挂起,那么选择哪一个线程恢复呢?若是随机选择一个线程恢复,若是源源不断的线程到达临界区域而且挂起,那么颇有可能出现某一个线程一直等待资源,而致使"饥饿"。固然也有好的FILO
调度策略来解决调用问题。当时问题在于刚刚到达的线程有很好的局部性,也就是CPU的寄存器、缓存等包含了该线程的局部变量,若是程得到资源锁,很好的避免了线程上下文切换,对性能提升颇有帮助。
在go
语言的互斥锁中采用结合上述两种策略,接下来小节中,将会仔细分析源码。
包含了Locker
接口和Mutex
结构:
1type Locker interface {
2 Lock()
3 Unlock()
4}
5type Mutex struct {
6 state int32
7 sema uint32
8}
Mutex
实现了Locker
接口,该结构包含了state
的字段,用来表示该锁当前状态;sema
则为一个信号量。state
是一个32位的整数,不一样比特位包含了不一样的意义,其中源码中的有很详细的注释,该注释很好解释mutex
如何工做:
互斥锁有两种状态:正常状态和饥饿状态。在正常状态下,全部挂起等待的goroutine按照
FIFO
顺序等待。唤醒的goroutine将会和刚刚到达的goroutine竞争互斥锁的拥有权,由于刚刚到达的goroutine具备优点:它刚刚正在CPU上执行,因此刚刚唤醒的goroutine有很大可能在锁竞争中失败。若是一个等待的goroutine超过1ms没有获取互斥锁,那么它将会把互斥锁转变为饥饿模式。在饥饿模式下,互斥锁的全部权将移交给等待队列中的第一个。新来的goroutine将不会尝试去得到互斥锁,也不会去尝试自旋操做,而是放在队列的最后一个。若是一个等待的goroutine获取的互斥锁,如何它知足一下其中的任何一个条件:(1)它是队列中的最后一个;(2)它等待的时候小于1ms。它会将互斥锁的转台转换为正常状态。正常状态有很好的性能表现,饥饿模式也是很是重要的的,由于它能阻止尾部延迟的现象。
1const (
2 mutexLocked = 1 << iota // mutex is locked
3 mutexWoken
4 mutexStarving
5 mutexWaiterShift = iota
6 starvationThresholdNs = 1e6
7)
mutexLocked
该值为1
, 第一位比特位1
,表明了该是否该互斥锁已经被锁住。mutex.state
与它进行&
操做,若是为1
表示已经锁住,0
则表示未被锁住。
mutexWoken
该值为2
,第二位比特位1
,表明了该互斥锁是否被唤醒,mutex.state
与它进行&
操做,若是为1
表示已经被唤醒,0
表明未被唤醒
mutexStarving
该值为4
,第三位比特为1
,表明了该互斥锁是否处于饥饿状态,mutex.state
与它进行&
操做,若是为1
表示处于饥饿转态,0
表示处于正常状态。
mutexWaiterShift
该值为3
,表示mutex.state
右移3位后为等待的goroutine
的数量。
starvationThresholdNs
goroutine
将互斥锁转换状态的时间等待的临界值:一百万纳秒,也就是1ms。
1if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
2 if race.Enabled {
3 race.Acquire(unsafe.Pointer(m))
4 }
5 return
6}
CompareAndSwapInt32
是一个原子操做,它判断是一个参数的值是否等于第二个参数,若是相等,则将第一个参数设置为第三个参数,并返回true
;不然对一个参数不作任何操做而且返回false
。这一段是代码是处理第一次goroutine
进行尝试Lock
操做,若是一切都是初始状态,则m.state
为.....0000001
而且返回,进入临界区域代码,不然代码继续往下走。
1var waitStartTime int64
2starving := false
3awoke := false
4iter := 0
5old := m.state
首先定义了一下变量:goroutine
等待时间,是否饥饿转台,是否唤醒和自旋迭代次数和保存当前互斥锁状态。接下来是一个for
循环,只有退出循环才能进入临界区域代码,纵观代码只有两处使用break
来退出循环。
1for {
2 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
3 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
4 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
5 awoke = true
6 }
7 runtime_doSpin()
8 iter++
9 old = m.state
10 continue
11 }
12}
首先判断锁状态是被锁并且不是处于饥饿模式,加上还能自旋额次数,进入下一层判断。若是当前goroutine没有被唤醒
,
其余goroutine也没有被唤醒,
等待的goroutine超过1和
能够将m.state设置为唤醒转态四个条件同时知足,将
awoke设置
true`。而后进行自旋操做,进行一轮循环。
1new := old
2if old&mutexStarving == 0 {
3 new |= mutexLocked
4}
5if old&(mutexLocked|mutexStarving) != 0 {
6 new += 1 << mutexWaiterShift
7}
8if starving && old&mutexLocked != 0 {
9 new |= mutexStarving
10}
这三个判断条件作了以下工做:若是当前的mutex.state
处于正常模式,则将new
的锁位设置为1,若是当前锁锁状态为锁定状态或者处于饥饿模式,则将等待的线程数量+1。若是starving
变量为true
而且处于锁定状态,则new
的饥饿状态位打开。
1if awoke {
2 if new&mutexWoken == 0 {
3 throw("sync: inconsistent mutex state")
4 }
5 new &^= mutexWoken
6}
若是 goroutine
已经被唤醒,则清空new
的唤醒位。
1if atomic.CompareAndSawpInt32(&m.state, old, new){
2 //...
3}else{
4 //...
5}
若是更新m.state
成功
1if old&(mutexLocked|mutexStarving) == 0 {
2 break
3}
若是未被锁定而且并非出于饥饿状态,到达第一个break
,进入代码临界区域。
1queueLifo := waitStartTime != 0
2if waitStartTime == 0 {
3 waitStartTime = runtime_nanotime()
4}
5runtime_SemacquireMutex(&m.sema, queueLifo)
runtime_SemacquireMutex(s *uint32, lifo bool)
函数相似P
操做,若是lifo
为true
则将等待goroutine
插入到队列的前面。在这里,对于每个到达的goroutine
,若是CompareAndSawpInt32
成功,而且到达时候若是锁出于锁定状态,那么将该goroutine
插入到等待队列的最后,不然插入到最前面。此时goroutine
将会被挂起,等待Unlock
的V
操做,将唤醒goroutines
1starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
2old = m.state
3if old&mutexStarving != 0 {
4 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
5 throw("sync: inconsistent mutex state")
6 }
7 delta := int32(mutexLocked - 1<<mutexWaiterShift)
8 if !starving || old>>mutexWaiterShift == 1 {
9 delta -= mutexStarving
10 }
11 atomic.AddInt32(&m.state, delta)
12 break
13}
14
判断被唤醒的线程是否为达到饥饿状态,也就是等待时间超过1ms
,若是以前的m.state
不是饥饿状态,继续循环,给新到来goroutine
让出互斥锁。若是已经饥饿状态,则修改等待goroutine
数量和饥饿状态位,并返回进入临界代码区域。
1new := atomic.AddInt32(&m.state, -mutexLocked)
首先建立变量new
,该变量的锁位为0
。接下来是饥饿状态判断
1if new&mutexStarving == 0 {
2 old := new
3 for {
4 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
5 return
6 }
7 new = (old - 1<<mutexWaiterShift) | mutexWoken
8 if atomic.CompareAndSwapInt32(&m.state, old, new) {
9 runtime_Semrelease(&m.sema, false)
10 return
11 }
12 old = m.state
13 }
14 } else {
15 runtime_Semrelease(&m.sema, true)
16 }
若是是正常状态,则判断若是等待的goroutine
为零,或者已经被锁定、唤醒、或者已经变成饥饿状态,返回,不须要唤醒任何其余被挂起的goroutine
,由于互斥锁已经被其余goroutine
抢占。不然更新new
值(修改等待的goroutine数量)并设置唤醒为,若是CompareAndSwapInt32
成功,则经过runtime_Semrelease(&m.sema, false)
恢复挂起的goroutine.r若是为 true
代表将唤醒第一个阻塞的goroutine
,这第一点在else
饥饿的分支中体现。
读写锁也是一种常见的锁机制,它容许多个线程读共享资源,只有一个线程写共享资源,接下来看看go中如何实现读写锁。
1type RWMutex struct {
2 w Mutex
3 writerSem uint32
4 readerSem uint32
5 readerCount int32
6 readerWait int32
7}
8const rwmutexMaxReaders = 1 << 30
RWMutex
结构包含了以下的字段
goroutine
数量。
1func (rw *RWMutex) RLock() {
2 // [...]
3 if atomic.AddInt32(&rw.readerCount, 1) < 0 {
4 runtime_Semacquire(&rw.readerSem)
5 }
6//[...]
7}
首先是readerCount
值+1, 若是小于零,则挂起goroutine
等待readerSem
。是否是很奇怪,为何会小于零判断呢?在这里先卖一个关子,接下来会看到为何是这样的设计逻辑。
1func (rw *RWMutex) RUnlock() {
2 //[...]
3 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
4 if r+1 == 0 || r+1 == -rwmutexMaxReaders {
5 race.Enable()
6 throw("sync: RUnlock of unlocked RWMutex")
7 }
8 if atomic.AddInt32(&rw.readerWait, -1) == 0 {
9 runtime_Semrelease(&rw.writerSem, false)
10 }
11 }
12 //[...]
13}
首先将readerCount
减去1,若是小于零,再讲readWait
减去1,若是是离开读的goroutine
数量为零,则对writerSem
信号量进行V
操做。
1func (rw *RWMutex) Lock() {
2 //[...]
3 rw.w.Lock()
4 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
5 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
6 runtime_Semacquire(&rw.writerSem)
7 }
8 //[...]
9}
首先rw.w.Lock
操做,来防止其余goroutine
对共享资源的写访问。而后将readerCount
减去rwmutexMaxReaders
,代表还剩多少goroutine
能够进行读访问,这也解释在RLock
中小于零的判断,若是还能够还能够进行读访问,则必须得到readerSem
信号量。在Lock
中接下来是对readWait
判断,若是该数量不为零,则须要对writerSem
进行P
操做,而V
操做只在RUnlock
方法中,若是最后一个读goroutine
离开,则进行V
操做。
1func (rw *RWMutex) Unlock() {
2 //[...]
3 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
4 if r >= rwmutexMaxReaders {
5 race.Enable()
6 throw("sync: Unlock of unlocked RWMutex")
7 }
8 for i := 0; i < int(r); i++ {
9 runtime_Semrelease(&rw.readerSem, false)
10 }
11 //[...]
12}
首先恢复readCounter
为正数,而后对readerSem
信号量进行r
次V
操做,唤醒在RLock
中被挂起的goroutine
。
WaitGroup
一般用在等待一组goroutine
所有完成。调用Add
方法指明要等待的goroutine
的数量,调用Done
方法说明该goroutine
已经完成,而Wait
方法是阻塞等待的goroutine
。
1type WaitGroup struct {
2 noCopy noCopy
3 state1 [12]byte
4 sema uint32
5}
noCopy
字段说明WaitGroup
不容许拷贝,而state1
字段是一个很是tricky
的方法,用其中的8
个字节(64bit)来保存一些状态。高位的32bit用来表示须要等待的goroutine
的数量,地位的32
bit用来表示被挂起的goroutine
的数量。至于为何不直接使用64bit
的数据主要是为了考虑32为编译器没法保证64位对齐。sema
则是一个信号量。
1func (wg *WaitGroup) state() *uint64 {
2 if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
3 return (*uint64)(unsafe.Pointer(&wg.state1))
4 } else {
5 return (*uint64)(unsafe.Pointer(&wg.state1[4]))
6 }
7}
该方法是一个辅助方法,用来获取state
,一个64为的无符号整数。
1func (wg *WaitGroup) Done() {
2 wg.Add(-1)
3}
Done方法其实调用了Add(-1)
方法,因此咱们着重讨论Add
方法。
1func (wg *WaitGroup) Add(delta int) {
2 statep := wg.state()
3 //[...]
4 state := atomic.AddUint64(statep, uint64(delta)<<32)
5 v := int32(state >> 32)
6 w := uint32(state)
7 if race.Enabled && delta > 0 && v == int32(delta) {
8 race.Read(unsafe.Pointer(&wg.sema))
9 }
10 if v < 0 {
11 panic("sync: negative WaitGroup counter")
12 }
13 if w != 0 && delta > 0 && v == int32(delta) {
14 panic("sync: WaitGroup misuse: Add called concurrently with Wait")
15 }
16 if v > 0 || w == 0 {
17 return
18 }
19 if *statep != state {
20 panic("sync: WaitGroup misuse: Add called concurrently with Wait")
21 }
22 // Reset waiters count to 0.
23 *statep = 0
24 for ; w != 0; w-- {
25 runtime_Semrelease(&wg.sema, false)
26 }
27}
首先是获取state
,而后将delta
右移32位,加上等待的goroutine
数量。v
和w
分别表明了须要等待的goroutine
和被阻塞的goroutine
的数量。接下来v==int32(delta)
判断条件代表若是是第一次Add
操做,则必须与等待的goroutine
同步,在Wait
方法中能够看到一样的操做。接下来是一些抛异常操做,若是等待的数量为负数,如何第一次Add
操做没有同步。if >0 || w==0
条件代表如何v
没有降到零,或者被阻塞的goroutine
数量为零,直接返回。如何v
为零,则按照w
的数量,依次对信号量ws.sema
进行V
操做。
1func (wg *WaitGroup) Wait() {
2 //[...]
3 for {
4 state := atomic.LoadUint64(statep)
5 v := int32(state >> 32)
6 w := uint32(state)
7 //[...]
8 // Increment waiters count.
9 if atomic.CompareAndSwapUint64(statep, state, state+1) {
10 if race.Enabled && w == 0 {
11 race.Write(unsafe.Pointer(&wg.sema))
12 }
13 runtime_Semacquire(&wg.sema)
14 if *statep != 0 {
15 panic("sync: WaitGroup is reused before previous Wait has returned")
16 }
17 //[...]
18 return
19 }
20 }
21}
Wait
方法一样也是CAS算法,首先获取须要等待的goroutine
的数量v
和阻塞的goroutine
数量w
, 而后将阻塞的goroutine
数量+1,若是以前的w
为零,表示是第一次等待,则与Add
操做进行同步,最后后对信号量wg.sema
进行P
操做。
在编程中使用Cond
也叫管程(monitor)
,它能够用来使不一样线程完成互斥条件,也可使某个线程等待某个条件的发生。常见的使用模式以下:
1var locker = new(sync.Mutex)
2var cond = sync.NewCond(locker)
3var condition = true
4// goroutine A
5cond.L.Lock()
6for condition {
7 cond.Wait()
8}
9// ...
10cond.L.Unlock()
11
12//goroutine B
13condiiton = false
14cond.Signal()
为何使用for
循环做为判断进入Wait
的条件而不是if
呢?主要是防止为被唤醒的goroutine
在返回Wait
调用的时候,刚好有别的goroutine
修改了conditon
的值,因此须要使用for
循环做为条件判断。
1type Cond struct {
2 noCopy noCopy
3 L Locker
4 notify notifyList
5 checker copyChecker
6}
Cond
结构不容许拷贝,包含了Locker
的接口字段,和一个notifyList
的集合字段。
1func NewCond(l Locker) *Cond {
2 return &Cond{L: l}
3}
实现Locker
接口的类型均可以,通常为Mutex
和RWMutex
1func (c *Cond) Wait() {
2 c.checker.check()
3 t := runtime_notifyListAdd(&c.notify)
4 c.L.Unlock()
5 runtime_notifyListWait(&c.notify, t)
6 c.L.Lock()
7}
在使用Wait
方法以前,要调用c.L.Lock
来进入临界区域,将当前等待的goroutine
加入到通知队列中,而后调用c.L.Unlock()
来退出临界区域,以便让其余goroutine
能够进入等待区域。紧接着挂起goroutine
,等待消息。
1func (c *Cond) Signal() {
2 c.checker.check()
3 runtime_notifyListNotifyOne(&c.notify)}
runtime_notifyListNotifyOne
唤起其中的等待的goroutine
。
1func (c *Cond) Broadcast() {
2 c.checker.check()
3 runtime_notifyListNotifyAll(&c.notify)
4}
runtime_notifyListNotifyAll
唤起所有等待的goroutine
。