template <typename T, typename Allocator = AlignedAllocator<Slot<T>>> class Queue { private: static_assert(std::is_nothrow_copy_assignable<T>::value || std::is_nothrow_move_assignable<T>::value, "T must be nothrow copy or move assignable"); static_assert(std::is_nothrow_destructible<T>::value, "T must be nothrow destructible");
第一个模板参数是队列存储的对象类型,第二个模板参数为内存分配器,默认使用AlignedAllocator,即上文定义的内存分配器。c++
要求T类型的拷贝赋值,移动赋值函数和析构函数都要是noexcept的。并发
public: explicit Queue(const size_t capacity, const Allocator &allocator = Allocator()) : capacity_(capacity), allocator_(allocator), head_(0), tail_(0) { if (capacity_ < 1) { throw std::invalid_argument("capacity < 1"); } // Allocate one extra slot to prevent false sharing on the last slot slots_ = allocator_.allocate(capacity_ + 1); // Allocators are not required to honor alignment for over-aligned types (see http://eel.is/c++draft/allocator.requirements#10) so we verify alignment here if (reinterpret_cast<size_t>(slots_) % alignof(Slot<T>) != 0) { allocator_.deallocate(slots_, capacity_ + 1); throw std::bad_alloc(); } for (size_t i = 0; i < capacity_; ++i) { new (&slots_[i]) Slot<T>(); } static_assert( alignof(Slot<T>) == hardwareInterferenceSize, "Slot must be aligned to cache line boundary to prevent false sharing"); static_assert(sizeof(Slot<T>) % hardwareInterferenceSize == 0, "Slot size must be a multiple of cache line size to prevent false sharing between adjacent slots"); static_assert(sizeof(Queue) % hardwareInterferenceSize == 0, "Queue size must be a multiple of cache line size to prevent false sharing between adjacent queues"); static_assert( offsetof(Queue, tail_) - offsetof(Queue, head_) == static_cast<std::ptrdiff_t>(hardwareInterferenceSize), "head and tail must be a cache line apart to prevent false sharing"); }
上面代码是Queue的构造函数,为何要多申请一个Slot避免最后一个Slot的伪共享(不懂)?
而后检查了分配的内存起始地址是否是以Slot<T>的对齐数对齐,由于分配器并不被要求对over-aligned(过分对齐)的类型进行对齐,若是不知足对齐要求,则避免伪共享的要求达不到,释放内存抛出异常。
接着在capacity_个内存块上构造Slot<T>对象,最后进行一系列的静态断言,确保各个对象的内存分布与大小符合设计要求。函数
~Queue() noexcept { for (size_t i = 0; i < capacity_; ++i) { slots_[i].~Slot(); } allocator_.deallocate(slots_, capacity_ + 1); } // non-copyable and non-movable Queue(const Queue &) = delete; Queue &operator=(const Queue &) = delete;
析构函数没有什么意外之处.fetch
template <typename... Args> void emplace(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&..."); auto const head = head_.fetch_add(1); auto &slot = slots_[idx(head)]; while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire)) ; slot.construct(std::forward<Args>(args)...); slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); }
向队列首部插入一个元素,这个函数涉及到MPMCQueue核心的设计思路,所以对源码的分析先暂停,研究下MPMCQueue入队出队的操做实现。ui
MPMCQueue类使用head_和tail_两个数据成员做为队列的首元素和尾元素的索引标识 head_为队列首元素的下一个元素索引,即下一个插入位置的索引值,tail_为队列尾元素的索引,可是这两个数据不会有减少的操做,而是一直fetch_add(1),取元素的时候使用idx(head)得到真正的索引值,这里idx辅助函数就是head % capacity_,而turn函数的实现为head / capacity_,能够这么理解,turn的返回值表明了head遍历当前队列的趟数,假设capacity_ = 5,则:设计
head = 0,turn(head) = 0,当前head遍历了队列0趟。
head = 1, turn(head) = 0, head前进了一个单位,但仍是0趟。
...
head = 5,turn(head) = 1, head又指向了队列的第一个Slot(由于idx(head) = 0),而已是第1趟遍历队列了。code
所以,emplace函数中首先递增head_,这样就经过idx(head)得到了队首Slot的前一个Slot索引,也就是本次构造T类型对象的Slot的位置,
调用head_.fetch_add函数,这个函数首先修改head_保存的值而后返回修改以前的值,这样原子的更新了下一个插入操做的位置并获得本次插入位置的索引值,经过auto& slot = slots_[idx(head)]得到该Slot的引用。
接下来是一个while循环,经过不断比较turn(head) * 2 和slot.turn的值,相等的时候认为该Slot是空的,不然在这里无限循环,等待slot.turn的值改变。以后就调用construct函数在Slot对象中构造T类型对象,并给slot.turn赋值为turn(head) * 2 + 1。对象
暂且忽略原子操做的内存一致性选项(以后分析),能够分析每一个Slot对象turn的值表明了该Slot对象中是否存在T类型对象,当slot.turn = turn(head) * 2时不存在,当slot.turn = turn(head) * 2 + 1时存在。
head_第0趟遍历到该Slot对象的时候,slot.turn = 0, while判断成功,构造对象,slot.turn被赋值为1。假设一直没有pop操做而不断插入数据,head_不断增长直到又找到了这个Slot对象(这个时候队列已经满了),这时候head_的趟数变为1,因此while判断(1 * 2 != 1)失败,表示这个Slot对象中已经含有T类型对象,不能插入。分析到这里能够知道,pop函数中也在不断修改slot.turn值,当tail_第0趟遍历队列的时候,会把slot.turn从1变为2,这时emplace操做的while判断就会成功,便可以插入T类型对象。
所以对于每一个slot.turn其实在不断经历以下过程:
slot.turn = 0 // init.索引
//emplace
wait slot.turn == 0 :
slot.turn = 1
construct object.接口
//pop
wait slot.turn == 1 :
slot.turn = 2
destruct object.
//emplace
wait slot.turn == 2 :
slot.turn = 3
construct object
...
当slot.turn为奇数的时候Slot中存在对象,当slot.turn为偶数的时候Slot中不存在对象,这时候咱们回顾下Slot的析构函数:
~Slot() noexcept { if (turn & 1) { destroy(); } }
当turn为奇数的时候turn & 1的结果为真,调用destroy函数。
为了验证这个猜测下面看下pop函数的代码:
void pop(T &v) noexcept { auto const tail = tail_.fetch_add(1); auto &slot = slots_[idx(tail)]; while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire)) ; v = slot.move(); slot.destroy(); slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); }
能够看到,pop函数中在等待slot.turn变为turn(tail) * 2 + 1,而后move出对象,并修改slot.turn为turn(tail) * 2 + 2。
Queue类中还有try_emplace,push,try_push,try_pop接口函数,核心逻辑与emplace和pop大同小异,预计和原子操做的内存一致性选项一块儿分析吧。
template <typename... Args> bool try_emplace(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&..."); //获取当前时刻的插入位置索引值 auto head = head_.load(std::memory_order_acquire); for (;;) { //获取idx(head)对应的元素,注意此刻slot已经不必定是插入位置索引了。 auto &slot = slots_[idx(head)]; //判断插入位置是否是空的,若是是空的的话 if (turn(head) * 2 == slot.turn.load(std::memory_order_acquire)) { //判断head_是否被更新过,若是没有的话cas操做成功,构造对象,更新turn,最后返回true便可,cas操做失败,head会被更新为head_的新值,从新进入循环。 if (head_.compare_exchange_strong(head, head + 1)) { slot.construct(std::forward<Args>(args)...); slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); return true; } } else { //插入位置不是空的,此时判断head_节点是否被更新过,若是没有更新过就意味着队列已经满了,插入节点已是队尾节点了,所以返回false,若是被更新过,则更新了head,从新进入循环判断。 auto const prevHead = head; head = head_.load(std::memory_order_acquire); if (head == prevHead) { return false; } } } }
上面是try_emplace函数的代码,分析以注释的方式写在源代码里,属于比较精巧的部分了,由于要处理时刻可能存在的并发问题,所以须要引入关键的cas判断与更新操做。原子读写采用Require-Release模型,见:https://zhuanlan.zhihu.com/p/...
其他的接口函数,try_pop的实现思路与try_emplace大同小异,push,try_push只是emplace版本的套壳实习,故不作赘述。