从源码分析Node的Cluster模块

前段时间,公司的老哥遇到一个问题,大概就是本机有个node的http服务器,可是每次请求这个服务器的端口返回的数据都报错,一看返回的数据根本不是http的报文格式,而后通过一番排查发现是另一个服务器同时监听了http服务器的这个端口。这个时候老哥就很奇怪,为啥我这个端口明明使用了,却仍是能够启动呢?这个时候我根据之前看libuv源码的经验解释了这个问题,由于uv__tcp_bind中,对socket会设置SO_REUSEADDR选项,使得端口能够复用,可是tcp中地址不能复用,由于那两个监听虽然是同一个端口,可是地址不一样,因此能够同时存在。这个问题让我不由想到了以前看一篇文章里有人留言说这个选项是cluster内部复用端口的缘由,当时没有细细研究觉得说的是SO_REUSEPORT也就没有细想,可是此次由于这个问题仔细看了下结果是设置的SO_REUSEADDR选项,这个选项虽然能复用端口,可是前提是每一个ip地址不一样,好比能够同时监听'0.0.0.0'和'192.168.0.12'的端口,但不能两个都是'0.0.0.0'的同一个 端口,若是cluster是用这个来实现的,那要是多起几个子进程很明显ip地址不够用啊,因而就用node文档中的例子试了下:node

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}
复制代码

在使用cluster的在几个子进程同时监听了8000端口后,查看了一下只有主进程监听了这个端口,其余都没有。这个时候,我猜想node仍是使用在父进程中建立sever的io可是这个父进程应该就是经过Unix域套接字的cmsg_data将父进程中收到客户端套接字描述符传递给子进程而后让子进程来处理具体的数据与逻辑,可是node究竟是如何经过在子进程中createServer而且listen可是只在父进程中真的监听了该端口来实现这个逻辑的呢?这个问题引发了个人好奇,让我不得不到源码中一探究竟。编程

从net模块出发

按理说,这个问题咱们应该直接经过cluster模块来分析,可是很明显,在加载http模块的时候并不会像cluster模块启动时同样经过去判断NODE_ENV来加载不一样的模块,可是从上面的分析,我能够得出子进程中的createServer执行了跟父进程不一样的操做,因此只能说明http模块中经过isMaster这样的判断来进行了不一样的操做,不过http.js_http_server.js中都没有这个判断,可是经过对createServer向上的查找我在net.jslistenInCluster中找到了isMaster的判断,listenInCluster会在createServer后的server.listen(8000)中调用,因此咱们能够看下他的关键逻辑。服务器

if (cluster === null) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    server._listen2(address, port, addressType, backlog, fd);
    return;
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags: 0
  };

  // 获取父进程的server句柄,并监听它
  cluster._getServer(server, serverQuery, listenOnMasterHandle);
复制代码

从这段代码中咱们能够看出,若是是在父进程中,直接经过_listen2的逻辑就能开始正常的监听了,可是在子进程中,会经过cluster._getServer的方式获取父进程的句柄,并经过回调函数listenOnMasterHandle监听它。看到这里我其实比较疑惑,由于在我对于网络编程的学习中,只据说过传递描述符的,这个传递server的句柄实在是太新鲜了,因而赶忙继续深刻研究了起来。网络

深刻cluster的代码

首先,来看一下_gerServer的方法的代码。socket

const message = util._extend({
    act: 'queryServer',
    index: indexes[indexesKey],
    data: null
  }, options);
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });
复制代码

这个方法经过send像主进程发送一个包,由于在send函数中有这样一句代码:tcp

message = util._extend({ cmd: 'NODE_CLUSTER' }, message);
复制代码

经过Node的文档,咱们能够知道这种cmd带了Node字符串的包,父进程会经过internalMessage事件来响应,因此咱们能够从internal/cluster/master.js中看到找到,对应于act: 'queryServer'的处理函数queryServer的代码。函数

...
  var constructor = RoundRobinHandle;
  ...
  handle = new constructor(key, message.address,message.port,message.addressType,message.fd,message.flags);
  ...
  handle.add(worker, (errno, reply, handle) => {
    reply = util._extend({
      errno: errno,
      key: key,
      ack: message.seq,
      data: handles[key].data
    }, reply);

    if (errno)
      delete handles[key];  // Gives other workers a chance to retry.

    send(worker, reply, handle);
  });
复制代码

这里建立了一个RoundRobinHandle实例,在该实例的构造函数中经过代码:oop

this.server = net.createServer(assert.fail);

  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0)
    this.server.listen(port, address);
  else
    this.server.listen(address);  // UNIX socket path.

  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
复制代码

在父进程中生成了一个server,而且经过注册listen的方法将有心的客户端链接到达时执行的onconnection改为了使用自身的this.distribute函数,这个函数咱们先记下由于他是后来父进程给子进程派发任务的重要函数。说回getServer的代码,这里经过RoundRobinHandle实例的add方法:学习

const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }

    this.handoff(worker);  // In case there are connections pending.
  };

  // Still busy binding.
  this.server.once('listening', done);
复制代码

会给子进程的getServer以回复。从这里咱们能够看到在给子进程的回复中handle一直都是null。那这个所谓的去取得父进程的server是怎么取得的呢?这个地方让我困惑了一下,不事后来看子进程的代码我就明白了,实际上根本不存在什么取得父进程server的句柄,这个地方的注释迷惑了阅读者,从以前子进程的回调中咱们能够看到,返回的handle只是决定子进程是用shared方式仍是Round-robin的方式来处理父进程派下来的任务。从这个回调函数咱们就能够看出,子进程是没有任何获取句柄的操做的,那它是如何处理的呢?咱们经过该例子中的rr方法能够看到:ui

const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  handles[key] = handle;
  cb(0, handle);
复制代码

这个函数中生成了一个自带listen和close方法的对象,并传递给了函数listenOnMasterHandle,虽然这个名字写的是在父进程的server句柄上监听,实际上咱们这个例子中是子进程自建了一个handle,可是若是是udp的状况下这个函数名字还确实就是这么回事,缘由在于SO_REUSEADDR选项,里面有这样一个解释:

SO_REUSEADDR容许彻底相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP。
复制代码

因此,在udp状况同一个地址和端口是能够重复监听的(以前网上看到那个哥们儿说的也没问题,只是一叶障目了),因此能够共享父进程的handle,跟TCP的状况不一样。咱们继续来看当前这个TCP的状况,在这个状况下listenOnMasterHandle会将咱们在子进程中本身生成的handle对象传入子进程中经过createServer建立的server的_handle属性中并经过

server._listen2(address, port, addressType, backlog, fd);
复制代码

作了一个假的监听操做,实际上由于_handle的存在这里只会为以前_handle赋值一个onconnection函数,这个函数的触发则跟父进程中经过真实的客户端链接触发的时机不一样,而是经过

process.on('internalMessage', (message, handle) {
  if (message.act === 'newconn')
    onconnection(message, handle);
  else if (message.act === 'disconnect')
    _disconnect.call(worker, true);
}
复制代码

中注册的internalMessage事件中的对父进程传入的act为newconn的包触发。而父进程中就经过咱们刚刚说到的改写了server对象的onconnection函数的distribute函数,这个函数中会调用一个叫handoff的函数,经过代码:

const message = { act: 'newconn', key: this.key };
  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another.

    this.handoff(worker);
  });
复制代码

其中send到子进程的handle就是新链接客户端的句柄,Node中父子进程之间的通讯最后是经过src/stream_base.cc中的StreamBase::WriteString函数实现的,从这段代码咱们能够看出:

...
//当进程间通讯时
uv_handle_t* send_handle = nullptr;

if (!send_handle_obj.IsEmpty()) {
  HandleWrap* wrap;
  ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
  send_handle = wrap->GetHandle();
  // Reference LibuvStreamWrap instance to prevent it from being garbage
  // collected before `AfterWrite` is called.
  CHECK_EQ(false, req_wrap->persistent().IsEmpty());
  req_wrap_obj->Set(env->handle_string(), send_handle_obj);
}

err = DoWrite(
    req_wrap,
    &buf,
    1,
    reinterpret_cast<uv_stream_t*>(send_handle));
复制代码

能够看到,在调用此方式时,若是传入了一个客户端的句柄则经过Dowrite方法最后经过辅助数据cmsg_data将客户端句柄的套接字fd传送到子进程中进行处理。看到这里我不由恍然大悟,原来仍是走的是我熟悉的那套网络编程的逻辑啊。

总结

经过上面的一轮分析,咱们能够总结出如下两个结论:

  1. 建立TCP服务器时,会在父进程中建立一个server并监听目标端口,新链接到达后,会经过ipc的方式将新链接的句柄分配到子进程中而后处理新链接的数据和请求,因此只有主进程会监听目标ip和端口。
  2. 建立UDP服务器,会共享在父进程中建立的server的句柄对象,而且在子进程中都会监听到跟对象相同的ip地址和端口上,因此建立n个子进程则会有n+1个进程同时监听到目标ip和端口上。
相关文章
相关标签/搜索