在 OpenResty 里实现进程间通信

在 Nginx 里面,每一个 worker 进程都是平等的。可是有些时候,咱们须要给它们分配不一样的角色,这时候就须要实现进程间通信的功能。nginx

轮询

一种简单粗暴但却被广泛使用的方案,就是每一个进程划分属于本身的 list 类型的 shdict key,每隔一段时间查看是否有新消息。这种方式优势在于实现简单,缺点在于难以保证明时性。固然对于绝大多数须要进程间通信的场景,每 0.1 起一个 timer 来处理新增消息已经足够了。毕竟 0.1 秒的延迟不算长,每秒起 10 个 timer 开销也不大,应付通常的通讯量绰绰有余。git

redis外援

要是你以为轮询很搓,或者在你的环境下,轮询确实很搓,也能够考虑下引入外部依赖来改善实时性。好比在本地起一个 redis,监听 unix socket,而后每一个进程经过 Pub/Sub 或者 stream 类型发布/获取最新的消息。这种方案实现起来也简单,实时性和性能也足够好,只是须要引入个 redis 服务。github

ngx_lua_ipc

若是你是个极简主义者,对引入外部依赖深恶痛绝,但愿什么东西都能在 Nginx 里面实现的话,ngx_lua_ipc 是一个被普遍使用的选择。redis

ngx_lua_ipc 是一个第三方 Nginx C 模块,提供了一些 Lua API,可供在 OpenResty 代码里完成进程间通信(IPC)的操做。segmentfault

它会在 Nginx 的 init 阶段建立 worker process + helper process 对 pipe fd。每对 fd 有一个做为 read fd,负责接收数据,另外一个做为 write fd,用于发送数据。当 Nginx 建立 worker 进程时,每一个 worker 进程都会继承这些 pipe fd,因而就能经过它们来实现进程间通信。感兴趣的读者能够 man 7 pipe 一下,了解基于 pipe 的进程间通信是怎么实现的。网络

固然 ngx_lua_ipc 还须要把 pipe 的 read fd 经过 ngx_connection_t 接入到 Nginx 的事件循环机制中,具体实现位于 ipc_channel_setup_connsocket

c = ngx_get_connection(chan->pipe[conn_type == IPC_CONN_READ ? 0 : 1], cycle->log);
  c->data = data;

  if(conn_type == IPC_CONN_READ) {
    c->read->handler = event_handler;
    c->read->log = cycle->log;
    c->write->handler = NULL;
    ngx_add_event(c->read, NGX_READ_EVENT, 0);
    chan->read_conn=c;
  }
  else if(conn_type == IPC_CONN_WRITE) {
    c->read->handler = NULL;
    c->write->log = cycle->log;
    c->write->handler = ipc_write_handler;
    chan->write_conn=c;
  }
  else {
    return NGX_ERROR;
  }
  return NGX_OK;

write fd 是由 Lua 代码操做的,因此不须要加入到 Nginx 的事件循环机制中。tcp

有一点有趣的细节,pipe fd 只有在写入数据小于 PIPE_BUF 时才会保证写操做的原子性。若是一条消息超过 PIPE_BUF(在 Linux 上大于 4K),那么它的写入就不是原子的,可能写入前面 PIPE_BUF 以后,有另外一个 worker 也正巧给同一个进程写入消息。函数

为了不不一样 worker 进程的消息串在一块儿,ngx_lua_ipc 定义了一个 packet 概念。每一个 packet 都不会大于 PIPE_BUF,同时有一个 header 来保证单个消息分割成多个 packet 以后可以被从新打包回来。性能

在接收端,为了能在收到消息以后执行对应的 Lua handler,ngx_lua_ipc 使用了 ngx.timer.at 来执行一个函数,这个函数会根据消息类型分发到对应的 handler 上。这样有个问题,就是消息是否能完成投递,取决于 ngx.timer.at 可否被执行。而 ngx.timer.at 是否被执行受限于两个因素:

  1. 若是 lua_max_pending_timer 不够大,ngx.timer.at 可能没法建立 timer
  2. 若是 lua_max_running_timer 不够大,或者没有足够的资源运行 timer,ngx.timer.at 建立的 timer 可能没法运行。

事实上,若是 timer 没法运行(消息没法投递),现阶段的 OpenResty 可能不会记录错误日志。我以前提过一个记录错误日志的 PR:https://github.com/openresty/...,不过一直没有合并。

因此严格意义上, ngx_lua_ipc 并不能保证消息可以被投递,也不能在消息投递失败时报错。不过这个锅得让 ngx.timer.at 来背。

ngx_lua_ipc 能不能不用 ngx.timer.at 那一套呢?这个就须要从 lua-nginx-module 里复制一大段代码,并偶尔同步一下。复制粘贴乃 Nginx C 模块开发的奥义。

动态监听 unix socket

上面的方法中,除了 Redis 外援法,若是不在应用代码里加日志,要想在外部查看消息投递的过程,只能依靠 gdb/systemtap/bcc 这些大招。若是走网络链接,就能使用平民技术,如 tcpdump,来追踪消息的流动。固然若是是 unix socket,还须要临时搞个 TCP proxy 整一下,不过操做难度较前面的大招们已经大大下降了。

那有没有办法让 IPC 走网络,但又不须要借助外部依赖呢?

回想起 Redis 外援法,之因此咱们不能直接走 Nginx 的网络请求,是由于 Nginx 里面每一个 worker 进程是平等的,你不知道你的请求会落到哪一个进程上,而请求 Redis 就没这个问题。那咱们能不能让不一样的 worker 进程动态监听不一样的 unix socket?

答案是确定的。咱们能够实现相似于这样的接口:

ln = ngx.socket.listen(...)
sock = ln.accept()
sock:read(...)

曾经有人提过相似的 PR:https://github.com/openresty/...,我本身也在公司项目里实现过差很少的东西。声明下,不要用这个方法作 IPC。上面的实现有个致命的问题,就是 ln 和后面建立的全部的 sock,都是在同一个 Nginx 请求里面的。

我曾经写过,在一个 Nginx 请求里作太多的事情,会有资源分配上的问题:https://segmentfault.com/a/11...
后面随着 IPC 的次数的增长,这种问题会愈加明显。

要想解决这个问题,咱们能够把每一个 sock 放到独立的 fake request 里面跑,就像这样:

ln = ngx.socket.listen(...)
-- 相似于 ngx.timer.at 的处理风格
ln.register_handler(function(sock)
    sock:read(...)
end)

可是还有个问题。若是用 worker id 做为被监听的 unix socket 的 ID, 因为这个 unix socket 是在 worker 进程里动态监听的,而在 Nginx reload 或 binary upgrade 的状况下,多个 worker 进程会有一样的 worker id,尝试监听一样的 unix socket,致使地址被占用的错误。解决方法就是改用 PID 做为被监听的 unix socket 的 ID,而后在首次发送时初始化 PID 到 worker id 的映射。若是有支持在 reload 时正常发送消息的需求,还要记录新旧两组 worker,好比:

1111 => old worker ID 1
1123 => new worker ID 2

每一个 worker 分配不一样的 unix socket

还有一种更为巧妙的,借助不一样 worker 不一样 unix socket 来实现进程间通信的方法。这种方法是如此地巧妙,我只恨不是我想出来的。该方法能够淘汰掉上面动态监听 unix socket 的方案。

咱们能够在 Nginx 配置文件里面声明,listen unix:xxx.sock use_as_ipc_blah_blah。而后修改 Nginx,让它在看到 use_as_ipc_blah_blah 差很少这样的一个标记时,让特定的进程监听特定的 unix sock,好比 xxx_1.sockxxx_2.sock 等。

它跟动态监听 unix socket 方法比起来,实现更为简单,因此也更为可靠。固然要想保证在 reload 或者 binary upgrade 时投递消息到正确的 worker,记得用 PID 而不是 worker id 来做为区分后缀,并维护好二者间的映射。

这个方法是由 datavisor 的同行提出来的,估计最近会开源出来。

相关文章
相关标签/搜索