本文主要译自 zguide - chapter one. 但并非照本翻译.linux
介绍性的话我这里就不翻译了, 总结起来就是zmq很cool, 你应该尝试一下.git
在Linux和Mac OS上, 请经过随机附带的包管理软件, 或者home brew安装zmq. 包名通常就叫zmq, 安装上就好.程序员
安装后, 以Mac OS为例, 会出现一个新的头文件 /usr/local/include/zmq.h
, 和一个连接库 /usr/local/lib/libzmq.a
.github
因此, 若是你使用C语言, 那么很简单, 写代码的时候加上头文件 #include <zmq.h>
就行了, 连接的时候加上库 -lzmq
就行了.面试
若是你使用的不是C语言, 那么也很简单, 去复习一下C语言, 而后再回来看这个教程. 须要注意的是, 这个教程里的全部示例代码在编译的时候须要指定 -std=c99.apache
先放一个一问一答的例子来让你感觉一下编程
这是服务端代码设计模式
#include <zmq.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <assert.h> int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_REP); zmq_bind(socket, "tcp://*:5555"); while(1) { char buffer[10]; int bytes = zmq_recv(socket, buffer, 10, 0); buffer[bytes] = '\0'; printf("[Server] Recevied Request Message: %d bytes, content == \"%s\"\n", bytes, buffer); sleep(1); const char * replyMsg = "World"; bytes = zmq_send(socket, replyMsg, strlen(replyMsg), 0); printf("[Server] Sended Reply Message: %d bytes, content == \"%s\"\n", bytes, replyMsg); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
这是客户端代码api
#include <zmq.h> #include <string.h> #include <stdio.h> #include <unistd.h> int main(void) { printf("Connecting to server...\n"); void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_REQ); zmq_connect(socket, "tcp://localhost:5555"); for(int i = 0; i < 10; ++i) { char buffer[10]; const char * requestMsg = "Hello"; int bytes = zmq_send(socket, requestMsg, strlen(requestMsg), 0); printf("[Client][%d] Sended Request Message: %d bytes, content == \"%s\"\n", i, bytes, requestMsg); bytes = zmq_recv(socket, buffer, 10, 0); buffer[bytes] = '\0'; printf("[Client][%d] Received Reply Message: %d bytes, content == \"%s\"\n", i, bytes, buffer); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
这是makefile缓存
all: client server %: %.c gcc -std=c99 $^ -o $@ -lzmq
这个例子就很简单, 要点有如下
服务端上:
客户端上
看起来套路和你从<Unix 网络编程>里学到的差很少嘛. 不过, 你能够试试, 先启动客户端, 而后再启动服务端, 你会发现, 程序没有崩溃. 这就是zmq高明的地方, 把操做系统原生脆弱的网络编程接口进行了封装. 而且实际上不止于此, 后面咱们会学到更多. 这只是开胃小菜.
你可能注意到了咱们上面的例子里, 其实客户端与服务端互相传输的数据里, 并无包含C风格字符串最后一位的'\0'. 请时刻谨记这一点, 网络通讯中, 流动在网络编程API上的数据, 对于API自己来讲, 都是字节序列而已, 如何解释这些字节序列, 是网络编程API的使用者的责任. 好比上面, 咱们须要在每次接收数据的时候记录接收的数据的大小, 而且在buffer中为接收到的数据以后的一个字节赋值为0, 即人为的把接收到的数据解释为字符串. 而对于zmq_send
与zmq_recv
来讲, 它并不关心客户端与服务端传输的数据具体是什么.
这在全部网络编程API中都是这个套路, 不光是zmq, linux socket, winsock, 都是这样. 字符串? 不存在的. 我能看见的, 只是字节序列而已.
当你要把zmq应用到实际项目中的时候, 版本号注是一个你必须关注的事情了. 固然, 项目初期你能够不关心它, 或者项目规模较小的时候你能够不关心它. 但随着项目的进展, 项目中使用到的库的版本号就成了全部人必须关心的事情. 实际上全部第三方库的版本都是一个须要项目owner关心的事情, 由于总有一些sb会作出如下的事情:
因此, 在这里衷心的建议你, 时刻关注你项目中使用的全部第三方库, 搞清楚你的项目构造工具链的运行过程. 而对于zmq来讲, 要得到zmq的版本, 须要以下调用一些函数
#include <zmq.h> #include <stdio.h> int main(void) { int major = 0; int minor = 0; int patch = 0; zmq_version(&major, &minor, &patch); printf("ZMQ_VERSION == %d.%d.%d\n", major, minor, patch); return 0; }
在我写(抄)这个教程的时候, 我使用的版本号是4.2.5
有三件事我建议你养成习惯
如今我要写三个工具函数, 这三个函数都不完美, 但它们都会出现大后续的示例程序里, 用于缩减示例程序的篇幅:
第一个工具函数: 向zmq socket发送字符串数据, 但不带结尾的'\0'
/* * 把字符串做为字节数据, 发送至zmq socket, 但不发送字符串末尾的'\0'字节 * 发送成功时, 返回发送的字节数 */ static inline int s_send(void * socket, const char * string) { return zmq_send(socket, string, strlen(string), 0); }
第二个工具函数: 从zmq socket中接收数据, 并把其解释为一个字符串
/* * 从zmq socket中接收数据, 并将其解释为C风格字符串 * 注意: 该函数返回的字符串是为在堆区建立的字符串 * 请在使用结束后手动调用free将其释放 */ static inline char * s_recv(void * socket) { char buffer[256]; int length = zmq_recv(socket, buffer, 255, 0); if(length == -1) { return NULL; } buffer[length] = '\0'; return strndup(buffer, sizeof(buffer) - 1); }
第三个函数: 在取值范围 [0, x) 中随机生成一个整数
/* * 生成一个位于 [0, num)区间的随机数 */ #define randof(num) (int)((float)(num) * random() / (RAND_MAX + 1.0))
这些工具函数都会以静态内联函数的形式写在一个名为 "zmq_helper.h" 的头文件中, 在后续用得着这些工具函数的时候, 示例程序将直接使用, 而不作额外的说明. 对应的, 当新增一个工具函数的时候, 工具函数自己的源代码会在合适的时候贴出
相信以Java为主要工做语言的同窗, 在毕业面试的时候基本上都被面试官问过各类设计模式, design patterns. 不知道大家有没有思考过一个哲学问题: 什么是模式? 什么是pattern? 为何咱们须要设计模式?
我在这里给出个人理解: 模式并不高大上, 模式其实就是"套路". 所谓的设计模式就是在面向对象程序设计架构中, 前人总结出来的一些惯用套路.
网络编程中也有这样的套路, 也被称之为模式, pattern. ZMQ做为一个像消息库的网络库, 致力于向你提供套路, 或者说, 向你提供一些便于实现套路的工具集. 下面, 咱们来看咱们接触的第二个套路: 发布-订阅套路. (第一个套路是 请求-应答 套路)
发布-订阅套路中有两个角色: 发布者, 订阅者. 或者通俗一点: 村口的大喇叭, 与村民.
发布者, 与村口的大喇叭的共性是: 只生产消息, 不接收消息. 而订阅者与村民的共性是: 只接收消息, 而不生产消息(好吗, 村民会生产八卦消息, 抬杠就没意思了). ZMQ提供了两种特殊的socket用于实现这个模式, 这个套路, 下面是一个例子:
村口的大喇叭循环播放天气预报, 播放的内容很简单: 邮编+温度+相对温度. 各个村民只关心本身村的天气状况, 他们村的邮编是10001, 对于其它地区的天气, 村民不关心.
发布者/村口的大喇叭:
#include <zmq.h> #include <stdio.h> #include <stdlib.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_PUB); zmq_bind(socket, "tcp://*:5556"); srandom((unsigned)time(NULL)); while(1) { int zipcode = randof(100000); // 邮编: 0 ~ 99999 int temp = randof(84) - 42; // 温度: -42 ~ 41 int relhumidity = randof(50) + 10; // 相对湿度: 10 ~ 59 char msg[20]; snprintf(msg, sizeof(msg), "%5d %d %d", zipcode, temp, relhumidity); s_send(socket, msg); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
订阅者/村民:
#include <zmq.h> #include <stdio.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_SUB); zmq_connect(socket, "tcp://localhost:5556"); char * zipcode = "10001"; zmq_setsockopt(socket, ZMQ_SUBSCRIBE, zipcode, strlen(zipcode)); for(int i = 0; i < 50; ++i) { char * string = s_recv(socket); printf("[Subscriber] Received weather report msg: %s\n", string); free(string); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
makefile
all: publisher subscriber %: %.c gcc -std=c99 $^ -o $@ -lzmq
这个例子中须要特别注意的点有:
zmq_setsockopt
函数设置一个过滤器, 以说明关心哪些消息. 若是不设置过滤器, 那么什么消息都不会收到另外, 关于这个例子中的两种socket类型, 有如下特色
ZMQ_PUB
类型的socket, 若是没有任何村民与其相连, 其全部消息都将被简单就地抛弃ZMQ_SUB
类型的socket, 便是村民, 能够与多个ZMQ_PUB
类型的socket相连, 即村民能够同时收听多个喇叭, 但必须为每一个喇叭都设置过滤器. 不然默认状况下, zmq认为村民不关心喇叭里的全部内容.tcp
或ipc
这种面向链接的协议, 则堆积的消息缓存在喇叭里, 当使用epgm
这种协议时, 堆积的消息缓存了村民里. 在ZMQ 大版本号为2的版本中, 全部状况下, 消息都将堆积在村民里. 后续章节咱们会学习到, 如何以"高水位阈值"来保护喇叭.ZMQ里的ZMQ_PUB
型的发布者, 也就是喇叭, 其发送消息的能力是很炸的, zmq的做者在官方的guide里讲到, 发布者与订阅者位于同台机器上, 经过tcp://locahost链接, 发布者发布一千万条消息, 大概用时4秒多. 这仍是一台2011年的i5处理器的笔记本电脑. 还不是IDC机房里的服务器...你大体感觉一下..这个时候有人就跳出来讲了, 这同台机器走了loopback, 确定效率高啊.
若是你也冒出这样的想法, pong友, 看来你没理解zmq的做者想表达的意思. 显然, 若是采用以太网做链路层, 这个数据不可能这么炸裂, 但做者只是想向你表达: ZMQ自己绝对不会成为性能的瓶颈, 瓶颈确定在网络IO上, 而不是ZMQ库, 甚至于说操做系统协议栈上. 应用程序的性能瓶颈, 99.9999%都不在协议栈与网络库上, 而是受限于物理规格的网络IO.
性能低? 你不买个几百张82599武装你的机房, 性能低你怪谁? 内心没一点i3数吗?
分治套路里有三个角色:
在介绍这一节的示例代码以前, 咱们先引入了两个工具函数:
/* * 获取当时时间戳, 单位ms */ static inline int64_t s_clock(void) { struct timeval tv; gettimeofday(&tv, NULL); return (int64_t)(tv.tv_sec * 1000 + tv.tv_usec / 1000); } /* * 使当前进程睡眠指定毫秒 */ static inline void s_sleep(int ms) { struct timespec t; t.tv_sec = ms/1000; t.tv_nsec = (ms % 1000) * 1000000; nanosleep(&t, NULL); }
分治套路也被称为流水线套路. 下面是示例代码:
包工头代码:
#include <zmq.h> #include <stdio.h> #include <time.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); void * socket_to_worker = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket_to_sink, "tcp://localhost:5558"); zmq_bind(socket_to_worker, "tcp://*:5557"); printf("Press Enter when all workers get ready:"); getchar(); printf("Sending tasks to workers...\n"); s_send(socket_to_sink, "Get ur ass up"); // 通知监理, 干活了 srandom((unsigned)time(NULL)); int total_ms = 0; for(int i = 0; i < 100; ++i) { int workload = randof(100) + 1; // 工做须要的耗时, 单位ms total_ms += workload; char string[10]; snprintf(string, sizeof(string), "%d", workload); s_send(socket_to_worker, string); // 将工做分派给工程队 } printf("Total expected cost: %d ms\n", total_ms); zmq_close(socket_to_sink); zmq_close(socket_to_worker); zmq_ctx_destroy(context); return 0; }
工程队代码:
#include <zmq.h> #include <stdio.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL); void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket_to_ventilator, "tcp://localhost:5557"); zmq_connect(socket_to_sink, "tcp://localhost:5558"); while(1) { char * msg = s_recv(socket_to_ventilator); printf("Received msg: %s\n", msg); fflush(stdout); s_sleep(atoi(msg)); // 干活, 即睡眠指定毫秒 free(msg); s_send(socket_to_sink, "DONE"); // 活干完了通知监理 } zmq_close(socket_to_ventilator); zmq_close(socket_to_sink); zmq_ctx_destroy(context); return 0; }
监理代码:
#include <zmq.h> #include <stdio.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_to_worker_and_ventilator = zmq_socket(context, ZMQ_PULL); zmq_bind(socket_to_worker_and_ventilator, "tcp://*:5558"); char * msg = s_recv(socket_to_worker_and_ventilator); printf("Received msg: %s", msg); // 接收来自包工头的开始干活的消息 free(msg); int64_t start_time = s_clock(); for(int i = 0; i < 100; ++i) { // 接收100个worker干完活的消息 char * msg = s_recv(socket_to_worker_and_ventilator); free(msg); if(i / 10 * 10 == i) printf(":"); else printf("."); fflush(stdout); } printf("Total elapsed time: %d ms]\n", (int)(s_clock() - start_time)); zmq_close(socket_to_worker_and_ventilator); zmq_ctx_destroy(context); return 0; }
这个示例程序的逻辑流程是这样的:
包工头里输出的预计耗时是100个任务的共计耗时, 在监理那里统计的实际耗时则是由多个工程队并行处理100个任务实际的耗时.
这里个例子中须要注意的点有:
ZMQ_PULL
与ZMQ_PUSH
两种socket. 分别供消息分发方与消息接收方使用. 看起来略微有点相似于发布-订阅套路, 具体之间的区别后续章节会讲到.PUSH/PULL
模式虽然和PUB/SUB
不同, 不会丢失消息. 但若是不手动同步的话, 最早创建链接的工程队将几乎把全部任务都接收到手, 致使后续完成链接的工程队拿不到任务, 任务分配不平衡.因此, 你大体能看出来, 分治套路里有一个核心问题, 就是任务分发者与任务执行者之间的同步. 若是在全部执行者均与分发者创建链接后, 进行分发, 那么任务分发是比较公平的. 这就须要应用程序开发者本身负责同步事宜. 关于这个话题进一步的技巧将在第三章进一步讨论.
如今咱们写了三个例子, 分别是请求-回应套路, 发布-订阅套路, 流水线套路. 在继续进一步学习以前, 有必要对一些点进行强调
你大体注意到了, 在上面的全部示例代码中, 每次都以zmq_ctx_new()
函数建立出一个名为context
的变量, 目前你不须要了解它的细节, 这只是ZMQ库的标准套路. 甚至于你未来都不须要了解这个context里面究竟是什么. 但你必需要遵循zmq中关于这个context的一些编程规定:
zmq_ctx_new()
建立contextzmq_ctx_destroy()
销毁掉它每一个进程, 应该持有, 且应该只持有, 一个context. 固然, 目前来讲, 你这样理解就好了, 后续章节或许咱们会深刻探索一下context, 但目前, 请谨记, one context per process.
若是你在代码中调用了fork
系统调用, 那么请在子进程代码区的开始处调用zmq_ctx_new()
, 为子进程建立本身的context
网络编程和内存泄漏简直就是一对狗男女, 要避免这些狗血的场景, 写代码的时候, 时刻要谨记: 把屁股擦干净.在使用ZMQ编程的过程当中, 我建议你:
zmq_ctx_destroy()
以前, 先调用zmq_close()
关闭掉全部的zmq socket. 不然zmq_ctx_destroy
可能会被一直阻塞着zmq_send()
与zmq_recv()
来收发消息, 尽可能避免使用与zmq_msg_t
相关的API接口. 是的, 那些接口有额外的特性, 有额外的性能提高, 但在性能瓶颈不在这些细枝末节的时候, 不要过分造做.zmq_msg_t
相关的接口收发消息, 那么请在调用zmq_msg_recv()
以后, 尽快的调用zmq_msg_close()
释放掉消息对象固然, 上面主要是对C语言做者的一些建议, 对于其它语言, 特别是有GC的语言, 使用ZMQ相关接口以前建议确认相关的binding接口是否正确处理了资源句柄.
网络编程, 特别是*nix平台的网络编程, 99%程序员的启蒙始于<Unix网络编程>这本书, 90%里的项目充斥着linux socket, epoll与fd. 是的, 2018年了, 他们仍是这么干的. 咱们就从这个视角来列举一下, 使用*nix平台原生的网络API与多路IO接口, 你在写服务端程序时须要头疼的事情:
我问你, 你头大不大? 想不想死?
读过开源项目吗? 好比Hadoop Zookeeper, 你去观摩一下zookeeper.c, 真是看的人头大想死. 你再翻翻其它开源项目, 特别是用C/C++写的Linux端程序, 每一个都要把网络库事件库从新写一遍.
因此矛盾很突出, 为何不能造一个你们都用的轮子呢? 缘由很简单, 有两个方面:
那么ZMQ解决了什么问题呢? ZMQ给上面提出的问题都给了完美答案吗? 理性的说, 确定没有, 可是ZMQ是这样回答这些问题的:
总之, 就是很好, 固然了没有一个框架库的做者会说本身的产品很差, 而具体好很差, 学了用了以后才会知道, 上面的点看一看得了, 别当真.
在发布-订阅套路由, 当你开启多个村民的时候, 你会发现, 全部村民都能收到消息, 而村口的喇叭也工做正常. 这就是zmq socket的可扩展性. 对于发布端来说, 开发人员始终面对的是一个socket, 而不用去管链接我到底下面会有多少订阅用户. 这样极大简化了开发人员的工做, 实际发布端程序跑起来的时候, 会自主进行适应, 并执行最合理的行为. 更深层次一点, 你可能会说, 这样的功能, 我用epoll在linux socket上也能实现, 可是, 当多个订阅者开始接收数据的时候, 你仔细观察你cpu的负载, 你会发现发布端进程不光正确接纳了全部订阅者, 更重要的是把工做负载经过多线程均衡到了你电脑的多个核心上. 日最大程度的榨干了你的cpu性能. 若是你单纯的用epoll和linux socket来实现这个功能, 发布端只会占用一个核心, 除非你再写一坨代码以实现多线程或多进程版的村口大喇叭.