咱们知道,js是单线程的,意味着一个nodejs进程只能运行在单个cpu上面。nodejs在io处理方面是很是优秀的,可是在密集运算型应用中仍然有不足之处,而解决的办法,就是利用多核cpu的并发优点,将nodejs进程运行在各个cpu上面。而egg为咱们提供了egg-cluster模块,用于多进程管理以及进程间通讯。html
借用官网的文档,cluster是什么呢? 简单的说:node
其中:git
ps: 由于官网讲得很详细,因此这一部分都是借鉴官网的。github
下面,咱们经过文档中的图,来看看多进程模型windows
有些工做其实不须要每一个 Worker 都去作,若是都作,一来是浪费资源,更重要的是可能会致使多进程间资源访问冲突api
Agent
在大部分状况下,咱们在写业务代码的时候彻底不用考虑 Agent 进程的存在,可是当咱们遇到一些场景,只想让代码运行在一个进程上的时候,Agent 进程就到了发挥做用的时候了。 因为 Agent 只有一个,并且会负责许多维持链接的脏活累活,所以它不能轻易挂掉和重启,因此 Agent 进程在监听到未捕获异常时不会退出,可是会打印出错误日志,咱们须要对日志中的未捕获异常提升警戒。bash
Q: 当fork进程时,明明代码中已经监听了一个端口,为何fork时没有报端口占用?
A: cluster的工做原理推荐这一篇文章《经过源码解析 Node.js 中 cluster 模块的主要功能实现》, 结合朴灵老师的《深刻浅出nodejs》中的多进程架构,这里作一下总结:服务器
// parent.js
var n = require('child_process').fork(__dirname + 'child.js')
n.on('message', function(m){
console.log('parent get msg:'+ m)
})
n.send({hello: 'world'})
// child.js
process.on('message', function(m){
console.log('child get msg' + m)
})
process.send({hello: 'world'})
复制代码
child.send(params, [sendHandle])
复制代码
详细用法:架构
// parent.js
var n = require('child_process').fork(__dirname + 'child.js')
var server = require('net').createServer();
server.listen(8080, function(){
n.send('server', server)
})
// child.js
process.on('message', function(m, server){
server.on('connection', function(socket){
console.log('child get msg' + m)
socket.end('handle by child process')
})
})
复制代码
经过传递 TCP server,咱们能够发现,没有异常了,多个子进程能够监听同个端口 。 在node句柄发送的过程当中,多个进程能够监听到相同的端口,而不引发EADDRINUSE异常,这是由于,咱们独立启动的进程中,tcp服务端套接字socket的文件描述符并不相同,致使监听相同的端口时会抛出异常,因为独立启动的进程互相之间并不知道文件描述符,因此监听相同端口就会失败,但对于send()发送的句柄还原出来的服务而言,他们的文件描述符是相同的,因此监听相同端口不会引发异常。
并发
总结下来,一句话:经过进程间ipc通讯传递句柄从而共享文件描述符
egg-cluster/master.js中承担了初始化,启动agent和app进程,检测状态等工做,只详细讲解一下master.js中作了什么?咱们看一下构造函数中的代码,整个流程在constructor中已经很好的提现出来了。
constructor(options) {
super();
this.options = parseOptions(options);
// new 一个 Messenger 实例
this.messenger = new Messenger(this);
// 借用 ready 模块的方法
ready.mixin(this);
this.isProduction = isProduction();
this.isDebug = isDebug();
...
...
// 根据不一样运行环境(local、test、prod)设置日志输出级别
this.logger = new ConsoleLogger({ level: process.env.EGG_MASTER_LOGGER_LEVEL || 'INFO' });
...
}
// master 启动成功后通知 parent、app worker、agent
this.ready(() => {
this.isStarted = true;
const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
this.logger.info('[master] %s started on %s://127.0.0.1:%s (%sms)%s',
frameworkPkg.name, this.options.https ? 'https' : 'http',
this.options.port, Date.now() - startTime, stickyMsg);
const action = 'egg-ready';
this.messenger.send({ action, to: 'parent' });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });
});
// 监听 agent 退出
this.on('agent-exit', this.onAgentExit.bind(this));
// 监听 agent 启动
this.on('agent-start', this.onAgentStart.bind(this));
// 监听 app worker 退出
this.on('app-exit', this.onAppExit.bind(this));
// 监听 app worker 启动
this.on('app-start', this.onAppStart.bind(this));
// 开发环境下监听 app worker 重启
this.on('reload-worker', this.onReload.bind(this));
// 监听 agent 启动,注意这里只执行一次
this.once('agent-start', this.forkAppWorkers.bind(this));
// master监听自身的退出及退出后的处理
// kill(2) Ctrl-C 监听 SIGINT 信号
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\ 监听 SIGQUIT 信号
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default 监听 SIGTERM 信号
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));
// 监听 exit 事件
process.once('exit', this.onExit.bind(this));
// 监听端口冲突
detectPort((err, port) => {
/* istanbul ignore if */
if (err) {
err.name = 'ClusterPortConflictError';
err.message = '[master] try get free port error, ' + err.message;
this.logger.error(err);
process.exit(1);
return;
}
this.options.clusterPort = port;
this.forkAgentWorker(); // 若是端口没有冲突则执行该方法
});
复制代码
master继承eventEmitter,使用发布订阅模式监听消息。 构造函数中的流程以下:
detect-port
来获取空闲的端口forkAgentWorker
使用child_process.fork()来启动agent进程,启动后经过 process.send
通知master agent已经启动agent.ready(() => {
agent.removeListener('error', startErrorHandler);
process.send({ action: 'agent-start', to: 'master' });
});
复制代码
forkAppWorkers
: agent启动后,经过 cluster.fork()
启动app_worker进程。// fork app workers after agent started
this.once('agent-start', this.forkAppWorkers.bind(this));
复制代码
这里是使用了cfork
模块,其本质也是cluster.fork()
,默认启动进程数为os.cpu.length
,也能够经过启动参数来指定worker进程的数量。
cfork({
exec: this.getAppWorkerFile(),
args,
silent: false,
count: this.options.workers,
// don't refork in local env refork: this.isProduction, windowsHide: process.platform === 'win32', }); 复制代码
启动成功后,经过messenger告知master,worker进程已经ready
this.messenger.send({
action: 'app-start',
data: {
workerPid: worker.process.pid,
address,
},
to: 'master',
from: 'app',
});
复制代码
onAppStart
: app worker 启动成功后通知 agent。并告知parent,egg-ready了,并带上port,address,protocol
等参数this.ready(() => {
this.isStarted = true;
const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
this.logger.info('[master] %s started on %s (%sms)%s',
frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);
const action = 'egg-ready';
this.messenger.send({
action,
to: 'parent',
data: {
port: this[REAL_PORT],
address: this[APP_ADDRESS],
protocol: this[PROTOCOL],
},
});
this.messenger.send({
action,
to: 'app',
data: this.options,
});
this.messenger.send({
action,
to: 'agent',
data: this.options,
});
// start check agent and worker status
if (this.isProduction) {
this.workerManager.startCheck();
}
});
复制代码
startCheck
: 若是在生产环境,,每隔10s检测agent和worker,若有异常则上报。// check agent and worker must both alive
// if exception appear 3 times, emit an exception event
startCheck() {
this.exception = 0;
this.timer = setInterval(() => {
const count = this.count();
if (count.agent && count.worker) {
this.exception = 0;
return;
}
this.exception++;
if (this.exception >= 3) {
this.emit('exception', count);
clearInterval(this.timer);
}
}, 10000);
}
复制代码
egg文档上的流程图很好的总结了以上过程:
IPC的全称是Inter-Process Communication,即进程间通讯。进程间通讯的目的是为了让不一样的进程可以互相访问资源,并进程协调工做。实现进程间通讯的技术有不少,如命名管道、匿名管道、socket、信号量、共享内存、消息队列、Domain Socket等,node中实现IPC通道的是管道技术(pipe)。在node中管道是个抽象层面的称呼,具体细节实现由libuv提供,在win下是命名管道(named pipe)实现,在*nix下,采用unix Domain Socket来实现。
Q:那么,进程间是如何经过ipc通道去连接的呢?
父进程在实际建立子进程前,会建立IPC通道并监听它,而后才真正建立出子进程,并经过环境变量(NODE_CHANNEL_FD)告诉子进程这个IPC通讯的文件描述符。子进程在启动的过程当中,根据文件描述符去链接这个已存在的IPC通道,从而完成父子进程之间的链接。
cluster 的 IPC 通道只存在于 Master 和 Worker/Agent 之间,Worker 与 Agent 进程互相间是没有的。那么 Worker 之间想通信该怎么办呢?是的,经过 Master 来转发
show the code
/**
* master messenger,provide communication between parent, master, agent and app.
* ┌────────┐
* │ parent │
* /└────────┘\
* / | \
* / ┌────────┐ \
* / │ master │ \
* / └────────┘ \
* / / \ \
* ┌───────┐ ┌───────┐
* │ agent │ ------- │ app │
* └───────┘ └───────┘
*/
send(data) {
if (!data.from) {
data.from = 'master';
}
// recognise receiverPid is to who
if (data.receiverPid) {
if (data.receiverPid === String(process.pid)) {
data.to = 'master';
} else if (data.receiverPid === String(this.master.agentWorker.pid)) {
data.to = 'agent';
} else {
data.to = 'app';
}
}
// default from -> to rules
if (!data.to) {
if (data.from === 'agent') data.to = 'app';
if (data.from === 'app') data.to = 'agent';
if (data.from === 'parent') data.to = 'master';
}
// app -> master
// agent -> master
if (data.to === 'master') {
debug('%s -> master, data: %j', data.from, data);
// app/agent to master
this.sendToMaster(data);
return;
}
// master -> parent
// app -> parent
// agent -> parent
if (data.to === 'parent') {
debug('%s -> parent, data: %j', data.from, data);
this.sendToParent(data);
return;
}
// parent -> master -> app
// agent -> master -> app
if (data.to === 'app') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAppWorker(data);
return;
}
// parent -> master -> agent
// app -> master -> agent,可能不指定 to
if (data.to === 'agent') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAgentWorker(data);
return;
}
}
复制代码
master.emit(data.action, data.data)
(master 继承自 EventEmitter)process.send(data)
sendmessage(worker, data)
sendmessage(this.master.agentWorker, data)
注: [sendmessage]是一个苏千大大写的一个用于处理进程间通讯的module(Send a cross process message if message channel is connected.),感兴趣的同窗能够看看源码https://github.com/node-modules/sendmessage/blob/master/index.js
可能有同窗会想,为何多了一个parent?
原来,parent就是master进程的parent进程,通常是CLI,好比egg-script start和egg-bin,egg-script中经过('child_process').spawn
建立的master进程。child_process.spawn()
方法使用给定的 command 衍生一个新进程,并带上 args 中的命令行参数。同时,经过传递detached
参数,可使得在父进程退出后子进程继续执行。
spawn文档传送门
(感谢@天猪 的解答,源码连接)