Node.js的进程管理

众所周知Node基于V8,而在V8中JavaScript是单线程运行的,这里的单线程不是指Node启动的时候就只有一个线程,而是说运行JavaScript代码是在单线程上,Node还有其余线程,好比进行异步IO操做的IO线程。这种单线程模型带来的好处就是系统调度过程当中不会频繁进行上下文切换,提高了单核CPU的利用率。javascript

可是这种作法有个缺陷,就是咱们没法利用服务器CPU多核的性能,一个Node进程只能利用一个CPU。并且单线程模式下一旦代码崩溃就是整个程序崩溃。一般解决方案就是使用Node的cluster模块,经过master-worker模式启用多个进程实例。下面咱们详细讲述下,Node如何使用多进程模型利用多核CPU,以及自带的cluster模块具体的工做原理。html

如何建立子进程

node提供了child_process模块用来进行子进程的建立,该模块一共有四个方法用来建立子进程。java

const { spawn, exec, execFile, fork } = require('child_process')

spawn(command[, args][, options])

exec(command[, options][, callback])

execFile(file[, args][, options][, callback])

fork(modulePath[, args][, options])

spawn

首先认识一下spawn方法,下面是Node文档的官方实例。node

const { spawn } = require('child_process');
const child = spawn('ls', ['-lh', '/home']);

child.on('close', (code) => {
  console.log(`子进程退出码:${code}`);
});

const { stdin, stdout, stderr } = child

stdout.on('data', (data) => {
  console.log(`stdout: ${data}`);
});

stderr.on('data', (data) => {
  console.log(`stderr: ${data}`);
});

经过spawn建立的子进程,继承自EventEmitter,因此能够在上面进行事件(discounterrorclosemessage)的监听。同时子进程具备三个输入输出流:stdin、stdout、stderr,经过这三个流,能够实时获取子进程的输入输出和错误信息。linux

这个方法的最终实现基于libuv,这里再也不展开讨论,感兴趣能够查看源码nginx

// 调用libuv的api,初始化一个进程
int err = uv_spawn(env->event_loop(), &wrap->process_, &options);

exec/execFile

之因此把这两个放到一块儿,是由于exec最后调用的就是execFile方法,源码在这里)。惟一的区别是,exec中调用的normalizeExecArgs方法会将opts的shell属性默认设置为true。git

exports.exec = function exec(/* command , options, callback */) {
  const opts = normalizeExecArgs.apply(null, arguments);
  return exports.execFile(opts.file, opts.options, opts.callback);
};

function normalizeExecArgs(command, options, callback) {
  options = { ...options };
  options.shell = typeof options.shell === 'string' ? options.shell : true;
  return { options };
}

在execFile中,最终调用的是spawn方法。github

exports.execFile = function execFile(file /* , args, options, callback */) {
  let args = [];
  let callback;
  let options;
  var child = spawn(file, args, {
    // ... some options
  });
  
  return child;
}

exec会将spawn的输入输出流转换成String,默认使用UTF-8的编码,而后传递给回调函数,使用回调方式在node中较为熟悉,比流更容易操做,因此咱们能使用exec方法执行一些shell命令,而后在回调中获取返回值。有点须要注意,这里的buffer是有最大缓存区的,若是超出会直接被kill掉,可用经过maxBuffer属性进行配置(默认: 200*1024)。shell

const { exec } = require('child_process');
exec('ls -lh /home', (error, stdout, stderr) => {
  console.log(`stdout: ${stdout}`);
  console.log(`stderr: ${stderr}`);
});

fork

fork最后也是调用spawn来建立子进程,可是fork是spawn的一种特殊状况,用于衍生新的 Node.js 进程,会产生一个新的V8实例,因此执行fork方法时须要指定一个js文件。数据库

exports.fork = function fork(modulePath /* , args, options */) {
  // ...
  
  options.shell = false;

  return spawn(options.execPath, args, options);
};

经过fork建立子进程以后,父子进程直接会建立一个IPC(进程间通讯)通道,方便父子进程直接通讯,在js层使用 process.send(message)process.on('message', msg => {}) 进行通讯。而在底层,实现进程间通讯的方式有不少,Node的进程间通讯基于libuv实现,不一样操做系统实现方式不一致。在*unix系统中采用Unix Domain Socket方式实现,Windows中使用命名管道的方式实现。

常见进程间通讯方式:消息队列、共享内存、pipe、信号量、套接字

下面是一个父子进程通讯的实例。

parent.js

const path = require('path')
const { fork } = require('child_process')

const child = fork(path.join(__dirname, 'child.js'))

child.on('message', msg => {
    console.log('message from child', msg)
});

child.send('hello child, I\'m master')

child.js

process.on('message', msg => {
  console.log('message from master:', msg)
});
let counter = 0
setInterval(() => {
  process.send({
    child: true,
    counter: counter++
  })
}, 1000);

图片描述

小结

其实能够看到,这些方法都是对spawn方法的复用,而后spawn方法底层调用了libuv进行进程的管理,具体能够看下图。

图片描述

利用fork实现master-worker模型

首先来看看,若是咱们在child.js中启动一个http服务会发生什么状况。

// master.js
const { fork } = require('child_process')

for (let i = 0; i < 2; i++) {
  const child = fork('./child.js')
}

// child.js
const http = require('http')
http.createServer((req, res) => {
  res.end('Hello World\n');
}).listen(8000)

图片描述

+--------------+
              |              |
              |    master    |
              |              |
     +--------+--------------+- -- -- -
     |                                 |
     |                          Error: listen EADDRINUSE
     |                                 |
     |
+----v----+                      +-----v---+
|         |                      |         |
| worker1 |                      | worker2 |
|         |                      |         |
+---------+                      +---------+
   :8000                            :8000

咱们fork了两个子进程,由于两个子进程同时对一个端口进行监听,Node会直接抛出一个异常(Error: listen EADDRINUSE),如上图所示。那么咱们能不能使用代理模式,同时监听多个端口,让master进程监听80端口收到请求时,再将请求分发给不一样服务,并且master进程还能作适当的负载均衡。

+--------------+
              |              |
              |    master    |
              |     :80     |
     +--------+--------------+---------+
     |                                 |
     |                                 |
     |                                 |
     |                                 |
+----v----+                      +-----v---+
|         |                      |         |
| worker1 |                      | worker2 |
|         |                      |         |
+---------+                      +---------+
   :8000                            :8001

可是这么作又会带来另外一个问题,代理模式中十分消耗文件描述符(linux系统默认的最大文件描述符限制是1024),文件描述符在windows系统中称为句柄(handle),习惯性的咱们也能够称linux中的文件描述符为句柄。当用户进行访问,首先链接到master进程,会消耗一个句柄,而后master进程再代理到worker进程又会消耗掉一个句柄,因此这种作法十分浪费系统资源。为了解决这个问题,Node的进程间通讯能够发送句柄,节省系统资源。

句柄是一种特殊的智能指针 。当一个应用程序要引用其余系统(如数据库、操做系统)所管理的内存块或对象时,就要使用句柄。

咱们能够在master进程启动一个tcp服务,而后经过IPC将服务的句柄发送给子进程,子进程再对服务的链接事件进行监听,具体代码以下:

// master.js
var { fork } = require('child_process')
var server = require('net').createServer()
server.on('connection', function(socket) {
  socket.end('handled by master') // 响应来自master
})
server.listen(3000, function() {
  console.log('master listening on: ', 3000)
})
for (var i = 0; i < 2; i++) {
  var child = fork('./child.js')
  child.send('server', server) // 发送句柄给worker
  console.log('worker create, pid is ', child.pid)
}

// child.js
process.on('message', function (msg, handler) {
  if (msg !== 'server') {
    return
  }
  // 获取到句柄后,进行请求的监听
  handler.on('connection', function(socket) {
    socket.end('handled by worker, pid is ' + process.pid)  
  })
})

启动服务

下面咱们经过curl连续请求 5 次服务。

for varible1 in {1..5}
do
  curl "localhost:3000"
done

请求服务

能够看到,响应请求的能够是父进程,也能够是不一样子进程,多个进程对同一个服务响应的链接事件监听,谁先抢占,就由谁进行响应。这里就会出现一个Linux网络编程中很常见的事件,当多个进程同时监听网络的链接事件,当这个有新的链接到达时,这些进程被同时唤醒,这被称为“惊群”。这样致使的状况就是,一旦事件到达,每一个进程同时去响应这一个事件,而最终只有一个进程能处理事件成功,其余的进程在处理该事件失败后从新休眠,形成了系统资源的浪费。

图片描述

ps:在windows系统上,永远都是最后定义的子进程抢占到句柄,这可能和libuv的实现机制有关,具体缘由往有大佬可以指点。

图片描述

出现这样的问题确定是你们都不肯意的嘛,这个时候咱们就想起了nginx的好了,这里有篇文章讲解了nginx是如何解决“惊群”的,利用nginx的反向代理能够有效地解决这个问题,毕竟nginx原本就很擅长这种问题。

http { 
  upstream node { 
      server 127.0.0.1:8000; 
      server 127.0.0.1:8001; 
      server 127.0.0.1:8002; 
      server 127.0.0.1:8003;
      keepalive 64;
  } 
  server { 
       listen 80; 
       server_name shenfq.com; 
       location / { 
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header Host $http_host;
            proxy_set_header X-Nginx-Proxy true;
            proxy_set_header Connection "";
            proxy_pass http://node; # 这里要和最上面upstream后的应用名一致,能够自定义
       } 
  }
}

小结

若是咱们本身用Node原生来实现一个多进程模型,存在这样或者那样的问题,虽然最终咱们借助了nginx达到了这个目的,可是使用nginx的话,咱们须要另外维护一套nginx的配置,并且若是有一个Node服务挂了,nginx并不知道,仍是会将请求转发到那个端口。

cluster模块

除了用nginx作反向代理,node自己也提供了一个cluster模块,用于多核CPU环境下多进程的负载均衡。cluster模块建立子进程本质上是经过child_procee.fork,利用该模块能够很容易的建立共享同一端口的子进程服务器。

上手指南

有了这个模块,你会感受实现Node的单机集群是多么容易的一件事情。下面看看官方实例,短短的十几行代码就实现了一个多进程的Node服务,且自带负载均衡。

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) { // 判断是否为主进程
  console.log(`主进程 ${process.pid} 正在运行`);

  // 衍生工做进程。
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`工做进程 ${worker.process.pid} 已退出`);
  });
} else { // 子进程进行服务器建立
  // 工做进程能够共享任何 TCP 链接。
  // 在本例子中,共享的是一个 HTTP 服务器。
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`工做进程 ${process.pid} 已启动`);
}

图片描述

cluster模块源码分析

首先看代码,经过isMaster来判断是否为主进程,若是是主进程进行fork操做,子进程建立服务器。这里cluster进行fork操做时,执行的是当前文件。cluster.fork最终调用的child_process.fork,且第一个参数为process.argv.slice(2),在fork子进程以后,会对其internalMessage事件进行监听,这个后面会提到,具体代码以下:

const { fork } = require('child_process');

cluster.fork = function(env) {
  cluster.setupMaster();
  const id = ++ids;
  const workerProcess = createWorkerProcess(id, env);
  const worker = new Worker({
    id: id,
    process: workerProcess
  });
  
  // 监听子进程的消息
  worker.process.on('internalMessage', internal(worker, onmessage));
  // ...
};
// 配置master进程
cluster.setupMaster = function(options) {
  cluster.settings = {
    args: process.argv.slice(2),
    exec: process.argv[1],
    execArgv: process.execArgv,
    silent: false,
    ...cluster.settings,
    ...options
  };
};

// 建立子进程
function createWorkerProcess(id, env) {
  return fork(cluster.settings.exec, cluster.settings.args, {
    // some options
  });
}

子进程端口监听问题

这里会有一个问题,子进程所有都在监听同一个端口,咱们以前已经试验过,服务监听同一个端口会出现端口占用的问题,那么cluster模块如何保证端口不冲突的呢? 查阅源码发现,http模块的createServer继承自net模块。

util.inherits(Server, net.Server);

而在net模块中,listen方法会调用listenInCluster方法,listenInCluster判断当前是否为master进程。

lib/net.js

Server.prototype.listen = function(...args) {

  // ...
  if (typeof options.port === 'number' || typeof options.port === 'string') {
    // 若是listen方法只传入了端口号,最后会走到这里
    listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive);
    return this;
  }
  // ...
};

function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isMaster) {
    // 若是是主进程则启动一个服务
    // 可是主进程没有调用过listen方法,因此没有走这里一步
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }
  
  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };
 
  // 子进程获取主进程服务的句柄
  cluster._getServer(server, serverQuery, listenOnMasterHandle);
  
  function listenOnMasterHandle(err, handle) {
    server._handle = handle; // 重写handle,对listen方法进行了hack
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

看上面代码能够知道,真正启动服务的方法为server._listen2。在_listen2方法中,最终调用的是_handle下的listen方法。

function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  // ...
  this._handle.onconnection = onconnection;
  var err = this._handle.listen(backlog || 511);
  // ...
}

Server.prototype._listen2 = setupListenHandle;  // legacy alias

那么cluster._getServer方法到底作了什么呢?

搜寻它的源码,首先向master进程发送了一个消息,消息类型为queryServer

// child.js
cluster._getServer = function(obj, options, cb) {
  // ...
  
  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };
  
  // 发送消息到master进程,消息类型为 queryServer
  send(message, (reply, handle) => {
    rr(reply, indexesKey, cb);              // Round-robin.
  });
  // ...
};

这里的rr方法,对前面提到的_handle.listen进行了hack,全部子进程的listen实际上是不起做用的。

function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  var key = message.key;

  function listen(backlog) { // listen方法直接返回0,再也不进行端口监听
    return 0;
  }

  function close() {
    send({ act: 'close', key });
  }

  function getsockname(out) {
    return 0;
  }
  
  const handle = { close, listen, ref: noop, unref: noop };
  
  handles.set(key, handle); // 根据key将工做进程的 handle 进行缓存
  cb(0, handle);
}

// 这里的cb回调就是前面_getServer方法传入的。 参考以前net模块的listen方法
function listenOnMasterHandle(err, handle) {
  server._handle = handle; // 重写handle,对listen方法进行了hack
  // 该方法调用后,会对handle绑定一个 onconnection 方法,最后会进行调用
  server._listen2(address, port, addressType, backlog, fd, flags);
}

主进程与子进程通讯

那么到底在哪里对端口进行了监听呢?

前面提到过,fork子进程的时候,对子进程进行了internalMessage事件的监听。

worker.process.on('internalMessage', internal(worker, onmessage));

子进程向master进程发送消息,通常使用process.send方法,会被监听的message事件所接收。这里是由于发送的message指定了cmd: 'NODE_CLUSTER',只要cmd字段以NODE_开头,这样消息就会认为是内部通讯,被internalMessage事件所接收。

// child.js
function send(message, cb) {
  return sendHelper(process, message, null, cb);
}

// utils.js
function sendHelper(proc, message, handle, cb) {
  if (!proc.connected)
    return false;

  // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
  message = { cmd: 'NODE_CLUSTER', ...message, seq };

  if (typeof cb === 'function')
    callbacks.set(seq, cb);

  seq += 1;
  return proc.send(message, handle);
}

master进程接收到消息后,根据act的类型开始执行不一样的方法,这里act为queryServer。queryServer方法会构造一个key,若是这个key(规则主要为地址+端口+文件描述符)以前不存在,则对RoundRobinHandle构造函数进行了实例化,RoundRobinHandle构造函数中启动了一个TCP服务,并对以前指定的端口进行了监听。

// master.js
const handles = new Map();

function onmessage(message, handle) {
  const worker = this;
  if (message.act === 'online')
    online(worker);
  else if (message.act === 'queryServer')
    queryServer(worker, message);
  // other act logic
}
function queryServer(worker, message) {
  // ...
  const key = `${message.address}:${message.port}:${message.addressType}:` +
              `${message.fd}:${message.index}`;
  var handle = handles.get(key);
  // 若是以前没有对该key进行实例化,则进行实例化
  if (handle === undefined) {
    let address = message.address;
    // const RoundRobinHandle = require('internal/cluster/round_robin_handle');
    var constructor = RoundRobinHandle;

    handle = new constructor(key,
                             address,
                             message.port,
                             message.addressType,
                             message.fd,
                             message.flags);
    handles.set(key, handle);
  }
  // ...
}

// internal/cluster/round_robin_handle
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
  this.server = net.createServer(assert.fail);
  // 这里启动一个TCP服务器
  this.server.listen({ port, host });
  
  // TCP服务器启动时的事件
  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
  });
  // ...
}

能够看到TCP服务启动后,立马对connection事件进行了监听,会调用RoundRobinHandle的distribute方法。

// RoundRobinHandle
this.handle.onconnection = (err, handle) => this.distribute(err, handle);

// distribute 对工做进程进行分发
RoundRobinHandle.prototype.distribute = function(err, handle) {
  this.handles.push(handle); // 存入TCP服务的句柄
  const worker = this.free.shift(); // 取出第一个工做进程

  if (worker)
    this.handoff(worker); // 切换到工做进程
};

RoundRobinHandle.prototype.handoff = function(worker) {
  const handle = this.handles.shift(); // 获取TCP服务句柄
  
  if (handle === undefined) {
    this.free.push(worker);  // 将该工做进程从新放入队列中
    return;
  }
  
  const message = { act: 'newconn', key: this.key };

  // 向工做进程发送一个类型为 newconn 的消息以及TCP服务的句柄
  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // 工做进程不能正常运行,启动下一个

    this.handoff(worker);
  });
};

在子进程中也有对内部消息进行监听,在cluster/child.js中,有个cluster._setupWorker方法,该方法会对内部消息监听,该方法的在lib/internal/bootstrap/node.js中调用,这个文件是每次启动node命令后,由C++模块调用的。

连接

function startup() {
  // ...
  startExecution();
}
function startExecution() {
  // ...
  prepareUserCodeExecution();
}
function prepareUserCodeExecution() {
  if (process.argv[1] && process.env.NODE_UNIQUE_ID) {
    const cluster = NativeModule.require('cluster');
    cluster._setupWorker();
    delete process.env.NODE_UNIQUE_ID;
  }
}

startup()

下面看看_setupWorker方法作了什么。

cluster._setupWorker = function() {
  // ...
  process.on('internalMessage', internal(worker, onmessage));

  function onmessage(message, handle) {
    // 若是act为 newconn 调用onconnection方法
    if (message.act === 'newconn')
      onconnection(message, handle);
    else if (message.act === 'disconnect')
      _disconnect.call(worker, true);
  }
};

function onconnection(message, handle) {
  const key = message.key;
  const server = handles.get(key);
  const accepted = server !== undefined;

  send({ ack: message.seq, accepted });

  if (accepted)
    server.onconnection(0, handle); // 调用net中的onconnection方法
}

最后子进程获取到客户端句柄后,调用net模块的onconnection,对Socket进行实例化,后面就与其余http请求的逻辑一致了,再也不细讲。

至此,cluster模块的逻辑就走通了。

参考连接

相关文章
相关标签/搜索