多核处理器日益普及的如今不少代码都得和并发/并行打交道,对于内置了并发支持(goroutine)的golang来讲并发编程是必不可少的一环。node
链表是咱们再熟悉不过的数据结构,在并发编程中咱们也时长须要用到,今天咱们就来看两种带锁的并发安全的单项链表。golang
方案一:粗粒度锁,彻底锁住链表编程
方案一的作法是将全部操做用锁——Mutex串行化处理。串行化处理是指锁和链表相关联,当须要修改或读取链表时就获取锁,只要该goroutine持有锁,那么其余goroutine就没法修改或读取链表,直到锁的持有者将其释放。安全
这样能够保证任什么时候间都只有一个goroutine能处理链表,如此一来也就避免了数据竞争。下面是链表结构的定义:数据结构
type MutexList struct { locker *sync.Mutex head, tail *Node size int64 }
size表示当前list的长度,head是一个哨兵节点,不存储实际的数值。并发
下面是节点的定义:ide
type Node struct { Value interface{} Next *Node } func NewNode(v interface{}) *Node { n := &Node{Value: v} n.Next = nil return n }
节点和它的初始化不用多说,由于数据访问经过list来控制,因此节点里不须要再有mutex的存在。函数
好了咱们进入正题,在粗粒度的解决方案里Enq方法负责将数据插入list的末尾,这是O(1)时间的操做,将list锁住而后更新tail便可,注意咱们不容许插入nil:性能
func (l *MutexList) Enq(v interface{}) bool { if v == nil { return false } l.locker.Lock() node := NewNode(v) l.tail.Next = node l.tail = node l.size++ l.locker.Unlock() return true }
而后是insert,它将数据插入在给出的index处,index从0开始,一样咱们不容许插入nil,同时会检查index,index不能超过size,当list只有size-1个节点时,新数据会插入在list的末尾:测试
func (l *MutexList) Insert(index int64, v interface{}) bool { l.locker.Lock() defer l.locker.Unlock() if index > l.size || index < 0 || v == nil { return false } current := l.head for i := int64(0); i <= index-1; i++ { // index - 1是最后一个节点时 if current.Next == nil { break } current = current.Next } node := NewNode(v) node.Next = current.Next current.Next = node l.size++ return true }
这里咱们使用了defer,那是由于只有一个mutex,并且函数有多个出口,容易在编码过程当中漏掉对锁的释放。
节点的删除和查找也是相似的步骤,先给列表上锁,而后修改/读取,最后解锁,这里就很少讲解了。
而后是获取size的函数,后面的测试中要用,虽然咱们以原子操做来获取了长度,可是仍然可能存在得到size以后其余goroutine进行了remove致使size改变进而引起Insert返回false,所幸的是咱们的测试里并不会让remove和Insert同时出现,所以不会出现insert返回失败的问题,在实际使用时须要注意Insert的返回值,这一点在第二种方案中也是同样的:
func (l *MutexList) Length() int64 { return atomic.LoadInt64(&l.size) }
由于方案二的该函数并没有什么变化,所以就省略了。
如你所见,方案一的优势在于实现起来简单,确点在于一次只有一个goroutine能处理list,几乎全部对list的操做都被串行化了。
方案一无疑能很好地工做,可是它的性能十分有限,因此咱们来看看方案二。
方案二:细粒度锁,锁住须要修改的节点
方案二的作法是将锁放到node里,每次须要修改的仅仅是部分节点,而不用把整个list锁住,这样能保证互不干扰的goroutine们能够同时处理list,而会互相干扰的goroutine则会被节点的mutex阻塞,以保证不存在竟态数据。
固然,为了保证不会有多个goroutine同时处理一个节点,咱们须要在取得要修改节点的锁以前先取得前项节点的锁,而后才能取得修改节点的锁。这个步骤很像交叉手,它被称为锁耦合。
另一个须要注意的地方是加锁的顺序,全部操做的加锁顺序/方向必须相同,好比从head开始锁定到tail,若是不按统一的顺序加锁将会出现死锁。考虑以下状况,goroutine A锁住了节点1,正准备锁定节点2,这时goroutine B沿反方向加锁,它要锁住节点2而后再锁住节点1,若是B运气很好先于A取得了节点2的锁,那么它将一直等待锁住节点1,而A则会始终等待锁住节点2,出现了A等B,B等A的死锁。可是只要统一了加锁的顺序/方向,那么这种问题就不复存在了。
这是list和node的定义,能够看见锁已经移动到node结构里了:
type List struct { head, tail *MutexNode size int64 } type MutexNode struct { Locker *sync.Mutex Value interface{} Next *MutexNode } func NewMutexNode(v interface{}) *MutexNode { n := &MutexNode{Value: v} n.Locker = &sync.Mutex{} n.Next = nil return n } func NewList() *List { l := &List{} l.head = NewMutexNode(nil) l.tail = l.head return l }
下面咱们来看Enq,功能与方案一一致,只是在处理锁的地方有所不一样,由于tail节点老是在list末尾的元素,符合咱们从head开始的加锁顺序,又由于l.tail的位置始终是肯定的,因此能够省略锁住前项节点的步骤;然而l.tail会在咱们等待锁的时间段里被更新,因此咱们须要处理l.tail被更新的状况:
func (l *List) Enq(v interface{}) bool { if v == nil { return false } tail := l.tail tail.Locker.Lock() for tail.Next != nil { next := tail.Next next.Locker.Lock() tail.Locker.Unlock() tail = next } node := NewMutexNode(v) tail.Next = node l.tail = node l.size++ tail.Locker.Unlock() return true }
若是tail的next在取得锁时不为nil,说明tail被更新,在tail被更新以后咱们须要找到当前的末尾节点,这时不能直接使用l.tail,有两点缘由,一是由于这时的l.tail可能也已经被更新,二是在临时变量tail多是非前驱节点时给l.tail加锁不能保证其一致性,并且如此一来会破坏加锁的顺序,会形成意想不到的问题。因此咱们遵循加锁的顺序原则不断后推,直到找到真正的末尾节点。因而可知方案二的Enq操做最坏状况下是O(n)最好状况下是O(1),而只要仔细想想,在并发压力较大时这个操做几乎老是O(n)的时间开销(不过实际状况是方案二花费的时间与方案一差很少,缘由在于方案一要锁住整个list开销实在太大了)
Insert的功能也与方案一同样,由于不是锁住整个list,因此光判断size是无心义的,须要处理list被中途修改的状况;并且由于是从head开始加锁,而后锁住节点1再解锁head,以此类推,因此不会有竞争,但一样存在remove和insert一块儿使用时insert会失败的状况,须要注意其返回值:
func (l *List) Insert(index int64, v interface{}) bool { if index < 0 || v == nil { return false } current := l.head current.Locker.Lock() for i := int64(0); i <= index-1; i++ { next := current.Next if next == nil { // 若是index前的某个节点为nil,那么说明链表可能被修改了,没有index个节点,insert失败 if index < index - 1 { current.Locker.Unlock() return false } break } next.Locker.Lock() current.Locker.Unlock() current = next } node := NewMutexNode(v) node.Next = current.Next current.Next = node l.size++ current.Locker.Unlock() return true }
remove的作法和insert相似,再也不赘述。
咱们能够看到方案二锁的粒度确实变小了,可是实现变得十分复杂,并且须要同时考虑多个边界状况,对开发增长了很大的难度,并且分散的锁也会对调试带来必定的负面影响。
方案二的另外一个缺点是每一个节点都带有本身的mutex,当节点增多时内存的开销也会增大。
性能对比
说了这么多,方案一粗粒度锁和方案二细粒度锁在性能上孰优孰劣呢?毕竟方案二须要屡次获取和释放锁并且须要额外处理不少边界状况,仔细想一下的话可能也是一笔不菲的开销,感谢golang自带的测试套件,咱们能够方便的测试。
测试咱们采用一组单goroutine+一组多goroutine测试一个功能的作法,测试机器是intel i5 6500 4核,为了模拟通常的工做负载,在多goroutine组我统一使用6个goroutine来并发操做list。
测试代码:
import ( "math/rand" "sync" "testing" "time" ) func init() { rand.Seed(time.Now().Unix()) } func BenchmarkEnq(b *testing.B) { list := NewMutexList() b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Enq(rand.Int()); !done { b.Error("Enq failed") } } } func BenchmarkGoroutineEnq(b *testing.B) { list := NewMutexList() wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Enq(rand.Int()); !done { b.Error("Enq by goroutines failed") } } wg.Done() }(b.N) } wg.Wait() } func BenchmarkInsert(b *testing.B) { list := NewMutexList() for i := 0; i < 5; i++ { list.Enq(i) } b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("Insert failed") } } } func BenchmarkGoroutineInsert(b *testing.B) { list := NewMutexList() for i := 0; i < 5; i++ { list.Enq(i) } wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("insert by goroutine failed") } } wg.Done() }(b.N) } wg.Wait() }
import ( "math/rand" "sync" "testing" ) func BenchmarkMutexNodeEnq(b *testing.B) { list := NewList() b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Enq(rand.Int()); !done { b.Error("MutexNode Enq failed") } } } func BenchmarkGoroutineMutexNodeEnq(b *testing.B) { list := NewList() wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Enq(rand.Int()); !done { b.Error("MutexNode Enq by goroutines failed") } } wg.Done() }(b.N) } wg.Wait() } func BenchmarkMutexNodeInsert(b *testing.B) { list := NewList() for i := 0; i < 5; i++ { list.Enq(i) } b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("MutexNode Insert failed") } } } func BenchmarkGoroutineMutexNodeInsert(b *testing.B) { list := NewList() for i := 0; i < 5; i++ { list.Enq(i) } wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("MutexNode Insert by goroutine failed") } } wg.Done() }(b.N) } wg.Wait() }
测试内容是将随机数插入两种不一样的链表,而后对比插入性能。
这是测试结果,由于testing不能跟踪goroutine内部的操做,因此多goroutine组的单个op看上去比较吓人,其实这是一个goroutine运行完全部插入调用的时间:
go test -bench=. -benchmem
能够看到,在Enq也就是在末尾插入上二者相差很少,方案二在全部多goroutine测试用例的表现都优于方案一;
对于有大量随机访问发生的Inser操做,方案二在性能上能够说是碾压的存在,这多是方案二能够运行多个goroutine同时修改list而方案一只能同时有一个goroutine修改的缘由;
而在并发的状况下方案二仍然比方案一快很多,可是差距缩小了,缘由极可能是频繁的加锁加上goroutine之间互相干扰增多致使了性能的部分降低。
总结
若是追求性能,能够考虑方案二,或者使用第三方的无锁队列,不建议本身去实现无锁数据结构,由于 太 复 杂 !若是你以为方案二已经想不明白了,那么无锁编程将会是天书通常的存在,不如复用大神们的劳动成果吧。若是链表并非你程序的性能热点,那么就能够考虑方案一,稳定且易于开发和维护的代码永远都是好东西。
最后若是有疑问和建议或者勘误,欢迎评论指出,祝玩得愉快!