环形无锁队列
Table of Contents
1 环形无锁队列的实现
数据结构定义:数组
template class LockFreeQueue { private: ElementT *mArray; int mCapacity; int mFront; int mTail; }
因为出队操做是在队首进行,入队操做是在队尾进行,所以,咱们能够尝试用mFront和mTail来实现多个线程之间的协调。这其中会用到CAS操做:安全
入队操做伪码:数据结构
…… do { 获取当前的mTail的值:curTailIndex; 计算新的mTail的值:newTailIndex = (newTailIndex + 1) % size; } while(!CAS(mTail, curTailIndex, newTailIndex)); 插入元素到curTailIndex;
其中的do-while循环实现的是一个忙式等待:线程试图获取当前的队列尾部空间的控制权;一旦获取成功,则向其中插入元素。多线程
可是这样出队的时候就出现了问题:如何判断队首的位置里是否有相应元素呢?仅使用mFront来判断是不行的,这只能保证出队进程不会对同一个索引位置进行出队操做,而不能保证mFront的位置中必定有有效的元素。所以,为了保证出队队列与入队队列之间的协调,须要在LockFreeQueue中添加一个标志数组:less
char *mFlagArray;
mFlagArray中的元素标记mArray中与之对应的元素位置是否有效。mFlagArray中的元素有4个取值:函数
- 0表示对应的mArray中的槽位为空;
- 1表示对应槽位已被申请,正在写入;
- 2表示对应槽位中为有效的元素,能够对其进行出对操做;
- 3则表示正在弹出操做。
修改后的无锁队列的代码以下:post
template class LockFreeQueue { public: LockFreeQueue(int s = 0) { mCapacity = s; mFront = 0; mTail = 0; mSize = 0; } ~LockFreeQueue() {} /** * 初始化queue。分配内存,设定size * 非线程安全,需在单线程环境下使用 */ bool initialize() { mFlagArray = new char[mCapacity]; if (NULL == mFlagArray) return false; memset(mFlagArray, 0, mCapacity); mArray = reinterpret_cast(new char[mCapacity * sizeof(ElementT)]); if (mArray == NULL) return false; memset(mArray, 0, mCapacity * sizeof(ElementT)); return true; } const int capacity(void) const { return mCapacity; } const int size(void) const { return mSize; } /** * 入队函数,线程安全 */ bool push(const ElementT & ele) { if (mSize >= mCapacity) return false; int curTailIndex = mTail; char *cur_tail_flag_index = mFlagArray + curTailIndex; //// 忙式等待 // while中的原子操做:若是当前tail的标记为已占用(1),则更新cur_tail_flag_index,继续循环;不然,将tail标记设为已经占用 while (!__sync_bool_compare_and_swap(cur_tail_flag_index, 0, 1)) { curTailIndex = mTail; cur_tail_flag_index = mFlagArray + curTailIndex; } //// 两个入队线程之间的同步 int update_tail_index = (curTailIndex + 1) % mCapacity; // 若是已经被其余的线程更新过,则不须要更新; // 不然,更新为 (curTailIndex+1) % mCapacity; __sync_bool_compare_and_swap(&mTail, curTailIndex, update_tail_index); // 申请到可用的存储空间 *(mArray + curTailIndex) = ele; // 写入完毕 __sync_fetch_and_add(cur_tail_flag_index, 1); // 更新size;入队线程与出队线程之间的协做 __sync_fetch_and_add(&mSize, 1); return true; } /** * 出队函数,线程安全 */ bool pop(ElementT *ele) { if (mSize <= 0) return false; int cur_head_index = mFront; char *cur_head_flag_index = mFlagArray + cur_head_index; while (!__sync_bool_compare_and_swap(cur_head_flag_index, 2, 3)) { cur_head_index = mFront; cur_head_flag_index = mFlagArray + cur_head_index; } // 取模操做能够优化 int update_head_index = (cur_head_index + 1) % mCapacity; __sync_bool_compare_and_swap(&mFront, cur_head_index, update_head_index); *ele = *(mArray + cur_head_index); // 弹出完毕 __sync_fetch_and_sub(cur_head_flag_index, 3); // 更新size __sync_fetch_and_sub(&mSize, 1); return true; } private: ElementT *mArray; int mCapacity; // 环形数组的大小 int mSize; //队列中元素的个数 int mFront; int mTail; char *mFlagArray; // 标记位,标记某个位置的元素是否被占用 };
2 死锁及饥饿
LockFreeQueue实现了基本的多线程之间的协调,不会存在多个线程同时对同一个资源进行操做的状况,也就不会产生数据竞跑,这保证了对于这个队列而言,基本的访问操做(出队、入队)的执行都是安全的,其结果是可预期的。fetch
在多线程环境下,LockFreeQueue会不会出现死锁的状况呢?死锁有四个必要条件:优化
- 对资源的访问是互斥的;
- 请求和保持请求;
- 资源不可剥夺;
- 循环等待。
在LockFreeQueue中,全部的线程都是对资源进行申请后再使用,一个线程若申请到了资源(这里的资源主要指环形队列中的内存槽位),就会当即使用,而且在使用完后释放掉该资源。不存在一个线程使用A资源的同时去申请B资源的状况,所以并不会出现死锁。
但LockFreeQueue可能出现饥饿状态。例如,对两个出队线程A、B,二者都循环进行出队操做。当队列中有元素时,A总能申请到这个元素而且执行到弹出操做,而B则只能在DeQueue函数的while循环中一直循环下去。
3 一些优化
对LockFreeQueue能够进行一些优化。好比:
- 对于环形数组大小,能够设定为2的整数倍,如1024。这样取模的操做便可以简化为与mCapacity-1的按位与操做。
- 忙式等待的时候可能会出现某个线程一直占用cpu的状况。此时可使用sleep(0),让该线程让出CPU时间片,从就绪态转为挂起态。