当咱们在编写多线程程序时,经常会涉及到多个线程对共享数据的访问。若是不对这种访问加以限制,每每会致使程序运行结果与预期不符
编写代码时,咱们以及习惯了用锁去保护数据。那么,这里的锁是什么?为何它能知足咱们的要求?它存在于哪里?html
让咱们从一个最简单的例子出发---多个线程并发修改一个全局变量:node
/* 全局变量 */ int g_sum = 0; /* 每一个线程入口 */ void *thread(void* arg) { for(int i = 0; i < 100; i++) { g_sum++; } return NULL; }
在多核
处理器上,若是有两个线程同时执行上面的累加操做,最终的g_sum
几乎不多是预期的200
(每一个线程累加100
次),而更倾向因而一个接近200
的随机值。算法
这是由于CPU
对g_sum
进行累加时,它们都会:1
.从内存中读取 2
.修改它的值 3
.将新值写回内存。因为CPU
之间是独立的,而内存是共享的,因此就有可能存在一种时序:两个CPU
前后从内存中读取了g_sum
的值,并各自对它进行了递增,最终将新的值写入g_sum
,这时。两个线程的两次累加最终只让g_sum
增长了1
shell
要解决上面的问题,一个很天然的想法同一时间段内,要想办法只让一个线程对全局变量进行读-修改-写。咱们能够用锁
去保护临界区
编程
这里引入了临界区
的概念。临界区是指访问共用资源的程序片断(好比上面的例子中的"g_sum++")。线程在进入临界区时加锁,退出临界区时解锁。也就是说,锁将临界区"保护"了起来。数组
临界区
是人们为一段代码片断强加上的概念,但加锁
和解锁
不同,它必须实打实地存在于代码中。那么问题来了,锁
应该如何实现 ? 数据结构
为了回答这个问题,咱们先将锁
须要具备的特性列出来:多线程
1
. 它须要支持加锁(lock
)和解锁(unlock
)两种操做。2
. 它须要是有状态(State
)的,它须要记录当前这把锁处于Locked
仍是Unlocked
状态。3
. 锁的状态变化必须是原子(Atomic)的4
. 当它处于Locked
状态时,对其进行加锁(lock
)的操做,不会成功。并发
第1
条,对实现者来讲,一是要提供两个API
分别对应这两种操做。post
第2
条,须要一个地方能记录锁的状态,对计算机系统来讲,这个地方只能是内存
。
第3
条,将锁
的状态记录在内存中有个和全局变量同样的问题,那就是如何避免多个线程同时去改变锁的状态 ? 总不能用锁
去保护锁
吧 ? 好在各个体系的CPU
都提供了这种原子操做的原语, 对x86
来讲,就是指令的LOCK
前缀, 它能够在执行指令时控制住总线,直到指令执行完成。这也就保证了锁
的状态修改是经过原子
操做完成的。
第4
条,加锁操做成功的前提是锁
的状态是处于"Unlocked",若是该条件不知足,则本次加锁操做失败,那么失败之后的行为呢?不一样的锁有不一样的实现,通常来讲有三种可选择的行为:1
.当即返回失败 2
.不断尝试再加锁,直到成功. 3
. 睡眠线程本身,直到能够得到锁。
固然,咱们并不须要去重复造锁
的轮子。
在用户空间,glibc
提供了诸如spinlock
、semaphore
、rwlock
、mutex
类型的锁的实现,咱们只要使用API
就行。
int sem_wait(sem_t *sem); int sem_post(sem_t *sem); int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex); .......
在内核空间,Linux
也有相似的实现.
在刚才的例子中,若是咱们使用了锁
去保护g_sum
,那么最终必定能获得200
。可是,咱们在获得准确结果的同时也会付出性能的代价。
若是把临界区
比做成一个独木桥,那么线程就是须要过独木桥的人。 显然,若是过桥的人(并发访问临界区的线程)越多,独木桥越长(锁保护的临界区的范围越大),那么其余人等地就越久(性能就降低地越厉害)。
下面这是在一台8
核CPU
虚拟机环境下,测试程序的运行结果。
横坐标是并发运行的线程的数目,纵坐标是完成相同任务(累加必定次数)时的运行时间。越多的线程会带来越多的冲突,所以,总的运行时间会逐渐增大。
若是增长临界区的长度呢(在每次循环中增长一些额外指令),则会获得下面的结果:
横坐标表示额外的指令,纵坐标依然表示时间。
可见,线程的并发越多、临界区越大都会形成程序性能降低。这也是为何追求性能的程序会选择使用每cpu变量
(或者每线程变量),而且尽可能减少锁保护的粒度。
前面说过,锁
是有状态的,而且这个状态须要保存在内存中。那么?具体到Linux
平台,锁
对象是保存在内核空间仍是用户空间呢? 在比较早的内核(2.5.7)中,这个对象是保存在内核中的,这是很天然的作法。由于当一个线程(task
)去等待得到一个互斥锁时,若是获取不到,那么它须要将积极睡眠,直到锁
可用后再被唤醒。
这个过程具体来讲,就是将本身的task_struct
挂到锁
对象的等待链表上。当锁
的持有者unlock
时,内核就能够从该等待列表上找到并唤醒链表上全部task
。
可见,每次用户的加锁解锁操做都必须陷入内核(即便如今没有其余线程持有这把锁)。陷入内核意味着几百个时钟就消耗了。在冲突不大的场景中,这种消耗就白白浪费了。
所以,从2.5.7
版本开始,Linux
引入了Futex
(Fast Userspace muTEXes
),即快速的用户态互斥机制,这个机制是用户态和内核态共同协做完成的,它将保存锁
状态的对象放在用户态。若是用户在加锁时发现锁
处于(Unlocked
)状态,那么就直接修改状态就行了(fast path
),不须要陷入内核。固然,若是此时锁处于(Locked
)状态,仍是须要陷入内核(slow path
)。
那么咱们如何使用Futex
机制呢?答案是咱们彻底不须要显示地使用,glibc
库中的semaphore
、mutex
底层就是使用的Futex
。
锁
是经过一个状态的原子操做来保证共享数据的访问互斥。而无锁
的意思就是不须要这样一个状态。
说到无锁
,必须提到的就是CAS
指令(也能够叫CSW
)。CAS
是CompareAndSwap
的缩写,即比较-交换。不一样体系的CPU
有不一样的CAS
的指令实现。在x86
上,就是带LOCK
前缀的CMPXCHG
指令。因此,CAS
操做是原子的
它的功能用伪代码描述就是下面这样(仅为理解,实际是一条原子指令):
bool compare_and_swap(int *src, int *dest, int newval) { if (*src == *dest) { *src = newval; return true; } else { return false; } }
第一个操做数的内容与第二个操做数的内容相比较, 若是相同,则将第三个操做数赋值给第一个操做数,返回TRUE
, 不然返回FALSE
。
较新版本的gcc
已经内置了CAS
操做的API
(以下)。其余编译器也提供了相似的API
,不过这不是本文的重点。
bool __sync_bool_comware_and_swap(type *ptr, type oldval, type newval);
无锁
一般构建无锁队列(Lock-Free Queue
)。顾名思义,无锁队列就是指不使用锁结构
来控制多线程并发互斥的队列。
咱们知道,队列是一个典型的先入先出(FIFO
)的数据结构,具备入队(Enqueue
)和出队(Dequeue
)两种操做。并发条件下,多个线程可能在入队或出队时会产生竞争。
以单向链表为基础实现的队列以下图所示(有一个Dummy
链表头),线程1和线程2都但愿本身能完成入队操做
一般来讲,入队要完成两件事:
Next
指向新节点Tail
指向的节点到新入队的节点若是可使用锁
,咱们能够经过将以上两件事放到一个锁
的保护范围内就能完成线程的互斥,那么对于无锁呢?
John D.Valois
在《Implemeting Lock-Free Queues》中提出的无锁队列的入队列算法以下(伪代码):
EnQueue(x) { /* 建立新的节点 n */ n = new node(); n->value = x; n->next = NULL; do { t = tail; // 取得尾节点 succ = CAS(t->next, NULL, n) // 尝试更新尾节点的Next指向新的节点 if succ != TRUE CAS(tail, t, t->next) // 更新失败,尝试将tail向后走 }while(succ != TRUE); CAS(tail, t, n); // 更新队列的Tail指针,使它指向新的节点 }
这里的Enqueue
算法中使用了三次CAS
操做。
1
. 第一次CAS
操做更新尾节点的Next指向新的节点。若是在单线程环境中,这个操做一定成功。但在多线程环境,若是有多个线程都在进行Enqueue
操做,那么在线程T1取得尾节点后,线程T2可能已经完成了新节点的入队,此时T1的CAS
操做就会失败,由于此时t->Next
已经不为NULL
了,而变成了T2新插入的节点。
再强调一遍,CAS
操做会锁住总线!所以T1和T2只有一个线程会成功,成功的线程会更新尾节点的Next
,另外一个线程会由于CAS
失败而从新循环。
若是CAS
操做成功,链表会变成下面这样,此时的Tail
指针尚未更新
2
. 若是第一个CAS
失败,说明有其余线程在坏事(进行了元素入队),这个时候第二个CAS
操做会尝试推动Tail
指针。这样作是为了防止第一个CAS
成功的线程忽然挂掉而致使不更新Tail
指针
3
. 第三个CAS
操做更新尾节点的Next
论文中还给出了另外一个版本的入队算法,以下所示
EnQueue2(x) { /* 建立新的节点 n */ n = new node(); n->value = x; n->next = NULL; oldt = t = tail do { while(t->next != NULL) // 不断向后到达队列尾部 t = t->next }while(CAS(t->next, NULL, n) != TRUE); // 更新尾节点的Next指向新的节点 CAS(tail, oldt, n); // 更新队列的Tail指针,使它指向新的节点 }
与前一个的版本相比,新版本在循环内部增长了不断向后遍历的过程,也就是若是Tail
指针后面已经有被其余线程添加了节点,本线程并不会等待Tail
更新,而是直接向后遍历。
再来看出队,论文中给出的出队算法以下:
DeQueue() { do { h = head; if h->next = NULL error queue_empty; }while (CAS(head, h, h->next)!= TRUE) return h->next->value; }
须要特别注意,该出队算法不是返回队首的元素,而是返回Head->Next
节点。完成出队后,移动Head
指针到刚出队的元素。算法中使用了一个CAS
操做来控制竞争下的Head
指针更新。另外,算法中并无描述队列元素的资源释放。
以链表为基础的无锁队列有一个缺点就是内存的频繁申请和释放,在一些语言实现中,这种申请释放自己就是带锁的。包含有锁操做的行为天然称不上是无锁。所以,更通用的无锁队列是基于数组实现的。论文中描述了一种基于数组的无锁队列算法,它具备如下一些特性:
1
. 数组预先分配好,也就是能容纳的元素个数优先
2
. 使用者能够将值填入数组,除此以外,数组有三个特殊值:HEAD
, TAIL
和EMPTY
。队列初始化时(下图),除了有两个相邻的位置是填入HEAD
, TAIL
以外,其余位置都是EMPTY
。显然,用户数据不能再使用这三个值了。。
x
入队,它会找到TAIL
的位置,而后对该位置和以后的位置执行一次Double-Word CAS
。该操做将<TAIL
, EMPTY
>原子地替换为<x
, TAIL
>。固然,若是TAIL
后面不是EMPTY
(而是HEAD`),就说明队列满了,入队失败。HEAD
的位置,一样利用Double-Word CAS
,将<HEAD
,x
>替换为<EMPTY
, HEAD
>。固然若是HEAD
后面是EMPTY
,则出队失败(此时队列是空的)。HEAD
和TAIL
的位置,算法使用两个变量记录入队和出队发生的次数,显然这两个变量的改变都是原子递增的。在某个时刻,队列多是下面这个样子
我也用CAS
操做实现了一个队列,可是没有用论文中的算法。而更偏向于DPDK
的实现。
struct headtail{ volatile uint32_t head; volatile uint32_t tail; }; struct Queue{ struct headtail prod; struct headtail cons; int array[QUEUE_SIZE]; int capacity; }; int CAS_EnQueue(struct Queue* queue, int val) { uint32_t head; uint32_t idx; bool succ; do{ head = queue->prod.head; if (queue->capacity + queue->cons.tail - head < 1) { /* queue is full */ return -1; } /* move queue->prod.head */ succ = CAS(&queue->prod.head, head, head + 1); }while(!succ); idx = head & queue->capacity; /* set val */ queue->array[idx] = val; /* wait */ while(unlikely(queue->prod.tail != head)) { _mm_pause(); } queue->prod.tail = head + 1; return 0; } int CAS_DeQueue(struct Queue* queue, int* pval) { uint32_t head; uint32_t idx; bool succ; do { head = queue->cons.head; if (queue->prod.tail - head < 1) { /* Queue is Empty */ return -1; } /* forward queue->head */ succ = CAS(&queue->cons.head, head, head + 1); }while(!succ); idx = head & queue->capacity; *pval = queue->array[idx]; /* wait */ while(unlikely(queue->cons.tail != head)) { _mm_pause(); } /* move cons tail */ queue->cons.tail = head + 1; return 0; }
不管是锁
仍是无锁
,其实都是一种多线程环境下的同步方式,锁
的应用更为普遍,而无锁
更有一种自旋的味道在里面,在特定场景下的确能提升性能,好比DPDK
中ring
实际就是无锁队列的应用