互斥锁是用来保护一个临界区,即保护一个访问共用资源的程序片断,而这些共用资源又没法同时被多个线程访问的特性。当有线程进入临界区段时,其余线程或是进程必须等待。html
在谈及锁的性能开销,通常都会说锁的开销很大,那锁的开销有多大,主要耗在哪,怎么提升锁的性能。linux
如今锁的机制通常使用 futex(fast Userspace mutexes),内核态和用户态的混合机制。尚未futex的时候,内核是如何维护同步与互斥的呢?系统内核维护一个对象,这个对象对全部进程可见,这个对象是用来管理互斥锁而且通知阻塞的进程。若是进程A要进入临界区,先去内核查看这个对象,有没有别的进程在占用这个临界区,出临界区的时候,也去内核查看这个对象,有没有别的进程在等待进入临界区,而后根据必定的策略唤醒等待的进程。这些没必要要的系统调用(或者说内核陷入)形成了大量的性能开销。为了解决这个问题,Futex就应运而生。git
Futex是一种用户态和内核态混合的同步机制。首先,同步的进程间经过mmap共享一段内存,futex变量就位于这段共享的内存中且操做是原子的,当进程尝试进入互斥区或者退出互斥区的时候,先去查看共享内存中的futex变量,若是没有竞争发生,则只修改futex,而不用再执行系统调用了。当经过访问futex变量告诉进程有竞争发生,则仍是得执行系统调用去完成相应的处理(wait 或者 wake up)。简单的说,futex就是经过在用户态的检查,(motivation)若是了解到没有竞争就不用陷入内核了,大大提升了low-contention时候的效率。github
mutex 是在 futex 的基础上用的内存共享变量来实现的,若是共享变量创建在进程内,它就是一个线程锁,若是它创建在进程间共享内存上,那么它是一个进程锁。pthread_mutex_t 中的 _lock 字段用于标记占用状况,先使用CAS判断_lock是否占用,若未占用,直接返回。不然,经过__lll_lock_wait_private 调用SYS_futex 系统调用迫使线程进入沉睡。 CAS是用户态的 CPU 指令,若无竞争,简单修改锁状态即返回,很是高效,只有发现竞争,才经过系统调用陷入内核态。因此,FUTEX是一种用户态和内核态混合的同步机制,它保证了低竞争状况下的锁获取效率。redis
因此若是锁不存在冲突,每次得到锁和释放锁的处理器开销仅仅是CAS指令的开销。算法
肯定一件事情最好的方法是实际测试和观测它,让咱们写一段代码来测试无冲突时锁的开销:shell
#include <pthread.h> #include <stdlib.h> #include <stdio.h> #include <time.h> static inline long long unsigned time_ns(struct timespec* const ts) { if (clock_gettime(CLOCK_REALTIME, ts)) { exit(1); } return ((long long unsigned) ts->tv_sec) * 1000000000LLU + (long long unsigned) ts->tv_nsec; } int main() { int res = -1; pthread_mutex_t mutex; //初始化互斥量,使用默认的互斥量属性 res = pthread_mutex_init(&mutex, NULL); if(res != 0) { perror("pthread_mutex_init failed\n"); exit(EXIT_FAILURE); } long MAX = 1000000000; long c = 0; struct timespec ts; const long long unsigned start_ns = time_ns(&ts); while(c < MAX) { pthread_mutex_lock(&mutex); c = c + 1; pthread_mutex_unlock(&mutex); } const long long unsigned delta = time_ns(&ts) - start_ns; printf("%f\n", delta/(double)MAX); return 0; }
说明:如下性能测试在腾讯云 Intel(R) Xeon(R) CPU E5-26xx v4 1核 2399.996MHz 下进行。数据库
运行了 10 亿次,平摊到每次加锁/解锁操做大概是 2.2ns 每次加锁/解锁(扣除了循环耗时 2.7ns)编程
在锁冲突的状况下,开销就没有这么小了。windows
首先pthread_mutex_lock会真正的调用sys_futex来进入内核来试图加锁,被锁住之后线程会进入睡眠,这带来了上下文切换和线程调度的开销。
能够写两个互相解锁的线程来测试这个过程的开销:
// Copyright (C) 2010 Benoit Sigoure // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. #include <pthread.h> #include <sched.h> #include <stdio.h> #include <stdlib.h> #include <sys/ipc.h> #include <sys/shm.h> #include <sys/syscall.h> #include <sys/wait.h> #include <time.h> #include <unistd.h> #include <linux/futex.h> static inline long long unsigned time_ns(struct timespec* const ts) { if (clock_gettime(CLOCK_REALTIME, ts)) { exit(1); } return ((long long unsigned) ts->tv_sec) * 1000000000LLU + (long long unsigned) ts->tv_nsec; } static const int iterations = 500000; static void* thread(void* restrict ftx) { int* futex = (int*) ftx; for (int i = 0; i < iterations; i++) { sched_yield(); while (syscall(SYS_futex, futex, FUTEX_WAIT, 0xA, NULL, NULL, 42)) { // retry sched_yield(); } *futex = 0xB; while (!syscall(SYS_futex, futex, FUTEX_WAKE, 1, NULL, NULL, 42)) { // retry sched_yield(); } } return NULL; } int main(void) { struct timespec ts; const int shm_id = shmget(IPC_PRIVATE, sizeof (int), IPC_CREAT | 0666); int* futex = shmat(shm_id, NULL, 0); pthread_t thd; if (pthread_create(&thd, NULL, thread, futex)) { return 1; } *futex = 0xA; const long long unsigned start_ns = time_ns(&ts); for (int i = 0; i < iterations; i++) { *futex = 0xA; while (!syscall(SYS_futex, futex, FUTEX_WAKE, 1, NULL, NULL, 42)) { // retry sched_yield(); } sched_yield(); while (syscall(SYS_futex, futex, FUTEX_WAIT, 0xB, NULL, NULL, 42)) { // retry sched_yield(); } } const long long unsigned delta = time_ns(&ts) - start_ns; const int nswitches = iterations << 2; printf("%i thread context switches in %lluns (%.1fns/ctxsw)\n", nswitches, delta, (delta / (float) nswitches)); wait(futex); return 0; }
编译使用 gcc -std=gnu99 -pthread context_switch.c。
运行的结果是 2003.4ns/ctxsw,因此锁冲突的开销大概是不冲突开销的 910 倍了,相差出乎意料的大。
另一个c程序能够用来测试“纯上下文切换”的开销,线程只是使用sched_yield来放弃处理器,并不进入睡眠。
// Copyright (C) 2010 Benoit Sigoure // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. #include <sched.h> #include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <time.h> #include <string.h> #include <errno.h> static inline long long unsigned time_ns(struct timespec* const ts) { if (clock_gettime(CLOCK_REALTIME, ts)) { exit(1); } return ((long long unsigned) ts->tv_sec) * 1000000000LLU + (long long unsigned) ts->tv_nsec; } static const int iterations = 500000; static void* thread(void*ctx) { (void)ctx; for (int i = 0; i < iterations; i++) sched_yield(); return NULL; } int main(void) { struct sched_param param; param.sched_priority = 1; if (sched_setscheduler(getpid(), SCHED_FIFO, ¶m)) fprintf(stderr, "sched_setscheduler(): %s\n", strerror(errno)); struct timespec ts; pthread_t thd; if (pthread_create(&thd, NULL, thread, NULL)) { return 1; } long long unsigned start_ns = time_ns(&ts); for (int i = 0; i < iterations; i++) sched_yield(); long long unsigned delta = time_ns(&ts) - start_ns; const int nswitches = iterations << 2; printf("%i thread context switches in %lluns (%.1fns/ctxsw)\n", nswitches, delta, (delta / (float) nswitches)); return 0; }
“纯上下文切换” 消耗了大概381.2ns/ctxsw。
这样咱们大体能够把锁冲突的开销分红三部分,“纯上下文切换”开销,大概是 381.2ns,调度器开销(把线程从睡眠变成就绪或者反过来)大概是1622.2ns,在多核系统上,还存在跨处理器调度的开销,那部分开销很大。在真实的应用场景里,还要考虑上下文切换带来的cache不命中和TLB不命中的开销,开销只会进一步加大。
从上面能够知道,真正消耗时间的不是上锁的次数,而是锁冲突的次数。减小锁冲突的次数才是提高性能的关键。使用更细粒度的锁,能够减小锁冲突。这里说的粒度包括时间和空间,好比哈希表包含一系列哈希桶,为每一个桶设置一把锁,空间粒度就会小不少--哈希值相互不冲突的访问不会致使锁冲突,这比为整个哈希表维护一把锁的冲突机率低不少。减小时间粒度也很容易理解,加锁的范围只包含必要的代码段,尽可能缩短得到锁到释放锁之间的时间,最重要的是,绝对不要在锁中进行任何可能会阻塞的操做。使用读写锁也是一个很好的减小冲突的方式,读操做之间不互斥,大大减小了冲突。
假设单向链表中的插入/删除操做不多,主要操做是搜索,那么基于单一锁的方法性能会不好。在这种状况下,应该考虑使用读写锁,即 pthread_rwlock_t,这么作就容许多个线程同时搜索链表。插入和删除操做仍然会锁住整个链表。假设执行的插入和搜索操做数量差很少相同,可是删除操做不多,那么在插入期间锁住整个链表是不合适的,在这种状况下,最好容许在链表中的分离点(disjoint point)上执行并发插入,一样使用基于读写锁的方式。在两个级别上执行锁定,链表有一个读写锁,各个节点包含一个互斥锁,在插入期间,写线程在链表上创建读锁,而后继续处理。在插入数据以前,锁住要在其后添加新数据的节点,插入以后释放此节点,而后释放读写锁。删除操做在链表上创建写锁。不须要得到与节点相关的锁;互斥锁只创建在某一个操做节点之上,大大减小锁冲突的次数。
锁自己的行为也存在进一步优化的可能性,sys_futex系统调用的做用在于让被锁住的当前线程睡眠,让出处理器供其它线程使用,既然这个过程的消耗很高,也就是说若是被锁定的时间不超过这个数值的话,根本没有必要进内核加锁——释放的处理器时间还不够消耗的。sys_futex的时间消耗够跑不少次 CAS 的,也就是说,对于一个锁冲突比较频繁并且平均锁定时间比较短的系统,一个值得考虑的优化方式是先循环调用 CAS 来尝试得到锁(这个操做也被称做自旋锁),在若干次失败后再进入内核真正加锁。固然这个优化只能在多处理器的系统里起做用(得有另外一个处理器来解锁,不然自旋锁无心义)。在glibc的pthread实现里,经过对pthread_mutex设置PTHREAD_MUTEX_ADAPTIVE_NP属性就可使用这个机制。
锁产生的一些问题:
无锁编程的好处之一是一个线程被挂起,不会影响到另外一个线程的执行,避免锁护送;在锁冲突频繁且平均锁定时间较短的系统,避免上下文切换和调度开销。
CAS (comapre and swap 或者 check and set),比较并替换,引用 wiki,它是一种用于线程数据同步的原子指令。
CAS 核心算法涉及到三个参数,即内存值,更新值和指望值;CAS 指令会先检查一个内存位置是否包含预期的值;若是是这样,就把新的值复制到这个位置,返回 true;若是不是则返回 false。
CAS 对应一条汇编指令 CMPXCHG,所以是原子性的。
bool compare_and_swap (int *accum, int *dest, int newval) { if ( *accum == *dest ) { *dest = newval; return true; } return false; }
通常,程序会在循环里使用 CAS 不断去完成一个事务性的操做,通常包含拷贝一个共享的变量到一个局部变量,而后再使用这个局部变量执行任务计算获得新的值,最后再使用 CAS 比较保存再局部变量的旧值和内存值来尝试提交你的修改,若是尝试失败,会从新读取一遍内存值,再从新计算,最后再使用 CAS 尝试提交修改,如此循环。好比:
void LockFreeQueue::push(Node* newHead) { for (;;) { // 拷贝共享变量(m_Head) 到一个局部变量 Node* oldHead = m_Head; // 执行任务,能够不用关注其余线程 newHead->next = oldHead; // 下一步尝试提交更改到共享变量 // 若是共享变量没有被其余线程修改过,仍为 oldHead,则 CAS 将 newHead 赋值给共享变量 m_Head 并返回 // 不然继续循环重试 if (_InterlockedCompareExchange(&m_Head, newHead, oldHead)) return; } }
上面的数据结构设置了一个共享的头节点 m_Head,当 push 一个新的节点时,会把新节点加在头节点后面;不要相信程序的执行是连续的,CPU 的执行是多线程并发。在 _InterlockedCompareExchange 即 CAS 以前,线程可能由于时间片用完被调度出去,新调度进来的线程执行完了 push 操做,多个线程共享了 m_Head 变量,此时 m_Head 已被修改了,若是原来线程继续执行,把 oldHead 覆盖到 m_Head,就会丢失其余线程 push 进来的节点。因此须要比较 m_Head 是否是还等于 oldHead,若是是,说明头节点不变,可使用 newHead 覆盖 m_Head;若是不是,说明有其余线程 push 了新的节点,那么须要使用最新的 m_Head 更新 oldHead 的值从新走一下循环,_InterlockedCompareExchange 会自动把 m_Head 赋值给 oldHead。
由于 CAS 须要在提交修改时检查指望值和内存值有没有发生变化,若是没有则进行更新,可是若是原来一个值从 A 变成 B 又变成 A,那么使用 CAS 检查的时候发现值没有发生变化,但实际上已经发生了一系列变化。
内存的回收利用会致使 CAS 出现严重的问题:
T* ptr1 = new T(8, 18); T* old = ptr1; delete ptr1; T* ptr2 = new T(0, 1); // 咱们不能保证操做系统不会从新使用 ptr1 内存地址,通常的内存管理器都会这样子作 if (old1 == ptr2) { // 这里表示,刚刚回收的 ptr1 指向的内存被用于后面申请的 ptr2了 }
ABA问题是无锁结构实现中常见的一种问题,可基本表述为:
对于P1来讲,数值A未发生过改变,但实际上A已经被变化过了,继续使用可能会出现问题。在CAS操做中,因为比较的可能是指针,这个问题将会变得更加严重。试想以下状况:
有一个堆(先入后出)中有top和节点A,节点A目前位于堆顶top指针指向A。如今有一个进程P1想要pop一个节点,所以按照以下无锁操做进行
pop() { do{ ptr = top; // ptr = top = NodeA next_prt = top->next; // next_ptr = NodeX } while(CAS(top, ptr, next_ptr) != true); return ptr; }
而进程P2在执行CAS操做以前打断了P1,并对堆进行了一系列的pop和push操做,使堆变为以下结构:
进程P2首先pop出NodeA,以后又Push了两个NodeB和C,因为内存管理机制中普遍使用的内存重用机制,致使NodeC的地址与以前的NodeA一致。
这时P1又开始继续运行,在执行CAS操做时,因为top依旧指向的是NodeA的地址(实际上已经变为NodeC),所以将top的值修改成了NodeX,这时堆结构以下:
通过CAS操做后,top指针错误的指向了NodeX而不是NodeB。
Tagged state reference,增长额外的 tag bits 位,它像一个版本号;好比,其中一种算法是在内存地址的低位记录指针的修改次数,在指针修改时,下一次 CAS 会返回失败,即便由于内存重用机制致使地址同样。有时咱们称这种机制位 ABA‘,由于咱们使第二个 A 稍微有点不一样于第一个。tag 的位数长度会影响记录修改的次数,在现有的 CPU 下,使用 60 bit tag,在不重启程序10年才会产生溢出问题;在 X64 CPU,趋向于支持 128 bit 的 CAS 指令,这样更能保证避免出现 ABA 问题。
下面参考 liblfds 库代码说明下 Tagged state reference 的实现过程。
咱们想要避免 ABA 问题的方法之一是使用更长的指针,这样便须要一个支持 dword 长度的 CAS 指令。liblfds 是怎么跨平台实现 128 bit 指令的呢?
在 liblfds 下,CAS 指令为 LFDS710_PAL_ATOMIC_DWCAS 宏,它的完整形式是:
LFDS710_PAL_ATOMIC_DWCAS( pointer_to_destination, pointer_to_compare, pointer_to_new_destination, cas_strength, result)
从上面能够看出,liblfds 库使用一个由两个元素组成的一维数组来表示 128 bit 指针。
Linux 提供了 cmpxchg16b 用于实现 128 bit 的 CAS 指令,而在 Windows,使用 _InterlockedCompareExchange128。只有 128 位指针彻底相等的状况下,才视为相等。
参考 liblfds/liblfds7.1.0/liblfds710/inc/liblfds710/lfds710_porting_abstraction_layer_compiler.h 下关于 CAS 的 windows 实现:
#define LFDS710_PAL_ATOMIC_DWCAS( pointer_to_destination, pointer_to_compare, pointer_to_new_destination, cas_strength, result ) \ { \ LFDS710_PAL_BARRIER_COMPILER_FULL; \ (result) = (char unsigned) _InterlockedCompareExchange128( (__int64 volatile *) (pointer_to_destination), (__int64) (pointer_to_new_destination[1]), (__int64) (pointer_to_new_destination[0]), (__int64 *) (pointer_to_compare) ); \ LFDS710_PAL_BARRIER_COMPILER_FULL; \ }
再重点研究 new_top 的定义和提交修改过程。
new_top 是一个具备两个元素的一维数组,元素是 struct lfds710_stack_element 指针,两个元素分别使用 POINTER 0 和 COUNTER 1 标记。COUNTER 至关于前面说的 tag 标记,POINTER 保存的时真正的节点指针。在 X64 下,指针长度是 64 bit,因此这里使用的是 64 bit tag 记录 pointer 修改记录。
liblfds 用原 top 的 COUNTER + 1来初始化 new top COUNTER,即便用 COUNTER 标记 ss->top 的更换次数,这样每一次更换 top,top 里的 COUNTER 都会变。
只有在 ss->top 和 original_top 的 POINTER 和 COUNTER 彻底相等的状况下,new_top 才会覆盖到 ss->top,不然会使用 ss->top 覆盖 original_top,下次循环用最新的 original_top 再次操做和比较。
参考 liblfds/liblfds7.1.0/liblfds710/src/lfds710_stack/lfds710_stack_push.c,无锁堆栈的实现:
void lfds710_stack_push( struct lfds710_stack_state *ss, struct lfds710_stack_element *se ) { char unsigned result; lfds710_pal_uint_t backoff_iteration = LFDS710_BACKOFF_INITIAL_VALUE; struct lfds710_stack_element LFDS710_PAL_ALIGN(LFDS710_PAL_ALIGN_DOUBLE_POINTER) *new_top[PAC_SIZE], *volatile original_top[PAC_SIZE]; LFDS710_PAL_ASSERT( ss != NULL ); LFDS710_PAL_ASSERT( se != NULL ); new_top[POINTER] = se; original_top[COUNTER] = ss->top[COUNTER]; original_top[POINTER] = ss->top[POINTER]; do { se->next = original_top[POINTER]; LFDS710_MISC_BARRIER_STORE; new_top[COUNTER] = original_top[COUNTER] + 1; LFDS710_PAL_ATOMIC_DWCAS( ss->top, original_top, new_top, LFDS710_MISC_CAS_STRENGTH_WEAK, result ); if( result == 0 ) LFDS710_BACKOFF_EXPONENTIAL_BACKOFF( ss->push_backoff, backoff_iteration ); } while( result == 0 ); LFDS710_BACKOFF_AUTOTUNE( ss->push_backoff, backoff_iteration ); return; }
[wiki Compare-and-swap] https://en.wikipedia.org/wiki/Compare-and-swap
[wiki ABA problem] https://en.wikipedia.org/wiki/ABA_problem
[左耳朵耗子无锁队列的实现] https://coolshell.cn/articles/8239.html
[IBM 设计不使用互斥锁的并发数据结构] https://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/index.html#artrelatedtopics
[ABA problem] https://lumian2015.github.io/lockFreeProgramming/aba-problem.html
[_InterlockedCompareExchange128] https://docs.microsoft.com/en-us/cpp/intrinsics/interlockedcompareexchange128?view=vs-2019
[Linux 互斥锁的实现原理(pthread_mutex_t)] https://www.bbsmax.com/A/x9J2WXvW56/
[futex机制介绍] https://blog.csdn.net/y33988979/article/details/82252266
[an-introduction-to-lock-free-programming] https://preshing.com/20120612/an-introduction-to-lock-free-programming/
[多进程、多线程与多处理器计算平台的性能问题] https://blog.csdn.net/Jmilk/article/details/81049623
[Implement Lock-Free Queue] http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.53.8674&rep=rep1&type=pdf
[上下文切换和线程调度性能测试] https://github.com/tsuna/contextswitch/blob/master/timetctxsw.c
[纯上下文切换性能测试] https://github.com/tsuna/contextswitch/blob/master/timetctxsw2.c
[锁的开销] http://xbay.github.io/2015/12/31/%E9%94%81%E7%9A%84%E5%BC%80%E9%94%80/
[pthread包的mutex实现分析] https://blog.csdn.net/tlxamulet/article/details/79047717
[IBM通用线程:POSIX 线程详解] https://www.ibm.com/developerworks/cn/linux/thread/posix_thread2/index.html