深刻理解Go 1.9 sync.Map

Go官方的faq已经提到内建的map不是线程(goroutine)安全的。在Go 1.6以前, 内置的map类型是部分goroutine安全的,并发的读没有问题,并发的写可能有问题。自go 1.6以后, 并发地读写map会报错,这在一些知名的开源库中都存在这个问题,因此go 1.9以前的解决方案是额外绑定一个锁,封装成一个新的struct或者单独使用锁均可以。另外笔者在go 1.9以前一般是使用concurrent-map来解决这类问题,可是不是全部的第三方库都以此来解决问题。git

咱们先来看看这个代码样例:程序中一个goroutine一直读,一个goroutine一直写同一个键值,即便读写的键不相同,并且map也没有"扩容"等操做,代码仍是会报错的,错误信息是: fatal error: concurrent map read and map write。github

package main
func main() {
	m := make(map[int]int)
	go func() {
		for {
			_ = m[1]
		}
	}()
	go func() {
		for {
			m[2] = 2
		}
	}()
	select {}
}

问题的根源在Go的源代码: hashmap_fast.go#L118,会看到读的时候会检查hashWriting标志, 若是有这个标志,就会报并发错误。golang

写的时候会设置这个标志: hashmap.go#L542安全

h.flags |= hashWriting

hashmap.go#L628设置完以后会取消这个标记。这样并发读写的检查有不少处, 好比写的时候也会检查是否是有并发的写,删除键的时候相似写,遍历的时候并发读写问题等。map的并发问题并非那么容易被发现, 你能够利用-race参数来检查。数据结构

并发地使用map对象是咱们平常开发中一个很常见的需求,特别是在一些大项目中。map总会保存goroutine共享的数据。Go 1.9以前在Go官方blog的Go maps in action一文中,给出了一种简便的解决方案。并发

首先,经过嵌入struct为map增长一个读写锁性能

var counter = struct{
    sync.RWMutex
    m map[string]int
}{m: make(map[string]int)}

读写数据时,能够很方便的加锁测试

counter.RLock()
n := counter.m["some_key"]
counter.RUnlock()
fmt.Println("some_key:", n)

counter.Lock()
counter.m["some_key"]++
counter.Unlock()

固然,你也可使用concurrent-map来解决问题优化

// Create a new map.
map := cmap.New()
	
// Sets item within map, sets "bar" under key "foo"
map.Set("foo", "bar")

// Retrieve item from map.
if tmp, ok := map.Get("foo"); ok {
	bar := tmp.(string)
}

// Removes item under key "foo"
map.Remove("foo")

二者本质上都是使用sync.RWMutex来保障线程(goroutine)安全的。这种解决方案至关简洁,而且利用读写锁而不是Mutex能够进一步减小读写的时候由于锁带来的性能。但在map的数据很是大的状况下,一把锁会致使大并发的客户端共争一把锁,这时,在Go 1.9中sync.Map就很是实用。(除了以上这些以外,还有一个笔者想提到的库,cmap也是一个至关好,安全且性能出色的第三方库)atom

Go 1.9中sync.Map的实现有如下优化点:

  1. 空间换时间。 经过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。
  2. 使用只读数据(read),避免读写冲突。
  3. 动态调整,miss次数多了以后,将dirty数据提高为read。
  4. double-checking。
  5. 延迟删除。 删除一个键值只是打标记,只有在提高dirty的时候才清理删除的数据。
  6. 优先从read读取、更新、删除,由于对read的读取不须要锁。

sync.Map数据结构很简单,包含四个字段:readmudirtymisses

type Map struct {
	// 当涉及到dirty数据的操做的时候,须要使用此锁
	mu Mutex
	// 一个只读的数据结构,由于只读,因此不会有读写冲突。
	// 因此从这个数据中读取老是安全的。
	// 实际上,实际也会更新这个数据的entries,若是entry是未删除的(unexpunged), 并不须要加锁。若是entry已经被删除了,须要加锁,以便更新dirty数据。
	read atomic.Value // readOnly
	// dirty数据包含当前的map包含的entries,它包含最新的entries(包括read中未删除的数据,虽有冗余,可是提高dirty字段为read的时候很是快,不用一个一个的复制,而是直接将这个数据结构做为read字段的一部分),有些数据还可能没有移动到read字段中。
	// 对于dirty的操做须要加锁,由于对它的操做可能会有读写竞争。
	// 当dirty为空的时候, 好比初始化或者刚提高完,下一次的写操做会复制read字段中未删除的数据到这个数据中。
	dirty map[interface{}]*entry
	// 当从Map中读取entry的时候,若是read中不包含这个entry,会尝试从dirty中读取,这个时候会将misses加一,
	// 当misses累积到 dirty的长度的时候, 就会将dirty提高为read,避免从dirty中miss太屡次。由于操做dirty须要加锁。
	misses int
}

read的数据结构

type readOnly struct {
	m       map[interface{}]*entry
	amended bool // 若是Map.dirty有些数据不在其中的时候,这个值为true
}

这里的精髓是,使用了冗余的数据结构readdirtydirty中会包含read中未删除的entries,新增长的entries会加入到dirty中。amended指明Map.dirty中有readOnly.m未包含的数据,因此若是从Map.read找不到数据的话,还要进一步到Map.dirty中查找。而对Map.read的修改是经过原子操做进行的。虽然readdirty有冗余数据,但这些数据是经过指针指向同一个数据,因此尽管Map的value会很大,可是冗余的空间占用仍是有限的。readOnly.mMap.dirty存储的值类型是*entry,它包含一个指针p, 指向用户存储的value值。

type entry struct {
	p unsafe.Pointer // *interface{}
}

p有三种值:

  • nil: entry已被删除了,而且m.dirty为nil
  • expunged: entry已被删除了,而且m.dirty不为nil,并且这个entry不存在于m.dirty中
  • 其它: entry是一个正常的值

理解了sync.Map的数据结构,那么咱们先来看看sync.Map的Load方法实现

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
	// 1.首先从m.read中获得只读readOnly,从它的map中查找,不须要加锁
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	// 2. 若是没找到,而且m.dirty中有新数据,须要从m.dirty查找,这个时候须要加锁
	if !ok && read.amended {
		m.mu.Lock()
		// 双检查,避免加锁的时候m.dirty提高为m.read,这个时候m.read可能被替换了。
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		// 若是m.read中仍是不存在,而且m.dirty中有新数据
		if !ok && read.amended {
			// 从m.dirty查找
			e, ok = m.dirty[key]
			// 无论m.dirty中存不存在,都将misses计数加一
			// missLocked()中知足条件后就会提高m.dirty
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

Load加载方法,提供一个键key,查找对应的值value,若是不存在,经过ok反映。这里的精髓是从m.read中加载,不存在的状况下,而且m.dirty中有新数据,加锁,而后从m.dirty中加载。另一点是这里使用了双检查的处理,由于在下面的两个语句中,这两行语句并非一个原子操做。

if !ok && read.amended {
		m.mu.Lock()

虽然第一句执行的时候条件知足,可是在加锁以前,m.dirty可能被提高为m.read,因此加锁后还得再检查m.read,后续的方法中都使用了这个方法。若是咱们查询的键值正好存在于m.read中,则无须加锁,直接返回,理论上性能优异。即便不存在于m.read中,通过miss几回以后,m.dirty会被提高为m.read,又会从m.read中查找。因此对于更新/增长较少,加载存在的key不少的场景,性能基本和无锁的map相差无几。

通过miss几回以后,m.dirty会被提高为m.read,那么m.dirty又是如何被提高的呢?重点在missLocked方法中。

func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

最后三行代码就是提高m.dirty的,很简单的将m.dirty做为readOnlym字段,原子更新m.read。提高后m.dirtym.misses重置, 而且m.read.amended为false。

sync.Map的Store方法实现

func (m *Map) Store(key, value interface{}) {
	// 若是m.read存在这个键,而且这个entry没有被标记删除,尝试直接存储。
	// 由于m.dirty也指向这个entry,因此m.dirty也保持最新的entry。
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}
	// 若是`m.read`不存在或者已经被标记删除
	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() { //标记成未被删除
			m.dirty[key] = e //m.dirty中不存在这个键,因此加入m.dirty
		}
		e.storeLocked(&value) //更新
	} else if e, ok := m.dirty[key]; ok { // m.dirty存在这个键,更新
		e.storeLocked(&value)
	} else { //新键值
		if !read.amended { //m.dirty中没有新的数据,往m.dirty中增长第一个新键
			m.dirtyLocked() //从m.read中复制未删除的数据
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value) //将这个entry加入到m.dirty中
	}
	m.mu.Unlock()
}

func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}
	read, _ := m.read.Load().(readOnly)
	m.dirty = make(map[interface{}]*entry, len(read.m))
	for k, e := range read.m {
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
	p := atomic.LoadPointer(&e.p)
	for p == nil {
		// 将已经删除标记为nil的数据标记为expunged
		if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
			return true
		}
		p = atomic.LoadPointer(&e.p)
	}
	return p == expunged
}

Store方法是更新或者新增一个entry。以上操做都是先从操做m.read开始的,不知足条件再加锁,而后操做m.dirty。Store可能会在某种状况下(初始化或者m.dirty刚被提高后)从m.read中复制数据,若是这个时候m.read中数据量很是大,可能会影响性能。

sync.Map的Delete方法实现

func (m *Map) Delete(key interface{}) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			delete(m.dirty, key)
		}
		m.mu.Unlock()
	}
	if ok {
		e.delete()
	}
}

func (e *entry) delete() (hadValue bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		// 已标记为删除
		if p == nil || p == expunged {
			return false
		}
		// 原子操做,e.p标记为nil
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return true
		}
	}
}

Delete方法删除一个键值。和Store方法同样,删除操做仍是从m.read中开始, 若是这个entry不存在于m.read中,而且m.dirty中有新数据,则加锁尝试从m.dirty中删除。注意,仍是要双检查的。 从m.dirty中直接删除便可,就当它没存在过,可是若是是从m.read中删除,并不会直接删除,而是打标记而已。

sync.Map的Range方法实现

func (m *Map) Range(f func(key, value interface{}) bool) {
	read, _ := m.read.Load().(readOnly)
	// 若是m.dirty中有新数据,则提高m.dirty,而后在遍历
	if read.amended {
		//提高m.dirty
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly) //双检查
		if read.amended {
			read = readOnly{m: m.dirty}
			m.read.Store(read)
			m.dirty = nil
			m.misses = 0
		}
		m.mu.Unlock()
	}
	// 遍历, for range是安全的
	for k, e := range read.m {
		v, ok := e.load()
		if !ok {
			continue
		}
		if !f(k, v) {
			break
		}
	}
}

在Go语言中,for ... range map是内建的语言特性,因此没有办法使用for range遍历sync.Map, 因而变通的有了Range方法,经过回调的方式遍历。Range方法调用前可能会作一个m.dirty的提高,不过提高m.dirty不是一个耗时的操做。

sync.Map的LoadOrStore 方法实现

func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		actual, loaded, ok := e.tryLoadOrStore(value)
		if ok {
			return actual, loaded
		}
	}

	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			m.dirty[key] = e
		}
		actual, loaded, _ = e.tryLoadOrStore(value)
	} else if e, ok := m.dirty[key]; ok {
		actual, loaded, _ = e.tryLoadOrStore(value)
		m.missLocked()
	} else {
		if !read.amended {
			// 给dirty添加一个新key,
			// 标记只读为不完整
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
		actual, loaded = value, false
	}
	m.mu.Unlock()

	return actual, loaded
}

func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == expunged {
		return nil, false, false
	}
	if p != nil {
		return *(*interface{})(p), true, true
	}
	ic := i
	for {
		if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
			return i, false, true
		}
		p = atomic.LoadPointer(&e.p)
		if p == expunged {
			return nil, false, false
		}
		if p != nil {
			return *(*interface{})(p), true, true
		}
	}
}

LoadOrStore方法若是提供的key存在,则返回已存在的值(Load),不然保存提供的键值(Store)。一样是从m.read开始,而后是m.dirty,最后还有双检查机制。

Go 1.9源代码中提供了性能的测试: map_bench_test.gomap_reference_test.go,和之前的解决方案比较,性能会有很多的提高。

最后sync.Map没有Len方法,而且目前没有迹象要加上 (issue#20680),因此若是想获得当前Map中有效的entries的数量,须要使用Range方法遍历一次。

相关文章
相关标签/搜索