最近用Egg做为底层框架开发项目,好奇其多进程模型的管理实现,因而学习了解了一些东西,顺便记录下来。文章若有错误, 请轻喷html
伴随科技的发展, 如今的服务器基本上都是多核cpu
的了。然而,Node是一个单进程单线程
语言(对于开发者来讲是单线程,实际上不是)。咱们都知道,cpu的调度单位是线程
,而基于Node的特性,那么咱们每次只能利用一个cpu。这样不只仅利用率极低,并且容错更是不能接受(出错时会崩溃整个程序)。因此,Node有了cluster来协助咱们充分利用服务器的资源。 前端
cluster工做原理
关于cluster的工做原理推荐你们看这篇文章,这里简单总结一下:node
hack
掉,而是统一由master的内部TCP监听
,因此不会出现多个子进程监听同一端口而报错的现象。请求统一通过master的内部TCP
,TCP的请求处理逻辑中,会挑选一个worker进程向其发送一个newconn内部消息,随消息发送客户端句柄
。(这里的挑选有两种方式,第一种是除Windows外全部平台的默认方法循环法,即由主进程负责监听端口,接收新链接后再将链接循环分发给工做进程。在分发中使用了一些内置技巧防止工做进程任务过载。第二种是主进程建立监听socket后发送给感兴趣的工做进程,由工做进程负责直接接收链接。)建立客户端实例(net.socket)执行具体的业务逻辑
,而后返回。如图:
图引用出处git
先看一下Egg官方文档的进程模型github
+--------+ +-------+ | Master |<-------->| Agent | +--------+ +-------+ ^ ^ ^ / | \ / | \ / | \ v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+
类型 | 进程数量 | 做用 | 稳定性 | 是否运行业务代码 |
---|---|---|---|---|
Master | 1 | 进程管理,进程间消息转发 | 很是高 | 否 |
Agent | 1 | 后台运行工做(长链接客户端) | 高 | 少许 |
Worker | 通常为cpu核数 | 执行业务代码 | 通常 | 是 |
大体上就是利用Master
做为主线程,启动Agent
做为秘书进程协助Worker
处理一些公共事务(日志之类),启动Worker
进程执行真正的业务代码。npm
首先从Master
入手,这里暂时认为Master是最顶级的进程(事实上还有一个parent
进程,待会再说)。后端
/** * start egg app * @method Egg#startCluster * @param {Object} options {@link Master} * @param {Function} callback start success callback */ exports.startCluster = function(options, callback) { new Master(options).ready(callback); };
先从Master的构造函数
看起api
constructor(options) { super(); // 初始化参数 this.options = parseOptions(options); // worker进程的管理类 详情见 Manager及Messenger篇 this.workerManager = new Manager(); // messenger类, 详情见 Manager及Messenger篇 this.messenger = new Messenger(this); // 设置一个ready事件 详情见get-ready npm包 ready.mixin(this); // 是否为生产环境 this.isProduction = isProduction(); this.agentWorkerIndex = 0; // 是否关闭 this.closed = false; ... 接下来看的是ready的回调函数及注册的各种事件: this.ready(() => { // 将开始状态设置为true this.isStarted = true; const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : ''; this.logger.info('[master] %s started on %s (%sms)%s', frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg); // 发送egg-ready至各个进程并触发相关事件 const action = 'egg-ready'; this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } }); this.messenger.send({ action, to: 'app', data: this.options }); this.messenger.send({ action, to: 'agent', data: this.options }); // start check agent and worker status this.workerManager.startCheck(); }); // 注册各种事件 this.on('agent-exit', this.onAgentExit.bind(this)); this.on('agent-start', this.onAgentStart.bind(this)); ... // 检查端口并 Fork一个Agent detectPort((err, port) => { ... this.forkAgentWorker(); } }); }
综上, 能够看到Master的构造函数主要是初始化和注册各种相应的事件
, 最后运行的是forkAgentWorker
函数, 该函数的关键代码能够看到:服务器
const agentWorkerFile = path.join(__dirname, 'agent_worker.js'); // 经过child_process执行一个Agent const agentWorker = childprocess.fork(agentWorkerFile, args, opt);
继续到agent_worker.js
上面看,agent_worker
实例化一个agent
对象,agent_worker.js
有一句关键代码:app
agent.ready(() => { agent.removeListener('error', startErrorHandler); // 清除错误监听的事件 process.send({ action: 'agent-start', to: 'master' }); // 向master发送一个agent-start的动做 });
能够看到, agent_worker.js
中的代码向master
发出了一个信息, 动做为agent-start
, 再回到Master
中, 能够看到其注册了两个事件, 分别为once的forkAppWorkers和 on的onAgentStart
this.on('agent-start', this.onAgentStart.bind(this)); this.once('agent-start', this.forkAppWorkers.bind(this));
先看onAgentStart
函数, 这个函数相对简单, 就是一些信息的传递:
onAgentStart() { this.agentWorker.status = 'started'; // Send egg-ready when agent is started after launched if (this.isAllAppWorkerStarted) { this.messenger.send({ action: 'egg-ready', to: 'agent', data: this.options }); } this.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] }); // should send current worker pids when agent restart if (this.isStarted) { this.messenger.send({ action: 'egg-pids', to: 'agent', data: this.workerManager.getListeningWorkerIds() }); } this.messenger.send({ action: 'agent-start', to: 'app' }); this.logger.info('[master] agent_worker#%s:%s started (%sms)', this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime); }
而后会执行forkAppWorkers
函数,该函数主要是借助cfork包fork
对应的工做进程, 并注册一系列相关的监听事件,
... cfork({ exec: this.getAppWorkerFile(), args, silent: false, count: this.options.workers, // don't refork in local env refork: this.isProduction, }); ... // 触发app-start事件 cluster.on('listening', (worker, address) => { this.messenger.send({ action: 'app-start', data: { workerPid: worker.process.pid, address }, to: 'master', from: 'app', }); });
能够看到forkAppWorkers
函数在监听Listening
事件时,会触发master
上的app-start
事件。
this.on('app-start', this.onAppStart.bind(this)); ... // master ready回调触发 if (this.options.sticky) { this.startMasterSocketServer(err => { if (err) return this.ready(err); this.ready(true); }); } else { this.ready(true); } // ready回调 发送egg-ready状态到各个进程 const action = 'egg-ready'; this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } }); this.messenger.send({ action, to: 'app', data: this.options }); this.messenger.send({ action, to: 'agent', data: this.options }); // start check agent and worker status if (this.isProduction) { this.workerManager.startCheck(); }
总结下:
+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | | +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready | | +-------------------->| | | Egg ready | | +----------------------------------------->|
根据官方文档,进程守护主要是依赖于graceful和egg-cluster这两个库。
未捕获异常
+---------+ +---------+ | Worker | | Master | +---------+ +----+----+ | uncaughtException | +------------+ | | | | +---------+ | <----------+ | | Worker | | | +----+----+ | disconnect | fork a new worker | +-------------------------> + ---------------------> | | wait... | | | exit | | +-------------------------> | | | | | die | | | | | |
由执行的app文件可知, app
其实是继承于Application类, 该类下面调用了graceful()
。
onServer(server) { ...... graceful({ server: [ server ], error: (err, throwErrorCount) => { ...... }, }); ...... }
继续看graceful
, 能够看到它捕获了process.on('uncaughtException')
事件, 并在回调函数里面关闭TCP
链接, 关闭自己进程, 断开与master
的IPC
通道。
process.on('uncaughtException', function (err) { ...... // 对http链接设置 Connection: close响应头 servers.forEach(function (server) { if (server instanceof http.Server) { server.on('request', function (req, res) { // Let http server set `Connection: close` header, and close the current request socket. req.shouldKeepAlive = false; res.shouldKeepAlive = false; if (!res._header) { res.setHeader('Connection', 'close'); } }); } }); // 设置一个定时函数关闭子进程, 并退出自己进程 // make sure we close down within `killTimeout` seconds var killtimer = setTimeout(function () { console.error('[%s] [graceful:worker:%s] kill timeout, exit now.', Date(), process.pid); if (process.env.NODE_ENV !== 'test') { // kill children by SIGKILL before exit killChildren(function() { // 退出自己进程 process.exit(1); }); } }, killTimeout); // But don't keep the process open just for that! // If there is no more io waitting, just let process exit normally. if (typeof killtimer.unref === 'function') { // only worked on node 0.10+ killtimer.unref(); } var worker = options.worker || cluster.worker; // cluster mode if (worker) { try { // 关闭TCP链接 for (var i = 0; i < servers.length; i++) { var server = servers[i]; server.close(); } } catch (er1) { ...... } try { // 关闭ICP通道 worker.disconnect(); } catch (er2) { ...... } } });
ok, 关闭了IPC
通道后, 咱们继续看cfork
文件, 即上面提到的fork worker
的包, 里面监听了子进程的disconnect
事件, 他会根据条件判断是否从新fork
一个新的子进程
cluster.on('disconnect', function (worker) { ...... // 存起该pid disconnects[worker.process.pid] = utility.logDate(); if (allow()) { // fork一个新的子进程 newWorker = forkWorker(worker._clusterSettings); newWorker._clusterSettings = worker._clusterSettings; } else { ...... } });
通常来讲, 这个时候会继续等待一会而后就执行了上面说到的定时函数了, 即退出进程
。
OOM、系统异常
关于这种系统异常
, 有时候在子进程中是不能捕获到
的, 咱们只能在master中进行处理, 也就是cfork
包。
cluster.on('exit', function (worker, code, signal) { // 是程序异常的话, 会经过上面提到的uncatughException从新fork一个子进程, 因此这里就不须要了 var isExpected = !!disconnects[worker.process.pid]; if (isExpected) { delete disconnects[worker.process.pid]; // worker disconnect first, exit expected return; } // 是master杀死的子进程, 无需fork if (worker.disableRefork) { // worker is killed by master return; } if (allow()) { newWorker = forkWorker(worker._clusterSettings); newWorker._clusterSettings = worker._clusterSettings; } else { ...... } cluster.emit('unexpectedExit', worker, code, signal); });
上面一直提到各类进程间通讯,细心的你可能已经发现 cluster 的 IPC 通道只存在于 Master 和 Worker/Agent 之间,Worker 与 Agent 进程互相间是没有的。那么 Worker 之间想通信该怎么办呢?是的,经过 Master 来转发。
广播消息: agent => all workers +--------+ +-------+ | Master |<---------| Agent | +--------+ +-------+ / | \ / | \ / | \ / | \ v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+ 指定接收方: one worker => another worker +--------+ +-------+ | Master |----------| Agent | +--------+ +-------+ ^ | send to / | worker 2 / | / | / v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+
在master
中, 能够看到当agent和app被fork时
, 会监听他们的信息, 同时将信息转化成一个对象:
agentWorker.on('message', msg => { if (typeof msg === 'string') msg = { action: msg, data: msg }; msg.from = 'agent'; this.messenger.send(msg); }); worker.on('message', msg => { if (typeof msg === 'string') msg = { action: msg, data: msg }; msg.from = 'app'; this.messenger.send(msg); });
能够看到最后调用的是messenger.send
, 而messengeer.send就是根据from和to来决定将信息发送到哪里
send(data) { if (!data.from) { data.from = 'master'; } ...... // app -> master // agent -> master if (data.to === 'master') { debug('%s -> master, data: %j', data.from, data); // app/agent to master this.sendToMaster(data); return; } // master -> parent // app -> parent // agent -> parent if (data.to === 'parent') { debug('%s -> parent, data: %j', data.from, data); this.sendToParent(data); return; } // parent -> master -> app // agent -> master -> app if (data.to === 'app') { debug('%s -> %s, data: %j', data.from, data.to, data); this.sendToAppWorker(data); return; } // parent -> master -> agent // app -> master -> agent,可能不指定 to if (data.to === 'agent') { debug('%s -> %s, data: %j', data.from, data.to, data); this.sendToAgentWorker(data); return; } }
master
则是直接根据action
信息emit
对应的注册事件
sendToMaster(data) { this.master.emit(data.action, data.data); }
而agent和worker则是经过一个sendmessage
包, 实际上就是调用下面相似的方法
// 将信息传给子进程 agent.send(data) worker.send(data)
最后, 在agent和app都继承的基础类EggApplication
上, 调用了Messenger
类, 该类内部的构造函数以下:
constructor() { super(); ...... this._onMessage = this._onMessage.bind(this); process.on('message', this._onMessage); } _onMessage(message) { if (message && is.string(message.action)) { // 和master同样根据action信息emit对应的注册事件 this.emit(message.action, message.data); } }
总结一下:
思路就是利用事件机制和IPC通道来达到各个进程之间的通讯。
学习过程当中有遇到一个timeout.unref()的函数, 关于该函数推荐你们参考这个问题的6楼回答
从前端思惟转到后端思惟其实仍是很吃力的,加上Egg的进程管理实现确实很是厉害, 因此花了不少时间在各类api和思路思考上。