Go语言提供了channel和sync包两种并发控制的方法,每种方法都有他们适用的场景,并非全部并发场景都适合应用channel的,有的时候用sync包里提供的同步原语更简单。今天这个话题纯属是为了经过用channel实现同步锁的功能来学习掌握channel拥有的强大能力,并不适合在实际中使用。并且面试中有时候就是会出一些奇奇怪怪的题考应聘者对知识的理解以及灵活运用的应变能力。面试
你们仔细看看文章里用channel实现几种经常使用的同步锁的思路,没准儿哪次面试就碰上这样的面试官了呢。编程
今天,我将深刻探讨Go
语言channel
和select
语句的表达能力。为了演示只用这两个原语就能够实现多少功能,我将从头开始用它们重写sync
包。并发
sync
包提供的同步原语的有哪些以及如何使用咱们已经在以前的文章里介绍过了,因此这里不会再去介绍用channel
实现的这些同步原语应该怎么用。若是对用法有疑问请回看以前的文章: Go语言sync包的应用详解。函数
once是一个简单而强大的原语,可确保在并行程序中一个函数仅执行一次。学习
channel
版的Once
咱们使用带有一个缓冲的通道来实现 第一次调用Do(func ())
的goroutine
从通道中接收到值后,后续的goroutine
将会被阻塞中,直到Do
的参数函数执行完成后关闭通道为止。其余goroutine
判断通道已关闭后将不执行任何操做并当即返回。ui
type Once chan struct{}
func NewOnce() Once {
o := make(Once, 1)
// 只容许一个goroutine接收,其余goroutine会被阻塞住
o <- struct{}{}
return o
}
func (o Once) Do(f func()) {
_, ok := <-o
if !ok {
// Channel已经被关闭
// 证实f已经被执行过了,直接return.
return
}
// 调用f, 由于channel中只有一个值
// 因此只有一个goroutine会到达这里
f()
// 关闭通道,这将释放全部在等待的
// 以及将来会调用Do方法的goroutine
close(o)
}
复制代码
大小为N的信号量最多容许N个goroutine
在任何给定时间保持其锁。互斥锁是大小为1的信号量的特例。spa
信号量(英语:semaphore)又称为信号标,是一个同步对象,用于保持在0至指定最大值之间的一个计数值。当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;当线程完成一次对semaphore对象的释放(release)时,计数值加一。当计数值为0,则线程直至该semaphore对象变成signaled状态才能等待成功。semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态.线程
咱们先用channel
实现信号量的功能code
type Semaphore chan struct{}
func NewSemaphore(size int) Semaphore {
return make(Semaphore, size)
}
func (s Semaphore) Lock() {
// 只有在s还有空间的时候才能发送成功
s <- struct{}{}
}
func (s Semaphore) Unlock() {
// 为其余信号量腾出空间
<-s
}
复制代码
上面也说了互斥锁是大小为1的信号量的特例。那么在刚才实现的信号量的基础上实现互斥锁只须要:cdn
type Mutex Semaphore
func NewMutex() Mutex {
return Mutex(NewSemaphore(1))
}
复制代码
RWMutex
是一个稍微复杂的原语:它容许任意数量的并发读锁,但在任何给定时间仅容许一个写锁。还能够保证,若是有线程持有写锁,则任何线程都不能持有或得到读锁。
sync
标准库里的RWMutex
还容许若是有线程尝试获取写锁,则其余读锁将排队等待,以免饿死尝试获取写锁的线程。为了简洁起见,在用channel
实现的RWMutex
里咱们忽略了这部分逻辑。
RWMutex
具备三种状态:空闲,存在写锁和存在读锁。这意味着咱们须要两个通道分别标记RWMutex
上的读锁和写锁:空闲时,两个通道都为空;当获取到写锁时,标记写锁的通道里将被写入一下空结构体;当获取到读锁时,咱们向两个通道中都写入一个值(避免写锁可以向标记写锁的通道发送值),其中标记读锁的通道里的值表明当前RWMutex
拥有的读锁的数量,读锁释放的时候除了更新通道里存的读锁数量值,也会抽空写锁通道。
type RWMutex struct {
write chan struct{}
readers chan int
}
func NewLock() RWMutex {
return RWMutex{
// 用来作一个普通的互斥锁
write: make(chan struct{}, 1),
// 用来保护读锁的数量,获取读锁时经过接受通道里的值确保
// 其余goroutine不会在同一时间更改读锁的数量。
readers: make(chan int, 1),
}
}
func (l RWMutex) Lock() { l.write <- struct{}{} }
func (l RWMutex) Unlock() { <-l.write }
func (l RWMutex) RLock() {
// 统计当前读锁的数量,默认为0
var rs int
select {
case l.write <- struct{}{}:
// 若是write通道能发送成功,证实如今没有读锁
// 向write通道发送一个值,防止出现并发的读-写
case rs = <-l.readers:
// 能从通道里接收到值,证实RWMutex上已经有读锁了,下面会更新读锁数量
}
// 若是执行了l.write <- struct{}{}, rs的值会是0
rs++
// 更新RWMutex读锁数量
l.readers <- rs
}
func (l RWMutex) RUnlock() {
// 读出读锁数量而后减一
rs := <-l.readers
rs--
// 若是释放后读锁的数量变为0了,抽空write通道,让write通道变为可用
if rs == 0 {
<-l.write
return
}
// 若是释放后读锁的数量减一后不是0,把新的读锁数量发送给readers通道
l.readers <- rs
}
复制代码
WaitGroup
最多见的用途是建立一个组,向其计数器中设置一个计数,生成与该计数同样多的goroutine
,而后等待它们完成。每次goroutine
运行完毕后,它将在组上调用Done
表示已完成工做。能够经过调用WaitGroup
的Done
方法或以负数调用Add
方法减小计数器的计数。当计数器达到0时,被Wait
方法阻塞住的主线程会恢复执行。
WaitGroup
一个不为人知的功能是在计数器达到0后,若是调用Add
方法让计数器变为正数,这将使WaitGroup
重回阻塞状态。 这意味着对于每一个给定的WaitGroup
,都有一点"世代"的意味:
WaitGroup
的一个世代结束。下面是用channel
实现的WaitGroup
同步原语,真正起到阻塞goroutine
做用的是世代里的wait
通道,而后经过用WaitGroup
通道包装generation
结构体实现WaitGroup
的Wait
和Add
等功能。用文字很难描述清楚仍是直接看下面的代码吧,代码里的注释会帮助理解实现原理。
type generation struct {
// 用于让等待者阻塞住的通道
// 这个通道永远不会用于发送,只用于接收和close。
wait chan struct{}
// 计数器,标记须要等待执行完成的job数量
n int
}
func newGeneration() generation {
return generation{ wait: make(chan struct{}) }
}
func (g generation) end() {
// close通道将释放由于接受通道而阻塞住的goroutine
close(g.wait)
}
//这里咱们使用一个通道来保护当前的generation。
//它基本上是WaitGroup状态的互斥量。
type WaitGroup chan generation
func NewWaitGroup() WaitGroup {
wg := make(WaitGroup, 1)
g := newGeneration()
// 在一个新的WaitGroup上Wait, 由于计数器是0,会当即返回不会阻塞住线程
// 它表现跟当前世代已经结束了同样, 因此这里先把世代里的wait通道close掉
// 防止刚建立WaitGroup时调用Wait函数会阻塞线程
g.end()
wg <- g
return wg
}
func (wg WaitGroup) Add(delta int) {
// 获取当前的世代
g := <-wg
if g.n == 0 {
// 计数器是0,建立一个新的世代
g = newGeneration()
}
g.n += delta
if g.n < 0 {
// 跟sync库里的WaitGroup同样,不容许计数器为负数
panic("negative WaitGroup count")
}
if g.n == 0 {
// 计数器回到0了,关闭wait通道,被WaitGroup的Wait方法
// 阻塞住的线程会被释放出来继续往下执行
g.end()
}
// 将更新后的世代发送回WaitGroup通道
wg <- g
}
func (wg WaitGroup) Done() { wg.Add(-1) }
func (wg WaitGroup) Wait() {
// 获取当前的世代
g := <-wg
// 保存一个世代里wait通道的引用
wait := g.wait
// 将世代写回WaitGroup通道
wg <- g
// 接收世代里的wait通道
// 由于wait通道里没有值,会把调用Wait方法的goroutine阻塞住
// 直到WaitGroup的计数器回到0,wait通道被close后才会解除阻塞
<-wait
}
复制代码
今天这篇文章用通道实现了Go语言sync
包里经常使用的几种同步锁,主要的目的是演示通道和select
语句结合后强大的表达能力,并无什么实际应用价值,你们也不要在实际开发中使用这里实现的同步锁。
有关通道和同步锁都适合解决什么种类的问题咱们后面的文章再细说,今天这篇文章,须要充分理解Go语言通道的行为才能理解文章里的代码,若是有哪里看不懂的能够留言,只要时间容许我都会回答。
若是还不了解sync
包里的同步锁的使用方法,请先看这篇文章 Go语言sync包的应用详解。后面的文章我会介绍并发编程里的数据竞争问题以及解决方法,以及考虑给你们留一道思考题,请你们关注公众号里的动态。