ypipe_t has a yqueue_t. pipe_t relates two ypipe(s)。pipe_t就是0MQ框架内使用的底层队列。react
yqueue_t的设计目的。算法
yqueue_t 的结构成员编程
这个队列的设计与std::deque有的类似,都用chunk双向链表实现,一样避免FIFO队列对内存过分频繁的分配和释放。yqueue_t不支持随机访问,以及遍历迭代。yqueue_t采用了一个spare_chunk,保存最近一次访问事后释放或分配出来的chunk,提升空闲chunk的cache亲和性。而且spare_chunk使用了lock-free的atomic_ptr_t。数组
begin_chunk指向chunk链表的开始端,back_chunk指向chunk链表的有效尾端。网络
yqueue只支持相似std::deque的pop_front, pop_back, push_back。session
pop_front当消耗完一个chunk,而后空闲出一个chunk,替换spare_chunk。并发
push_back当接近可写chunk的末端,就会分配出一个chunk或取出spare_chunk,链入到链表尾端,end_chunk指向这个新的chunk。框架
pop_back可能会使当前可写chunk空闲,但这个空闲出来的chunk不会放到spare_chunk。socket
接着就是ypipe_t, 其设计目的:atom
ypipe_t 结构成员:
这里的lock-free,一样关键在atomic_ptr_t类型的成员。一样是利用cas(compare_and_swap)替代解决某些状况的锁。这里并无实现RCU这类lock-free读写并发算法,也不支持读之间的并发。设计目的,明确指出,只有一条线程能够在任何指定时刻读,以及一条线程能够在任何指定时刻写,换句话说,读写能够无锁并发,但读之间不可并发,写之间也不能够并发。
这里的读写lock-free,w指向yqueue的back,r指向yqueue的front,f指向yqueue未flush的位置。也就是f在w前,r是不会指向到f和w之间的位置,也就是yqueue未空也是不可读的。读写以前的竞态关键就是当r到达了f的位置。这时使用了一个atomic_ptr_t类型的c去进行lock-free解决。准确来讲,竞态可能会发生在读线程的读和写线程的flush时刻。
若是你要使用这个ypipe做为你的程序的队列的话,你仍是得用同步原语控制读之间以及写之间的同步。最理想的使用就是,同时只有一条线程可读,另外一条线程可写,读写两条线程之间是能够lock-free并发的。若是用锁的话,当发生竞态就有一条线程要进入内核态等待,另外一条线程也要进入内核态去唤醒。详细请看前面关于futex的文章。
最后就是pipe_t。
主要结构成员:
pipe_t关联inpipe的读端,关联outpipe的写端。在这一层并提供一个事件槽。
==================================================================
ypipe_t用于两个小组件, 一个是IO消息的底层upipe_t,上面封装一层pipe_t,用于某些类型的0MQ socket (socket_base_t),另外一个是cmd命令的cpipe_t用于mailbox,而mailbox是向io_thread_t以及rearer_t对象通信的队列。mailbox并非底层io文件,但借用了signaler_t(实现用某种文件系统文件),并实现i_poll_event接口,让mailbox的"io"归入到操做系统的poller。当经过mailbox发送命令与mailbox的拥有者通信时,就会在某个reactor线程中处理事件。参看mailbox::send
==========================================================
整个zmq框架内部,对象之间使用mailbox进行通信。
来看一下zmq context
cxt_t将reaper_t回收线程,io_thread_t 以及 socket_base_t 的mailbox 按放在 slots 指针数组。
slots[0] = &term_mailbox
slots[1] = reaper->get_mailbox()
slots[2 .. 2+io_nr] = io_thread->get_mailbox()
slots[...] = socket_base->get_mailbox()
这里要清楚,
1. 能够向一个io_thread发送command,并让command在目标io_thread上运行。
2. 向socket_base_t发送command,不会唤醒io_thread,由于socket_base_t不负责io,负责io的stream_engine_t,这才是咱们一般理解的stream connection。你zmq_poll这个socket_base_t的控制线程会被唤醒,并执行这个命令。可是通常不会直接向socket_base_t发送command。而是一般继承它的tid(实质是slots的索引)的pipe_t,在写入pipe_t后,向pipe_t发送send_activate_read(send_command),就会最终向它的父对象(不是继承关系,而是建立关系)socket_base_t发送command。
======================================================
在zmq框架内部有这样一张 pipe 网络。
1. 首先0MQ socket 不是咱们理解的传统意义的socket(某个链接的插口对),一个0MQ socket能够有多个链接,而且是透明的。
2. 每一个0MQ socket会一对 pipe 用于编程者的访问。
3. 0MQ socket里面每一个链接(session)都有一对 pipe,0MQ socket会透明地将消息在它的 pipe 和 多个session的pipe之间进行交换。
4. ctx_t包含了slots(mailbox,一种借用了signaler能够被socket poll的pipe),对象间的消息通信使用这些mailbox。
5. zmq_proxy,透明地将一个socket_base_t的pipe和另外一个socket_base_t的pipe的消息进行交换。