如何设计并实现一个线程安全的 Map ?(下篇)

在上篇中,咱们已经讨论过如何去实现一个 Map 了,而且也讨论了诸多优化点。在下篇中,咱们将继续讨论如何实现一个线程安全的 Map。说到线程安全,须要从概念开始提及。php

线程安全就是若是你的代码块所在的进程中有多个线程在同时运行,而这些线程可能会同时运行这段代码。若是每次运行结果和单线程运行的结果是同样的,并且其余的变量的值也和预期的是同样的,就是线程安全的。html

若是代码块中包含了对共享数据的更新操做,那么这个代码块就多是非线程安全的。可是若是代码块中相似操做都处于临界区之中,那么这个代码块就是线程安全的。node

一般有如下两类避免竞争条件的方法来实现线程安全:c++

第一类 —— 避免共享状态

  1. 可重入 Re-entrancy)

一般在线程安全的问题中,最多见的代码块就是函数。让函数具备线程安全的最有效的方式就是使其可重入。若是某个进程中全部线程均可以并发的对函数进行调用,而且不管他们调用该函数的实际执行状况怎么样,该函数均可以产生预期的结果,那么就能够说这个函数是可重入的。git

若是一个函数把共享数据做为它的返回结果或者包含在它返回的结果中,那么该函数就确定不是一个可重入的函数。任何内含了操做共享数据的代码的函数都是不可重入的函数。github

为了实现线程安全的函数,把全部代码都置放于临界区中是可行的。可是互斥量的使用总会耗费必定的系统资源和时间,使用互斥量的过程总会存在各类博弈和权衡。因此请合理使用互斥量保护好那些涉及共享数据操做的代码。算法

注意:可重入只是线程安全的充分没必要要条件,并非充要条件。这个反例在下面会讲到。编程

  1. 线程本地存储

若是变量已经被本地化,因此每一个线程都有本身的私有副本。这些变量经过子程序和其余代码边界保留它们的值,而且是线程安全的,由于这些变量都是每一个线程本地存储的,即便访问它们的代码可能被另外一个线程同时执行,依旧是线程安全的。数组

  1. 不可变量

对象一旦初始化之后就不能改变。这意味着只有只读数据被共享,这也实现了固有的线程安全性。可变(不是常量)操做能够经过为它们建立新对象,而不是修改现有对象的方式去实现。 Java,C#和
Python 中的字符串的实现就使用了这种方法。缓存

第二类 —— 线程同步

第一类方法都比较简单,经过代码改造就能够实现。可是若是遇到必定要进行线程中共享数据的状况,第一类方法就解决不了了。这时候就出现了第二类解决方案,利用线程同步的方法来解决线程安全问题。

今天就从线程同步开始提及。


一. 线程同步理论

在多线程的程序中,多以共享数据做为线程之间传递数据的手段。因为一个进程所拥有的至关一部分虚拟内存地址均可以被该进程中全部线程共享,因此这些共享数据大可能是之内存空间做为载体的。若是两个线程同时读取同一块共享内存但获取到的数据却不一样,那么程序很容易出现一些 bug。

为了保证共享数据一致性,最简单而且最完全的方法就是使该数据成为一个不变量。固然这种绝对的方式在大多数状况下都是不可行的。好比函数中会用到一个计数器,记录函数被调用了几回,这个计数器确定就不能被设为常量。那这种必须是变量的状况下,还要保证共享数据的一致性,这就引出了临界区的概念。

临界区的出现就是为了使该区域只能被串行的访问或者执行。临界区能够是某个资源,也能够是某段代码。保证临界区最有效的方式就是利用线程同步机制。

先介绍2种共享数据同步的方法。

1. 互斥量

在同一时刻,只容许一个线程处于临界区以内的约束称为互斥,每一个线程在进入临界区以前,都必须先锁定某个对象,只有成功锁定对象的线程才能容许进入临界区,不然就会阻塞。这个对象称为互斥对象或者互斥量。

通常咱们平常说的互斥锁就能达到这个目的。

互斥量能够有多个,它们所保护的临界区也能够有多个。先从简单的提及,一个互斥量和一个临界区。

(一) 一个互斥量和一个临界区

上图就是一个互斥量和一个临界区的例子。当线程1先进入临界区的时候,当前临界区处于未上锁的状态,因而它便先将临界区上锁。线程1获取到临界区里面的值。

这个时候线程2准备进入临界区,因为线程1把临界区上锁了,因此线程2进入临界区失败,线程2由就绪状态转成睡眠状态。线程1继续对临界区的共享数据进行写入操做。

当线程1完成全部的操做之后,线程1调用解锁操做。当临界区被解锁之后,会尝试唤醒正在睡眠的线程2。线程2被唤醒之后,由睡眠状态再次转换成就绪状态。线程2准备进入临界区,当临界区此到处于未上锁的状态,线程2便将临界区上锁。

通过 read、write 一系列操做之后,最终在离开临界区的时候会解锁。

线程在离开临界区的时候,必定要记得把对应的互斥量解锁。这样其余因临界区被上锁而致使睡眠的线程还有机会被唤醒。因此对同一个互斥变量的锁定和解锁必须成对的出现。既不能够对一个互斥变量进行重复的锁定,也不能对一个互斥变量进行屡次的解锁。

若是对一个互斥变量锁定屡次可能会致使临界区最终永远阻塞。可能有人会问了,对一个未锁定的互斥变成解锁屡次会出现什么问题呢?

在 Go 1.8 以前,虽然对互斥变量解锁屡次不会引发任何 goroutine 的阻塞,可是它可能引发一个运行时的恐慌。Go 1.8 以前的版本,是能够尝试恢复这个恐慌的,可是恢复之后,可能会致使一系列的问题,好比重复解锁操做的 goroutine 会永久的阻塞。因此 Go 1.8 版本之后此类运行时的恐慌就变成了不可恢复的了。因此对互斥变量反复解锁就会致使运行时操做,最终程序异常退出。

(二) 多个互斥量和一个临界区

在这种状况下,极容易产生线程死锁的状况。因此尽可能不要让不一样的互斥量所保护的临界区重叠。

上图这个例子中,一个临界区中存在2个互斥量:互斥量 A 和互斥量
B。

线程1先锁定了互斥量 A ,接着线程2锁定了互斥量 B。当线程1在成功锁定互斥量 B 以前永远不会释放互斥量 A。一样,线程2在成功锁定互斥量 A 以前永远不会释放互斥量 B。那么这个时候线程1和线程2都因没法锁定本身须要锁定的互斥量,都由 ready 就绪状态转换为 sleep 睡眠状态。这是就产生了线程死锁了。

线程死锁的产生缘由有如下几种:

    1. 系统资源竞争
    1. 进程推荐顺序非法
    1. 死锁必要条件(必要条件中任意一个不知足,死锁都不会发生)
      (1). 互斥条件
      (2). 不剥夺条件
      (3). 请求和保持条件
      (4). 循环等待条件

想避免线程死锁的状况发生有如下几种方法能够解决:

    1. 预防死锁
      (1). 资源有序分配法(破坏环路等待条件)
      (2). 资源原子分配法(破坏请求和保持条件)
    1. 避免死锁
      银行家算法
    1. 检测死锁
      死锁定理(资源分配图化简法),这种方法虽然能够检测,可是没法预防,检测出来了死锁还须要配合解除死锁的方法才行。

完全解决死锁有如下几种方法:

    1. 剥夺资源
    1. 撤销进程
    1. 试锁定 — 回退
      若是在执行一个代码块的时候,须要前后(顺序不定)锁定两个变量,那么在成功锁定其中一个互斥量以后应该使用试锁定的方法来锁定另一个变量。若是试锁定第二个互斥量失败,就把已经锁定的第一个互斥量解锁,并从新对这两个互斥量进行锁定和试锁定。

如上图,线程2在锁定互斥量 B 的时候,再试锁定互斥量 A,此时锁定失败,因而就把互斥量 B 也一块儿解锁。接着线程1会来锁定互斥量 A。此时也不会出现死锁的状况。

    1. 固定顺序锁定

这种方式就是让线程1和线程2都按照相同的顺序锁定互斥量,都按成功锁定互斥量1之后才能去锁定互斥量2 。这样就能保证在一个线程彻底离开这些重叠的临界区以前,不会有其余一样须要锁定那些互斥量的线程进入到那里。

(三) 多个互斥量和多个临界区

多个临界区和多个互斥量的状况就要看是否会有冲突的区域,若是出现相互交集的冲突区域,后进临界区的线程就会进入睡眠状态,直到该临界区的线程完成任务之后,再被唤醒。

通常状况下,应该尽可能少的使用互斥量。每一个互斥量保护的临界区应该在合理范围内并尽可能大。可是若是发现多个线程会频繁出入某个较大的临界区,而且它们之间常常存在访问冲突,那么就应该把这个较大的临界区划分的更小一点,并使用不一样的互斥量保护起来。这样作的目的就是为了让等待进入同一个临界区的线程数变少,从而下降线程被阻塞的几率,并减小它们被迫进入睡眠状态的时间,这从必定程度上提升了程序的总体性能。

在说另一个线程同步的方法以前,回答一下文章开头留下的一个疑问:可重入只是线程安全的充分没必要要条件,并非充要条件。这个反例在下面会讲到。

这个问题最关键的一点在于:mutex 是不可重入的

举个例子:

在下面这段代码中,函数 increment_counter 是线程安全的,但不是可重入的。

#include <pthread.h>

int increment_counter () {
    static int counter = 0;
    static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

    pthread_mutex_lock(&mutex);

    // only allow one thread to increment at a time
    ++counter;
    // store value before any other threads increment it further
    int result = counter;    

    pthread_mutex_unlock(&mutex);

    return result;
}复制代码

上面的代码中,函数 increment_counter 能够在多个线程中被调用,由于有一个互斥锁 mutex 来同步对共享变量 counter 的访问。可是若是这个函数用在可重入的中断处理程序中,若是在
pthread_mutex_lock(&mutex) 和 pthread_mutex_unlock(&mutex)
之间产生另外一个调用函数 increment_counter 的中断,则会第二次执行此函数,此时因为 mutex 已被 lock,函数会在 pthread_mutex_lock(&mutex) 处阻塞,而且因为 mutex 没有机会被
unlock,阻塞会永远持续下去。简言之,问题在于 pthread 的 mutex 是不可重入的。

解决办法是设定 PTHREAD_MUTEX_RECURSIVE 属性。然而对于给出的问题而言,专门使用一个 mutex 来保护一次简单的增量操做显然过于昂贵,所以 c++11 中的 原子变量&action=edit&redlink=1) 提供了一个可以使此函数既线程安全又可重入(并且还更简洁)的替代方案:

#include <atomic>

int increment_counter () {
    static std::atomic<int> counter(0);

    // increment is guaranteed to be done atomically
    int result = ++counter;

    return result;
}复制代码

在 Go 中,互斥量在标准库代码包 sync 中的 Mutex 结构体表示的。sync.Mutex 类型只有两个公开的指针方法,Lock 和 Unlock。前者用于锁定当前的互斥量,后者则用于对当前的互斥量进行解锁。

2. 条件变量

在线程同步的方法中,还有一个能够与互斥量相提并论的同步方法,条件变量。

条件变量与互斥量不一样,条件变量的做用并非保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其余所以而被阻塞的线程。条件变量老是与互斥变量组合使用的。

这类问题其实很常见。先用生产者消费者的例子来举例。

若是不用条件变量,只用互斥量,来看看会发生什么后果。

生产者线程在完成添加操做以前,其余的生产者线程和消费者线程都没法进行操做。同一个商品也只能被一个消费者消费。

若是只用互斥量,可能会出现2个问题。

    1. 生产者线程得到了互斥量之后,却发现商品已满,没法再添加新的商品了。因而该线程就会一直等待。新的生产者也进入不了临界区,消费者也没法进入。这时候就死锁了。
    1. 消费者线程得到了互斥量之后,却发现商品是空的,没法消费了。这个时候该线程也是会一直等待。新的生产者和消费者也都没法进入。这时候一样也死锁了。

这就是只用互斥量没法解决的问题。在多个线程之间,急需一套同步的机制,能让这些线程都协做起来。

条件变量就是你们熟悉的 P - V 操做了。这块你们应该比较熟悉,因此简单的过一下。

P 操做就是 wait 操做,它的意思就是阻塞当前线程,直到收到该条件变量发来的通知。

V 操做就是 signal 操做,它的意思就是让该条件变量向至少一个正在等待它通知的线程发送通知,以表示某个共享数据的状态已经变化。

Broadcast 广播通知,它的意思就是让条件变量给正在等待它通知的全部线程发送通知,以表示某个共享数据的状态已经发生改变。

signal 能够操做屡次,若是操做3次,就表明发了3次信号通知。如上图。

P - V 操做设计美妙之处在于,P 操做的次数与 V 操做的次数是相同的。wait 多少次,signal 对应的有多少次。看上图,这个循环就是这么的奇妙。

生产者消费者问题

这个问题能够形象的描述成像上图这样,门卫守护着临界区的安全。售票厅记录着当前 semaphone 的值,它也控制着门卫是否打开临界区。

临界区只容许一个线程进入,当已经有一个线程了,再来一个线程,就会被 lock 住。售票厅也会记录当前阻塞的线程数。

当以前的线程离开之后,售票厅就会告诉门卫,容许一个线程进入临界区。

用 P-V 伪代码来描述生产者消费者:

初始变量:

semaphore  mutex = 1; // 临界区互斥信号量
semaphore  empty = n; // 空闲缓冲区个数
semaphore  full = 0; // 缓冲区初始化为空复制代码

生产者线程:

producer()
{
  while(1) {
    produce an item in nextp;
    P(empty);
    P(mutex);
    add nextp to buffer;
    V(mutex);
    V(full);
  }
}复制代码

消费者线程:

consumer()
{
  while(1) {
    P(full);
    P(mutex);
    remove an item from buffer;
    V(mutex);
    V(empty);
    consume the item;
  }
}复制代码

虽然在生产者和消费者单个程序里面 P,V 并非成对的,可是整个程序里面 P,V 仍是成对的。

读者写者问题——读者优先,写者延迟

读者优先,写进程被延迟。只要有读者在读,后来的读者均可以随意进来读。

读者要先进入 rmutex ,查看 readcount,而后修改 readcout 的值,最后再去读数据。对于每一个读进程都是写者,都要进去修改 readcount 的值,因此还要单独设置一个 rmutex 互斥访问。

初始变量:

int readcount = 0;     // 读者数量
semaphore  rmutex = 1; // 保证更新 readcount 互斥
semaphore  wmutex = 1; // 保证读者和写着互斥的访问文件复制代码

读者线程:

reader()
{
  while(1) {
    P(rmutex);              // 准备进入,修改 readcount,“开门”
    if(readcount == 0) {    // 说明是第一个读者
      P(wmutex);            // 拿到”钥匙”,阻止写线程来写
    }
    readcount ++;
    V(rmutex);
    reading;
    P(rmutex);              // 准备离开
    readcount --;
    if(readcount == 0) {    // 说明是最后一个读者
      V(wmutex);            // 交出”钥匙”,让写线程来写
    }
    V(rmutex);              // 离开,“关门”
  }
}复制代码

写者线程:

writer()
{
  while(1) {
    P(wmutex);
    writing;
    V(wmutex);
  }
}复制代码

读者写者问题——写者优先,读者延迟

有写者写,禁止后面的读者来读。在写者前的读者,读完就走。只要有写者在等待,禁止后来的读者进去读。

初始变量:

int readcount = 0;     // 读者数量
semaphore  rmutex = 1; // 保证更新 readcount 互斥
semaphore  wmutex = 1; // 保证读者和写着互斥的访问文件
semaphore  w = 1;      // 用于实现“写者优先”复制代码

读者线程:

reader()
{
  while(1) {
    P(w);                   // 在没有写者的时候才能请求进入
    P(rmutex);              // 准备进入,修改 readcount,“开门”
    if(readcount == 0) {    // 说明是第一个读者
      P(wmutex);            // 拿到”钥匙”,阻止写线程来写
    }
    readcount ++;
    V(rmutex);
    V(w);
    reading;
    P(rmutex);              // 准备离开
    readcount --;
    if(readcount == 0) {    // 说明是最后一个读者
      V(wmutex);            // 交出”钥匙”,让写线程来写
    }
    V(rmutex);              // 离开,“关门”
  }
}复制代码

写者线程:

writer()
{
  while(1) {
    P(w);
    P(wmutex);
    writing;
    V(wmutex);
    V(w);
  }
}复制代码

哲学家进餐问题

假设有五位哲学家围坐在一张圆形餐桌旁,作如下两件事情之一:吃饭,或者思考。吃东西的时候,他们就中止思考,思考的时候也中止吃东西。餐桌中间有一大碗意大利面,每两个哲学家之间有一只餐叉。由于用一只餐叉很难吃到意大利面,因此假设哲学家必须用两只餐叉吃东西。他们只能使用本身左右手边的那两只餐叉。哲学家就餐问题有时也用米饭和筷子而不是意大利面和餐叉来描述,由于很明显,吃米饭必须用两根筷子。

初始变量:

semaphore  chopstick[5] = {1,1,1,1,1}; // 初始化信号量
semaphore  mutex = 1;                  // 设置取筷子的信号量复制代码

哲学家线程:

Pi()
{
  do {
    P(mutex);                     // 得到取筷子的互斥量
    P(chopstick[i]);              // 取左边的筷子
    P(chopstick[ (i + 1) % 5 ]);  // 取右边的筷子
    V(mutex);                     // 释放取筷子的信号量
    eat;
    V(chopstick[i]);              // 放回左边的筷子
    V(chopstick[ (i + 1) % 5 ]);  // 放回右边的筷子
    think;
  }while(1);
}复制代码

综上所述,互斥量能够实现对临界区的保护,并会阻止竞态条件的发生。条件变量做为补充手段,可让多方协做更加有效率。

在 Go 的标准库中,sync 包里面 sync.Cond 类型表明了条件变量。可是和互斥锁和读写锁不一样的是,简单的声明没法建立出一个可用的条件变量,还须要用到 sync.NewCond 函数。

func NewCond( l locker) *Cond复制代码

*sync.Cond 类型的方法集合中有3个方法,即 Wait、Signal 和 Broadcast 。

二. 简单的线程锁方案

实现线程安全的方案最简单的方法就是加锁了。

先看看 OC 中如何实现一个线程安全的字典吧。

在 Weex 的源码中,就实现了一套线程安全的字典。类名叫 WXThreadSafeMutableDictionary。

/** * @abstract Thread safe NSMutableDictionary */
@interface WXThreadSafeMutableDictionary<KeyType, ObjectType> : NSMutableDictionary
@property (nonatomic, strong) dispatch_queue_t queue;
@property (nonatomic, strong) NSMutableDictionary* dict;
@end复制代码

具体实现以下:

- (instancetype)initCommon
{
    self = [super init];
    if (self) {
        NSString* uuid = [NSString stringWithFormat:@"com.taobao.weex.dictionary_%p", self];
        _queue = dispatch_queue_create([uuid UTF8String], DISPATCH_QUEUE_CONCURRENT);
    }
    return self;
}复制代码

该线程安全的字典初始化的时候会新建一个并发的 queue。

- (NSUInteger)count
{
    __block NSUInteger count;
    dispatch_sync(_queue, ^{
        count = _dict.count;
    });
    return count;
}

- (id)objectForKey:(id)aKey
{
    __block id obj;
    dispatch_sync(_queue, ^{
        obj = _dict[aKey];
    });
    return obj;
}

- (NSEnumerator *)keyEnumerator
{
    __block NSEnumerator *enu;
    dispatch_sync(_queue, ^{
        enu = [_dict keyEnumerator];
    });
    return enu;
}

- (id)copy{
    __block id copyInstance;
    dispatch_sync(_queue, ^{
        copyInstance = [_dict copy];
    });
    return copyInstance;
}复制代码

读取的这些方法都用 dispatch_sync 。

- (void)setObject:(id)anObject forKey:(id<NSCopying>)aKey
{
    aKey = [aKey copyWithZone:NULL];
    dispatch_barrier_async(_queue, ^{
        _dict[aKey] = anObject;
    });
}

- (void)removeObjectForKey:(id)aKey
{
    dispatch_barrier_async(_queue, ^{
        [_dict removeObjectForKey:aKey];
    });
}

- (void)removeAllObjects{
    dispatch_barrier_async(_queue, ^{
        [_dict removeAllObjects];
    });
}复制代码

和写入相关的方法都用 dispatch_barrier_async。

再看看 Go 用互斥量如何实现一个简单的线程安全的 Map 吧。

既然要用到互斥量,那么咱们封装一个包含互斥量的 Map 。

type MyMap struct {
    sync.Mutex
    m map[int]int
}

var myMap *MyMap

func init() {
    myMap = &MyMap{
        m: make(map[int]int, 100),
    }
}复制代码

再简单的实现 Map 的基础方法。

func builtinMapStore(k, v int) {
    myMap.Lock()
    defer myMap.Unlock()
    myMap.m[k] = v
}

func builtinMapLookup(k int) int {
    myMap.Lock()
    defer myMap.Unlock()
    if v, ok := myMap.m[k]; !ok {
        return -1
    } else {
        return v
    }
}

func builtinMapDelete(k int) {
    myMap.Lock()
    defer myMap.Unlock()
    if _, ok := myMap.m[k]; !ok {
        return
    } else {
        delete(myMap.m, k)
    }
}复制代码

实现思想比较简单,在每一个操做前都加上 lock,在每一个函数结束 defer 的时候都加上 unlock。

这种加锁的方式实现的线程安全的字典,优势是比较简单,缺点是性能不高。文章最后会进行几种实现方法的性能对比,用数字说话,就知道这种基于互斥量加锁方式实现的性能有多差了。

在语言原生就自带线程安全 Map 的语言中,它们的原生底层实现都不是经过单纯的加锁来实现线程安全的,好比 Java 的 ConcurrentHashMap,Go 1.9 新加的 sync.map。

三. 现代线程安全的 Lock - Free 方案 CAS

在 Java 的 ConcurrentHashMap 底层实现中大量的利用了 volatile,final,CAS 等 Lock-Free 技术来减小锁竞争对于性能的影响。

在 Go 中也大量的使用了原子操做,CAS 是其中之一。比较并交换即 “Compare And Swap”,简称 CAS。

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)复制代码

CAS 会先判断参数 addr 指向的被操做值与参数 old 的值是否相等。若是至关,相应的函数才会用参数 new 表明的新值替换旧值。不然,替换操做就会被忽略。

这一点与互斥锁明显不一样,CAS 老是假设被操做的值不曾改变,并一旦确认这个假设成立,就当即进行值的替换。而互斥锁的作法就更加谨慎,老是先假设会有并发的操做修改被操做的值,并须要使用锁将相关操做放入临界区中加以保护。能够说互斥锁的作法趋于悲观,CAS 的作法趋于乐观,相似乐观锁。

CAS 作法最大的优点在于能够不建立互斥量和临界区的状况下,完成并发安全的值替换操做。这样大大的减小了线程同步操做对程序性能的影响。固然 CAS 也有一些缺点,缺点下一章会提到。

接下来看看源码是如何实现的。如下以64位为例,32位相似。

TEXT ·CompareAndSwapUintptr(SB),NOSPLIT,$0-25
    JMP    ·CompareAndSwapUint64(SB)

TEXT ·CompareAndSwapInt64(SB),NOSPLIT,$0-25
    JMP    ·CompareAndSwapUint64(SB)

TEXT ·CompareAndSwapUint64(SB),NOSPLIT,$0-25
    MOVQ    addr+0(FP), BP
    MOVQ    old+8(FP), AX
    MOVQ    new+16(FP), CX
    LOCK
    CMPXCHGQ    CX, 0(BP)
    SETEQ    swapped+24(FP)
    RET复制代码

上述实现最关键的一步就是 CMPXCHG。

查询 Intel 的文档

文档上说:

比较 eax 和目的操做数(第一个操做数)的值,若是相同,ZF 标志被设置,同时源操做数(第二个操做)的值被写到目的操做数,不然,清
ZF 标志,而且把目的操做数的值写回 eax。

因而也就得出了 CMPXCHG 的工做原理:

比较 _old 和 (*__ptr) 的值,若是相同,ZF 标志被设置,同时
_new 的值被写到 (*__ptr),不然,清 ZF 标志,而且把 (*__ptr) 的值写回 _old。

在 Intel 平台下,会用 LOCK CMPXCHG 来实现,这里的 LOCK 是 CPU 锁。

Intel 的手册对 LOCK 前缀的说明以下:

    1. 确保对内存的读-改-写操做原子执行。在 Pentium 及 Pentium 以前的处理器中,带有 LOCK 前缀的指令在执行期间会锁住总线,使得其余处理器暂时没法经过总线访问内存。很显然,这会带来昂贵的开销。从 Pentium 4,Intel Xeon 及 P6 处理器开始,Intel 在原有总线锁的基础上作了一个颇有意义的优化:若是要访问的内存区域(area of memory)在 LOCK 前缀指令执行期间已经在处理器内部的缓存中被锁定(即包含该内存区域的缓存行当前处于独占或以修改状态),而且该内存区域被彻底包含在单个缓存行(cache line)中,那么处理器将直接执行该指令。因为在指令执行期间该缓存行会一直被锁定,其它处理器没法读/写该指令要访问的内存区域,所以能保证指令执行的原子性。这个操做过程叫作缓存锁定(cache locking),缓存锁定将大大下降 LOCK 前缀指令的执行开销,可是当多处理器之间的竞争程度很高或者指令访问的内存地址未对齐时,仍然会锁住总线。
    1. 禁止该指令与以前和以后的读和写指令重排序。
    1. 把写缓冲区中的全部数据刷新到内存中。

看完描述,能够看出,CPU 锁主要分两种,总线锁和缓存锁。总线锁用在老的 CPU 中,缓存锁用在新的 CPU 中。

所谓总线锁就是使用 CPU 提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其余处理器的请求将被阻塞住,那么该 CPU 能够独占使用共享内存。总线锁的这种方式,在执行期间会锁住总线,使得其余处理器暂时没法经过总线访问内存。因此总线锁定的开销比较大,最新的处理器在某些场合下使用缓存锁定代替总线锁定来进行优化。

所谓“缓存锁定”就是若是缓存在处理器缓存行中内存区域在 LOCK 操做期间被锁定,当它执行锁操做回写内存时,处理器不在总线上产生
LOCK#信号,而是修改内部的内存地址,并容许它的缓存一致性机制来保证操做的原子性,由于缓存一致性机制会阻止同时修改被两个以上处理器缓存的内存区域数据,当其余处理器回写已被锁定的缓存行的数据时会对缓存行无效。

有两种状况处理器没法使用缓存锁。

  • 第一种状况是,当操做的数据不能被缓存在处理器内部,或操做的数据跨多个缓存行(cache line),则处理器会调用总线锁定。

  • 第二种状况是:有些处理器不支持缓存锁定。一些老的 CPU 就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。

虽然缓存锁能够大大下降 CPU 锁的执行开销,可是若是遇到多处理器之间的竞争程度很高或者指令访问的内存地址未对齐时,仍然会锁住总线。因此缓存锁和总线锁相互配合,效果更佳。

综上,用 CAS 方式来保证线程安全的方式就比用互斥锁的方式效率要高不少。

四. CAS 的缺陷

虽然 CAS 的效率高,可是依旧存在3大问题。

1. ABA 问题

线程1准备用 CAS 将变量的值由 A 替换为 B ,在此以前,线程2将变量的值由 A 替换为 C ,又由 C 替换为 A,而后线程1执行 CAS 时发现变量的值仍然为 A,因此 CAS 成功。但实际上这时的现场已经和最初不一样了。图上也为了分开两个 A 不一样,因此用不一样的颜色标记了。最终线程2把 A 替换成了 B 。这就是经典的 ABA 问题。可是这会致使项目出现什么问题呢?

设想存在这样一个链栈,栈里面存储了一个链表,栈顶是 A,A 的 next 指针指向 B。在线程1中,要将栈顶元素 A 用 CAS 把它替换成 B。接着线程2来了,线程2将以前包含 A,B 元素的链表都 pop 出去。而后 push 进来一个 A - C - D 链表,栈顶元素依旧是 A。这时线程1发现 A 没有发生变化,因而替换成 B。这个时候 B 的 next 其实为 nil。替换完成之后,线程2操做的链表 C - D 这里就与表头断开链接了。也就是说线程1 CAS 操做结束,C - D 就被丢失了,再也找不回来了。栈中只剩下 B 一个元素了。这很明显出现了 bug。

那怎么解决这种状况呢?最通用的作法就是加入版本号进行标识。

每次操做都加上版本号,这样就能够完美解决 ABA 的问题了。

2. 循环时间可能过长

自旋 CAS 若是长时间不成功,会给 CPU 带来很是大的执行开销。若是能支持 CPU 提供的 Pause 指令,那么 CAS 的效率能有必定的提高。Pause 指令有两个做用,第一它能够延迟流水线执行指令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它能够避免在退出循环的时候因内存顺序冲突(memory order violation)而引发 CPU 流水线被清空(CPU pipeline flush),从而提升 CPU 的执行效率。

3. 只能保证一个共享变量的原子操做

CAS 操做只能保证一个共享变量的原子操做,可是保证多个共享变量操做的原子性。通常作法可能就考虑利用锁了。

不过也能够利用一个结构体,把两个变量合并成一个变量。这样还能够继续利用 CAS 来保证原子性操做。

五. Lock - Free 方案举例

在 Lock - Free方案举例以前,先来回顾一下互斥量的方案。上面咱们用互斥量实现了 Go 的线程安全的 Map。至于这个 Map 的性能如何,接下来对比的时候能够看看数据。

1. NO Lock - Free 方案

若是不用 Lock - Free 方案也不用简单的互斥量的方案,如何实现一个线程安全的字典呢?答案是利用分段锁的设计,只有在同一个分段内才存在竞态关系,不一样的分段锁之间没有锁竞争。相比于对整个
Map 加锁的设计,分段锁大大的提升了高并发环境下的处理能力。

type ConcurrentMap []*ConcurrentMapShared


type ConcurrentMapShared struct {
    items        map[string]interface{}
    sync.RWMutex // 读写锁,保证进入内部 map 的线程安全
}复制代码

分段锁 Segment 存在一个并发度。并发度能够理解为程序运行时可以同时更新 ConccurentMap 且不产生锁竞争的最大线程数,实际上就是 ConcurrentMap 中的分段锁个数。即数组的长度。

var SHARD_COUNT = 32复制代码

若是并发度设置的太小,会带来严重的锁竞争问题;若是并发度设置的过大,本来位于同一个 Segment 内的访问会扩散到不一样的 Segment 中,CPU cache 命中率会降低,从而引发程序性能降低。

ConcurrentMap 的初始化就是对数组的初始化,而且初始化数组里面每一个字典。

func New() ConcurrentMap {
    m := make(ConcurrentMap, SHARD_COUNT)
    for i := 0; i < SHARD_COUNT; i++ {
        m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
    }
    return m
}复制代码

ConcurrentMap 主要使用 Segment 来实现减少锁粒度,把 Map 分割成若干个 Segment,在 put 的时候须要加读写锁,get 时候只加读锁。

既然分段了,那么针对每一个 key 对应哪个段的逻辑就由一个哈希函数来定。

func fnv32(key string) uint32 {
    hash := uint32(2166136261)
    const prime32 = uint32(16777619)
    for i := 0; i < len(key); i++ {
        hash *= prime32
        hash ^= uint32(key[i])
    }
    return hash
}复制代码

上面这段哈希函数会根据每次传入的 string ,计算出不一样的哈希值。

func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
    return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}复制代码

根据哈希值对数组长度取余,取出 ConcurrentMap 中的 ConcurrentMapShared。在 ConcurrentMapShared 中存储对应这个段的 key - value。

func (m ConcurrentMap) Set(key string, value interface{}) {
    // Get map shard.
    shard := m.GetShard(key)
    shard.Lock()
    shard.items[key] = value
    shard.Unlock()
}复制代码

上面这段就是 ConcurrentMap 的 set 操做。思路很清晰:先取出对应段内的 ConcurrentMapShared,而后再加读写锁锁定,写入 key - value,写入成功之后再释放读写锁。

func (m ConcurrentMap) Get(key string) (interface{}, bool) {
    // Get shard
    shard := m.GetShard(key)
    shard.RLock()
    // Get item from shard.
    val, ok := shard.items[key]
    shard.RUnlock()
    return val, ok
}复制代码

上面这段就是 ConcurrentMap 的 get 操做。思路也很清晰:先取出对应段内的 ConcurrentMapShared,而后再加读锁锁定,读取 key - value,读取成功之后再释放读锁。

这里和 set 操做的区别就在于只须要加读锁便可,不用加读写锁。

func (m ConcurrentMap) Count() int {
    count := 0
    for i := 0; i < SHARD_COUNT; i++ {
        shard := m[i]
        shard.RLock()
        count += len(shard.items)
        shard.RUnlock()
    }
    return count
}复制代码

ConcurrentMap 的 Count 操做就是把 ConcurrentMap 数组的每个分段元素里面的每个元素都遍历一遍,计算出总数。

func (m ConcurrentMap) Keys() []string {
    count := m.Count()
    ch := make(chan string, count)
    go func() {
        // 遍历全部的 shard.
        wg := sync.WaitGroup{}
        wg.Add(SHARD_COUNT)
        for _, shard := range m {
            go func(shard *ConcurrentMapShared) {
                // 遍历全部的 key, value 键值对.
                shard.RLock()
                for key := range shard.items {
                    ch <- key
                }
                shard.RUnlock()
                wg.Done()
            }(shard)
        }
        wg.Wait()
        close(ch)
    }()

    // 生成 keys 数组,存储全部的 key
    keys := make([]string, 0, count)
    for k := range ch {
        keys = append(keys, k)
    }
    return keys
}复制代码

上述是返回 ConcurrentMap 中全部 key ,结果装在字符串数组中。

type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}

func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
    shard := m.GetShard(key)
    shard.Lock()
    v, ok := shard.items[key]
    res = cb(ok, v, value)
    shard.items[key] = res
    shard.Unlock()
    return res
}复制代码

上述代码是 Upsert 操做。若是已经存在了,就更新。若是是一个新元素,就用 UpsertCb 函数插入一个新的。思路也是先根据 string 找到对应的段,而后加读写锁。这里只能加读写锁,由于无论是 update 仍是 insert 操做,都须要写入。读取 key 对应的 value 值,而后调用 UpsertCb 函数,把结果更新到 key 对应的 value 中。最后释放读写锁便可。

UpsertCb 函数在这里值得说明的是,这个函数是回调返回待插入到 map 中的新元素。这个函数当且仅当在读写锁被锁定的时候才会被调用,所以必定不容许再去尝试读取同一个 map 中的其余 key 值。由于这样会致使线程死锁。死锁的缘由是 Go 中 sync.RWLock 是不可重入的。

完整的代码见concurrent_map.go

这种分段的方法虽然比单纯的加互斥量好不少,由于 Segment 把锁住的范围进一步的减小了,可是这个范围依旧比较大,还能再进一步的减小锁么?

还有一点就是并发量的设置,要合理,不能太大也不能过小。

2. Lock - Free 方案

在 Go 1.9 的版本中默认就实现了一种线程安全的 Map,摒弃了Segment(分段锁)的概念,而是启用了一种全新的方式实现,利用了 CAS 算法,即 Lock - Free 方案。

采用 Lock - Free 方案之后,能比上一个分案,分段锁更进一步缩小锁的范围。性能大大提高。

接下来就让咱们来看看如何用 CAS 实现一个线程安全的高性能 Map 。

官方是 sync.map 有以下的描述:

这个 Map 是线程安全的,读取,插入,删除也都保持着常数级的时间复杂度。多个 goroutines 协程同时调用 Map 方法也是线程安全的。该 Map 的零值是有效的,而且零值是一个空的 Map 。线程安全的 Map 在第一次使用以后,不容许被拷贝。

这里解释一下为什么不能被拷贝。由于对结构体的复制不但会生成该值的副本,还会生成其中字段的副本。如此一来,本应施加于此的并发线程安全保护也就失效了。

做为源值赋给别的变量,做为参数值传入函数,做为结果值从函数返回,做为元素值经过通道传递等都会形成值的复制。正确的作法是用指向该类型的指针类型的变量。

Go 1.9 中 sync.map 的数据结构以下:

type Map struct {

    mu Mutex

    // 并发读取 map 中一部分的内容是线程安全的,这是不须要
    // read 这部分自身读取就是线程安全的,由于是原子性的。可是存储的时候仍是须要 Mutex
    // 存储在 read 中的 entry 在并发读取过程当中是容许更新的,即便没有 Mutex 信号量,也是线程安全的。可是更新一个之前删除的 entry 就须要把值拷贝到 dirty Map 中,而且必需要带上 Mutex
    read atomic.Value // readOnly

    // dirty 中包含 map 中必需要互斥量 mu 保护才能线程安全的部分。为了使 dirty 能快速的转化成 read map,dirty 中包含了 read map 中全部没有被删除的 entries
    // 已经删除过的 entries 不存储在 dirty map 中。在 clean map 中一个已经删除的 entry 必定是没有被删除过的,而且当新值将要被存储的时候,它们会被添加到 dirty map 中。
    // 当 dirty map 为 nil 的时候,下一次写入的时候会经过 clean map 忽略掉旧的 entries 之后的浅拷贝副原本初始化 dirty map。
    dirty map[interface{}]*entry

    // misses 记录了 read map 由于须要判断 key 是否存在而锁住了互斥量 mu 进行了 update 操做之后的加载次数。
    // 一旦 misses 值大到足够去复制 dirty map 所需的花费的时候,那么 dirty map 就被提高到未被修改状态下的 read map,下次存储就会建立一个新的 dirty map。
    misses int
}复制代码

在这个 Map 中,包含一个互斥量 mu,一个原子值 read,一个非线程安全的字典 map,这个字典的 key 是 interface{} 类型,value 是 *entry 类型。最后还有一个 int 类型的计数器。

先来讲说原子值。atomic.Value 这个类型有两个公开的指针方法,Load 和 Store 。Load 方法用于原子地的读取原子值实例中存储的值,它会返回一个 interface{} 类型的结果,而且不接受任何参数。Store 方法用于原子地在原子值实例中存储一个值,它接受一个 interface{} 类型的参数而没有任何结果。在不曾经过 Store 方法向原子值实例存储值以前,它的 Load 方法总会返回 nil。

在这个线程安全的字典中,Load 和 Store 的都是一个 readOnly 的数据结构。

// readOnly 是一个不可变的结构体,原子性的存储在 Map.read 中
type readOnly struct {
    m map[interface{}]*entry
    // 标志 dirty map 中是否包含一些不在 m 中的 key 。
    amended bool // true if the dirty map contains some key not in m.
}复制代码

readOnly 中存储了一个非线程安全的字典,这个字典和上面 dirty map 存储的类型彻底一致。key 是 interface{} 类型,value 是 *entry 类型。

// entry 是一个插槽,与 map 中特定的 key 相对应
type entry struct {
    p unsafe.Pointer // *interface{}
}复制代码

p 指针指向 *interface{} 类型,里面存储的是 entry 的地址。若是 p \=\= nil,表明 entry 被删除了,而且 m.dirty \=\= nil。若是 p \=\= expunged,表明 entry 被删除了,而且 m.dirty != nil ,那么 entry 从 m.dirty 中丢失了。

除去以上两种状况外,entry 都是有效的,而且被记录在 m.read.m[key] 中,若是 m.dirty!= nil,entry 被存储在 m.dirty[key] 中。

一个 entry 能够经过原子替换操做成 nil 来删除它。当 m.dirty 在下一次被建立,entry 会被 expunged 指针原子性的替换为 nil,m.dirty[key] 不对应任何 value。只要 p != expunged,那么一个 entry 就能够经过原子替换操做更新关联的 value。若是 p \=\= expunged,那么一个 entry 想要经过原子替换操做更新关联的 value,只能在首次设置 m.dirty[key] = e 之后才能更新 value。这样作是为了能在 dirty map 中查找到它。

总结一下,sync.map 的数据结构如上。

再看看线程安全的 sync.map 的一些操做。

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    // 若是 key 对应的 value 不存在,而且 dirty map 包含 read map 中没有的 key,那么开始读取 dirty map 
    if !ok && read.amended {
        // dirty map 不是线程安全的,因此须要加上互斥锁
        m.mu.Lock()
        // 当 m.dirty 被提高的时候,为了防止获得一个虚假的 miss ,因此此时咱们加锁。
        // 若是再次读取相同的 key 不 miss,那么这个 key 值就就不值得拷贝到 dirty map 中。
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // 不管 entry 是否存在,记录此次 miss 。
            // 这个 key 将会缓慢的被取出,直到 dirty map 提高到 read map
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}复制代码

上述代码是 Load 操做。返回的是入参 key 对应的 value 值。若是 value 不存在就返回 nil。dirty map 中会保存一些 read map 里面不存在的 key,那么就要读取出 dirty map 里面 key 对应的 value。注意读取的时候须要加互斥锁,由于 dirty map 是非线程安全的。

func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}复制代码

上面这段代码是记录 misses 次数的。只有当 misses 个数大于 dirty map 的长度的时候,会把 dirty map 存储到 read map 中。而且把 dirty 置空,misses 次数也清零。

在看 Store 操做以前,先说一个 expunged 变量。

// expunged 是一个指向任意类型的指针,用来标记从 dirty map 中删除的 entry
var expunged = unsafe.Pointer(new(interface{}))复制代码

expunged 变量是一个指针,用来标记从 dirty map 中删除的 entry。

func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    // 从 read map 中读取 key 失败或者取出的 entry 尝试存储 value 失败,直接返回
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        // e 指向的是非 nil 的
        if e.unexpungeLocked() {
            // entry 先前被删除了,这就意味着存在一个非空的 dirty map 里面并无存储这个 entry
            m.dirty[key] = e
        }
        // 使用 storeLocked 函数以前,必须保证 e 没有被清除
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        // 已经存储在 dirty map 中了,表明 e 没有被清除
        e.storeLocked(&value)
    } else {
        if !read.amended {
            // 到这个 else 中就意味着,当前的 key 是第一次被加到 dirty map 中。
            // store 以前先判断一下 dirty map 是否为空,若是为空,就把 read map 浅拷贝一次。
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        // 在 dirty 中存储 value
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}复制代码

Store 优先从 read map 里面去读取 key ,而后存储它的 value。若是 entry 是被标记为从 dirty map 中删除过的,那么还须要从新存储回 dirty map中。

若是 read map 里面没有相应的 key,就去 dirty map 里面去读取。dirty map 就直接存储对应的 value。

最后如何 read map 和 dirty map 都没有这个 key 值,这就意味着该 key 是第一次被加入到 dirty map 中。在 dirty map 中存储这个 key 以及对应的 value。

// 当 entry 没有被删除的状况下去存储一个 value。
// 若是 entry 被删除了,tryStore 方法返回 false,而且保留 entry 不变
func (e *entry) tryStore(i *interface{}) bool {
    p := atomic.LoadPointer(&e.p)
    if p == expunged {
        return false
    }
    for {
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
    }
}复制代码

tryStore 函数的实现和 CAS 原理差很少,它会反复的循环判断 entry 是否被标记成了 expunged,若是 entry 通过 CAS 操做成功的替换成了 i,那么就返回 true,反之若是被标记成了 expunged,就返回 false。

// unexpungeLocked 函数确保了 entry 没有被标记成已被清除。
// 若是 entry 先前被清除过了,那么在 mutex 解锁以前,它必定要被加入到 dirty map 中
func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}复制代码

若是 entry 的 unexpungeLocked 返回为 true,那么就说明 entry 已经被标记成了 expunged,那么它就会通过 CAS 操做把它置为 nil。

再来看看删除操做的实现。

func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        // 因为 dirty map 是非线程安全的,因此操做前要加锁
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            // 删除 dirty map 中的 key
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete()
    }
}复制代码

delete 操做的实现比较简单,若是 read map 中存在 key,就能够直接删除,若是不存在 key 而且 dirty map 中有这个 key,那么就要删除 dirty map 中的这个 key。操做 dirty map 的时候记得先加上锁进行保护。

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}复制代码

删除 entry 具体的实现如上。这个操做里面都是原子性操做。循环判断 entry 是否为 nil 或者已经被标记成了 expunged,若是是这种状况就返回 false,表明删除失败。不然就 CAS 操做,将 entry 的 p 指针置为 nil,并返回 true,表明删除成功。

至此,关于 Go 1.9 中自带的线程安全的 sync.map 的实现就分析完了。官方的实现里面基本没有用到锁,互斥量的 lock 也是基于 CAS的。read map 也是原子性的。因此比以前加锁的实现版本性能有所提高。

究竟 Lock - Free 的性能有多强呢?接下来作一下性能测试。

五. 性能对比

性能测试主要针对3个方面,Insert,Get,Delete。测试对象主要针对简单加互斥锁的原生 Map ,分段加锁的 Map,Lock - Free 的 Map 这三种进行性能测试。

性能测试的全部代码已经放在 github 了,地址在这里,性能测试用的指令是:

go test -v -run=^$ -bench . -benchmem复制代码

1. 插入 Insert 性能测试

// 插入不存在的 key (粗糙的锁)
func BenchmarkSingleInsertAbsentBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        myMap.BuiltinMapStore(strconv.Itoa(i), "value")
    }
}

// 插入不存在的 key (分段锁)
func BenchmarkSingleInsertAbsent(b *testing.B) {
    m := New()
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Set(strconv.Itoa(i), "value")
    }
}

// 插入不存在的 key (syncMap)
func BenchmarkSingleInsertAbsentSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        syncMap.Store(strconv.Itoa(i), "value")
    }
}复制代码

测试结果:

BenchmarkSingleInsertAbsentBuiltInMap-4          2000000           857 ns/op         170 B/op           1 allocs/op
BenchmarkSingleInsertAbsent-4                    2000000           651 ns/op         170 B/op           1 allocs/op
BenchmarkSingleInsertAbsentSyncMap-4             1000000          1094 ns/op         187 B/op           5 allocs/op复制代码

实验结果是分段锁的性能最高。这里说明一下测试结果,-4表明测试用了4核 CPU ,2000000 表明循环次数,857 ns/op 表明的是平均每次执行花费的时间,170 B/op 表明的是每次执行堆上分配内存总数,allocs/op 表明的是每次执行堆上分配内存次数。

这样看来,循环次数越多,花费时间越少,分配内存总数越小,分配内存次数越少,性能就越好。下面的性能图表中去除掉了第一列循环次数,只花了剩下的3项,因此条形图越短的性能越好。如下的每张条形图的规则和测试结果表明的意义都和这里同样,下面就再也不赘述了。

// 插入存在 key (粗糙锁)
func BenchmarkSingleInsertPresentBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    myMap.BuiltinMapStore("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        myMap.BuiltinMapStore("key", "value")
    }
}

// 插入存在 key (分段锁)
func BenchmarkSingleInsertPresent(b *testing.B) {
    m := New()
    m.Set("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Set("key", "value")
    }
}

// 插入存在 key (syncMap)
func BenchmarkSingleInsertPresentSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    syncMap.Store("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        syncMap.Store("key", "value")
    }
}复制代码

测试结果:

BenchmarkSingleInsertPresentBuiltInMap-4        20000000            74.6 ns/op           0 B/op           0 allocs/op
BenchmarkSingleInsertPresent-4                  20000000            61.1 ns/op           0 B/op           0 allocs/op
BenchmarkSingleInsertPresentSyncMap-4           20000000           108 ns/op          16 B/op           1 allocs/op复制代码

从图中能够看出,sync.map 在涉及到 Store 这一项的均比其余二者的性能差。无论插入不存在的 Key 仍是存在的 Key,分段锁的性能均是目前最好的。

2. 读取 Get 性能测试

// 读取存在 key (粗糙锁)
func BenchmarkSingleGetPresentBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    myMap.BuiltinMapStore("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        myMap.BuiltinMapLookup("key")
    }
}

// 读取存在 key (分段锁)
func BenchmarkSingleGetPresent(b *testing.B) {
    m := New()
    m.Set("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Get("key")
    }
}

// 读取存在 key (syncMap)
func BenchmarkSingleGetPresentSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    syncMap.Store("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        syncMap.Load("key")
    }
}复制代码

测试结果:

BenchmarkSingleGetPresentBuiltInMap-4           20000000            71.5 ns/op           0 B/op           0 allocs/op
BenchmarkSingleGetPresent-4                     30000000            42.3 ns/op           0 B/op           0 allocs/op
BenchmarkSingleGetPresentSyncMap-4              30000000            40.3 ns/op           0 B/op           0 allocs/op复制代码

从图中能够看出,sync.map 在 Load 这一项的性能很是优秀,远高于其余二者。

3. 并发插入读取混合性能测试

接下来的实现就涉及到了并发插入和读取了。因为分段锁实现的特殊性,分段个数会多多少少影响到性能,那么接下来的实验就会对分段锁分1,16,32,256 这4段进行测试,分别看看性能变化如何,其余两种线程安全的 Map 不变。

因为并发的代码太多了,这里就不贴出来了,感兴趣的同窗能够看这里

下面就直接放出测试结果:

并发插入不存在的 Key 值

BenchmarkMultiInsertDifferentBuiltInMap-4        1000000          2359 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_1_Shard-4          1000000          2039 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_16_Shard-4         1000000          1937 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_32_Shard-4         1000000          1944 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_256_Shard-4        1000000          1991 ns/op         331 B/op          11 allocs/op
BenchmarkMultiInsertDifferentSyncMap-4           1000000          3760 ns/op         635 B/op          33 allocs/op复制代码

从图中能够看出,sync.map 在涉及到 Store 这一项的均比其余二者的性能差。并发插入不存在的 Key,分段锁划分的 Segment 多少与性能没有关系。

并发插入存在的 Key 值

BenchmarkMultiInsertSameBuiltInMap-4             1000000          1182 ns/op         160 B/op          10 allocs/op
BenchmarkMultiInsertSame-4                       1000000          1091 ns/op         160 B/op          10 allocs/op
BenchmarkMultiInsertSameSyncMap-4                1000000          1809 ns/op         480 B/op          30 allocs/op复制代码

从图中能够看出,sync.map 在涉及到 Store 这一项的均比其余二者的性能差。

并发的读取存在的 Key 值

BenchmarkMultiGetSameBuiltInMap-4                2000000           767 ns/op           0 B/op           0 allocs/op
BenchmarkMultiGetSame-4                          3000000           481 ns/op           0 B/op           0 allocs/op
BenchmarkMultiGetSameSyncMap-4                   3000000           464 ns/op           0 B/op           0 allocs/op复制代码

从图中能够看出,sync.map 在 Load 这一项的性能远超多其余二者。

并发插入读取不存在的 Key 值

BenchmarkMultiGetSetDifferentBuiltInMap-4        1000000          3281 ns/op         337 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_1_Shard-4          1000000          3007 ns/op         338 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_16_Shard-4          500000          2662 ns/op         337 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_32_Shard-4         1000000          2732 ns/op         337 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_256_Shard-4        1000000          2788 ns/op         339 B/op          12 allocs/op
BenchmarkMultiGetSetDifferentSyncMap-4            300000          8990 ns/op        1104 B/op          34 allocs/op复制代码

从图中能够看出,sync.map 在涉及到 Store 这一项的均比其余二者的性能差。并发插入读取不存在的 Key,分段锁划分的 Segment 多少与性能没有关系。

并发插入读取存在的 Key 值

BenchmarkMultiGetSetBlockBuiltInMap-4            1000000          2095 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_1_Shard-4              1000000          1712 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_16_Shard-4             1000000          1730 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_32_Shard-4             1000000          1645 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_256_Shard-4            1000000          1619 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlockSyncMap-4                500000          2660 ns/op         480 B/op          30 allocs/op复制代码

从图中能够看出,sync.map 在涉及到 Store 这一项的均比其余二者的性能差。并发插入读取存在的 Key,分段锁划分的 Segment 越小,性能越好!

4. 删除 Delete 性能测试

// 删除存在 key (粗糙锁)
func BenchmarkDeleteBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    b.RunParallel(func(pb *testing.PB) {
        r := rand.New(rand.NewSource(time.Now().Unix()))
        for pb.Next() {
            // The loop body is executed b.N times total across all goroutines.
            k := r.Intn(100000000)
            myMap.BuiltinMapDelete(strconv.Itoa(k))
        }
    })
}

// 删除存在 key (分段锁)
func BenchmarkDelete(b *testing.B) {
    m := New()
    b.RunParallel(func(pb *testing.PB) {
        r := rand.New(rand.NewSource(time.Now().Unix()))
        for pb.Next() {
            // The loop body is executed b.N times total across all goroutines.
            k := r.Intn(100000000)
            m.Remove(strconv.Itoa(k))
        }
    })
}

// 删除存在 key (syncMap)
func BenchmarkDeleteSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    b.RunParallel(func(pb *testing.PB) {
        r := rand.New(rand.NewSource(time.Now().Unix()))
        for pb.Next() {
            // The loop body is executed b.N times total across all goroutines.
            k := r.Intn(100000000)
            syncMap.Delete(strconv.Itoa(k))
        }
    })
}复制代码

测试结果:

BenchmarkDeleteBuiltInMap-4                     10000000           130 ns/op           8 B/op           1 allocs/op
BenchmarkDelete-4                               20000000            76.7 ns/op           8 B/op           1 allocs/op
BenchmarkDeleteSyncMap-4                        30000000            45.4 ns/op           8 B/op           0 allocs/op复制代码

从图中能够看出,sync.map 在 Delete 这一项是完美的超过其余二者的。

六. 总结

本文从线程安全理论基础开始讲了线程安全中一些处理方法。其中涉及到互斥量和条件变量相关知识。从 Lock 的方案谈到了 Lock - Free 的 CAS 相关方案。最后针对 Go 1.9 新加的 sync.map 进行了源码分析和性能测试。

采用了 Lock - Free 方案的 sync.map 测试结果并无想象中的那么出色。除了 Load 和 Delete 这两项远远甩开其余二者,凡是涉及到 Store 相关操做的性能均低于其余二者 Map 的实现。不过这也是有缘由的。

纵观 Java ConcurrentHashmap 一路的变化:

JDK 6,7 中的 ConcurrentHashmap 主要使用 Segment 来实现减少锁粒度,把 HashMap 分割成若干个 Segment,在 put 的时候须要锁住 Segment,get 时候不加锁,使用 volatile 来保证可见性,当要统计全局时(好比size),首先会尝试屡次计算 modcount 来肯定,这几回尝试中,是否有其余线程进行了修改操做,若是没有,则直接返回 size。若是有,则须要依次锁住全部的 Segment 来计算。

JDK 7 中 ConcurrentHashmap 中,当长度过长碰撞会很频繁,链表的增改删查操做都会消耗很长的时间,影响性能,因此 JDK8 中彻底重写了concurrentHashmap,代码量从原来的1000多行变成了 6000多行,实现上也和原来的分段式存储有很大的区别。

JDK 8 的 ConcurrentHashmap 主要设计上的变化有如下几点:

  • 不采用 Segment 而采用 node,锁住 node 来实现减少锁粒度。
  • 设计了 MOVED 状态 当 Resize 的中过程当中线程2还在 put 数据,线程2会帮助 resize。
  • 使用3个 CAS 操做来确保 node 的一些操做的原子性,这种方式代替了锁。
  • sizeCtl 的不一样值来表明不一样含义,起到了控制的做用。

可见 Go 1.9 一上来第一个版本就直接摒弃了 Segment 的作法,采起了 CAS 这种 Lock - Free 的方案提升性能。可是它并无对整个字典进行相似 Java 的 Node 的设计。可是整个 sync.map 在 ns/op ,B/op,allocs/op 这三个性能指标上是普通原生非线程安全 Map 的三倍!

不过相信 Google 应该还会继续优化这部分吧,毕竟源码里面还有几处 TODO 呢,让咱们一块儿其余 Go 将来版本的发展吧,笔者也会一直持续关注的。

(在本篇文章截稿的时候,笔者又忽然发现了一种分段锁的 Map 实现,性能更高,它具备负载均衡等特色,应该是目前笔者见到的性能最好的 Go 语言实现的线程安全的 Map ,关于它的实现源码分析就只能放在下篇博文单独写一篇或者之后有空再分析啦)


Reference:
《Go 并发实战编程》
Split-Ordered Lists: Lock-Free Extensible Hash Tables
Semaphores are Surprisingly Versatile
线程安全
JAVA CAS原理深度分析
Java ConcurrentHashMap 总结

GitHub Repo:Halfrost-Field

Follow: halfrost · GitHub

Source: halfrost.com/go_map_chap…

相关文章
相关标签/搜索