年度最佳【golang】sync.Map详解

工做中,常常会碰到并发读写 map 而形成 panic 的状况,为何在并发读写的时候,会 panic 呢?由于在并发读写的状况下,map 里的数据会被写乱,以后就是 Garbage in, garbage out,还不如直接 panic 了。git

是什么

Go 语言原生 map 并非线程安全的,对它进行并发读写操做的时候,须要加锁。而 sync.map 则是一种并发安全的 map,在 Go 1.9 引入。github

sync.map 是线程安全的,读取,插入,删除也都保持着常数级的时间复杂度。
sync.map 的零值是有效的,而且零值是一个空的 map。在第一次使用以后,不容许被拷贝。

有什么用

通常状况下解决并发读写 map 的思路是加一把大锁,或者把一个 map 分红若干个小 map,对 key 进行哈希,只操做相应的小 map。前者锁的粒度比较大,影响效率;后者实现起来比较复杂,容易出错。golang

而使用 sync.map 以后,对 map 的读写,不须要加锁。而且它经过空间换时间的方式,使用 read 和 dirty 两个 map 来进行读写分离,下降锁时间来提升效率。面试

如何使用

使用很是简单,和普通 map 相比,仅遍历的方式略有区别:编程

package main

import (
    "fmt"
    "sync"
)

func main()  {
    var m sync.Map
    // 1. 写入
    m.Store("qcrao", 18)
    m.Store("stefno", 20)

    // 2. 读取
    age, _ := m.Load("qcrao")
    fmt.Println(age.(int))

    // 3. 遍历
    m.Range(func(key, value interface{}) bool {
        name := key.(string)
        age := value.(int)
        fmt.Println(name, age)
        return true
    })

    // 4. 删除
    m.Delete("qcrao")
    age, ok := m.Load("qcrao")
    fmt.Println(age, ok)

    // 5. 读取或写入
    m.LoadOrStore("stefno", 100)
    age, _ = m.Load("stefno")
    fmt.Println(age)
}

第 1 步,写入两个 k-v 对;segmentfault

第 2 步,使用 Load 方法读取其中的一个 key;缓存

第 3 步,遍历全部的 k-v 对,并打印出来;安全

第 4 步,删除其中的一个 key,再读这个 key,获得的就是 nil;数据结构

第 5 步,使用 LoadOrStore,尝试读取或写入 "Stefno",由于这个 key 已经存在,所以写入不成功,而且读出原值。并发

程序输出:

18
stefno 20
qcrao 18
<nil> false
20

sync.map 适用于读多写少的场景。对于写多的场景,会致使 read map 缓存失效,须要加锁,致使冲突变多;并且因为未命中 read map 次数过多,致使 dirty map 提高为 read map,这是一个 O(N) 的操做,会进一步下降性能。

源码分析

数据结构

先来看下 map 的数据结构。去掉大段的注释后:

type Map struct {
    mu Mutex
    read atomic.Value // readOnly
    dirty map[interface{}]*entry
    misses int
}

互斥量 mu 保护 read 和 dirty。

read 是 atomic.Value 类型,能够并发地读。但若是须要更新 read,则须要加锁保护。对于 read 中存储的 entry 字段,可能会被并发地 CAS 更新。可是若是要更新一个以前已被删除的 entry,则须要先将其状态从 expunged 改成 nil,再拷贝到 dirty 中,而后再更新。

dirty 是一个非线程安全的原始 map。包含新写入的 key,而且包含 read 中的全部未被删除的 key。这样,能够快速地将 dirty 提高为 read 对外提供服务。若是 dirty 为 nil,那么下一次写入时,会新建一个新的 dirty,这个初始的 dirtyread 的一个拷贝,但除掉了其中已被删除的 key。

每当从 read 中读取失败,都会将 misses 的计数值加 1,当加到必定阈值之后,须要将 dirty 提高为 read,以期减小 miss 的情形。

read mapdirty map 的存储方式是不一致的。
前者使用 atomic.Value,后者只是单纯的使用 map。
缘由是 read map 使用 lock free 操做,必须保证 load/store 的原子性;而 dirty map 的 load+store 操做是由 lock(就是 mu)来保护的。

真正存储 key/value 的是 read 和 dirty 字段。read 使用 atomic.Value,这是 lock-free 的基础,保证 load/store 的原子性。dirty 则直接用了一个原始的 map,对于它的 load/store 操做须要加锁。

read 字段里其实是存储的是:

// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
    m       map[interface{}]*entry
    amended bool // true if the dirty map contains some key not in m.
}

注意到 read 和 dirty 里存储的东西都包含 entry,来看一下:

type entry struct {
    p unsafe.Pointer // *interface{}
}

很简单,它是一个指针,指向 value。看来,read 和 dirty 各自维护一套 key,key 指向的都是同一个 value。也就是说,只要修改了这个 entry,对 read 和 dirty 都是可见的。这个指针的状态有三种:

p 的三种状态

p == nil 时,说明这个键值对已被删除,而且 m.dirty == nil,或 m.dirty[k] 指向该 entry。

p == expunged 时,说明这条键值对已被删除,而且 m.dirty != nil,且 m.dirty 中没有这个 key。

其余状况,p 指向一个正常的值,表示实际 interface{} 的地址,而且被记录在 m.read.m[key] 中。若是这时 m.dirty 不为 nil,那么它也被记录在 m.dirty[key] 中。二者实际上指向的是同一个值。

当删除 key 时,并不实际删除。一个 entry 能够经过原子地(CAS 操做)设置 p 为 nil 被删除。若是以后建立 m.dirty,nil 又会被原子地设置为 expunged,且不会拷贝到 dirty 中。

若是 p 不为 expunged,和 entry 相关联的这个 value 能够被原子地更新;若是 p == expunged,那么仅当它初次被设置到 m.dirty 以后,才能够被更新。

总体用一张图来表示:

sync.map 总体结构

Store

先来看 expunged:

var expunged = unsafe.Pointer(new(interface{}))

它是一个指向任意类型的指针,用来标记从 dirty map 中删除的 entry。

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
    // 若是 read map 中存在该 key  则尝试直接更改(因为修改的是 entry 内部的 pointer,所以 dirty map 也可见)
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() {
            // 若是 read map 中存在该 key,但 p == expunged,则说明 m.dirty != nil 而且 m.dirty 中不存在该 key 值 此时:
            //    a. 将 p 的状态由 expunged  更改成 nil
            //    b. dirty map 插入 key
            m.dirty[key] = e
        }
        // 更新 entry.p = value (read map 和 dirty map 指向同一个 entry)
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        // 若是 read map 中不存在该 key,但 dirty map 中存在该 key,直接写入更新 entry(read map 中仍然没有这个 key)
        e.storeLocked(&value)
    } else {
        // 若是 read map 和 dirty map 中都不存在该 key,则:
        //      a. 若是 dirty map 为空,则须要建立 dirty map,并从 read map 中拷贝未删除的元素到新建立的 dirty map
        //    b. 更新 amended 字段,标识 dirty map 中存在 read map 中没有的 key
        //    c. 将 kv 写入 dirty map 中,read 不变
        if !read.amended {
            // 到这里就意味着,当前的 key 是第一次被加到 dirty map 中。
            // store 以前先判断一下 dirty map 是否为空,若是为空,就把 read map 浅拷贝一次。
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        // 写入新 key,在 dirty 中存储 value
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

总体流程:

  1. 若是在 read 里可以找到待存储的 key,而且对应的 entry 的 p 值不为 expunged,也就是没被删除时,直接更新对应的 entry 便可。
  2. 第一步没有成功:要么 read 中没有这个 key,要么 key 被标记为删除。则先加锁,再进行后续的操做。
  3. 再次在 read 中查找是否存在这个 key,也就是 double check 一下,这也是 lock-free 编程里的常见套路。若是 read 中存在该 key,但 p == expunged,说明 m.dirty != nil 而且 m.dirty 中不存在该 key 值 此时: a. 将 p 的状态由 expunged 更改成 nil;b. dirty map 插入 key。而后,直接更新对应的 value。
  4. 若是 read 中没有此 key,那就查看 dirty 中是否有此 key,若是有,则直接更新对应的 value,这时 read 中仍是没有此 key。
  5. 最后一步,若是 read 和 dirty 中都不存在该 key,则:a. 若是 dirty 为空,则须要建立 dirty,并从 read 中拷贝未被删除的元素;b. 更新 amended 字段,标识 dirty map 中存在 read map 中没有的 key;c. 将 k-v 写入 dirty map 中,read.m 不变。最后,更新此 key 对应的 value。

再来看一些子函数:

// 若是 entry 没被删,tryStore 存储值到 entry 中。若是 p == expunged,即 entry 被删,那么返回 false。
func (e *entry) tryStore(i *interface{}) bool {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
    }
}

tryStore 在 Store 函数最开始的时候就会调用,是比较常见的 for 循环加 CAS 操做,尝试更新 entry,让 p 指向新的值。

unexpungeLocked 函数确保了 entry 没有被标记成已被清除:

// unexpungeLocked 函数确保了 entry 没有被标记成已被清除。
// 若是 entry 先前被清除过了,那么在 mutex 解锁以前,它必定要被加入到 dirty map 中
func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

Load

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    // 若是没在 read 中找到,而且 amended 为 true,即 dirty 中存在 read 中没有的 key
    if !ok && read.amended {
        m.mu.Lock() // dirty map 不是线程安全的,因此须要加上互斥锁
        // double check。避免在上锁的过程当中 dirty map 提高为 read map。
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        // 仍然没有在 read 中找到这个 key,而且 amended 为 true
        if !ok && read.amended {
            e, ok = m.dirty[key] // 从 dirty 中找
            // 无论 dirty 中有没有找到,都要"记一笔",由于在 dirty 提高为 read 以前,都会进入这条路径
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok { // 若是没找到,返回空,false
        return nil, false
    }
    return e.load()
}

处理路径分为 fast path 和 slow path,总体流程以下:

  1. 首先是 fast path,直接在 read 中找,若是找到了直接调用 entry 的 load 方法,取出其中的值。
  2. 若是 read 中没有这个 key,且 amended 为 fase,说明 dirty 为空,那直接返回 空和 false。
  3. 若是 read 中没有这个 key,且 amended 为 true,说明 dirty 中可能存在咱们要找的 key。固然要先上锁,再尝试去 dirty 中查找。在这以前,仍然有一个 double check 的操做。若仍是没有在 read 中找到,那么就从 dirty 中找。无论 dirty 中有没有找到,都要"记一笔",由于在 dirty 被提高为 read 以前,都会进入这条路径

这里主要看下 missLocked 的函数的实现:

func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    // dirty map 晋升
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}

直接将 misses 的值加 1,表示一次未命中,若是 misses 值小于 m.dirty 的长度,就直接返回。不然,将 m.dirty 晋升为 read,并清空 dirty,清空 misses 计数值。这样,以前一段时间新加入的 key 都会进入到 read 中,从而可以提高 read 的命中率。

再来看下 entry 的 load 方法:

func (e *entry) load() (value interface{}, ok bool) {
    p := atomic.LoadPointer(&e.p)
    if p == nil || p == expunged {
        return nil, false
    }
    return *(*interface{})(p), true
}

对于 nil 和 expunged 状态的 entry,直接返回 ok=false;不然,将 p 转成 interface{} 返回。

Delete

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    // 若是 read 中没有这个 key,且 dirty map 不为空
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            delete(m.dirty, key) // 直接从 dirty 中删除这个 key
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete() // 若是在 read 中找到了这个 key,将 p 置为 nil
    }
}

能够看到,基本套路仍是和 Load,Store 相似,都是先从 read 里查是否有这个 key,若是有则执行 entry.delete 方法,将 p 置为 nil,这样 read 和 dirty 都能看到这个变化。

若是没在 read 中找到这个 key,而且 dirty 不为空,那么就要操做 dirty 了,操做以前,仍是要先上锁。而后进行 double check,若是仍然没有在 read 里找到此 key,则从 dirty 中删掉这个 key。但不是真正地从 dirty 中删除,而是更新 entry 的状态。

来看下 entry.delete 方法:

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}

它真正作的事情是将正常状态(指向一个 interface{})的 p 设置成 nil。没有设置成 expunged 的缘由是,当 p 为 expunged 时,表示它已经不在 dirty 中了。这是 p 的状态机决定的,在 tryExpungeLocked 函数中,会将 nil 原子地设置成 expunged。

tryExpungeLocked 是在新建立 dirty 时调用的,会将已被删除的 entry.p 从 nil 改为 expunged,这个 entry 就不会写入 dirty 了。

func (e *entry) tryExpungeLocked() (isExpunged bool) {
    p := atomic.LoadPointer(&e.p)
    for p == nil {
        // 若是原来是 nil,说明原 key 已被删除,则将其转为 expunged。
        if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
    }
    return p == expunged
}

注意到若是 key 同时存在于 read 和 dirty 中时,删除只是作了一个标记,将 p 置为 nil;而若是仅在 dirty 中含有这个 key 时,会直接删除这个 key。缘由在于,若二者都存在这个 key,仅作标记删除,能够在下次查找这个 key 时,命中 read,提高效率。若只有在 dirty 中存在时,read 起不到“缓存”的做用,直接删除。

LoadOrStore

这个函数结合了 Load 和 Store 的功能,若是 map 中存在这个 key,那么返回这个 key 对应的 value;不然,将 key-value 存入 map。这在须要先执行 Load 查看某个 key 是否存在,以后再更新此 key 对应的 value 时颇有效,由于 LoadOrStore 能够并发执行。

具体的过程再也不一一分析了,可参考 Load 和 Store 的源码分析。

Range

Range 的参数是一个函数:

f func(key, value interface{}) bool

由使用者提供实现,Range 将遍历调用时刻 map 中的全部 k-v 对,将它们传给 f 函数,若是 f 返回 false,将中止遍历。

func (m *Map) Range(f func(key, value interface{}) bool) {
    read, _ := m.read.Load().(readOnly)
    if read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        if read.amended {
            read = readOnly{m: m.dirty}
            m.read.Store(read)
            m.dirty = nil
            m.misses = 0
        }
        m.mu.Unlock()
    }

    for k, e := range read.m {
        v, ok := e.load()
        if !ok {
            continue
        }
        if !f(k, v) {
            break
        }
    }
}

当 amended 为 true 时,说明 dirty 中含有 read 中没有的 key,由于 Range 会遍历全部的 key,是一个 O(n) 操做。将 dirty 提高为 read,会将开销分摊开来,因此这里直接就提高了。

以后,遍历 read,取出 entry 中的值,调用 f(k, v)。

其余

关于为什么 sync.map 没有 Len 方法,参考资料里给出了 issuebcmills 认为对于并发的数据结构和非并发的数据结构并不必定要有相同的方法。例如,map 有 Len 方法,sync.map 却不必定要有。就像 sync.map 有 LoadOrStore 方法,map 就没有同样。

有些实现增长了一个计数器,并原子地增长或减小它,以此来表示 sync.map 中元素的个数。但 bcmills 提出这会引入竞争:atomic 并非 contention-free 的,它只是把竞争下沉到了 CPU 层级。这会给其余不须要 Len 方法的场景带来负担。

总结

  1. sync.map 是线程安全的,读取,插入,删除也都保持着常数级的时间复杂度。
  2. 经过读写分离,下降锁时间来提升效率,适用于读多写少的场景。
  3. Range 操做须要提供一个函数,参数是 k,v,返回值是一个布尔值:f func(key, value interface{}) bool
  4. 调用 Load 或 LoadOrStore 函数时,若是在 read 中没有找到 key,则会将 misses 值原子地增长 1,当 misses 增长到和 dirty 的长度相等时,会将 dirty 提高为 read。以期减小“读 miss”。
  5. 新写入的 key 会保存到 dirty 中,若是这时 dirty 为 nil,就会先新建立一个 dirty,并将 read 中未被删除的元素拷贝到 dirty。
  6. 当 dirty 为 nil 的时候,read 就表明 map 全部的数据;当 dirty 不为 nil 的时候,dirty 才表明 map 全部的数据。

参考资料

【德志大佬-设计并发安全的 map】https://halfrost.com/go_map_chapter_one/

【德志大佬-设计并发安全的 map】https://halfrost.com/go_map_chapter_two/

【关于 sync.map 为何没有 len 方法的 issue】https://github.com/golang/go/issues/20680

【芮神增长了 len 方法】http://xiaorui.cc/archives/4972

【图解 map 操做】https://wudaijun.com/2018/02/go-sync-map-implement/

【从一道面试题开始】http://www.javashuo.com/article/p-vvbdwsqz-v.html

【源码分析】https://zhuanlan.zhihu.com/p/44585993

【行文通畅,流程图清晰】http://www.javashuo.com/article/p-kdgzgony-gp.html

相关文章
相关标签/搜索