一、前言html
队列在计算机中很是重要的一种数据结构,尤为在操做系统中。队列典型的特征是先进先出(FIFO),符合流水线业务流程。在进程间通讯、网络通讯之间常常采用队列作缓存,缓解数据处理压力。结合本身在工做中遇到的队列问题,总结一下对不一样场景下的队列实现。根据操做队列的场景分为:单生产者——单消费者、多生产者——单消费者、单生产者——多消费者、多生产者——多消费者四大模型。其实后面三种的队列,能够概括为一种多对多。根据队列中数据分为:队列中的数据是定长的、队列中的数据是变长的。linux
二、队列操做模型算法
(1)单生产者——单消费者shell

(2)多生产者——单消费者windows

(3)单生产者——多消费者api

(4)多生产者——多消费者数组

三、队列数据定长与变长缓存
(1)队列数据定长网络

(2)队列数据变长数据结构

四、并发无锁处理
(1)单生产者——单消费者模型
此种场景不须要加锁,定长的能够经过读指针和写指针进行控制队列操做,变长的经过读指针、写指针、结束指针控制操做。具体实现能够参考linux内核提供的kfifo的实现。能够参考:
http://blog.csdn.net/linyt/article/details/5764312
(2)(一)多对多(一)模型
正常逻辑操做是要对队列操做进行加锁处理。加锁的性能开销较大,通常采用无锁实现。无锁实现原理是CAS、FAA等机制。定长的能够参考:
http://coolshell.cn/articles/8239.html
变长的能够参考intel dpdk提供的rte_ring的实现。
http://blog.csdn.net/linzhaolover/article/details/9771329
无锁队列的实现
关于无锁队列的实现,网上有不少文章,虽然本文可能和那些文章有所重复,可是我仍是想以我本身的方式把这些文章中的重要的知识点串起来和你们讲一讲这个技术。下面开始正文。
关于CAS等原子操做
在开始说无锁队列以前,咱们须要知道一个很重要的技术就是CAS操做——Compare & Set,或是 Compare & Swap,如今几乎全部的CPU指令都支持CAS的原子操做,X86下对应的是 CMPXCHG 汇编指令。有了这个原子操做,咱们就能够用其来实现各类无锁(lock free)的数据结构。
这个操做用C语言来描述就是下面这个样子:(代码来自Wikipedia的Compare And Swap词条)意思就是说,看一看内存*reg里的值是否是oldval,若是是的话,则对其赋值newval。
int
compare_and_swap (
int
* reg,
int
oldval,
int
newval)
{
int
old_reg_val = *reg;
if
(old_reg_val == oldval)
*reg = newval;
return
old_reg_val;
}
这个操做能够变种为返回bool值的形式(返回 bool值的好处在于,能够调用者知道有没有更新成功):
bool
compare_and_swap (
int
*accum,
int
*dest,
int
newval)
{
if
( *accum == *dest ) {
*dest = newval;
return
true
;
}
return
false
;
}
与CAS类似的还有下面的原子操做:(这些东西你们本身看Wikipedia吧)
注:在实际的C/C++程序中,CAS的各类实现版本以下:
1)GCC的CAS
GCC4.1+版本中支持CAS的原子操做(完整的原子操做可参看 GCC Atomic Builtins)
bool
__sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
2)Windows的CAS
在Windows下,你可使用下面的Windows API来完成CAS:(完整的Windows原子操做可参看MSDN的InterLocked Functions)
InterlockedCompareExchange ( __inout
LONG
volatile
*Target,
__in
LONG
Exchange,
__in
LONG
Comperand);
3) C++11中的CAS
C++11中的STL中的atomic类的函数可让你跨平台。(完整的C++11的原子操做可参看 Atomic Operation Library)
template
<
class
T >
bool
atomic_compare_exchange_weak( std::atomic* obj,
T* expected, T desired );
template
<
class
T >
bool
atomic_compare_exchange_weak(
volatile
std::atomic* obj,
T* expected, T desired );
无锁队列的链表实现
下面的东西主要来自John D. Valois 1994年10月在拉斯维加斯的并行和分布系统系统国际大会上的一篇论文——《Implementing Lock-Free Queues》。
咱们先来看一下进队列用CAS实现的方式:
EnQueue(x)
{
q =
new
record();
q->value = x;
q->next = NULL;
do
{
p = tail;
}
while
( CAS(p->next, NULL, q) != TRUE);
CAS(tail, p, q);
}
咱们能够看到,程序中的那个 do- while 的 Re-Try-Loop。就是说,颇有可能我在准备在队列尾加入结点时,别的线程已经加成功了,因而tail指针就变了,因而个人CAS返回了false,因而程序再试,直到试成功为止。这个很像咱们的抢电话热线的不停重播的状况。
你会看到,为何咱们的“置尾结点”的操做(第12行)不判断是否成功,由于:
- 若是有一个线程T1,它的while中的CAS若是成功的话,那么其它全部的 随后线程的CAS都会失败,而后就会再循环,
- 此时,若是T1 线程尚未更新tail指针,其它的线程继续失败,由于tail->next不是NULL了。
- 直到T1线程更新完tail指针,因而其它的线程中的某个线程就能够获得新的tail指针,继续往下走了。
这里有一个潜在的问题——若是T1线程在用CAS更新tail指针的以前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()
EnQueue(x)
{
q =
new
record();
q->value = x;
q->next = NULL;
p = tail;
oldp = p
do
{
while
(p->next != NULL)
p = p->next;
}
while
( CAS(p.next, NULL, q) != TRUE);
CAS(tail, oldp, q);
}
咱们让每一个线程,本身fetch 指针 p 到链表尾。可是这样的fetch会很影响性能。而通实际状况看下来,99.9%的状况不会有线程停转的状况,因此,更好的作法是,你能够接合上述的这两个版本,若是retry的次数超了一个值的话(好比说3次),那么,就本身fetch指针。
好了,咱们解决了EnQueue,咱们再来看看DeQueue的代码:(很简单,我就不解释了)
DeQueue()
{
do
{
p = head;
if
(p->next == NULL){
return
ERR_EMPTY_QUEUE;
}
while
( CAS(head, p, p->next) != TRUE );
return
p->next->value;
}
咱们能够看到,DeQueue的代码操做的是 head->next,而不是head自己。这样考虑是由于一个边界条件,咱们须要一个dummy的头指针来解决链表中若是只有一个元素,head和tail都指向同一个结点的问题,这样EnQueue和DeQueue要互相排斥了。

注:上图的tail正处于更新以前的装态。
CAS的ABA问题
所谓ABA(见维基百科的ABA词条),问题基本是这个样子:
- 进程P1在共享变量中读到值为A
- P1被抢占了,进程P2执行
- P2把共享变量里的值从A改为了B,再改回到A,此时被P1抢占。
- P1回来看到共享变量里的值没有被改变,因而继续执行。
虽然P1觉得变量值没有改变,继续执行了,可是这个会引起一些潜在的问题。ABA问题最容易发生在lock free 的算法中的,CAS首当其冲,由于CAS判断的是指针的地址。若是这个地址被重用了呢,问题就很大了。(地址被重用是很常常发生的,一个内存分配后释放了,再分配,颇有可能仍是原来的地址)
好比上述的DeQueue()函数,由于咱们要让head和tail分开,因此咱们引入了一个dummy指针给head,当咱们作CAS的以前,若是head的那块内存被回收并被重用了,而重用的内存又被EnQueue()进来了,这会有很大的问题。(内存管理中重用内存基本上是一种很常见的行为)
这个例子你可能没有看懂,维基百科上给了一个活生生的例子——
你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,而后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个如出一辙的手提箱和你那装满钱的箱子调了个包,而后就离开了,你看到你的手提箱还在那,因而就提着手提箱去赶飞机去了。
这就是ABA的问题。
解决ABA的问题
维基百科上给了一个解——使用double-CAS(双保险的CAS),例如,在32位系统上,咱们要检查64位的内容
1)一次用CAS检查双倍长度的值,前半部是指针,后半部分是一个计数器。
2)只有这两个都同样,才算经过检查,要吧赋新的值。并把计数器累加1。
这样一来,ABA发生时,虽然值同样,可是计数器就不同(可是在32位的系统上,这个计数器会溢出回来又从1开始的,这仍是会有ABA的问题)
固然,咱们这个队列的问题就是不想让那个内存重用,这样明确的业务问题比较好解决,论文《Implementing Lock-Free Queues》给出一这么一个方法——使用结点内存引用计数refcnt!
SafeRead(q)
{
loop:
p = q->next;
if
(p == NULL){
return
p;
}
Fetch&Add(p->refcnt, 1);
if
(p == q->next){
return
p;
}
else
{
Release(p);
}
goto
loop;
}
其中的 Fetch&Add和Release分是是加引用计数和减引用计数,都是原子操做,这样就能够阻止内存被回收了。
用数组实现无锁队列
本实现来自论文《Implementing Lock-Free Queues》
使用数组来实现队列是很常见的方法,由于没有内存的分部和释放,一切都会变得简单,实现的思路以下:
1)数组队列应该是一个ring buffer形式的数组(环形数组)
2)数组的元素应该有三个可能的值:HEAD,TAIL,EMPTY(固然,还有实际的数据)
3)数组一开始所有初始化成EMPTY,有两个相邻的元素要初始化成HEAD和TAIL,这表明空队列。
4)EnQueue操做。假设数据x要入队列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。须要注意,若是找不到(TAIL, EMPTY),则说明队列满了。
5)DeQueue操做。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。一样须要注意,若是x是TAIL,则说明队列为空。
算法的一个关键是——如何定位HEAD或TAIL?
1)咱们能够声明两个计数器,一个用来计数EnQueue的次数,一个用来计数DeQueue的次数。
2)这两个计算器使用使用Fetch&ADD来进行原子累加,在EnQueue或DeQueue完成的时候累加就行了。
3)累加后求个模什么的就能够知道TAIL和HEAD的位置了。
以下图所示:

小结
以上基本上就是全部的无锁队列的技术细节,这些技术均可以用在其它的无锁数据结构上。
1)无锁队列主要是经过CAS、FAA这些原子操做,和Retry-Loop实现。
2)对于Retry-Loop,我我的感受其实和锁什么什么两样。只是这种“锁”的粒度变小了,主要是“锁”HEAD和TAIL这两个关键资源。而不是整个数据结构。
还有一些和Lock Free的文章你能够去看看:
【
注:我配了一张look-free的自行车,寓意为——若是不用专门的车锁,那么自行得本身锁本身!】
intel dpdk api ring 模块源码详解
摘要
intel dpdk 提供了一套ring 队列管理代码,支持单生产者产品入列,单消费者产品出列;多名生产者产品入列,多产品消费这产品出列操做;
咱们以app/test/test_ring.c文件中的代码进行讲解,test_ring_basic_ex()函数完成一个基本功能测试函数;
一、ring的建立
- rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY,
- RING_F_SP_ENQ | RING_F_SC_DEQ);
调用rte_ring_create函数去建立一个ring,
第一参数"test_ring_basic_ex"是这个ring的名字,
第二个参数RING_SIZE是ring的大小;
第三个参数是在哪一个socket id上建立 ,这指定的是任意;
第四个参数是指定此ring支持单入单出;
我看一下rte_ring_create函数主要完成了哪些操做;
- rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
执行读写锁的加锁操做;
- mz = rte_memzone_reserve(mz_name, ring_size, socket_id, mz_flags);
预留一部份内存空间给ring,其大小就是RING_SIZE个sizeof(struct rte_ring)的尺寸;
- r = mz->addr;
-
- memset(r, 0, sizeof(*r));
- rte_snprintf(r->name, sizeof(r->name), "%s", name);
- r->flags = flags;
- r->prod.watermark = count;
- r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ);
- r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ);
- r->prod.size = r->cons.size = count;
- r->prod.mask = r->cons.mask = count-1;
- r->prod.head = r->cons.head = 0;
- r->prod.tail = r->cons.tail = 0;
-
- TAILQ_INSERT_TAIL(ring_list, r, next);
将获取到的虚拟地址给了ring,而后初始化她,prod 表明生成者,cons表明消费者;
生产者最大能够生产count个,其取模的掩码是 count-1; 目前是0个产品,因此将生产者的头和消费者头都设置为0;其尾也设置未0;
- rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
执行读写锁的写锁解锁操做;
二、ring的单生产者产品入列
- rte_ring_enqueue(rp, obj[i])
ring的单个入列;
最终会调用到上面这个函数,进行单次入列,咱们看一下它的实现;
- prod_head = r->prod.head;
- cons_tail = r->cons.tail;
暂时将生产者的头索引和消费者的尾部索引交给临时变量;
- free_entries = mask + cons_tail - prod_head;
计算还有多少剩余的存储空间;
- prod_next = prod_head + n;
- r->prod.head = prod_next;
若是有足够的剩余空间,咱们先将临时变量prod_next 进行后移,同事将生产者的头索引后移n个;
- for (i = 0; likely(i < n); i++)
- r->ring[(prod_head + i) & mask] = obj_table[i];
- rte_wmb();
执行写操做,将目标进行入队操做,它并无任何大数据量的内存拷贝操做,只是进行指针的赋值操做,所以dpdk的内存操做很快,应该算是零拷贝;
- r->prod.tail = prod_next;
成功写入以后,将生产者的尾部索引赋值为prox_next ,也就是将其日后挪到n个索引;咱们成功插入了n个产品;目前是单个操做,索引目前n=1;
三、ring的单消费者产品出列
- rte_ring_dequeue(rp, &obj[i]);
一样出队也包含了好几层的调用,最终定位到__rte_ring_sc_do_dequeue函数;
- cons_head = r->cons.head;
- prod_tail = r->prod.tail;
先将消费者的头索引和生产者的头索引赋值给临时变量;
- entries = prod_tail - cons_head;
计算目前ring中有多少产品;
- cons_next = cons_head + n;
- r->cons.head = cons_next;
若是有足够的产品,就将临时变量cons_next日后挪到n个值,指向你想取出几个产品的位置;同时将消费者的头索引日后挪到n个;这目前n=1;由于是单个取出;
- rte_rmb();
- for (i = 0; likely(i < n); i++) {
- obj_table[i] = r->ring[(cons_head + i) & mask];
- }
执行读取操做,一样没有任何的大的数据量拷贝,只是进行指针的赋值;
- r->cons.tail = cons_next;
最后将消费者的尾部索引也像后挪动n个,最终等于消费者的头索引;
四、ring的多生产者产品入列
多生产者入列的实现是在 __rte_ring_mp_do_enqueue()函数中;在dpdk/lib/librte_ring/rte_ring.h 文件中定义;其实这个函数和单入列函数很类似;
-
- do {
-
- n = max;
- .................
-
- prod_next = prod_head + n;
- success = rte_atomic32_cmpset(&r->prod.head, prod_head,
- prod_next);
- } while (unlikely(success == 0));
在单生产者中时将生产者的头部和消费者的尾部直接赋值给临时变量,去求剩余存储空间;最后将生产者的头索引日后移动n个,
但在多生产者中,要判断这个头部是否和其余的生产者发出竞争,
success = rte_atomic32_cmpset(&r->prod.head, prod_head,
prod_next);
是否有其余生产者修改了prod.head,因此这要从新判断一下prod.head是否还等于prod_head,若是等于,就将其日后移动n个,也就是将prod_next值赋值给prod.head;
若是不等于,就会失败,就须要进入do while循环再次循环一次;从新刷新一下prod_head和prod_next 以及prod.head的值 ;
- for (i = 0; likely(i < n); i++)
- r->ring[(prod_head + i) & mask] = obj_table[i];
- rte_wmb();
执行产品写入操做;
写入操做完成以后,如是单生产者应该是直接修改生产者尾部索引,将其日后顺延n个,但目前是多生产者操做;是怎样实现的呢?
- while (unlikely(r->prod.tail != prod_head))
- rte_pause();
-
- r->prod.tail = prod_next;
这也先进行判断,判断当前的生产者尾部索引是否还等于,存储在临时变量中的生产者头索引,
若是不等于,说明,有其余的线程还在执行,并且应该是在它以前进行存储,还没来得及更新prod.tail;等其余的生产者更新tail后,就会使得prod.tail==prod_head;
以后再更新,prod.tail 日后挪动n个,最好实现 prod.tail==prod.head==prod_next==prod_head+n;
五、ring的多消费者产品出列
多个消费者同时取产品是在__rte_ring_mc_do_dequeue()函数中实现;定义在dpdk/lib/librte_ring/rte_ring.h文件中;
-
- do {
-
- n = max;
-
- cons_head = r->cons.head;
- prod_tail = r->prod.tail;
- ...................
-
- cons_next = cons_head + n;
- success = rte_atomic32_cmpset(&r->cons.head, cons_head,
- cons_next);
- } while (unlikely(success == 0));
和多生产者同样,在外面多包含了一次do while循环,防止多消费者操做发生竞争;
在循环中先将消费者的头索引和生产者的为索引赋值给临时变量;让后判断有多少剩余的产品在循环队列,
若有n个产品,就将临时变量cons_next 日后挪动n个,而后判断目前的消费者头索引是否还等于刚才的保存在临时变量cons_head 中的值,如相等,说明没有发生竞争,就将cons_next赋值给
消费者的头索引 r->cons.head,如不相等,就须要从新作一次do while循环;
- rte_rmb();
- for (i = 0; likely(i < n); i++) {
- obj_table[i] = r->ring[(cons_head + i) & mask];
- }
在成功更新消费者头索引后,执行读取产品操做,这并无大的数据拷贝操做,只是进行指针的从新赋值操做;
- while (unlikely(r->cons.tail != cons_head))
- rte_pause();
-
- __RING_STAT_ADD(r, deq_success, n);
- r->cons.tail = cons_next;
读取完成后,就要更新消费者的尾部索引;
为了不竞争,就要判是否有其余的消费者在更新消费者尾部索引;若是目前的消费者尾部索引不等于刚才保存的在临时变量cons_head 的值,就要等待其余消费者修改这个尾部索引;
如相等,机能够将当前消费者的尾部索引日后挪动n个索引值了,
实现 r->cons.tail=r->cons.head=cons_next=cons_head+n;
六、ring的其余断定函数
- rte_ring_lookup("test_ring_basic_ex")
验证以test_ring_basic_ex 为名的ring是否建立成功;
判断ring是否为空;
判断ring是否已经满;
判断当前ring还有多少剩余存储空间;
一、前言
最近工做比较忙,加班较多,天天晚上回到家10点多了。我不知道本身还能坚持多久,既然选择了就要作到最好。写博客的少了。总以为少了点什么,须要继续学习。今天继续上个开篇写,介绍单生产者单消费者模型的队列。根据写入队列的内容是定长仍是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种。单生产者单消费者模型的队列操做过程是不须要进行加锁的。生产者经过写索引控制入队操做,消费者经过读索引控制出队列操做。两者相互之间对索引是独享,不存在竞争关系。以下图所示:

二、单生产者单消费者定长队列
这种队列要求每次入队和出队的内容是定长的,即生产者写入队列和消费者读取队列的内容大小事相同的。linux内核中的kfifo就是这种队列,提供了读和写两个索引。单生产者单消费者队列数据结构定义以下所示:
typedef struct { uint32_t r_index; /*读指针*/ uint32_t w_index; /*写指针*/ uint32_t size; /*缓冲区大小*/ char *buff[0]; /*缓冲区起始地址*/ }ring_buff_st;
为了方便计算位置,设置队列的大小为2的次幂。这样能够将以前的取余操做转换为位操做,即r_index = r_index % size 与 r_index = r_index & (size -1)等价。位操做很是快,充分利用了二进制的特征。
(1)队列初始状态,读写索引相等,此时队列为空。

(2)写入队列
写操做即进行入队操做,入队有三种场景,
2.1 写索引大于等于读索引


2.2写索引小于读索引

2.3.写索引后不够写入一个

(3)读取队列
读队列分为三种场景
3.1写索引大于等于读索引

3.2写索引小于读索引

3.3.读索引后面不够一个

三、单生产者单消费者变长队列
有些时候生产者每次写入的数据长度是不肯定的,致使写入队列的数据时变长的。这样为了充分利用队列,须要增长一个结束索引,保证队列末尾至少可以写入一个数据。变长队列数据结构定义以下:
typedef struct { uint32_t r_index; /*读指针*/ uint32_t w_index; /*写指针*/ uint32_t e_index; /*队列结束指针*/ uint32_t size; /*缓冲区大小*/ char *buff[0]; /*缓冲区起始地址*/ }ring_buff_st;