原文:https://www.cnblogs.com/my_life/articles/5220172.htmlhtml
程序在运行时内存实际的访问顺序和程序代码编写的访问顺序不必定一致,这就是内存乱序访问。内存乱序访问行为出现的理由是为了提高程序运行时的性能。内存乱序访问主要发生在两个阶段:linux
Memory barrier 可以让 CPU 或编译器在内存访问上有序。一个 Memory barrier 以前的内存访问操做一定先于其以后的完成。Memory barrier 包括两类:程序员
不少时候,编译器和 CPU 引发内存乱序访问不会带来什么问题,但一些特殊状况下,程序逻辑的正确性依赖于内存访问顺序,这时候内存乱序访问会带来逻辑上的错误,例如:redis
// thread 1 while (!ok); do(x); // thread 2 x = 42; ok = 1;
此段代码中,ok 初始化为 0,线程 1 等待 ok 被设置为 1 后执行 do 函数。假如说,线程 2 对内存的写操做乱序执行,也就是 x 赋值后于 ok 赋值完成,那么 do 函数接受的实参就极可能出乎程序员的意料,不为 42。安全
在编译时,编译器对代码作出优化时可能改变实际执行指令的顺序(例如 gcc 下 O2 或 O3 都会改变实际执行指令的顺序)数据结构
// test.cpp int x, y, r; void f() { x = r; y = 1; }
编译器优化的结果可能致使 y = 1 在 x = r 以前执行完成。首先直接编译此源文件:多线程
获得相关的汇编代码以下:架构
这里咱们看到,x = r 和 y = 1 并无乱序。现使用优化选项 O2(或 O3)编译上面的代码(g++ -O2 -S test.cpp),生成汇编代码以下:函数
咱们能够清楚的看到通过编译器优化以后 movl $1, y(%rip) 先于 movl %eax, x(%rip) 执行。避免编译时内存乱序访问的办法就是使用编译器 barrier(又叫优化 barrier)。Linux 内核提供函数 barrier() 用于让编译器保证其以前的内存访问先于其以后的完成。内核实现 barrier() 以下(X86-64 架构):性能
如今把此编译器 barrier 加入代码中:
int x, y, r; void f() { x = r; __asm__ __volatile__("" ::: "memory"); y = 1; }
这样就避免了编译器优化带来的内存乱序访问的问题了(若是有兴趣能够再看看编译以后的汇编代码)。本例中,咱们还可使用 volatile 这个关键字来避免编译时内存乱序访问(而没法避免后面要说的运行时内存乱序访问)。volatile 关键字可以让相关的变量之间在内存访问上避免乱序,这里能够修改 x 和 y 的定义来解决问题:
volatile int x, y; int r; void f() { x = r; y = 1; }
现加上了 volatile 关键字,这使得 x 相对于 y、y 相对于 x 在内存访问上有序。在 Linux 内核中,提供了一个宏 ACCESS_ONCE 来避免编译器对于连续的 ACCESS_ONCE 实例进行指令重排。其实 ACCESS_ONCE 实现源码以下:
此代码只是将变量 x 转换为 volatile 的而已。如今咱们就有了第三个修改方案:
int x, y, r; void f() { ACCESS_ONCE(x) = r; ACCESS_ONCE(y) = 1; }
到此基本上就阐述完了咱们的编译时内存乱序访问的问题。下面开始介绍运行时内存乱序访问。
在运行时,CPU 虽然会乱序执行指令,可是在单个 CPU 的上,硬件可以保证程序执行时全部的内存访问操做看起来像是按程序代码编写的顺序执行的,这时候 Memory barrier 没有必要使用(不考虑编译器优化的状况下)。这里咱们了解一下 CPU 乱序执行的行为。在乱序执行时,一个处理器真正执行指令的顺序由可用的输入数据决定,而非程序员编写的顺序。
早期的处理器为有序处理器(In-order processors),有序处理器处理指令一般有如下几步:
相比之下,乱序处理器(Out-of-order processors)处理指令一般有如下几步:
从上面的执行过程能够看出,乱序执行相比有序执行可以避免等待不可用的操做对象(有序执行的第二步)从而提升了效率。现代的机器上,处理器运行的速度比内存快不少,有序处理器花在等待可用数据的时间里已经能够处理大量指令了。
如今思考一下乱序处理器处理指令的过程,咱们能获得几个结论:
由此可知,在单 CPU 上,不考虑编译器优化致使乱序的前提下,多线程执行不存在内存乱序访问的问题。咱们从内核源码也能够获得相似的结论(代码不彻底的摘录):
#ifdef CONFIG_SMP #define smp_mb() mb() #else #define smp_mb() barrier() #endif
这里能够看到,若是是 SMP 则使用 mb,mb 被定义为 CPU Memory barrier(后面会讲到),而非 SMP 时,直接使用编译器 barrier。
在多 CPU 的机器上,问题又不同了。每一个 CPU 都存在 cache(cache 主要是为了弥补 CPU 和内存之间较慢的访问速度),当一个特定数据第一次被特定一个 CPU 获取时,此数据显然不在 CPU 的 cache 中(这就是 cache miss)。此 cache miss 意味着 CPU 须要从内存中获取数据(这个过程须要 CPU 等待数百个周期),此数据将被加载到 CPU 的 cache 中,这样后续就能直接从 cache 上快速访问。当某个 CPU 进行写操做时,它必须确保其余的 CPU 已经将此数据从它们的 cache 中移除(以便保证一致性),只有在移除操做完成后此 CPU 才能安全的修改数据。显然,存在多个 cache 时,咱们必须经过一个 cache 一致性协议来避免数据不一致的问题,而这个通信的过程就可能致使乱序访问的出现,也就是这里说的运行时内存乱序访问。这里再也不深刻讨论整个细节,这是一个比较复杂的问题,有兴趣能够研究 http://www.rdrop.com/users/paulmck/scalability/paper/whymb.2010.06.07c.pdf 一文,其详细的分析了整个过程。
如今经过一个例子来讲明多 CPU 下内存乱序访问:
// test2.cpp #include <pthread.h> #include <assert.h> // ------------------- int cpu_thread1 = 0; int cpu_thread2 = 1; volatile int x, y, r1, r2; void start() { x = y = r1 = r2 = 0; } void end() { assert(!(r1 == 0 && r2 == 0)); } void run1() { x = 1; r1 = y; } void run2() { y = 1; r2 = x; } // ------------------- static pthread_barrier_t barrier_start; static pthread_barrier_t barrier_end; static void* thread1(void*) { while (1) { pthread_barrier_wait(&barrier_start); run1(); pthread_barrier_wait(&barrier_end); } return NULL; } static void* thread2(void*) { while (1) { pthread_barrier_wait(&barrier_start); run2(); pthread_barrier_wait(&barrier_end); } return NULL; } int main() { assert(pthread_barrier_init(&barrier_start, NULL, 3) == 0); assert(pthread_barrier_init(&barrier_end, NULL, 3) == 0); pthread_t t1; pthread_t t2; assert(pthread_create(&t1, NULL, thread1, NULL) == 0); assert(pthread_create(&t2, NULL, thread2, NULL) == 0); cpu_set_t cs; CPU_ZERO(&cs); CPU_SET(cpu_thread1, &cs); assert(pthread_setaffinity_np(t1, sizeof(cs), &cs) == 0); CPU_ZERO(&cs); CPU_SET(cpu_thread2, &cs); assert(pthread_setaffinity_np(t2, sizeof(cs), &cs) == 0); while (1) { start(); pthread_barrier_wait(&barrier_start); pthread_barrier_wait(&barrier_end); end(); } return 0; }
这里建立了两个线程来运行测试代码(须要测试的代码将放置在 run 函数中)。我使用了 pthread barrier(区别于本文讨论的 Memory barrier)主要为了让两个子线程可以同时运行它们的 run 函数。此段代码不停的尝试同时运行两个线程的 run 函数,以便得出咱们指望的结果。在每次运行 run 函数前会调用一次 start 函数(进行数据初始化),run 运行后会调用一次 end 函数(进行结果检查)。run1 和 run2 两个函数运行在哪一个 CPU 上则经过 cpu_thread1 和 cpu_thread2 两个变量控制。
先编译此程序:g++ -lpthread -o test2 test2.cpp(这里未优化,目的是为了不编译器优化的干扰)。须要注意的是,两个线程运行在两个不一样的 CPU 上(CPU 0 和 CPU 1)。只要内存不出现乱序访问,那么 r1 和 r2 不可能同时为 0,所以断言失败表示存在内存乱序访问。编译以后运行此程序,会发现存在必定几率致使断言失败。为了进一步说明问题,咱们把 cpu_thread2 的值改成 0,换而言之就是让两个线程跑在同一个 CPU 下,再运行程序发现断言再也不失败。
最后,咱们使用 CPU Memory barrier 来解决内存乱序访问的问题(X86-64 架构下):
int cpu_thread1 = 0; int cpu_thread2 = 1; void run1() { x = 1; __asm__ __volatile__("mfence" ::: "memory"); r1 = y; } void run2() { y = 1; __asm__ __volatile__("mfence" ::: "memory"); r2 = x; }
Memory barrier 经常使用场合包括:
实际的应用程序开发中,开发者可能彻底不知道 Memory barrier 就能够开发正确的多线程程序,这主要是由于各类同步机制中已经隐含了 Memory barrier(但和实际的 Memory barrier 有细微差异),这就使得不直接使用 Memory barrier 不会存在任何问题。可是若是你但愿编写诸如无锁数据结构,那么 Memory barrier 仍是颇有用的。
一般来讲,在单个 CPU 上,存在依赖的内存访问有序:
这里内存操做有序。然而在 Alpha CPU 上,存在依赖的内存读取操做不必定有序,须要使用数据依赖 barrier(因为 Alpha 不常见,这里就不详细解释了)。
在 Linux 内核中,除了前面说到的编译器 barrier — barrier() 和 ACCESS_ONCE(),还有 CPU Memory barrier:
注意,全部的 CPU Memory barrier(除了数据依赖 barrier 以外)都隐含了编译器 barrier。这里的 smp 开头的 Memory barrier 会根据配置在单处理器上直接使用编译器 barrier,而在 SMP 上才使用 CPU Memory barrier(也就是 mb()、wmb()、rmb(),回忆上面相关内核代码)。
最后须要注意一点的是,CPU Memory barrier 中某些类型的 Memory barrier 须要成对使用,不然会出错,详细来讲就是:一个写操做 barrier 须要和读操做(或数据依赖)barrier 一块儿使用(固然,通用 barrier 也是能够的),反之依然。
读内核代码进一步学习 Memory barrier 的使用。
Linux 内核实现的无锁(只有一个读线程和一个写线程时)环形缓冲区 kfifo 就使用到了 Memory barrier,实现源码以下:
/* * A simple kernel FIFO implementation. * * Copyright (C) 2004 Stelian Pop <stelian@popies.net> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * */ #include <linux/kernel.h> #include <linux/module.h> #include <linux/slab.h> #include <linux/err.h> #include <linux/kfifo.h> #include <linux/log2.h> /** * kfifo_init - allocates a new FIFO using a preallocated buffer * @buffer: the preallocated buffer to be used. * @size: the size of the internal buffer, this have to be a power of 2. * @gfp_mask: get_free_pages mask, passed to kmalloc() * @lock: the lock to be used to protect the fifo buffer * * Do NOT pass the kfifo to kfifo_free() after use! Simply free the * &struct kfifo with kfree(). */ struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock) { struct kfifo *fifo; /* size must be a power of 2 */ BUG_ON(!is_power_of_2(size)); fifo = kmalloc(sizeof(struct kfifo), gfp_mask); if (!fifo) return ERR_PTR(-ENOMEM); fifo->buffer = buffer; fifo->size = size; fifo->in = fifo->out = 0; fifo->lock = lock; return fifo; } EXPORT_SYMBOL(kfifo_init); /** * kfifo_alloc - allocates a new FIFO and its internal buffer * @size: the size of the internal buffer to be allocated. * @gfp_mask: get_free_pages mask, passed to kmalloc() * @lock: the lock to be used to protect the fifo buffer * * The size will be rounded-up to a power of 2. */ struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock) { unsigned char *buffer; struct kfifo *ret; /* * round up to the next power of 2, since our 'let the indices * wrap' technique works only in this case. */ if (!is_power_of_2(size)) { BUG_ON(size > 0x80000000); size = roundup_pow_of_two(size); } buffer = kmalloc(size, gfp_mask); if (!buffer) return ERR_PTR(-ENOMEM); ret = kfifo_init(buffer, size, gfp_mask, lock); if (IS_ERR(ret)) kfree(buffer); return ret; } EXPORT_SYMBOL(kfifo_alloc); /** * kfifo_free - frees the FIFO * @fifo: the fifo to be freed. */ void kfifo_free(struct kfifo *fifo) { kfree(fifo->buffer); kfree(fifo); } EXPORT_SYMBOL(kfifo_free); /** * __kfifo_put - puts some data into the FIFO, no locking version * @fifo: the fifo to be used. * @buffer: the data to be added. * @len: the length of the data to be added. * * This function copies at most @len bytes from the @buffer into * the FIFO depending on the free space, and returns the number of * bytes copied. * * Note that with only one concurrent reader and one concurrent * writer, you don't need extra locking to use these functions. */ unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len) { unsigned int l; len = min(len, fifo->size - fifo->in + fifo->out); /* * Ensure that we sample the fifo->out index -before- we * start putting bytes into the kfifo. */ smp_mb(); /* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l); /* * Ensure that we add the bytes to the kfifo -before- * we update the fifo->in index. */ smp_wmb(); fifo->in += len; return len; } EXPORT_SYMBOL(__kfifo_put); /** * __kfifo_get - gets some data from the FIFO, no locking version * @fifo: the fifo to be used. * @buffer: where the data must be copied. * @len: the size of the destination buffer. * * This function copies at most @len bytes from the FIFO into the * @buffer and returns the number of copied bytes. * * Note that with only one concurrent reader and one concurrent * writer, you don't need extra locking to use these functions. */ unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len) { unsigned int l; len = min(len, fifo->in - fifo->out); /* * Ensure that we sample the fifo->in index -before- we * start removing bytes from the kfifo. */ smp_rmb(); /* first get the data from fifo->out until the end of the buffer */ l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); /* then get the rest (if any) from the beginning of the buffer */ memcpy(buffer + l, fifo->buffer, len - l); /* * Ensure that we remove the bytes from the kfifo -before- * we update the fifo->out index. */ smp_mb(); fifo->out += len; return len; } EXPORT_SYMBOL(__kfifo_get);
为了更好的理解上面的源码,这里顺带说一下此实现使用到的一些和本文主题无关的技巧:
这里,索引 in 和 out 被两个线程访问。in 和 out 指明了缓冲区中实际数据的边界,也就是 in 和 out 同缓冲区数据存在访问上的顺序关系,因为未使用同步机制,那么保证顺序关系就须要使用到 Memory barrier 了。索引 in 和 out 都分别只被一个线程修改,而被两个线程读取。__kfifo_put 先经过 in 和 out 来肯定能够向缓冲区中写入数据量的多少,这时,out 索引应该先被读取后才能真正的将用户 buffer 中的数据写入缓冲区,所以这里使用到了 smp_mb(),对应的,__kfifo_get 也使用 smp_mb() 来确保修改 out 索引以前缓冲区中数据已经被成功读取并写入用户 buffer 中了。对于 in 索引,在 __kfifo_put 中,经过 smp_wmb() 保证先向缓冲区写入数据后才修改 in 索引,因为这里只须要保证写入操做有序,故选用写操做 barrier,在 __kfifo_get 中,经过 smp_rmb() 保证先读取了 in 索引(这时候 in 索引用于肯定缓冲区中实际存在多少可读数据)才开始读取缓冲区中数据(并写入用户 buffer 中),因为这里只须要保证读取操做有序,故选用读操做 barrier。
到这里,Memory barrier 就介绍完毕了。