Node 官方提供了 cluster 模块来提供多进程的解决方案,以尽量提高服务器资源使用效率。javascript
总体而言,在这个问题域里,要解决的子问题有三块html
Egg 做为企业级框架,也针对这些问题,提供了 egg-cluster 模块来作了些加强java
为何先讲通讯?启动流程要用嘛。node
能够看到,主要的实体包括了 Master、Agent、Worker三个, Master、Agent、Worker,其实三者更多完成的是通讯的执行工做,真正提供通讯管理能力的是 Manager 和 Messenger。git
在看这俩模块以前,能够先看些基础知识github
Messenger 是一个消息发送器,负责:接收消息 -> 定向转发。npm
那为何要单独搞这个模块?api
它包含两个部分:bash
以一个 worker 启动的例子为例。服务器
首先,信息收集,使用的是订阅/通知模式,是以 master 显式调用 messenger 来处理的。
// 在 cluster 启动完毕后,会告知父进程启动成功
const action = 'egg-ready';
this.messenger.send({
action,
to: 'parent',
data: {
port: this[REAL_PORT],
address: this[APP_ADDRESS],
protocol: this[PROTOCOL],
},
});
复制代码
其次是经过 send 作定向转发,包括两部分
// 路由识别
if (data.to === 'parent') {
this.sendToParent(data);
return;
}
// 调用指定方法
sendToParent(data) {
if (!this.hasParent) {
return;
}
process.send(data);
}
复制代码
更多信息能够,查看这篇文档: Messenger 模块
Manager 模块比较简单,主要是针对Agent、Worker 提供管理操做。值得一提的是,它的存活检查代码
// agent.status的修改操做在master的onAgentStart中完成
count() {
return {
agent: (this.agent && this.agent.status === 'started') ? 1 : 0,
worker: this.listWorkerIds().length,
};
}
startCheck() {
this.exception = 0;
// 每10秒检查一次
this.timer = setInterval(() => {
const count = this.count();
if (count.agent && count.worker) {
this.exception = 0;
return;
}
// 若是agent和worker都不符合要求,超过3次就触发exception,master那边收到消息后会退出
this.exception++;
if (this.exception >= 3) {
this.emit('exception', count);
clearInterval(this.timer);
}
}, 10000);
}
复制代码
详见文档:Manager
先从启动流程入手,来看看 npm run dev
这个命令到底发生了什么。
它实际上是执行了 egg-bin 的 lib/cmd/dev.js 文件的 run 方法
// lib/cmd/dev.js 文件
constructor(rawArgv) {
// 省略其余初始化代码
this.serverBin = path.join(__dirname, '../start-cluster');
}
* run(context) {
// 省略参数格式化过程
yield this.helper.forkNode(this.serverBin, devArgs, options);
}
// start-cluster.js文件,执行框架的 startCluster
require(options.framework).startCluster(options);
复制代码
若是框架是egg,那最后就会执行 egg 的这段代码
exports.startCluster = require('egg-cluster').startCluster;
复制代码
所以最终是执行了,egg-cluster 模块的 index.js
exports.startCluster = function(options, callback) {
new Master(options).ready(callback);
};
复制代码
以后的流程不难,可是内容很是细碎,能够去看 启动和退出分析,主要是介绍如何实现下面的流程的
+---------+ +---------+ +---------+
| Master | | Agent | | Worker |
+---------+ +----+----+ +----+----+
| fork agent | |
+-------------------->| |
| agent ready | |
|<--------------------+ |
| | fork worker |
+----------------------------------------->|
| worker ready | |
|<-----------------------------------------+
| Egg ready | |
+-------------------->| |
| Egg ready | |
+----------------------------------------->|
复制代码
首先,在启动Agent的时候就会去注册回调
forkAgentWorker(){
// 得到agent
const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);
// 监听退出事件,转发给master
agentWorker.once('exit', (code, signal) => {
this.messenger.send({
action: 'agent-exit',
data: {
code,
signal,
},
to: 'master',
from: 'agent',
});
});
}
constructor(){
this.on('agent-exit', this.onAgentExit.bind(this));
}
复制代码
接着在 onAgentExit 中去处理重启逻辑
onAgentExit(data) {
if (this.closed) return;
// 清理工做
const agentWorker = this.agentWorker;
this.workerManager.deleteAgent(this.agentWorker);
agentWorker.removeAllListeners();
// 若是已经启动过,就自动重启
if (this.isStarted) {
setTimeout(() => {
this.forkAgentWorker();
}, 1000);
// 省略一段转发消息给parent的代码
} else {
process.exit(1);
}
}
复制代码
isStarted 标志,是用来记录总体是否启动成功,它在 ready 回调用中被赋值
// 这个ready是由 get-ready模块提供的,主要是解决异步任务注册问题的,便于自由添加启动前的异步任务
this.ready(() => {
this.isStarted = true;
});
复制代码
Worker的平滑重启主要是交给 cfork 模块完成的,egg-cluster 中对于exit 事件的监听只是作个转发。
大体思路是经过 cluster 模块去监听 exit 事件 和 disconnect 事件,而后来根据 disableRefork 配置,判断是否要重启,这其中会处理一些重启逻辑
cluster.on('disconnect', function (worker) {
// API参考:https://nodejs.org/api/cluster.html#cluster_worker_isdead
var isDead = worker.isDead && worker.isDead();
if (isDead) {
// worker has terminated before disconnect
return;
}
// 配置不重启就不会继续进行
if (worker.disableRefork) {
return;
}
// disconnect 用来保存失联的进程,下文会用到
disconnects[worker.process.pid] = utility.logDate();
// 重启逻辑
if (allow()) {
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
} else {
// 省略
}
});
cluster.on('exit', function (worker, code, signal) {
var isExpected = !!disconnects[worker.process.pid];
// 若是已经先响应了disconnect事件,就不用再走后续退出流程了
if (isExpected) {
delete disconnects[worker.process.pid];
// worker disconnect first, exit expected
return;
}
// 相似的判断 disableRefork 的逻辑,省略
unexpectedCount++;
// 相似的重启逻辑,省略
cluster.emit('unexpectedExit', worker, code, signal);
});
复制代码
背景:最先 Session 等状态信息是保存在 Worker 内存里的,因此一旦用户的屡次请求打到不一样的Worker上的时候,必然会出现登陆态失效的问题。
解决方案:经过必定的方式保证同一个用户的请求打到同一个 Worker 上,Sticky Mode 就是为了解决这个问题
:::info 其实查看 egg-bin 的README.md 文件就能够发现其实默认是不启动的,但出于有趣,仍是想介绍下。 :::
首先,若是启用了 sticky 模式,在 master 当中会分配一个 stickyWorkerPort
// master.js
detectPorts() {
return GetFreePort()
// 省略中间一段设置主端口的代码
.then(port => {
if (this.options.sticky) {
this.options.stickyWorkerPort = port;
}
})
}
复制代码
同时,会启动一个内部的 net.Server,用来作消息转发给Worker
if (this.options.sticky) {
this.startMasterSocketServer(err => {
// 省略
});
}
startMasterSocketServer(cb) {
// 内部 net server
require('net').createServer({
pauseOnConnect: true,
}, connection => {
// 这段涉及到 TCP_reset_attack,有兴趣能够自查,不介绍
if (!connection.remoteAddress) {
connection.destroy();
} else {
// 选出一个worker
const worker = this.stickyWorker(connection.remoteAddress);
worker.send('sticky-session:connection', connection);
}
}).listen(this[REAL_PORT], cb);
}
复制代码
:::info 题外话:为何这里 listen 不会报端口重复监听?个人理解是,按照这篇文章的介绍,Master 在初次给 Worker 传递 Socket 的时候,才会去启动内部 TCP 服务,比 startMasterSocketServer要晚。 :::
在 worker 当中,若是是有 配置 sticky,就会使用该 stickyWorkerPort 端口进行监听,同时只监听 父进程(也就是master)转发过来的 sticky-session:connection消息
if (options.sticky) {
server.listen(options.stickyWorkerPort, '127.0.0.1');
process.on('message', (message, connection) => {
if (message !== 'sticky-session:connection') {
return;
}
server.emit('connection', connection);
connection.resume();
});
}
// 省略正常监听的代码
复制代码
这其中有个细节,那如何保证转发的过程当中,数据不丢失呢?
:::info 为何会丢失?由于 net.Socket 是个 Duplex Stream 对象,在 Flowing Mode 下面会自动读取数据,若是 不响应 data 事件,数据就丢了) :::
首先在建立 socket 的时候,开启 pauseOnConnect 选项;
If pauseOnConnect is set to true, then the socket associated with each incoming connection will be paused, and no data will be read from its handle.
其次在接受到 socket 的时候,恢复执行 resume
转发是经过 stickyWorker 函数实现的,本质上就是把 remoteAddress 对 Worker 数量取余数,做为索引去 Worker 列表里随机取一个 Worker
stickyWorker(ip) {
const workerNumbers = this.options.workers;
// ws是一组pid列表
const ws = this.workerManager.listWorkerIds();
let s = '';
// IP处理:127.0.0.1 -> 127001
for (let i = 0; i < ip.length; i++) {
// 这个判断能够过滤掉字母和符号,这样就能够兼容IPv4和IPv6
if (!isNaN(ip[i])) {
s += ip[i];
}
}
s = Number(s);
// 取余数
const pid = ws[s % workerNumbers];
return this.workerManager.getWorker(pid);
}
复制代码
背景详见issue:Sticky Mode 引起的问题
回去翻代码,能够发现 agent 是 child_process.fork 方法启动的,worker 是经过 cluster 启动的。 猜想多是若是须要提供一些相似管理页面的本地服务,通常是 agent作,所以agent 要有独立监听端口的能力
对进程强制执行process.exit而且用try-catch包裹住,若是有报错,说明是真的退出了。
使用场景:一开始使用 SIGTERM 要求退出(对比 kill -1),后来退出超过必定时间了,使用 SIGKILL 强制退出(相似 kill -9)
function getUnterminatedProcesses(pids) {
return pids.filter(pid => {
try {
// success means it's still alive
process.kill(pid, 0);
return true;
} catch (err) {
// error means it's dead
return false;
}
});
}
复制代码
EggCluster涉及知识体系大图