在 Nginx 里面,每一个 worker 进程都是平等的。可是有些时候,咱们须要给它们分配不一样的角色,这时候就须要实现进程间通信的功能。nginx
一种简单粗暴但却被广泛使用的方案,就是每一个进程划分属于本身的 list 类型的 shdict key,每隔一段时间查看是否有新消息。这种方式优势在于实现简单,缺点在于难以保证明时性。固然对于绝大多数须要进程间通信的场景,每 0.1 起一个 timer 来处理新增消息已经足够了。毕竟 0.1 秒的延迟不算长,每秒起 10 个 timer 开销也不大,应付通常的通讯量绰绰有余。git
要是你以为轮询很搓,或者在你的环境下,轮询确实很搓,也能够考虑下引入外部依赖来改善实时性。好比在本地起一个 redis,监听 unix socket,而后每一个进程经过 Pub/Sub 或者 stream 类型发布/获取最新的消息。这种方案实现起来也简单,实时性和性能也足够好,只是须要引入个 redis 服务。github
若是你是个极简主义者,对引入外部依赖深恶痛绝,但愿什么东西都能在 Nginx 里面实现的话,ngx_lua_ipc 是一个被普遍使用的选择。redis
ngx_lua_ipc
是一个第三方 Nginx C 模块,提供了一些 Lua API,可供在 OpenResty 代码里完成进程间通信(IPC)的操做。segmentfault
它会在 Nginx 的 init 阶段建立 worker process + helper process 对 pipe fd。每对 fd 有一个做为 read fd,负责接收数据,另外一个做为 write fd,用于发送数据。当 Nginx 建立 worker 进程时,每一个 worker 进程都会继承这些 pipe fd,因而就能经过它们来实现进程间通信。感兴趣的读者能够 man 7 pipe
一下,了解基于 pipe 的进程间通信是怎么实现的。网络
固然 ngx_lua_ipc
还须要把 pipe 的 read fd 经过 ngx_connection_t
接入到 Nginx 的事件循环机制中,具体实现位于 ipc_channel_setup_conn
:socket
c = ngx_get_connection(chan->pipe[conn_type == IPC_CONN_READ ? 0 : 1], cycle->log); c->data = data; if(conn_type == IPC_CONN_READ) { c->read->handler = event_handler; c->read->log = cycle->log; c->write->handler = NULL; ngx_add_event(c->read, NGX_READ_EVENT, 0); chan->read_conn=c; } else if(conn_type == IPC_CONN_WRITE) { c->read->handler = NULL; c->write->log = cycle->log; c->write->handler = ipc_write_handler; chan->write_conn=c; } else { return NGX_ERROR; } return NGX_OK;
write fd 是由 Lua 代码操做的,因此不须要加入到 Nginx 的事件循环机制中。tcp
有一点有趣的细节,pipe fd 只有在写入数据小于 PIPE_BUF
时才会保证写操做的原子性。若是一条消息超过 PIPE_BUF
(在 Linux 上大于 4K),那么它的写入就不是原子的,可能写入前面 PIPE_BUF
以后,有另外一个 worker 也正巧给同一个进程写入消息。函数
为了不不一样 worker 进程的消息串在一块儿,ngx_lua_ipc
定义了一个 packet 概念。每一个 packet 都不会大于 PIPE_BUF
,同时有一个 header 来保证单个消息分割成多个 packet 以后可以被从新打包回来。性能
在接收端,为了能在收到消息以后执行对应的 Lua handler,ngx_lua_ipc
使用了 ngx.timer.at
来执行一个函数,这个函数会根据消息类型分发到对应的 handler 上。这样有个问题,就是消息是否能完成投递,取决于 ngx.timer.at
可否被执行。而 ngx.timer.at
是否被执行受限于两个因素:
lua_max_pending_timer
不够大,ngx.timer.at
可能没法建立 timerlua_max_running_timer
不够大,或者没有足够的资源运行 timer,ngx.timer.at
建立的 timer 可能没法运行。事实上,若是 timer 没法运行(消息没法投递),现阶段的 OpenResty 可能不会记录错误日志。我以前提过一个记录错误日志的 PR:https://github.com/openresty/...,不过一直没有合并。
因此严格意义上, ngx_lua_ipc
并不能保证消息可以被投递,也不能在消息投递失败时报错。不过这个锅得让 ngx.timer.at
来背。
ngx_lua_ipc
能不能不用 ngx.timer.at
那一套呢?这个就须要从 lua-nginx-module 里复制一大段代码,并偶尔同步一下。复制粘贴乃 Nginx C 模块开发的奥义。
上面的方法中,除了 Redis 外援法,若是不在应用代码里加日志,要想在外部查看消息投递的过程,只能依靠 gdb/systemtap/bcc 这些大招。若是走网络链接,就能使用平民技术,如 tcpdump,来追踪消息的流动。固然若是是 unix socket,还须要临时搞个 TCP proxy 整一下,不过操做难度较前面的大招们已经大大下降了。
那有没有办法让 IPC 走网络,但又不须要借助外部依赖呢?
回想起 Redis 外援法,之因此咱们不能直接走 Nginx 的网络请求,是由于 Nginx 里面每一个 worker 进程是平等的,你不知道你的请求会落到哪一个进程上,而请求 Redis 就没这个问题。那咱们能不能让不一样的 worker 进程动态监听不一样的 unix socket?
答案是确定的。咱们能够实现相似于这样的接口:
ln = ngx.socket.listen(...) sock = ln.accept() sock:read(...)
曾经有人提过相似的 PR:https://github.com/openresty/...,我本身也在公司项目里实现过差很少的东西。声明下,不要用这个方法作 IPC。上面的实现有个致命的问题,就是 ln 和后面建立的全部的 sock,都是在同一个 Nginx 请求里面的。
我曾经写过,在一个 Nginx 请求里作太多的事情,会有资源分配上的问题:https://segmentfault.com/a/11...
后面随着 IPC 的次数的增长,这种问题会愈加明显。
要想解决这个问题,咱们能够把每一个 sock 放到独立的 fake request 里面跑,就像这样:
ln = ngx.socket.listen(...) -- 相似于 ngx.timer.at 的处理风格 ln.register_handler(function(sock) sock:read(...) end)
可是还有个问题。若是用 worker id 做为被监听的 unix socket 的 ID, 因为这个 unix socket 是在 worker 进程里动态监听的,而在 Nginx reload 或 binary upgrade 的状况下,多个 worker 进程会有一样的 worker id,尝试监听一样的 unix socket,致使地址被占用的错误。解决方法就是改用 PID 做为被监听的 unix socket 的 ID,而后在首次发送时初始化 PID 到 worker id 的映射。若是有支持在 reload 时正常发送消息的需求,还要记录新旧两组 worker,好比:
1111 => old worker ID 1 1123 => new worker ID 2
还有一种更为巧妙的,借助不一样 worker 不一样 unix socket 来实现进程间通信的方法。这种方法是如此地巧妙,我只恨不是我想出来的。该方法能够淘汰掉上面动态监听 unix socket 的方案。
咱们能够在 Nginx 配置文件里面声明,listen unix:xxx.sock use_as_ipc_blah_blah
。而后修改 Nginx,让它在看到 use_as_ipc_blah_blah
差很少这样的一个标记时,让特定的进程监听特定的 unix sock,好比 xxx_1.sock
、xxx_2.sock
等。
它跟动态监听 unix socket 方法比起来,实现更为简单,因此也更为可靠。固然要想保证在 reload 或者 binary upgrade 时投递消息到正确的 worker,记得用 PID 而不是 worker id 来做为区分后缀,并维护好二者间的映射。
这个方法是由 datavisor 的同行提出来的,估计最近会开源出来。