本文主要译自 zguide - chapter two. 但并非照本翻译.linux
上一章咱们简单的介绍了一个ZMQ, 并给出了三个套路的例子: 请求-回应, 订阅-发布, 流水线(分治). 这一章, 咱们将深刻的探索一下ZMQ中的socket, 以及"套路"程序员
若是熟悉linux socket编程的同窗阅读完了第一章, 必定有一种说不上来的别扭感受.由于一般状况下, 当咱们讨论socket的时候, 咱们通常指的是操做系统提供的网络编程接口里的那个socket概念. 而在ZMQ中, 只是借用了这个概念的名字, 在ZMQ中, 咱们讨论到socket的时候, 通常指代的是调用zmq_socket()
接口返回的那个socket, 具体一点: zmq socket.编程
zmq socket比起linux socket来讲, 逻辑理解起来比较相似, 虽然二者内部彻底就不是同一种东西.windows
zmq_socket()
, zmq_close()
zmq_setsockopt()
, zmq_getsockopt()
zmq_bind()
, zmq_connect()
zmq_msg_send()
, zmq_msg_recv()
, zmq_send()
, zmq_recv()
但与linux socket不一样的是, zmq socket没有listen这个逻辑概念.后端
须要注意的是, zmq socket是void指针, 而消息则是结构实例. 这就意味着, 在C语言的API中, 须要zmq socket的地方, 传递的必定是值, 而须要传递消息的时候, 好比使用zmq_msg_send()
和zmq_msg_recv()
这样的接口, 消息参数则传递其地址. 其设计哲学是: 在zmq中, socket不归程序员掌控, 因此你可能拿到一个句柄(地址), 但不能看到它长什么样(不能看到socket实例), 但消息是程序员建立的, 是受程序员掌控的.api
在两个结点上用ZMQ实现通信, 你须要分别为两个结点建立socket, 并在其中一个结点上调用zmq_bind()
, 在另外一个结点上建立对应的zmq_connect()
. 在ZMQ中, 请不要再以死板的"客户端", "服务端"来区分网络结点. 而要这样理解: zmq_bind()
调用应该发生在网络拓扑中那些不易变的结点上, 而zmq_connect()
应该发生在网络拓扑中那些易变的结点上.缓存
ZMQ创建起的数据链接和常见的TCP链接有一些不一样, 但也有一些共通之处, 以下:安全
在请求-回应套路中, 咱们把比较不易变的逻辑结点称为服务端, 把易变, 也就是会常常性的退出, 或从新加入网络拓扑的结点称为客户端. 服务端向外提供服务, 必须提供一个"地址"供客户端去上门, 换句话说, 在这个套路拓扑中, 那些常常来来去去的客户端应该知道去哪找服务端. 但反过来, 服务端彻底不关心去哪找客户端, 你爱来不来, 不来就滚, 不要打扰我飞升. 对于不易变的结点, 应该使用zmq_bind()
函数, 对于易变的结点, 应该采用zmq_connect
bash
在传统的linux socket编程中, 若是服务端尚未上线工做, 这个时候去启动客户端程序, 客户端程序的connect()
调用会返回错误. 但在ZMQ中, 它妥善处理了这种状况. 客户端调用zmq_connect()
, 不会报错, 仅会致使消息被阻塞而发不出去.服务器
不要小看这一点设计, 它反映出ZMQ的设计思想: 在请求-应答套路中, 它不光容许客户端能够随时退出, 再回来. 甚至容许服务端去上个厕所.
另外, 一个服务端能够屡次调用zmq_bind()
以将本身关联到多个endpoint上.(所谓的endpoint, 就是通信协议+通信地址的组合, 它通常状况下指代了在这种通信协议中的一个网络结点, 但这个结点能够是逻辑性的, 不必定只是一台机器).这就意味着, zmq socket能够同时接受来自多个不一样通信协议的多簇请求消息.
zmq_bind(socket, "tcp://*:5555"); zmq_bind(socket, "tcp://*:999"); zmq_bind(socket, "inproc://suprise_motherfucker");
可是, 对于同一种通信协议里的同一个endpoint, 你只能对其执行一次zmq_bind()
操做. 这里有个例外, 就是ipc进程间通讯. 逻辑上容许另一个进程去使用以前一个进程已经使用过的ipc endpoint, 但不要滥用这特性: 这只是ZMQ提供给程序崩溃后恢复现场的一种手段, 在正常的代码逻辑中, 不要作这样的事情.
因此看到这里你大概能理解zmq对bind和connect这两个概念的态度: ZMQ努力的将这两个概念之间的差别抹平, 但很遗憾, zmq并无将这两个操做抽象成一个相似于touch的操做. 但仍是请谨记, 在你的网络拓扑中, 让不易变结点去使用zmq_bind()
, 让易变结点去使用zmq_connect
zmq socket是分类型的, 不一样类型的socket提供了差别化的服务, socket的类型与结点在拓扑中的角色有关, 也影响着消息的出入, 以及缓存策略. 不一样类型的socket之间, 有些能够互相链接, 但有些并不能, 这些规则, 以及如何在套路中为各个结点安排合适类型的socket, 都是后续咱们将要讲到的内容.
若是从网络通信的角度来说, zmq是一个将传统传输层封装起来的网络库. 但从数据传输, 消息传输, 以及消息缓存这个角度来说, zmq彷佛又称得上是一个消息队列库. 总之, zmq是一个优秀的库, 优秀不是指它的实现, 它的性能, 而是它能解决的问题, 它的设计思路.
在第一章里, 咱们接触到了两个有关消息收发的函数, zmq_send()
和zmq_recv()
, 如今, 咱们须要把术语规范一下.
zmq_send()
与zmq_recv()
是用来传输"数据"的接口. 而"消息"这个术语, 在zmq中有指定含义, 传递消息的接口是zmq_msg_send()
与zmq_msg_recv()
当咱们提及"数据"的时候, 咱们指的是二进制串. 当咱们说"消息"的时候, 指提是zmq中的一种特定结构体.
须要额外注意的是, 不管是调用zmq_send()
仍是zmq_msg_send()
, 当调用返回时, 消息并无真正被发送出去, 更没有被对方收到. 调用返回只表明zmq将你要发送的"消息"或"数据"放进了一个叫"发送缓冲区"的地方. 这是zmq实现收发异步且带缓冲队列的一个设计.
ZMQ底层封装了三种单播通信协议, 分别是: 共享内存实现的线程间通信(inproc), 进程间通讯(ipc), 以及TCP/IP协议栈里的TCP协议(tcp). 另外ZMQ底层还封装了两种广播协议: PGM, EPGM. 多播咱们在很是后面的章节才会介绍到, 在你了解它以前, 请不要使用多播协议, 即使你是在作一些相似于发布-订阅套路的东西.
对于多数场景来讲, 底层协议选用tcp都是没什么问题的. 须要注意的是, zmq中的tcp, 被称为 "无链接的tcp协议", 而之因此起这么一个精神分裂的名字, 是由于zmq容许在对端不存在的状况下, 结点去zmq_connect()
. 你大体能够想象zmq作了多少额外工做, 但这些对于你来讲, 对于上层应用程序来讲, 是透明了, 你没必要去关心具体实现.
IPC通信相似于tcp, 也是"无链接"的, 目前, 这种方式不能在windows上使用, 很遗憾. 而且, 按照惯例, 在使用ipc做为通信方式时, 咱们通常给endpoint加上一个.ipc
的后缀. 另外, 在Unix操做系统上, 使用ipc链接还请格外的注意不一样进程的权限问题, 特别是从属于两个不一样用户的进程.
最后来讲一下inproc, 也就是线程间通讯, 它只能用于同一进程内的不一样线程通信. 比起tcp和ipc, 这种通信方式快的飞起. 它与tcp和ipc最大的区别是: 在有客户端调用connect以前, 必须确保已经有一个服务端在对应的endpoint上调用了bind, 这个缺陷可能会在将来的某个版本被修正, 但就目前来说, 请务必当心注意.
很遗憾的是, ZMQ对于其底层封装的网络协议是有侵入性的, 换句话说, 你无法使用ZMQ去实现一个HTTP服务器. HTTP做为一个五层协议, 使用TCP做为传输层协议, 对TCP里的报文格式是有规约限制的, 而ZMQ做为一个封装了TCP的4.5层协议, 其在数据交互时, 已经侵入了TCP的报文格式. 你没法让TCP里的报文既知足HTTP的格式要求, 还知足ZMQ的格式要求.
关心ZMQ究竟是如何侵入它封装的通信协议的, 这个在第三章, 当咱们接触到ZMQ_ROUTER_RAW
这种socket配置项的时候才会深刻讨论, 目前你只须要明白, ZMQ对其底层封装的通信协议有侵入.
这意味着, 你没法无损的将ZMQ引入到一些现成的项目中. 这很遗憾.
咱们先前提到过, ZMQ在后台使用独立的线程来实现异步I/O处理. 通常状况下吧, 一个I/O线程就应该足以处理当前进程的全部socket的I/O做业, 可是这个凡事总有个极限状况, 因此总会存在一些很荀的场景, 你须要多开几个I/O线程.
当你建立一个context的时候, ZMQ就在背后建立了一个I/O处理线程. 若是这么一个I/O线程不能知足你的需求, 那么就须要在建立context的时候加一些料, 让ZMQ多建立几个I/O处理线程. 通常有一个简单估算I/O线程数量的方法: 每秒你的程序有几个G字节的吞吐量, 你就开几个I/O线程.
下面是自定义I/O线程数量的方法:
int io_threads = 4; void * context = zmq_ctx_new(); zmq_ctx_set(context, ZMQ_IO_THREADS, io_threads); assert(zmq_ctx_get(context, ZMQ_IO_THREADS) == io_threads);
回想一下你用linux socket + epoll编写服务端应用程序的套路, 通常都是一个tcp链接专门开一个线程. ZMQ不同, ZMQ容许你在一个进程里持有上千个链接(不必定是TCP哦), 但处理这上千个链接的I/O做业, 可能只有一个, 或者几个线程而已, 而且事实也证实这样作是可行的. 可能你的进程里只有十几个线程, 但就是能处理超过上千个链接.
当你的程序只使用inproc做为通信手段的时候, 实际上是不须要线程来处理异步I/O的, 由于inproc是经过共享内存实现通信的. 这个时候你能够手动设置I/O线程的数量为0. 这是一个小小的优化手段, 嗯, 对性能的提高基本为0.
ZMQ的设计是亲套路的, ZMQ的核心其实在于路由与缓存, 这也是为何做为一个网络库, 它更多的被人从消息队列这个角度了解到的缘由. 要用ZMQ实现套路, 关键在于使用正确的socket类型, 而后把拓扑中的socket组装配对起来. 因此, 要懂套路, 就须要懂zmq里的socket类型.
zmq提供了你构建以下套路的手段:
咱们在第一章中已经大体接触了套路, 除了一夫一妻没有接触到, 这章稍后些部分咱们也将接触这种套路.要了解具体socket的各个类型都是干吗用的, 能够去阅读zmq_socket()
的manpage, 我建议你去阅读, 而且仔细阅读, 反复阅读.下面列出的是能够互相组合的socket类型. 双方能够替换bind
与connect
操做.
后续你还会看到有XPUB与XSUB两种类型的socket. 就目前来讲, 只有上面的socket配对链接是有效的, 其它没列出的组合的行为是未定义的, 但就目前的版原本说, 错误的组合socket类型并不会致使链接时出错, 甚至可能会碰巧按你的预期运行, 但强烈不建议你这个瞎jb搞. 在将来的版本中, 组合非法的socket类型可能会致使API调用出错.
libzmq有两套收发消息的API接口, 这个以前咱们已经讲过. 而且在第一章里建议你多使用zmq_send()
与zmq_recv()
, 建议你规避zmq_msg_send()
与zmq_msg_recv()
. 但zmq_recv
有一个缺陷, 就是当你提供给zmq_recv()
接口的接收buffer不够长时, zmq_recv()
会把数据截断. 若是你没法预测你要收到的二进制数据的长度, 那么你只能使用zmq_msg_xxx()
接口.
从接口名上的msg
三个字母就能看出, 这个系列的接口是操纵结构体, 也就是"消息"(实际上是帧, 后面会讲到), 而不是"数据", 而非缓冲区的接口, 实际上它们操纵的是zmq_msg_t
类型的结构. 这个系列的接口功能更为丰富, 但使用起来也请务必万分当心.
zmq_msg_init()
, zmq_msg_init_size()
, zmq_msg_init_data()
zmq_msg_send()
, zmq_msg_recv()
zmq_close()
zmq_msg_data()
, zmq_msg_size()
, zmq_msg_more()
zmq_msg_get()
, zmq_msg_set()
zmq_msg_copy()
, zmq_msg_move()
消息结构中封装的数据是二进制的, 依然由程序员本身解释. 关于zmq_msg_t
结构类型, 下面是你须要知道的基础知识:
zmq_msg_t *
. 也就是说这是一个内部实现不对外开放的类型, 建立, 传递, 都应当以指针类型进行操做.zmq_msg_init()
建立一个消息对象, 而后将这个消息对象传递给zmq_msg_recv()
接口zmq_msg_init_size()
建立一个数据容量指定的消息对象, 而后把你要写入的二进制数据经过内存拷贝函数, 好比memcpy()
写入消息中, 最后调用zmq_msg_send()
, 看到这里你应该明白, zmq_msg_init_size()
接口内部进行了内存分配.zmq_msg_t
实际上是引用计数方式实现的共享对象类型, "释放"是指当前上下文放弃了对该消息的引用, 内部致使了实例的引用计数-1, 而"销毁"则是完全把实例自己给free掉了. 当你"释放"一个消息的时候, 应当调用zmq_msg_close()
接口. 若是消息实例在释放后引用计数归0, 那么这个消息实例会被ZMQ自动销毁掉.zmq_msg_data()
接口, 要获取消息中数据的长度, 调用zmq_msg_size()
zmq_msg_move()
, zmq_msg_copy()
, zmq_msg_init_data()
这三个接口zmq_msg_send()
调用将消息发送给socket后, 这个消息内部包装的数据会被清零, 也就是zmq_msg_size() == 0
, 因此, 你不该该连续两次使用同一个zmq_msg_t *
值调用zmq_msg_send()
. 但须要注意的是, 这里的"清零", 并不表明消息被"释放", 也不表明消息被"销毁". 消息仍是消息, 只是其中的数据被扔掉了.若是你想把同一段二进制数据发送屡次, 正确的作法是下面这样:
zmq_msg_init_size()
, 建立第一个消息, 再经过memcpy
或相似函数将二进制数据写入消息中zmq_msg_init()
建立第二个消息, 再调用zmq_msg_copy()
从第一个消息将数据"复制"过来zmq_msg_send()
发送上面的多个消息ZMQ还支持所谓的"多帧消息", 这种消息容许你把多段二进制数据一次性发送给对端. 这个特性在第三章咱们再讲. (P.S.: 这是一个很重要的特性, 路由代理等高级套路就严重依赖这种多帧消息.). ZMQ中的消息有三层逻辑概念: 消息, 帧, 二进制数据. 用户自定义的二进制数据被包装成帧, 而后一个或多个帧组成一个消息. 消息是ZMQ拓扑网络中两个结点收发的单位, 但在ZMQ底层的传输协议中, 最小单位是帧.
换一个角度来说, ZMQ使用其底层的传输协议, 好比tcp, 好比inproc, 好比ipc来传输数据, 当ZMQ调用这些传输协议传递数据的时候, 最小单元是帧. 帧的完整性由传输协议来保证, 便是ZMQ自己不关心这个帧会不会破损, 帧的完整传输应当由这些传输协议去保证. 而在使用ZMQ构建应用程序的程序员眼中, 最小的传输单位是消息, 一个消息里可能会有多个帧, 程序员不去关心消息从一端到另外一端是否会出现丢帧, 消息的完整性与原子性应当由ZMQ库去保证.
前面咱们讲过, ZMQ对其底层的传输协议是有侵入性的. 若是要了解ZMQ究竟是如何在传输协议的基础上规定帧传输格式的, 能够去阅读这个规范.
在咱们到达第三章以前, 咱们所讨论的消息中都仅包含一个帧. 这就是为何在这一小节的描述中, 咱们几乎有引导性的让你以为, zmq_msg_t
类型, 就是"消息", 其实不是, 其实zmq_msg_t
消息只是"帧".
zmq_msg_t
对象zmq_msg_send()
, zmq_msg_recv()
, 你能够一帧一帧的发送数据. 能够用屡次调用这些接口的方式来发送一个完整的消息, 或者接收一个完整的消息: 在发送时传入ZMQ_SNDMORE
参数, 或在接收时, 经过zmq_getsockopt()
来获取ZMQ_RCVMORE
选项的值. 更多关于如何使用低级API收发多帧消息的信息, 请参见相关接口的manpage关于消息或帧, 还有下面的一些特性:
zmq_send()
是一致的.zmq_msg_close()
接口来释放这个zmq_msg_t
对象最后再强调一下, 在你不理解zmq_msg_t
的原理以前, 不要使用zmq_msg_init_data()
接口, 这是一个0拷贝接口, 若是不熟悉zmq_msg_t
结构的原理, 瞎jb用, 是会core dump的
在先前的全部例子程序中, 大多程序里干的都是这样的事情
若是你接触过linux中的select, pselect, epoll等多路IO复用接口, 你必定会好奇, 在使用zmq的时候, 如何实现相似的效果呢? 毕竟ZMQ不光把linux socket的细节给你封装了, 连文件描述符都给你屏蔽封装掉了, 显然你无法直接调用相似于select, pselect, epoll这种接口了.
答案是, ZMQ本身搞了一个相似的玩意, zmq_poll()
了解一下.
咱们先看一下, 若是没有多路IO接口, 若是咱们要从两个socket上接收数据, 咱们会怎样作. 下面是一个没什么卵用的示例程序, 它试图从两个socket上读取数据, 使用了异步I/O. (若是你有印象的话, 应该记得对应的两个endpoint其实是咱们在第一章写的两个示例程序的数据生产方: 天气预报程序与村口的大喇叭)
#include <zmq.h> #include <stdio.h> int main(void) { void * context = zmq_ctx_new(); void * receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:5557"); void * subscriber = zmq_socket(context, ZMQ_SUB); zmq_connect(subscriber, "tcp://localhost:5556"); zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6); while(1) { char msg[256]; while(1) { int size = zmq_recv(receiver, msg, 255, ZMQ_DONTWAIT); if(size != -1) { // 接收数据成功 } else { break; } } while(1) { int size = zmq_recv(subscriber, msg, 255, ZMQ_DONTWAIT); if(size == -1) { // 接收数据成功 } else { break; } } sleep(1); // 休息一下, 避免疯狂循环 } zmq_close(receiver); zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
在没有多路IO手段以前, 这基本上就是你能作到的最好情形了. 大循环里的sleep()
让人浑身难受. 不加sleep()
吧, 在没有数据的时候, 这个无限空循环能把一个核心的cpu占满. 加上sleep()
吧, 收包又会有最坏状况下1秒的延时.
但有了zmq_poll()
接口就不同了, 代码就会变成这样:
#include <zmq.h> #include <stdio.h> int main(void) { void * context = zmq_ctx_new(); void * receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:5557"); void * subscriber = zmq_socket(context, ZMQ_SUB); zmq_connect(subscriber, "tcp://localhost:5556"); zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6); while(1) { char msg[256]; zmq_pollitem_t items[] = { {receiver, 0, ZMQ_POLLIN, 0}, {subscriber,0, ZMQ_POLLIN, 0}, }; zmq_poll(items, 2, -1); if(items[0].revents & ZMQ_POLLIN) { int size = zmq_recv(receiver, msg, 255, 0); if(size != -1) { // 接收消息成功 } } if(items[1].revents & ZMQ_POLLIN) { int size = zmq_recv(subscriber, msg, 255, 0); if(size != -1) { // 接收消息成功 } } } zmq_close(receiver); zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
zmq_pollitem_t
类型定义以下, 这个定义能够从zmq_poll()
的manpage里查到
typedef struct{ void * socket; // ZMQ的socket int fd; // 是的, zmq_poll()还能够用来读写linux file descriptor short events; // 要被监听的事件, 基础事件有 ZMQ_POLLIN 和 ZMQ_POLLOUT, 分别是可读可写 short revents; // 从zmq_poll()调用返回后, 这里存储着触发返回的事件 } zmq_pollitem_t;
咱们以前提到过, 用户数据被包装成zmq_msg_t
对象, 也就是帧, 而在帧上, 还有一个逻辑概念叫"消息". 那么在具体编码中, 如何发送多帧消息呢? 而又如何接收多帧消息呢? 简单的讲, 两点:
zmq_msg_send()
传入ZMQ_SNDMORE
选项, 告诉发送接口, "我后面还有其它帧"zmq_msg_recv()
接收一个帧, 就调用一次zmq_msg_more()
或者zmq_getsockopt() + ZMQ_RCVMORE
来判断是否这是消息的最后一个帧发送示例:
zmq_msg_send(&msg, socket, ZMQ_SNDMORE); zmq_msg_send(&msg, socket, ZMQ_SNDMORE); zmq_msg_send(&msg, socket, 0); // 消息的最后一个帧
接收示例:
while(1) { zmq_msg_t msg; zmq_msg_init(&msg); zmq_msg_recv(&msg, socket, 0); // 作处理 zmq_msg_close(&msg); if(!zmq_msg_more(&msg)) // 注意, zmq_msg_more能够在zmq_msg_close后被安全的调用 { break; } }
这里有一个须要注意的有趣小细节: 要判断一个收来的帧是否是消息的最后一个帧, 有两种途径, 一种是zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size)
, 另一种是zmq_msg_more(&msg)
. 前一种途径的入参是socket, 后一种途径的入参是msg. 这真是很因缺思汀. 目前来讲, 两种方法均可以, 不过我建议你使用zmq_getsockopt()
, 至于缘由嘛, 由于在zmq_msg_recv()
的manpage中, 是这样建议的.
关于多帧消息, 你须要注意如下几点:
zmq_poll()
时, 当socket可读, 而且用zmq_msg_recv()
读出一个帧时, 表明着不用等待下一次循环, 你直接继续读取, 必定能读取能整个消息中剩余的其它全部帧zmq_msg_more()
或zmq_getsockopt() + ZMQ_RCVMORE
检查消息是否接收完整, 你一帧帧的收, 也会把整个消息里的全部帧收集齐. 因此从这个角度看, zmq_msg_more()
能够在把全部可读的帧从socket里统一接收到手以后, 再慢慢判断这些帧应该怎么拼装. 因此这样看, 它和zmq_getsockopt()
的功能也不算是彻底重复.ZMQ的目标是创建去中心化的消息通讯网络拓扑. 但不要误解"去中心"这三个字, 这并不意味着你的网络拓扑在中心圈内空无一物. 实际上, 用ZMQ搭建的网络拓扑中经常充满了各类非业务处理的网络结点, 咱们把这些感知消息, 传递消息, 分发消息, 但不实际处理消息的结点称为"中介", 在ZMQ构建的网络中, 它们按应用场景有多个细化的名字, 好比"代理", "中继", "装置", "掮客"等.
这套逻辑在现实世界里也很常见, 中间人, 中介公司, 它们不实际生产社会价值, 表面上看它们的存在是在吸两头的血, 这些皮条客在社会中的存在乎义在于: 它们减小了沟通的复杂度, 对通讯双方进行了封装, 提升了社会运行效率.
当构建一个稍有规模的颁式系统的时候, 一个避不开的问题就是, 网络中的结点是如何感知其它结点的存在的? 结点会当机, 会扩容, 在这些变化发生的时候, 网络中的其它正在工做的结点如何感知这些变化, 并保持系统总体正常运行呢? 这就是经典的"动态探索问题".
动态探索问题有一系列很经典的解决方案, 最简单的解决方案就是把问题自己解决掉: 把网络拓扑设计死, 代码都写死, 别让它瞎jb来回变, 问题消灭了, done!. 这种解决方案的缺点就是若是网络拓扑要有变动, 好比业务规模扩展了, 或者有个结点当机了, 网络配置管理员会骂娘.
拓扑规模小的时候, 消灭问题的思路没什么坏处, 但拓扑稍微复杂一点, 显然这就是一个很好笑的解决方案.好比说, 网络中有一个发布者, 有100多个订阅者, 发布者bind到endpoint上, 订阅者connect到endpoint上. 若是代码是写死的, 若是发布者自己出了点什么问题, 或者发布者一台机器搞不住了, 须要横向扩容, 你就得改代码, 而后手动部署到100多台订阅者上. 这样的运维成本太大了.
这种场景, 你就须要一个"中介", 对发布者而言, 它今后无需关心订阅者是谁, 在哪, 有多少人, 只须要把消息给中介就好了. 对于订阅者而言, 它今后无需关注发布者有几个, 是否使用了多个endpoint, 在哪, 有多少人. 只须要向中介索取消息就好了. 虽然这时发布者身上的问题转嫁到的中介身上: 即中介是网络中最易碎的结点, 若是中介挂了整个拓扑就挂了, 但因为中介不处理业务逻辑, 只是一个相似于交换机的存在, 因此一样的机器性能, 中介在单位时间能转发的消息数量, 比发布者和订阅者能处理的消息高一个甚至几个数量级. 是的, 使用中介引入了新的问题, 但解决了老的问题.
中介并无解决全部问题, 当你引入中介的时候, 中介又变成了网络中最易碎的点, 因此在实际应用中, 要控制中介的权重, 避免整个网络拓扑严重依赖于一个中介这种状况出现: ZMQ提倡去中心化, 不要把中介变成一个垄断市场的掮客.
对于发布者而言, 中介就是订阅者, 而对于订阅者而言, 中介就是发布者. 中介使用两种额外的socket类型: XPUB与XSUB. XSUB与真实的发布者链接, XPUB与真实的订阅者链接.
在咱们以前写的请求-回应套路程序中, 咱们有一个客户端, 一个服务端. 这是一个十分简化的例子, 实际应用场景中的请求-回应套路中, 通常会有多个客户端与多个服务端.
请求-应答模式有一个隐含的条件: 服务端是无状态的. 不然就不能称之为"请求-应答"套路, 而应该称之为"唠嗑套路".
要链接多个客户端与多个服务端, 有两种思路.
第一种暴力思路就是: 让N个客户端与M个服务端创建起N*M的全链接. 这确实是一个办法, 虽然不是很优雅. 在ZMQ中, 实现起来还轻松很多: 由于ZMQ的socket能够向多个endpoint发起链接, 这对于客户端来讲, 编码难度下降了. 客户端应用程序中能够建立一个zmq_socket, 而后connect到多个服务端的endpoint上就好了. 这种思路作的话, 客户端数量扩张很容易, 直接部署就能够, 代码不用改. 可是缺陷有两个:
总的来讲, 这是一种很暴力的解决办法, 不适合用于健壮的生产环境. 可是这确实是一个办法.
为了解决上面两个缺陷, 天然而然的咱们就会想到: 为何不能把服务端抽象出来呢? 让一个掮客来作那个惟一的endpoint, 以供全部客户端connect, 而后掮客在背后再把请求体分发给各个服务端, 服务端作出回应后掮客再代替服务端把回应返回给客户端, 这样就解决了上面的问题:
而且, 掮客还能够作到如下
因此, 在请求回应套路中加入掮客, 是一个很明智的选择, 这就是第二种思路, 这种思路不是没有缺陷, 有, 并且很明显: 掮客是整个系统中最脆弱的部分.
但这个缺陷能够在必定程度上克服掉:
ZMQ中, 有两个特殊的socket类型特别适合掮客使用:
关于这两种特殊的socket的特性, 后续咱们会仔细深刻, 目前来讲, 你只须要了解
多说无益, 来看代码. 下面是在客户端与服务端中插入掮客的代码实例:
客户端
#include <zmq.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_REQ); zmq_connect(socket, "tcp://localhost:5559"); for(int i = 0; i < 10; ++i) { s_send(socket, "Hello"); char * strRsp = s_recv(socket); printf("Received reply %d [%s]\n", i, strRsp); free(strRsp); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
服务端
#include <zmq.h> #include <unistd.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_REP); zmq_connect(socket, "tcp://localhost:5560"); while(1) { char * strReq = s_recv(socket); printf("Received request: [%s]\n", strReq); free(strReq); sleep(1); s_send(socket, "World"); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
掮客
#include <zmq.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_for_client = zmq_socket(context, ZMQ_ROUTER); void * socket_for_server = zmq_socket(context, ZMQ_DEALER); zmq_bind(socket_for_client, "tcp://*:5559"); zmq_bind(socket_for_server, "tcp://*:5560"); zmq_pollitem_t items[] = { { socket_for_client, 0, ZMQ_POLLIN, 0 }, { socket_for_server, 0, ZMQ_POLLIN, 0 }, }; while(1) { zmq_msg_t message; zmq_poll(items, 2, -1); if(items[0].revents & ZMQ_POLLIN) { while(1) { zmq_msg_init(&message); zmq_msg_recv(&message, socket_for_client, 0); int more = zmq_msg_more(&message); zmq_msg_send(&message, socket_for_server, more ? ZMQ_SNDMORE : 0); zmq_msg_close(&message); if(!more) { break; } } } if(items[1].revents & ZMQ_POLLIN) { while(1) { zmq_msg_init(&message); zmq_msg_recv(&message, socket_for_server, 0); int more = zmq_msg_more(&message); zmq_msg_send(&message, socket_for_client, more ? ZMQ_SNDMORE : 0); zmq_msg_close(&message); if(!more) { break; } } } } zmq_close(socket_for_client); zmq_close(socket_for_server); zmq_ctx_destroy(context); return 0; }
客户端和服务端因为掮客的存在, 代码都简单了很多, 对于掮客的代码, 有如下几点须要思考:
s_send
与s_recv
互相传递字符串, 但在掮客那里就须要用zmq_msg_t
进行转发呢?上面三点实际上是同一个问题: 掮客是如何实现带会话追踪的转发消息的?
另外, 若是你先启动掮客, 再启动客户端, 再启动服务端. 你会看到在服务端正确启动后, 客户端显示它收到了回包.那么:
这就是有关掮客的第二个问题: 如何配置缓冲区.
本章目前暂时不会对这三个问题作出解答, 你们先思考一下. 咱们将在下一章深刻掮客的细节进行进一步探索.
在上面的掮客代码示例中, 核心代码就是zmq_poll
对两个socket的监听, 以及while(1)
循环. ZMQ将这两坨操做统一封装到了一个函数中, 免得你们每次都要写boring code.
int zmq_proxy (const void *frontend, const void *backend, const void *capture);
参数frontend
与backend
分别是与客户端相连的socket
和与服务端相连的socket
. 在使用zmq_proxy
函数以前, 这两个socket必须被正确配置好, 该调用connect就调用connect, 该调用bind就调用bind. 简单来说, zmq_proxy
负责把frontend
与backend
之间的数据互相递送给对方. 而若是仅仅是单纯的递送的话, 第三个参数capture
就应当被置为NULL
, 而若是还想监听一下数据, 那么就再建立一个socket, 并将其值传递给capture
, 这样, frontend
与backend
之间的数据都会有一份拷贝被送到capture
上的socket.
当咱们用zmq_proxy
重写上面的掮客代码的话, 代码会很是简洁, 会变成这样:
#include <zmq.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_for_client = zmq_socket(context, ZMQ_ROUTER); void * socket_for_server = zmq_socket(context, ZMQ_DEALER); zmq_bind(socket_for_client, "tcp://*:5559"); zmq_bind(socket_for_server, "tcp://*:5560"); zmq_proxy(socket_for_client, socket_for_server, NULL); zmq_close(socket_for_client); zmq_close(socket_for_server); zmq_ctx_destroy(context); return 0; }
桥接是服务器后端的一种经常使用技巧. 所谓的桥接有点相似于掮客, 可是解决问题的侧重点不同. 掮客主要解决了三个问题:
而桥接解决的问题的侧重点主要在:
这种设计思路经常使用于后台服务的接口层. 接口层一方面链接着后端内部局域网, 另一方面对公提供服务. 这种服务能够是请求-回应式的服务, 也能够是发布-订阅式的服务(显然发布方在后端内部的局域网里). 这个时候接口层其实就完成了桥接的工做.
其实这种应用场景里, 把这种技巧称为桥接
并非很合适. 由于桥接
是一个计算机网络中硬件层的术语, 最初是用于线缆过长信号衰减时, 在线缆末端再加一个信号放大器之类的设备, 为通讯续命用的.
原版ZMQ文档在这里提出bridging
这个术语, 也只是为了说明一下, zmq_proxy
的适用场景不只局限于作掮客, 而是应该在理解上更宽泛一点, zmq_proxy
函数就是互相传递两个socket之间数据函数, 仅此而已, 而具体这个函数能应用在什么样的场景下, 掮客与桥接场景都可以使用, 但毫不局限于此. 写代码思惟要活.
ZMQ库对待错误, 或者叫异常, 的设计哲学是: 见光死. 前文中写的多数示例代码, 都没有认真的检查ZMQ库函数调用的返回值, 也没有关心它们执行失败后会发生什么. 通常状况下, 这些函数都能正常工做, 但凡事总有个万一, 万一建立socket失败了, 万一bind或connect调用失败了, 会发生什么?
按照见光死的字面意思: 按咱们上面写代码的风格, 一旦出错, 程序就挂掉退出了.
因此正确使用ZMQ库的姿式是: 生产环境运行的代码, 务必为每个ZMQ库函数的调用检查返回值, 考虑调用失败的状况. ZMQ库函数的设计也继续了POSIX接口风格里的一些设计, 这些设计包括:
errno
中, 或zmq_errno()
中zmq_strerror()
可能得到真正健壮的代码, 应该像下面这样写, 是的, 它很啰嗦, 但它很健壮:
// ... void * context = zmq_ctx_new(); assert(context); void * socket = zmq_socket(context, ZMQ_REP); assert(socket); int rc = zmq_bind(socket, "tcp://*:5555"); if(rc == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -1; } // ...
有两个比较例外的状况须要你注意一下:
ZMQ_DONTWAIT
的函数返回-1时, 通常状况下不是一个致命错误, 不该当致使程序退出. 好比在收包函数里带上这个标志, 那么语义只是说"没数据可收", 是的, 收包函数会返回-1, 而且会置error
值为EAGAIN
, 但这并不表明程序发生了不可逆转的错误.zmq_ctx_destroy()
时, 若是此时有其它线程在忙, 好比在写数据或者收数据什么的, 那么这会直接致使这些在干活的线程, 调用的这些阻塞式接口函数返回-1, 而且errno
被置为ETERM
. 这种状况在实际编码过程当中不该当出现.下面咱们写一个健壮的分治套路, 和咱们在第一章中写过的相似, 不一样的是, 此次, 在监理收到"全部工做均完成"的消息以后, 会发消息给各个工程队, 让工程队中止运行. 这个例子主要有两个目的:
原先的分治套路代码, 使用PUSH/PULL这两种socket类型, 将任务分发给多个工程队. 但在工做作完以后, 工程队的程序还在运行, 工程队的程序没法得知任务什么进修终止. 这里咱们再掺入发布-订阅套路, 在工做作完以后, 监理向广大工程队, 经过PUB类型的socket发送"活干活了"的消息, 而工程队用SUB类型的socket一旦收到监理的消息, 就中止运行.
包工头ventilator的代码和上一章的一毛同样, 只是对全部的ZMQ库函数调用增长了错误处理. 照顾你们, 这里再帖一遍
#include <zmq.h> #include <stdio.h> #include <time.h> #include <assert.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); assert(socket_to_sink); void * socket_to_worker = zmq_socket(context, ZMQ_PUSH); assert(socket_to_worker); if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } if(zmq_bind(socket_to_worker, "tcp://*:5557") == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -1; } printf("Press Enter when all workers get ready:"); getchar(); printf("Sending tasks to workers...\n"); if(s_send(socket_to_sink, "Get ur ass up") == -1) { printf("E: s_send failed: %s\n", strerror(errno)); return -1; } srandom((unsigned)time(NULL)); int total_ms = 0; for(int i = 0; i < 100; ++i) { int workload = randof(100) + 1; total_ms += workload; char string[10]; snprintf(string, sizeof(string), "%d", workload); if(s_send(socket_to_worker, string) == -1) { printf("E: s_send failed: %s\n", strerror(errno)); return -1; } } printf("Total expected cost: %d ms\n", total_ms); zmq_close(socket_to_sink); zmq_close(socket_to_worker); zmq_ctx_destroy(context); return 0; }
接下来是工程队worker的代码, 这一版新增了一个socket_to_sink_of_control
来接收来自监理的中止消息:
#include <zmq.h> #include <assert.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL); assert(socket_to_ventilator); if(zmq_connect(socket_to_ventilator, "tcp://localhost:5557") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); assert(socket_to_sink); if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } void * socket_to_sink_of_control = zmq_socket(context, ZMQ_SUB); assert(socket_to_sink_of_control); if(zmq_connect(socket_to_sink_of_control, "tcp://localhost:5559") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } if(zmq_setsockopt(socket_to_sink_of_control, ZMQ_SUBSCRIBE, "", 0) == -1) { printf("E: setsockopt failed: %s\n", strerror(errno)); } zmq_pollitem_t items [] = { { socket_to_ventilator, 0, ZMQ_POLLIN, 0 }, { socket_to_sink_of_control, 0, ZMQ_POLLIN, 0 }, }; while(1) { if(zmq_poll(items, 2, -1) == -1) { printf("E: poll failed: %s\n", strerror(errno)); return -1; } if(items[0].revents & ZMQ_POLLIN) { char * strWork = s_recv(socket_to_ventilator); assert(strWork); printf("%s.", strWork); fflush(stdout); s_sleep(atoi(strWork)); free(strWork); if(s_send(socket_to_sink, "") == -1) { printf("E: s_send failed %s\n", strerror(errno)); return -1; } } if(items[1].revents & ZMQ_POLLIN) { break; } } zmq_close(socket_to_ventilator); zmq_close(socket_to_sink); zmq_close(socket_to_sink_of_control); zmq_ctx_destroy(context); return 0; }
接下来是监理的代码, 这一版新增了socket_to_worker_of_control
来在任务结束以后给工程队发布中止消息:
#include <zmq.h> #include <assert.h> #include <stdint.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_worker = zmq_socket(context, ZMQ_PULL); if(zmq_bind(socket_to_worker, "tcp://*:5558") == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -1; } void * socket_to_worker_of_control = zmq_socket(context, ZMQ_PUB); if(zmq_bind(socket_to_worker_of_control, "tcp://*:5559") == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -1; } char * strBeginMsg = s_recv(socket_to_worker); assert(strBeginMsg); free(strBeginMsg); int64_t i64StartTime = s_clock(); for(int i = 0; i < 100; ++i) { char * strRes = s_recv(socket_to_worker); assert(strRes); free(strRes); if(i % 10 == 0) { printf(":"); } else { printf("."); } fflush(stdout); } printf("Total elapsed time: %d msec\n", (int)(s_clock() - i64StartTime)); if(s_send(socket_to_worker_of_control, "STOP") == -1) { printf("E: s_send failed: %s\n", strerror(errno)); return -1; } zmq_close(socket_to_worker); zmq_close(socket_to_worker_of_control); zmq_ctx_destroy(context); return 0; }
这个例子也展现了如何将多种套路揉合在一个场景中. 因此说写代码, 思惟要灵活.
通常状况下, Linux上的程序在接收到诸如SIGINT
和SIGTERM
这样的信号时, 其默认动做是让进程退出. 这种退出信号的默认行为, 只是简单的把进程干掉, 不会管什么缓冲区有没有正确刷新, 也不会管文件以及其它资源句柄是否是正确被释放了.
这对于实际应用场景中的程序来讲是不可接受的, 因此在编写后台应用的时候必定要注意这一点: 要妥善的处理POSIX Signal. 限于篇幅, 这里不会对Signal进行进一步讨论, 若是对这部份内容不是很熟悉的话, 请参阅<Unix环境高级编程>(<Advanced Programming in the UNIX Environment>)第十章(chapter 10. Signals).
下面是妥善处理Signal的一个例子
#include <stdlib.h> #include <stdio.h> #include <signal.h> #include <unistd.h> #include <fcntl.h> #include <assert.h> #include <string.h> #include <zmq.h> #define S_NOTIFY_MSG " " #define S_ERROR_MSG "Error while writing to self-pipe.\n" static int s_fd; static void s_signal_handler(int signal_value) { int rc = write(s_fd, S_NOTIFY_MSG, sizeof(S_NOTIFY_MSG)); if(rc != sizeof(S_NOTIFY_MSG)) { write(STDOUT_FILENO, S_ERROR_MSG, sizeof(S_ERROR_MSG) - 1); exit(1); } } static void s_catch_signals(int fd) { s_fd = fd; struct sigaction action; action.sa_handler = s_signal_handler; action.sa_flags = 0; sigemptyset(&action.sa_mask); sigaction(SIGINT, &action, NULL); sigaction(SIGTERM, &action, NULL); } int main(void) { int rc; void * context = zmq_ctx_new(); assert(context); void * socket = zmq_socket(context, ZMQ_REP); assert(socket); if(zmq_bind(socket, "tcp://*:5555") == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -__LINE__; } int pipefds[2]; rc = pipe(pipefds); if(rc != 0) { printf("E: creating self-pipe failed: %s\n", strerror(errno)); return -__LINE__; } for(int i = 0; i < 2; ++i) { int flags = fcntl(pipefds[0], F_GETFL, 0); if(flags < 0) { printf("E: fcntl(F_GETFL) failed: %s\n", strerror(errno)); return -__LINE__; } rc = fcntl(pipefds[0], F_SETFL, flags | O_NONBLOCK); if(rc != 0) { printf("E: fcntl(F_SETFL) failed: %s\n", strerror(errno)); return -__LINE__; } } s_catch_signals(pipefds[1]); zmq_pollitem_t items[] = { { 0, pipefds[0], ZMQ_POLLIN, 0 }, { socket, 0, ZMQ_POLLIN, 0 }, }; while(1) { rc = zmq_poll(items, 2, -1); if(rc == 0) { continue; } else if(rc < 0) { if(errno == EINTR) { continue; } else { printf("E: zmq_poll failed: %s\n", strerror(errno)); return -__LINE__; } } // Signal pipe FD if(items[0].revents & ZMQ_POLLIN) { char buffer[2]; read(pipefds[0], buffer, 2); // clear notifying bytes printf("W: interrupt received, killing server...\n"); break; } // Read socket if(items[1].revents & ZMQ_POLLIN) { char buffer[255]; rc = zmq_recv(socket, buffer, 255, ZMQ_NOBLOCK); if(rc < 0) { if(errno == EAGAIN) { continue; } if(errno == EINTR) { continue; } printf("E: zmq_recv failed: %s\n", strerror(errno)); return -__LINE__; } printf("W: recv\n"); // Now send message back; // ... } } printf("W: cleaning up\n"); zmq_close(socket); zmq_ctx_destroy(context); return 0; }
上面这个程序的逻辑流程是这样的:
ZMQ_REP
的zmq socket, 并将之bind在本地5555端口上而后程序为信号SIGINT
与SIGTERM
挂载了自定义的信号处理函数, 信号处理函数作的事以下:
" "
"Err while writing to self-pipe"
并调用exit()
退出程序而后将zmq socket与管道1读端均加入zmq_poll
SIGINT
或SIGTERM
信号, 则退出数据处理循环, 以后将依次调用zmq_close()
与zmq_ctx_destroy()
这种写法使用了管道, 逻辑上清晰了, 代码上繁琐了, 但这都不是重点, 重点是这个版本的服务端程序在接收到SIGINT
与SIGTERM
时, 虽然也会退出进程, 但在退出以前会妥善的关闭掉zmq socket与zmq context.
而还有一种更简洁的写法(这种简洁的写在实际上是有潜在的漏洞的, 详情请参见<Unix环境高级编程>(<Advanced Programming in the UNIX Environment>) 第十章(chapter 10. Signals) )
s_interrupted
SIGINT
之类的信号时, 置s_interrupted
为1s_interrupted
的值, 若该值为1, 则进入退出流程大体以下:
s_catch_signals(); // 注册事件回调 client = zmq_socket(...); while(!s_interrupted) // 时刻检查 s_interrupted 的值 { char * message = s_recv(client); if(!message) { break; // 接收消息异常时也退出 } // 处理业务逻辑 } zmq_close(close);
服务端应用程序最蛋疼的问题就是内存泄漏了, 这个问题已经困扰了C/C++程序员二三十年了, ZMQ在这里建议你使用工具去检测你的代码是否有内存泄漏的风险. 这里建议你使用的工具是: valgrind
默认状况下, ZMQ自己会致使valgrind报一大堆的警告, 首先先屏蔽掉这些警告. 在你的工程目录下新建一个文件名为 vg.supp
, 写入下面的内容
{ <socketcall_sendto> Memcheck:Param socketcall.sendto(msg) fun:send ... } { <socketcall_sendto> Memcheck:Param socketcall.send(msg) fun:send }
而后记得妥善处理掉诸如SIGINT
与SIGTERM
这样的Signal. 不然valgrind会认为不正确的退出程序会有内存泄漏风险. 最后, 在编译你的程序时, 加上 -DDEBUG
选项. 而后以下运行valgrind
valgrind --tool=memcheck --leak-check=full --suppression=vg.supp <你的程序>
若是你的代码写的没有什么问题, 会获得下面这样的赞扬
==30536== ERROR SUMMARY: 0 errors from 0 contexts...
啊, 多线程, 给你们讲一个笑话, 小明有一个问题, 而后小明决定使用多线程编程解决这个问题. 最后小明问题两个了有.
传统的多线程编程中, 或多或少都会掺入同步手段. 而这此同步手段通常都是程序员的噩梦, 信号量, 锁. ZMQ则告诫广大程序员: 不要使用信号量, 也不要使用锁, 不要使用除了 zmq inproc
以外的任何手段进行线程间的数据交互.
ZMQ在多线程上的哲学是这样的:
更细节的, 在进行多线程编程时, 你应当遵循如下的几个点:
若是你程序要用到多个掮客, 好比, 多个线程都拥有本身独立的掮客, 一个常见的错误就是: 在A线程里建立掮客的左右两端socket, 而后将socket传递给B线程里的掮客. 这违反了上面的规则: 不要在线程间传递socket. 这种错误很难发觉, 而且出错是随机的, 出现问题后很难排查.
ZMQ对线程库是没有侵入性的, ZMQ没有内置线程库, 也没有使用其它的线程实例. 使用ZMQ写多线程应用程序, 多线程接口就是操做系统操做的线程接口. 因此它对线程相关的检查工具是友好的: 好比Intel的Thread Checker. 这种设计的坏处就是你写的程序在线程接口这一层的可移植性须要你本身去保证. 或者你须要使用其它第三方的可移植线程库.
这里咱们写一个例子吧, 咱们把最初的请求-回应套路代码改形成多线程版的. 原始版的服务端是单进程单线程程序, 若是请求量比较低的话, 是没有什么问题的, 单线程的ZMQ应用程序吃满一个CPU核心是没有问题的, 但请求量再涨就有点捉襟见肘了, 这个时候就须要让程序吃满多个核心. 固然多进程服务也能完成任务, 但这里主要是为了介绍在多线程编程中使用ZMQ, 因此咱们把服务端改形成多线程模式.
另外, 显然你能够使用一个掮客, 再外加一堆服务端结点(不管结点是独立的进程, 仍是独立的机器)来让服务端的处理能力更上一层楼. 但这更跑偏了.
仍是看代码吧. 服务端代码以下:
#include <pthread.h> #include <unistd.h> #include <assert.h> #include "zmq_helper.h" static void * worker_routine(void * context) { void * socket_to_main_thread = zmq_socket(context, ZMQ_REP); assert(socket_to_main_thread); zmq_connect(socket_to_main_thread, "inproc://workers"); while(1) { char * strReq = s_recv(socket_to_main_thread); printf("Received request: [%s]\n", strReq); free(strReq); sleep(1); s_send(socket_to_main_thread, "World"); } zmq_close(socket_to_main_thread); return NULL; } int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_client = zmq_socket(context, ZMQ_ROUTER); assert(socket_to_client); zmq_bind(socket_to_client, "tcp://*:5555"); void * socket_to_worker_thread = zmq_socket(context, ZMQ_DEALER); assert(socket_to_worker_thread); zmq_bind(socket_to_worker_thread, "inproc://workers"); for(int i = 0; i < 5; ++i) { pthread_t worker; pthread_create(&worker, NULL, worker_routine, context); } zmq_proxy(socket_to_client, socket_to_worker_thread, NULL); zmq_close(socket_to_client); zmq_close(socket_to_worker_thread); zmq_ctx_destroy(context); return 0; }
这就是一个很正统的设计思路, 多个线程之间是互相独立的, worker线程自己很容易能改形成独立的进程, 主线程作掮客.
来, 下面就是一个例子, 使用PAIR socket完成线程同步, 内部通讯使用的是inproc
#include <zmq.h> #include <pthread.h> #include "zmq_helper.h" static void * thread1_routine(void * context) { printf("thread 1 start\n"); void * socket_to_thread2 = zmq_socket(context, ZMQ_PAIR); zmq_connect(socket_to_thread2, "inproc://thread_1_2"); printf("thread 1 ready, send signal to thread 2\n"); s_send(socket_to_thread2, "READY"); zmq_close(socket_to_thread2); printf("thread 1 end\n"); return NULL; } static void * thread2_routine(void * context) { printf("thread 2 start\n"); void * socket_to_thread1 = zmq_socket(context, ZMQ_PAIR); zmq_bind(socket_to_thread1, "inproc://thread_1_2"); pthread_t thread1; pthread_create(&thread1, NULL, thread1_routine, context); char * str = s_recv(socket_to_thread1); free(str); zmq_close(socket_to_thread1); void * socket_to_mainthread = zmq_socket(context, ZMQ_PAIR); zmq_connect(socket_to_mainthread, "inproc://thread_2_main"); printf("thread 2 ready, send signal to main thread\n"); s_send(socket_to_mainthread, "READY"); zmq_close(socket_to_mainthread); printf("thread 2 end\n"); return NULL; } int main(void) { printf("main thread start\n"); void * context = zmq_ctx_new(); void * socket_to_thread2 = zmq_socket(context, ZMQ_PAIR); zmq_bind(socket_to_thread2, "inproc://thread_2_main"); pthread_t thread2; pthread_create(&thread2, NULL, thread2_routine, context); char * str = s_recv(socket_to_thread2); free(str); zmq_close(socket_to_thread2); printf("Test over\n"); zmq_ctx_destroy(context); printf("main thread end\n"); return 0; }
这个简单的程序包含了几个编写多线程同步时的潜规则:
须要注意的是, 上面这种写法的多线程, 很难拆成多个进程, 上面这种写法通常用于压根就不许备拆分的服务端应用程序. inproc很快, 性能很好, 可是不能用于多进程或多结点通讯.
另一种常见的设计就是使用tcp来传递同步信息. 使用tcp使得多线程拆分红多进程成为一种可能. 另一种同步场景就是使用发布-订阅套路. 而不使用PAIR. 甚至能够使用掮客使用的ROUTER/DEALER进行同步. 但须要注意下面几点:
zmq_send
接口发送的消息将变成一个多帧消息被发出去. 若是你发的同步消息不带语义, 那么还好, 若是你发送的消息带语义, 那么请特别当心这一点, 多帧消息的细节将在第三章进行进一步讨论. 而DEALER则会把消息广播给全部对端, 这一点和PUSH同样, 请额外注意. 总之创建在阅读第三章以前, 不要用ROUTER或DEALER作线程同步.zmq_setsockopt
设置过滤器, 不然SUB端收不到任何消息, 这一点很烦.因此总的来讲, 用PAIR是最方便的选择.
当你须要同步, 或者协调的两个结点位于两个不一样的机器上时, PAIR就不那么好用了, 直接缘由就是: PAIR不支持断线重连. 在同一台机器上, 多个进程之间同步, 没问题, 多个线程之间同步, 也没问题. 由于单机内创建起的通信链接基本不可能发生意外中断, 而一旦发生中断, 必定是进程挂了, 这个时候麻烦事是进程为何挂了, 而不是通信链接为何挂了.
可是在跨机器的结点间进行同步, 就须要考虑到网络波动的缘由了. 结点自己上运行的服务可能没什么问题, 但就是网线被剪了, 这种状况下使用PAIR就再也不合适了, 你就必须使用其它socket类型了.
另外, 线程同步与跨机器结点同步之间的另一个重大区别是: 线程数量通常是固定的, 服务稳定运行期间, 线程数目通常不会增长, 也不会减小. 但跨机器结点可能会横向扩容. 因此要考虑的事情就又我了一坨.
咱们下面会给出一个示例程序, 向你展现跨机器结点之间的同步到底应该怎么作. 还记得上一章咱们讲发布-订阅套路的时候, 提到的, 在订阅方创建链接的那段短暂的时间内, 全部发布方发布的消息都会被丢弃吗? 这里咱们将改进那个程序, 在下面改进版的发布-订阅套路中, 发布方会等待全部订阅方都创建链接完成后, 才开始广播消息. 下面将要展现的代码主要作了如下的工做:
来看代码:
发布方代码:
#include <zmq.h> #include "zmq_helper.h" #define SUBSCRIBER_EXPECTED 10 int main(void) { void * context = zmq_ctx_new(); void * socket_for_pub = zmq_socket(context, ZMQ_PUB); int sndhwm = 1100000; zmq_setsockopt(socket_for_pub, ZMQ_SNDHWM, &sndhwm, sizeof(int)); zmq_bind(socket_for_pub, "tcp://*:5561"); void * socket_for_sync = zmq_socket(context, ZMQ_REP); zmq_bind(socket_for_sync, "tcp://*:5562"); printf("waiting for subscribers\n"); int subscribers_count = 0; while(subscribers_count < SUBSCRIBER_EXPECTED) { char * str = s_recv(socket_for_sync); free(str); s_send(socket_for_sync, ""); subscribers_count++; } printf("broadingcasting messages\n"); for(int i = 0; i < 1000000; ++i) { s_send(socket_for_pub, "Lalalal"); } s_send(socket_for_pub, "END"); zmq_close(socket_for_pub); zmq_close(socket_for_sync); zmq_ctx_destroy(context); return 0; }
订阅方代码
#include <zmq.h> #include <unistd.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_for_sub = zmq_socket(context, ZMQ_SUB); zmq_connect(socket_for_sub, "tcp://localhost:5561"); zmq_setsockopt(socket_for_sub, ZMQ_SUBSCRIBE, "", 0); sleep(1); void * socket_for_sync = zmq_socket(context, ZMQ_REQ); zmq_connect(socket_for_sync, "tcp://localhost:5562"); s_send(socket_for_sync, ""); char * str = s_recv(socket_for_sync); free(str); int i = 0; while(1) { char * str = s_recv(socket_for_sub); if(strcmp(str, "END") == 0) { free(str); break; } free(str); i++; } printf("Received %d broadcast message\n", i); zmq_close(socket_for_sub); zmq_close(socket_for_sync); zmq_ctx_destroy(context); return 0; }
最后带一个启动脚本:
#! /bin/bash echo "Starting subscribers..." for((a=0; a<10; a++)); do ./subscriber & done echo "Starting publisher..." ./publisher
运行启动脚本以后, 你大概会获得相似于下面的结果:
Starting subscribers... Starting publisher... waiting for subscribers broadingcasting messages Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message
你看, 此次有了同步手段, 每一个订阅者都确实收到了100万条消息, 一条很多
上面的代码还有一个细节须要你注意一下:
注意到在订阅者的代码中, 有一行sleep(1)
, 若是去掉这一行, 运行结果可能(很小的几率)不是咱们指望的那样. 之因此这样作是由于:
先建立用于接收消息的socket_for_sub
, 而后connect
之. 再去作同步操做. 有可能: 同步的REQ与REP对话已经完成, 可是socket_for_sub
的链接过程尚未结束. 这个时候仍是会丢掉消息. 也就是说, 这个sleep(1)
操做是为了确认: 在同步操做完成以后, 用于发布-订阅套路的通信链接必定创建好了.
接触过与性能有关的网络编程的*nix端后台开发的同步必定据说这这样的一个术语: 零拷贝(Zero-Copy). 你仔细回想咱们经过网络编程接收, 发送消息的过程. 若是咱们要发送一个消息, 咱们须要把这个消息传递给发送相关的接口, 若是咱们须要接收一个消息, 咱们须要把咱们的缓冲区提供给接收消息的函数.
这里就有一个性能痛点, 特别是在接收消息的时候: 在网络接口API底层, 必定有另一个缓冲区率先接收了数据, 以后, 你调用收包函数, 诸如recv
这样的函数, 将你的缓冲区提供给函数, 而后, 数据须要从事先收到数据的缓冲区, 拷贝至你本身提供给API的缓冲区.
若是咱们向更底层追究一点, 会发现网络编程中, 最简单的发收消息模型里, 至少存在着两到三次拷贝, 不光收包的过程当中有, 发包也有. 上面讲到的只是离应用开发者最近的一层发生的拷贝动做. 而实际上, 可能发生拷贝的地方有: 应用程序与API交互层, API与协议栈交互层, 协议栈/内核空间交互层, 等等.
对于更深层次来说, 不是咱们应用程序开发者应该关心的地方, 而且时至今日, 从协议栈到离咱们最近的那一层, 操做系统基本上都作了避免拷贝的优化. 那么, ZMQ做为一个网络库, 在使用的进修, 应用程序开发就应当避免离咱们最近的那一次拷贝.
这也是为何ZMQ库除了zmq_send
与zmq_recv
以外, 又配套zmq_msg_t
类型再提供了zmq_msg_send
与zmq_msg_recv
接口的缘由. zmq_msg_t
内置了一个缓冲区, 能够用来收发消息, 当你使用msg系的接口时, 收与发都发生在zmq_msg_t
实例的缓冲区中, 不存在拷贝问题.
总之, 要避免拷贝, 须要如下几步:
zmq_msg_init_data()
建立一个zmq_msg_t
实例. 接口返回的是zmq_msg_t
的句柄. 应用开发者看不到底层实现.memcpy
之类的接口写入zmq_msg_t
中, 再传递给zmq_msg_send
. 接收数据时, 直接将zmq_msg_t
句柄传递给zmq_msg_recv
zmq_msg_t
被发送以后, 其中的数据就自动被释放了. 也就是, 对于同一个zmq_msg_t
句柄, 你不能连续两次调用zmq_msg_send
zmq_msg_t
内部使用了引用计数的形式来指向真正存储数据的缓冲区, 也就是说, zmq_msg_send
会将这个计数减一. 当计数为0时, 数据就会被释放. ZMQ库对于zmq_msg_t
的具体实现并无作过多介绍, 也只点到这一层为止.zmq_msg_t
是有可能共享同一段二进制数据的. 这也是zmq_msg_copy
作的事情. 若是你须要将同一段二进制数据发送屡次, 那么请使用zmq_msg_copy
来生成额外的zmq_msg_t
句柄. 每次zmq_msg_copy
操做都将致使真正的数据的引用计数被+1. 每次zmq_msg_send
则减1, 引用计数为0, 数据自动释放.zmq_msg_close
接口. 注意: 在zmq_msg_send
被调用以后, ZMQ库自动调用了zmq_msg_close
, 你能够理解为, 在zmq_msg_send
内部, 完成数据发送后, 自动调用了zmq_msg_close
zmq_msg_t
的内部实现是一个黑盒, 因此若是要接收数据, 虽然调用zmq_msg_recv
的过程当中没有发生拷贝, 但应用程序开发者最终仍是须要把数据读出来. 这就必须有一次拷贝. 这是没法避免的. 或者换一个角度来描述这个蛋疼的点: ZMQ没有向咱们提供真正的零拷贝收包接口. 收包时的拷贝是无可避免的.最后给你们一个忠告: 拷贝确实是一个后端服务程序的性能问题. 但瓶颈通常不在调用网络库时发生的拷贝, 而在于其它地方的拷贝. zmq_msg_t
的使用重心不该该在"优化拷贝, 提高性能"这个点上, 而是第三章要提到和进一步深刻讲解的多帧消息.
以前咱们讲到的发布-订阅套路里, 发布者广播的消息全是字符串, 而订阅者筛选过滤消息也是按字符串匹配前几个字符, 这种策略有点土. 假如咱们能把发布者广播的消息分红两段: 消息头与消息体. 消息头里写明信息类型, 消息体里再写具体的信息内容. 这样过滤器直接匹配消息头就能决定这个消息要仍是不要, 这就看起来洋气多了.
ZMQ中使用多帧消息支持这一点. 发布者发布多帧消息时, 订阅者的过滤器在匹配时, 只匹配第一帧.
多说无益, 来看例子, 在具体展现发布者与订阅者代码以前, 须要为咱们的zmq_help.h
文件再加一个函数, 用于发送多帖消息的s_sendmore
/* * 把字符串做为字节数据, 发送至zmq socket, 但不发送字符串末尾的'\0'字节 * 而且通知socket, 后续还有帧要发送 * 发送成功时, 返回发送的字节数 */ static inline int s_sendmore(void * socket, const char * string) { return zmq_send(socket, string, strlen(string), ZMQ_SNDMORE); }
下面是发布者的代码:
#include <zmq.h> #include <unistd.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_PUB); zmq_bind(socket, "tcp://*:5563"); while(1) { s_sendmore(socket, "A"); s_send(socket, "We don't want to see this"); s_sendmore(socket, "B"); s_send(socket, "We would like to see this"); sleep(1); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
下面是订阅者的代码:
#include <zmq.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_SUB); zmq_connect(socket, "tcp://localhost:5563"); zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "B", 1); while(1) { char * strMsgType = s_recv(socket); char * strMsgContent = s_recv(socket); printf("[%s] %s\n", strMsgType, strMsgContent); free(strMsgType); free(strMsgContent); } zmq_close(socket); zmq_ctx_destroy(socket); return 0; }
这里有两点:
消息愈加越快, 愈加越多, 你慢慢的就会意识到一个问题: 内存资源很宝贵, 而且很容易被用尽. 若是你不注意到这一点, 服务器上某个进程阻塞个几秒钟, 就炸了.
想象一下这个场景: 在同一台机器上, 有一个进程A在疯狂的向进程B发送消息. 忽然, B以为很累, 休息了3秒(好比CPU过载, 或者B在跑GC吧, 无所谓什么缘由), 这3秒钟B处理不过来A发送的数据了. 那么在这3秒钟, A依然疯狂的试图向B发送消息, 会发生什么? 若是B有收包缓冲区的话, 这个缓冲区确定被塞满了, 若是A有发送缓冲区的话, 这个缓冲区也应该被塞满了. 剩余的没被发出去的消息就堆积到A进程的内存空间里, 这个时候若是A程序写的很差, 那么A进程因为内存被疯狂占用, 很快就会挂掉.
这是一个消息队列里的经典问题, 就是消息生产者和消费者的速度不匹配的时候, 消息中间件应当怎么设计的问题. 这个问题的根实际上是在B身上, 但B对于消息队列的设计者来讲是不可控的: 这是消息队列使用者写的B程序, 你怎么知道那波屌人写的啥屌代码? 因此虽然问题由B产生, 但最好仍是在A那里解决掉.
最简单的策略就是: A保留一些缓存能力, 应对突发性的情况. 超过必定限度的话, 就要扔消息了. 不能把这些生产出来的消息, 发不出去还存着. 这太蠢了.
另外还有一种策略, 若是A只是一个消息中转者, 能够在超过限度后, 告诉生产消息的上流, 你停一下, 我这边满了, 请不要再给我发消息了. 这种状况下的解决方案, 其实就是经典的"流控"问题. 这个方案其实也很差, A只能向上游发出一声呻吟, 但上游若是执意仍是要发消息给A, A也没办法去剪网线, 因此转一圈又回来了: 仍是得扔消息.
ZMQ里, 有一个概念叫"高水位阈值", (high-water mark. HWM), 这个值实际上是网络结点自身能缓存的消息的能力. 在ZMQ中, 每个活动的链接, 即socket, 都有本身的消息缓冲队列, HWM指的就是这个队列的容量. 对于某些socket类型, 诸如SUB/PULL/REQ/REP来讲, 只有收包队列. 对于某此socket类型来讲, 诸如DEALER/ROUTER/PAIR, 既能收还能发, 就有两个队列, 一个用于收包, 一个用于发包.
在ZMQ 2.X版本中, HWM的值默认是无限的. 这种状况下很容易出现咱们这一小节开头讲的问题: 发送消息的api接口永远不会报错, 对端假死以后内存就会炸. 在ZMQ 3.X版本中, 这个值默认是1000, 这就合理多了.
当socket的HWM被触及后, 再调用发送消息接口, ZMQ要么会阻塞接口, 要么就扔掉消息. 具体哪一种行为取决于sokcet的类型.
显然在这种状况下, 若是以非阻塞形式发包, 接口会返回失败.
另外, 很特殊的是, inproc类型两端的两个socket共享同一个队列: 真实的HWM值是双方设置的HWM值的总和. 你能够将inproc方式想象成一根管子, 双方设置HWM时只是在宣称我须要占用多长的管子, 但真实的管子长度是两者的总和.
最后, 很反直觉的是, HWM的单位是消息个数, 而不是字节数. 这就颇有意思了. 另外, HWM触顶时, 队列中的消息数量通常很差恰好就等于你设置的HWM值, 真实状况下, 可能会比你设置的HWM值小, 极端状况下可能只有你设置的HWM的一半.
当你写代码, 编译, 连接, 运行, 而后发现收不到消息, 这个时候你应当这样排查:
zmq_setsockopt
设置过滤器若是你使用的是SUB类型的socket, 上面两点你都作正确了, 仍是有可能收不到消息. 这是由于ZMQ内部的队列在链接创建以后可能尚未初始化完成. 这种状况没什么好的解决办法, 有两个土办法
sleep(1)
最后, 若是你确实找不到出错的缘由, 但就是看不到消息, 请考虑向ZeroMQ 社区提问.