本文为转载,原文连接java
在Go 1.6以前, 内置的map类型是部分goroutine安全的,并发的读没有问题,并发的写可能有问题。自go 1.6以后, 并发地读写map会报错,这在一些知名的开源库中都存在这个问题,因此go 1.9以前的解决方案是额外绑定一个锁,封装成一个新的struct或者单独使用锁均可以。程序员
本文带你深刻到sync.Map的具体实现中,看看为了增长一个功能,代码是如何变的复杂的,以及做者在实现sync.Map的一些思想。golang
官方的faq已经提到内建的map不是线程(goroutine)安全的。安全
首先,让咱们看一段并发读写的代码,下列程序中一个goroutine一直读,一个goroutine一只写同一个键值,即即便读写的键不相同,并且map也没有"扩容"等操做,代码仍是会报错。数据结构
package main func main() { m := make(map[int]int) go func() { for { _ = m[1] } }() go func() { for { m[2] = 2 } }() select {} }
错误信息是: fatal error: concurrent map read and map write。并发
若是你查看Go的源代码: hashmap_fast.go#L118,会看到读的时候会检查hashWriting标志, 若是有这个标志,就会报并发错误。ide
写的时候会设置这个标志: hashmap.go#L542性能
h.flags |= hashWriting
hashmap.go#L628设置完以后会取消这个标记。测试
固然,代码中还有好几处并发读写的检查, 好比写的时候也会检查是否是有并发的写,删除键的时候相似写,遍历的时候并发读写问题等。优化
有时候,map的并发问题不是那么容易被发现, 你能够利用-race参数来检查。
可是,不少时候,咱们会并发地使用map对象,尤为是在必定规模的项目中,map总会保存goroutine共享的数据。在Go官方blog的Go maps in action一文中,提供了一种简便的解决方案。
var counter = struct{ sync.RWMutex m map[string]int }{m: make(map[string]int)}
它使用嵌入struct为map增长一个读写锁。
读数据的时候很方便的加锁:
counter.RLock() n := counter.m["some_key"] counter.RUnlock() fmt.Println("some_key:", n)
写数据的时候:
unter.Lock() counter.m["some_key"]++ counter.Unlock()
能够说,上面的解决方案至关简洁,而且利用读写锁而不是Mutex能够进一步减小读写的时候由于锁带来的性能。
可是,它在一些场景下也有问题,若是熟悉Java的同窗,能够对比一下java的ConcurrentHashMap的实现,在map的数据很是大的状况下,一把锁会致使大并发的客户端共争一把锁,Java的解决方案是shard, 内部使用多个锁,每一个区间共享一把锁,这样减小了数据共享一把锁带来的性能影响,orcaman提供了这个思路的一个实现: concurrent-map,他也询问了Go相关的开发人员是否在Go中也实现这种方案,因为实现的复杂性,答案是Yes, we considered it.,可是除非有特别的性能提高和应用场景,不然没有进一步的开发消息。
那么,在Go 1.9中sync.Map是怎么实现的呢?它是如何解决并发提高性能的呢?
sync.Map的实现有几个优化点,这里先列出来,咱们后面慢慢分析。
空间换时间。 经过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。
使用只读数据(read),避免读写冲突。
动态调整,miss次数多了以后,将dirty数据提高为read。
double-checking。
延迟删除。 删除一个键值只是打标记,只有在提高dirty的时候才清理删除的数据。
优先从read读取、更新、删除,由于对read的读取不须要锁。
下面咱们介绍sync.Map的重点代码,以便理解它的实现思想。
首先,咱们看一下sync.Map的数据结构:
type Map struct { // 当涉及到dirty数据的操做的时候,须要使用这个锁 mu Mutex // 一个只读的数据结构,由于只读,因此不会有读写冲突。 // 因此从这个数据中读取老是安全的。 // 实际上,实际也会更新这个数据的entries,若是entry是未删除的(unexpunged), 并不须要加锁。若是entry已经被删除了,须要加锁,以便更新dirty数据。 read atomic.Value // readOnly // dirty数据包含当前的map包含的entries,它包含最新的entries(包括read中未删除的数据,虽有冗余,可是提高dirty字段为read的时候很是快,不用一个一个的复制,而是直接将这个数据结构做为read字段的一部分),有些数据还可能没有移动到read字段中。 // 对于dirty的操做须要加锁,由于对它的操做可能会有读写竞争。 // 当dirty为空的时候, 好比初始化或者刚提高完,下一次的写操做会复制read字段中未删除的数据到这个数据中。 dirty map[interface{}]*entry // 当从Map中读取entry的时候,若是read中不包含这个entry,会尝试从dirty中读取,这个时候会将misses加一, // 当misses累积到 dirty的长度的时候, 就会将dirty提高为read,避免从dirty中miss太屡次。由于操做dirty须要加锁。 misses int }
它的数据结构很简单,值包含四个字段:read、mu、dirty、misses。
它使用了冗余的数据结构read、dirty。dirty中会包含read中为删除的entries,新增长的entries会加入到dirty中。
read的数据结构是:
type readOnly struct { m map[interface{}]*entry amended bool // 若是Map.dirty有些数据不在中的时候,这个值为true }
amended指明Map.dirty中有readOnly.m未包含的数据,因此若是从Map.read找不到数据的话,还要进一步到Map.dirty中查找。
对Map.read的修改是经过原子操做进行的。
虽然read和dirty有冗余数据,但这些数据是经过指针指向同一个数据,因此尽管Map的value会很大,可是冗余的空间占用仍是有限的。
readOnly.m和Map.dirty存储的值类型是*entry,它包含一个指针p, 指向用户存储的value值。
type entry struct { p unsafe.Pointer // *interface{} }
p有三种值:
nil: entry已被删除了,而且m.dirty为nil
expunged: entry已被删除了,而且m.dirty不为nil,并且这个entry不存在于m.dirty中
其它: entry是一个正常的值
以上是sync.Map的数据结构,下面咱们重点看看Load、Store、Delete、Range这四个方法,其它辅助方法能够参考这四个方法来理解。
加载方法,也就是提供一个键key,查找对应的值value,若是不存在,经过ok反映:
func (m *Map) Load(key interface{}) (value interface{}, ok bool) { // 1.首先从m.read中获得只读readOnly,从它的map中查找,不须要加锁 read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 2. 若是没找到,而且m.dirty中有新数据,须要从m.dirty查找,这个时候须要加锁 if !ok && read.amended { m.mu.Lock() // 双检查,避免加锁的时候m.dirty提高为m.read,这个时候m.read可能被替换了。 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] // 若是m.read中仍是不存在,而且m.dirty中有新数据 if !ok && read.amended { // 从m.dirty查找 e, ok = m.dirty[key] // 无论m.dirty中存不存在,都将misses计数加一 // missLocked()中知足条件后就会提高m.dirty m.missLocked() } m.mu.Unlock() } if !ok { return nil, false } return e.load() }
这里有两个值的关注的地方。一个是首先从m.read中加载,不存在的状况下,而且m.dirty中有新数据,加锁,而后从m.dirty中加载。
二是这里使用了双检查的处理,由于在下面的两个语句中,这两行语句并非一个原子操做。
if !ok && read.amended { m.mu.Lock()
虽然第一句执行的时候条件知足,可是在加锁以前,m.dirty可能被提高为m.read,因此加锁后还得再检查m.read,后续的方法中都使用了这个方法。
双检查的技术Java程序员很是熟悉了,单例模式的实现之一就是利用双检查的技术。
能够看到,若是咱们查询的键值正好存在于m.read中,无须加锁,直接返回,理论上性能优异。即便不存在于m.read中,通过miss几回以后,m.dirty会被提高为m.read,又会从m.read中查找。因此对于更新/增长较少,加载存在的key不少的case,性能基本和无锁的map相似。
下面看看m.dirty是如何被提高的。 missLocked方法中可能会将m.dirty提高。
func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } m.read.Store(readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }
上面的最后三行代码就是提高m.dirty的,很简单的将m.dirty做为readOnly的m字段,原子更新m.read。提高后m.dirty、m.misses重置, 而且m.read.amended为false。
这个方法是更新或者新增一个entry。
func (m *Map) Store(key, value interface{}) { // 若是m.read存在这个键,而且这个entry没有被标记删除,尝试直接存储。 // 由于m.dirty也指向这个entry,因此m.dirty也保持最新的entry。 read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok && e.tryStore(&value) { return } // 若是`m.read`不存在或者已经被标记删除 m.mu.Lock() read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { //标记成未被删除 m.dirty[key] = e //m.dirty中不存在这个键,因此加入m.dirty } e.storeLocked(&value) //更新 } else if e, ok := m.dirty[key]; ok { // m.dirty存在这个键,更新 e.storeLocked(&value) } else { //新键值 if !read.amended { //m.dirty中没有新的数据,往m.dirty中增长第一个新键 m.dirtyLocked() //从m.read中复制未删除的数据 m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) //将这个entry加入到m.dirty中 } m.mu.Unlock() } func (m *Map) dirtyLocked() { if m.dirty != nil { return } read, _ := m.read.Load().(readOnly) m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { if !e.tryExpungeLocked() { m.dirty[k] = e } } } func (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { // 将已经删除标记为nil的数据标记为expunged if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged }
你能够看到,以上操做都是先从操做m.read开始的,不知足条件再加锁,而后操做m.dirty。
Store可能会在某种状况下(初始化或者m.dirty刚被提高后)从m.read中复制数据,若是这个时候m.read中数据量很是大,可能会影响性能。
删除一个键值。
func (m *Map) Delete(key interface{}) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { delete(m.dirty, key) } m.mu.Unlock() } if ok { e.delete() } }
一样,删除操做仍是从m.read中开始, 若是这个entry不存在于m.read中,而且m.dirty中有新数据,则加锁尝试从m.dirty中删除。
注意,仍是要双检查的。 从m.dirty中直接删除便可,就当它没存在过,可是若是是从m.read中删除,并不会直接删除,而是打标记:
func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) // 已标记为删除 if p == nil || p == expunged { return false } // 原子操做,e.p标记为nil if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } } }
由于for ... range map是内建的语言特性,因此没有办法使用for range遍历sync.Map, 可是可使用它的Range方法,经过回调的方式遍历。
func (m *Map) Range(f func(key, value interface{}) bool) { read, _ := m.read.Load().(readOnly) // 若是m.dirty中有新数据,则提高m.dirty,而后在遍历 if read.amended { //提高m.dirty m.mu.Lock() read, _ = m.read.Load().(readOnly) //双检查 if read.amended { read = readOnly{m: m.dirty} m.read.Store(read) m.dirty = nil m.misses = 0 } m.mu.Unlock() } // 遍历, for range是安全的 for k, e := range read.m { v, ok := e.load() if !ok { continue } if !f(k, v) { break } } }
Range方法调用前可能会作一个m.dirty的提高,不过提高m.dirty不是一个耗时的操做。
Go 1.9源代码中提供了性能的测试: map_bench_test.go、map_reference_test.go
我也基于这些代码修改了一下,获得下面的测试数据,相比较之前的解决方案,性能多少回有些提高,若是你特别关注性能,能够考虑sync.Map。
BenchmarkHitAll/*sync.RWMutexMap-4 20000000 83.8 ns/op BenchmarkHitAll/*sync.Map-4 30000000 59.9 ns/op BenchmarkHitAll_WithoutPrompting/*sync.RWMutexMap-4 20000000 96.9 ns/op BenchmarkHitAll_WithoutPrompting/*sync.Map-4 20000000 64.1 ns/op BenchmarkHitNone/*sync.RWMutexMap-4 20000000 79.1 ns/op BenchmarkHitNone/*sync.Map-4 30000000 43.3 ns/op BenchmarkHit_WithoutPrompting/*sync.RWMutexMap-4 20000000 81.5 ns/op BenchmarkHit_WithoutPrompting/*sync.Map-4 30000000 44.0 ns/op BenchmarkUpdate/*sync.RWMutexMap-4 5000000 328 ns/op BenchmarkUpdate/*sync.Map-4 10000000 146 ns/op BenchmarkUpdate_WithoutPrompting/*sync.RWMutexMap-4 5000000 336 ns/op BenchmarkUpdate_WithoutPrompting/*sync.Map-4 5000000 324 ns/op BenchmarkDelete/*sync.RWMutexMap-4 10000000 155 ns/op BenchmarkDelete/*sync.Map-4 30000000 55.0 ns/op BenchmarkDelete_WithoutPrompting/*sync.RWMutexMap-4 10000000 173 ns/op BenchmarkDelete_WithoutPrompting/*sync.Map-4 10000000 147 ns/op
sync.Map没有Len方法,而且目前没有迹象要加上 (issue#20680),因此若是想获得当前Map中有效的entries的数量,须要使用Range方法遍历一次, 比较X疼。
LoadOrStore方法若是提供的key存在,则返回已存在的值(Load),不然保存提供的键值(Store)。