锁html
锁以及信号量对大部分人来讲都是很是熟悉的,特别是经常使用的mutex。锁有不少种,互斥锁,自旋锁,读写锁,顺序锁,等等,这里就只介绍常见到的,java
互斥锁linux
这个是最经常使用的,win32:CreateMutex-WaitForSingleObject-ReleaseMutex,linux的pthread_mutex_lock-pthread_mutex_unlock,c#的lock和Monitor,java的lock,这些都是互斥锁。互斥锁的做用你们都知道,是让一段代码同时只能有一个线程运行,编程
自旋锁c#
不经常使用,linux的pthread_spin系列函数就是自旋锁,(网上不少用原子操做写的自旋锁),做用和互斥锁大同小异。windows
信号量缓存
win下的CreateSemaphore、OpenSemaphore、ReleaseSemaphore、WaitForSingleObject,linux也有一样的semaphore系列,还有c#的AutoResetEvent或者semaphore。这个用的也不少,信号两个状态,阻塞和经过,做用是保证多线程代码的业务顺序!多线程
先唠一唠这些锁的原理,(为何我把信号量也归结于锁?)
架构
首先互斥锁,互斥锁其实是由原子操做来实现的,函数
好比,当变量A为0的时候为非锁,为1的时候为锁,当第一个线程将变量A从0变为1(原子操做)成功的时候,就至关于获取锁成功了,另外的线程再次获取锁的时候发现A为1了,(或者说两个线程同时获取锁->原子操做,某一个会失败),表示获取锁失败,当第一个线程用完了,就释放锁,将A=0(原子操做)。
互斥锁的特色是,当锁获取失败了,当前代码上下文(线程)会休眠,而且把当前线程添加到这个内核维护的互斥锁的链表里,当后面的锁再次获取失败,也是将当前线程和执行信息放到这个链表里。当前占用的互斥锁的人用完了锁,内核会抽取互斥锁等待链表上的下一个线程开始唤醒继续执行,当内核链表上为空,就是没人抢锁了,就将锁状态设置为非锁,以次类推~
而后呢,咱们讲自旋锁,自旋锁很简单,他和互斥锁大同小异,区别就是不休眠,当获取锁失败了,就一直while(获取),一直到成功,因此,自旋锁在大部分场景都是不适用的,由于获取锁的时间里,cpu一直是100%的!!
最后讲信号量,上面问为何我将信号量也归结于锁这一类?
由于信号量也是原子操做来实现的!道理和互斥锁同样的,信号量也有一个链表,当等待信号的时候,系统也是把当前线程休眠,把线程和代码执行信息存储到这个信号量的链表里,当内核接受到信号的时候,就把这个信号量上的全部等待线程激活运行,这就是信号量!
原子操做
到底什么是原子操做?
百度百科 所谓原子操做是指不会被线程调度机制打断的操做;这种操做一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另外一个线程)。
因此,原子操做保证了多个线程对内存操做某个值得准确性!那么原子操做具体如何实现的?
首先是inter cpu,熟悉汇编的人都知道,inter指令集有个lock,若是某个指令集前面加个lock,那么在多核状态下,某个核执行到这个前面加lock的指令的时候,inter会让总线锁住,当这个核把这个指令执行完了,再开启总线!这是一种最最底层的锁!!
好比 lock cmpxchg dword ptr [rcx],edx cmpxchg这个指令就被加锁了!
inter指令参考可查阅http://www.intel.cn/content/www/cn/zh/processors/architectures-software-developer-manuals.html
来自IA-32券3:
HLT 指令(中止处理器)中止处理器直至接收到一个启用中断(好比 NMI 或 SMI,正 常状况下这些都是开启的)、调试异常、BINIT#信号、INIT#信号或 RESET#信号。处理 器产生一个特殊的总线周期以代表进入中止模式。 硬件对这个信号的响应有好几个方面。前面板上的指示灯会打亮,产生一个记录 诊断信息的 NMI 中断,调用复位初始化过程(注意 BINIT#引脚是在 Pentium Pro 处理器 引入的)。若是停机过程当中有非唤醒事件(好比 A20M#中断)未处理,它们将在唤醒停 机事件处理以后的进行处理。
在修改内存操做时,使用 LOCK 前缀去调用加锁的读-修改-写操做(原子的)。这种 机制用于多处理器系统中处理器之间进行可靠的通信,具体描述以下: 在 Pentium 和早期的 IA-32 处理器中,LOCK 前缀会使处理器执行当前指令时产生 一个 LOCK#信号,这老是引发显式总线锁定出现。 在 Pentium 四、Intel Xeon 和 P6 系列处理器中,加锁操做是由高速缓存锁或总线 锁来处理。若是内存访问有高速缓存且只影响一个单独的高速缓存线,那么操做中就 会调用高速缓存锁,而系统总线和系统内存中的实际内存区域不会被锁定。同时,这 条总线上的其它 Pentium 四、Intel Xeon 或者 P6 系列处理器就回写全部的已修改数据 并使它们的高速缓存失效,以保证系统内存的一致性。若是内存访问没有高速缓存且/ 或它跨越了高速缓存线的边界,那么这个处理器就会产生 LOCK#信号,并在锁定操做期 间不会响应总线控制请求。
IA-32 处理器提供有一个 LOCK#信号,会在某些关键内存操做期间被自动激活,去锁定系统总线。当这个输出信号发出的时候,来自其它处理器或总线代理的总线控制请求将被阻塞。软件可以经过预先在指令前添加 LOCK 前缀来指定须要 LOCK 语义的其它场合。在 Intel38六、Intel48六、Pentium 处理器中,明确地对指令加锁会致使 LOCK#信号的产生。由硬件设计人员来保证系统硬件中 LOCK#信号的可用性,以控制处理器间的内IA-32 架构软件开发人员指南 卷 3:系统编程指南170存访问。对于 Pentium 四、Intel Xeon 以及 P6 系列处理器,若是被访问的内存区域是在处理器内部进行高速缓存的,那么一般不发出 LOCK#信号;相反,加锁只应用于处理器的高速缓存(参见 7.1.4.LOCK 操做对处理器内部高速缓存的影响) 。
可参考inter的 IA-32券3 第七章第一小节!
固然inter还有其余方式保证原子操做!
而后是ARM cpu, arm主要是靠两个指令来保证原子操做的,LDREX 和 STREX
LDREX
LDREX 可从内存加载数据。
若是物理地址有共享 TLB 属性,则 LDREX 会将该物理地址标记为由当前处理器独占访问,而且会清除该处理器对其余任何物理地址的任何独占访问标记。
不然,会标记:执行处理器已经标记了一个物理地址,但访问还没有完毕。
STREX
STREX 可在必定条件下向内存存储数据。 条件具体以下:
若是物理地址没有共享 TLB 属性,且执行处理器有一个已标记但还没有访问完毕的物理地址,那么将会进行存储,清除该标记,并在Rd 中返回值 0。
若是物理地址没有共享 TLB 属性,且执行处理器也没有已标记但还没有访问完毕的物理地址,那么将不会进行存储,而会在Rd 中返回值 1。
若是物理地址有共享 TLB 属性,且已被标记为由执行处理器独占访问,那么将进行存储,清除该标记,并在Rd 中返回值 0。
若是物理地址有共享 TLB 属性,但没有标记为由执行处理器独占访问,那么不会进行存储,且会在Rd 中返回值 1。
参考:http://blog.csdn.net/duanlove/article/details/8212123
原子CAS操做
原子操做指令里,有原子加,原子减,cas究竟是什么呢?
首先看一段代码,
bool compare_and_swap(int *accum, int *dest, int newval) { if (*accum == *dest) { *dest = newval; return true; } else { *accum = *dest; return false; } }
cas便是Compare-and-swap,先比较再互换,即修改,意思就是,当reg等oldvalue的时候,将reg设置为newval,这段代码在非原子状况下(多线程)是没用的,可是若是这段代码是原子操做,那么他的威力就很是大, 互斥锁就和这个cas有关,
上面咱们也看到inter这个指令了,lock cmpxchg,cmpxchg做用就是cas这个函数的做用,比较并交换操做数,这就是cas原子操做,神奇吧,上面一个函数的做用,被inter一个指令搞定了,再cmpxchg前面加一个lock,那么这就是一个真正发挥威力的cas!
在win32内核中有个InterlockedCompareExchange函数,这个函数就是cas功能,在inter cpu上的实现就是这段指令=》lock cmpxchg!
linux下有__sync_bool_compare_and_swap 和 __sync_val_compare_and_swap 。
在dotnet下有 interlocked.compareexchange。java参考sun.misc.Unsafe类。
CAS操做,到底有什么威力?
若是要修改一个变量,在多线程下,应该要加锁,代码是这样的
int num = 0; void add() { lock(); num = num + 123; unlock(); }
可是若是不要锁,cas来操做??
int num = 0; void add() { int temp; do { temp = num; } while (cas(num, temp, temp+123)==true) }
咱们看到用一个do while来无限判断cas的修改结果,若是修改完成,那就成功+1,若是cas没有修改为功,继续while,temp将获取最新的num,再次cas操做!
当一个线程的时候,num一我的操做,不会出现差错,当两我的的时候,某我的先进行cas原子操做,num+1,第二个线程拿着旧值去加操做,返现返回的就是false,因而从新复制temp获取最新的num,这就是cas的核心价值!无锁!
cas其实这也算一种锁,乐观锁!相同于自旋锁也循环!
贴下cas互斥锁的代码(本身写的),固然也能够去用原子+-来判断,反正都是原子操做~~
int i = 0;//0非锁,1锁住 //尝试获取锁,当cas返回失败,获取锁失败,返回true,获取锁成功 获取失败就休眠,等待系统唤醒 bool lock() { return cas(i, 0, 1); } bool unlock() { return cas(i, 1, 0); }
CAS无锁Queue
简单发下我写的cas环形队列,很简单的!
// .h #pragma once #ifndef _cas_queue #define _cas_queue #ifndef C_BOOL #define C_BOOL typedef int cbool; #define false 0 #define true 1 #endif // //typedef struct _cas_queue //{ // int size; //} cas_queue; #define QUEUE_SIZE 65536 #ifdef __cplusplus extern "C" { #endif /* compare and swap: CAS(*ptr,outvalue,newvalue); return bool */ cbool compare_and_swap(void ** ptr,long outvalue,long newvalue); void cas_queue_init(int queue_size); void cas_queue_free(); cbool cas_queue_try_enqueue(void * p); cbool cas_queue_try_dequeue(void ** p); #ifdef __cplusplus } #endif #endif //.c #include "cas_queue.h" #ifdef _MSC_VER #include <windows.h> #else #endif volatile unsigned long read_index = 0; volatile unsigned long write_index = 0; long* read_index_p = &read_index; long* write_index_p = &write_index; void** ring_queue_buffer_head; int ring_queue_size = QUEUE_SIZE; cbool is_load = 0; cbool compare_and_swap(void * ptr, long outvalue, long newvalue) { #ifdef _MSC_VER // vs long return_outvalue = InterlockedCompareExchange(ptr, newvalue, outvalue); return return_outvalue == outvalue; /*InterlockedCompareExchange64 No success!!*/ //#ifndef _WIN64 // //32 bit // long return_outvalue = InterlockedCompareExchange(ptr, newvalue, outvalue); // return return_outvalue == outvalue; //#else // //64 bit // long return_outvalue = InterlockedCompareExchange64(ptr, newvalue, outvalue); // return return_outvalue == outvalue; //#endif #else //linux #endif } void cas_queue_init(int queue_size) { if (queue_size > 0) ring_queue_size = queue_size; int size = sizeof(void**)*ring_queue_size; ring_queue_buffer_head = malloc(size); memset(ring_queue_buffer_head, 0, size); is_load = 1; read_index = 0; write_index = 0; } void cas_queue_free() { is_load = 0; free(ring_queue_buffer_head); } cbool cas_queue_try_enqueue(void * p) { if (!is_load) return false; unsigned long index; do { //queue full if (read_index != write_index && read_index%ring_queue_size == write_index%ring_queue_size) return false; index = write_index; } while (compare_and_swap(&write_index, index, index + 1) != true); ring_queue_buffer_head[index%ring_queue_size] = p; return true; } cbool cas_queue_try_dequeue(void ** p) { if (!is_load) return false; unsigned long index; do { //queue empty if (read_index == write_index) return false; index = read_index; } while (compare_and_swap(read_index_p, index, index + 1) != true); *p = ring_queue_buffer_head[index%ring_queue_size]; return true; }
具体我测试过,在4个线程状况下,80万个消息,同时入和出,出完只须要150毫秒左右!固然线程过多并且集火的话确定会慢的。
这个demo不是很实用,看下篇:CAS原子锁 高效自旋无锁的正确用法