一段常见的示例代码html
const cluster = require('cluster');
const http = require('http');
if (cluster.isMaster) {
// 根据cpu核心数出fork相同数量的子进程
} else {
// 用http模块建立server监听某一个端口
}
复制代码
引出以下问题:node
cluster
模块如何区分子进程和主进程?git
代码中没有在主进程中建立服务器,那么如何主进程如何承担代理服务器的职责?github
多个子进程共同侦听同一个端口为何不会形成端口reuse error
?windows
cluster
模块如何区分主进程/子进程cluster.js - 源码bash
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);
复制代码
结论: 判断环境变量中是否含有NODE_UNIQUE_ID
, 有则为子进程,没有则为主进程服务器
isMaster
& isWorker
这样的话, 在对应的文件中isMaster
和isWorker
的值就明确啦负载均衡
// child.js
module.exports = cluster;
cluster.isWorker = true;
cluster.isMaster = false;
// master.js
module.exports = cluster;
cluster.isWorker = false;
cluster.isMaster = true;
复制代码
那么接下来的问题是: NODE_UNIQUE_ID从哪里来?
socket
NODE_UNIQUE_ID
从哪里来的?在internal/cluster/master.js
文件中搜索NODE_UNIQUE_ID
----> 上层为createWorkerProcess
函数 ----> 上层为cluster.fork
函数函数
master.js
源码中相关部分
const { fork } = require('child_process');
cluster.workers = {}
var ids = 0;
cluster.fork = function(env) {
const id = ++ ids;
const workerProcess = createWorkerProcess(id, env);
const worker = new Worker({
id: id,
process: workerProcess
});
cluster.workers[worker.id] = worker;
return worker
}
function createWorkerProcess(id, env) {
const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
return fork(args, {
env: workerEnv
})
}
复制代码
结论: 变量NODE_UNIQUE_ID
是在主进程fork
子进程时传递进去的参数,所以采用cluster.fork
建立的子进程是必定包含NODE_UNIQUE_ID
的,而直接使用child_process.fork
的子进程是没有NODE_UNIQUE_ID
的
而且, NODE_UNIQUE_ID
将做为主进程中存储活跃的工做进程对象的键值
继续描述一下这个问题的由来:
const cluster = require('cluster');
const http = require('http');
if (cluster.isMaster) {
// 根据cpu核心数出fork相同数量的子进程
} else {
// 用http模块建立server监听某一个端口
}
复制代码
并无在cluster.isMaster
条件语句中建立服务器, 也没有提供服务器相关的路径,接口。而主进程又须要承担代理服务器的 职责,那么主进程中是否存在TCP
服务器?
咱们来猜猜看可能的步骤
子进程会执行http.createServer
http
模块会调用net
模块, 由于http.Server
继承net.Server
同时侦听端口, 建立net.Server
实例, 建立的实例调用listen(port)
, 等待连接
这时若是主进程要建立服务器就须要把建立服务器相关信息给主进程, 继续猜想
假设主进程已经拿到了服务器相关的信息, 主进程本身来建立
后面的fork
子进程就不用本身建立了,而是从主进程中get
到相关数据
既然要在主进程须要获得完整的建立服务器相关信息, 那么极可能在net
模块listen
相关方法中进行处理
Server.prototype.listen
找找看,何时把服务器相关信息传递给主进程了?
Server.prototype.listen = function(...args) {
// 无视其余的判断逻辑, 直达它的心里!
if (成功) {
listenInCluster()
return this
} else {
// 无视
}
}
复制代码
总的来讲就是: 在Server.prototype.listen
函数中,在成功进入条件语句后全部的状况都执行了listenInCluster
函数后返回
接下来看listenInCluster
函数
function listenInCluster(server, 建立服务器须要的数据) {
if (cluster === undefined) cluster = require('cluster')
// 判断是不是主进程
if (cluster.isMaster) {
server._listen2(建立服务器须要的数据)
return
}
// 建立服务器须要的数据
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
// 只剩下子进程
cluster._getServer(server, 建立服务器须要的数据, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) {
server._handle = handle
server._listen2(建立服务器须要的数据)
}
}
复制代码
按照前面的推断: 子进程会给主进程发送建立server须要的数据, 主进程去建立
因此接下来去看cluster
模块的child._getServer
函数
cluster._getServer = function(obj, options, cb) {
// 组装发送的数据
const message = {
act: 'queryServer',
...options,
}
// 发送数据
send(message, (reply, handle) => {
})
}
复制代码
那么接下来主进程就应该对queryServer
做出想要的处理
具体能够看cluster/master.js
const RoundRobinHandle = require('internal/cluster/round_robin_handle');
const handles = new Map()
function onmessage(message, handle) {
if (message.act === 'queryServer') {
queryServer(worker, message)
}
}
queryServer(worker, message) {
const key = `${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
const constructor = RoundRobinHandle
let handle = new constructor(建立服务器相关信息)
handles.set(key, handle);
}
复制代码
终于要到终点了:
在internal/cluster/round_robin_handle.js
中
function RoundRobinHandle(建立服务器相关信息) {
this.server = net.createServer()
this.server.listen(.....)
}
复制代码
cluster
模式下如何建立服务器的结论主进程fork
子进程, 子进程中有显式建立服务器的操做,但实际上在cluster
模式下, 子进程是把建立服务器所须要的数据发送给主进程, 主进程来隐式建立TCP
服务器
流程图
这个问题能够转换为: 子进程中有没有也建立一个服务器,同时侦听某个端口呢?
其实,上面的源码分析中能够得出结论:子进程中确实建立了net.Server
对象,但是它没有像主进程那样在libuv
层构建socket
句柄,子进程的net.Server
对象使用的是一个假句柄来'欺骗'使用者端口已侦听
这部分能够参考文章Node.js V0.12 新特性之 Cluster 轮转法负载均衡
主要就是说:Node.js v0.12
引入了round-robin方式
, 用轮转法来分配请求, 每一个子进程的获取的时间的机会都是均等的(windows除外)
源码在internal/cluster/master.js
中
var schedulingPolicy = {
'none': SCHED_NONE,
'rr': SCHED_RR
}[process.env.NODE_CLUSTER_SCHED_POLICY];
if (schedulingPolicy === undefined) {
// FIXME Round-robin doesn't perform well on Windows right now due to the // way IOCP is wired up. schedulingPolicy = (process.platform === 'win32') ? SCHED_NONE : SCHED_RR; } cluster.schedulingPolicy = schedulingPolicy; 复制代码
上面说明了:默认的调度策略是round-robin
, 那么子进程将建立服务器的数据发送给主进程, 当主进程发送建立服务器成功的消息后,子进程会执行回调函数
源码在internal/cluster/child.js _getServer中
cluster._getServer = function(obj, options, cb) {
const indexesKey = [address,
options.port,
options.addressType,
options.fd ].join(':');
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
// 这里能够反推出主进程返回的handle为null
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
}
复制代码
rr
函数, 注意这里的回调函数其实就是net
模块中的listenOnMasterHandle
方法
function rr(message, indexesKey, cb) {
const key = message.key
const handle = { close, listen, ref: noop, unref: noop };
handles.set(key, handle)
// 将假句柄传递给上层的net.Server
cb(0, handle)
}
复制代码
因此结论是这样:子进程压根没有建立底层的服务端socket
作侦听,因此在子进程建立的HTTP
服务器侦听的端口根本不会出现端口复用的状况
显而易见:主进程的服务器中会建立RoundRobinHandle
决定分发请求给哪个子进程,筛选出子进程后发送newconn
消息给对应的子进程
源码见internal/cluster/round_robin_handle
module.exports = RoundRobinHandle
function RoundRobinHandle(建立服务器须要的参数) {
// 存储空闲的子进程
this.free = []
// 存放待处理的用户请求
this.handles = []
}
// 负责筛选出处理请求的子进程
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle)
const worker = this.free.shift()
if (worker) {
this.handoff(worker)
}
}
// 获取请求,并经过IPC发送句柄handle和newconn消息,等待子进程返回
RoundRobinHandle.prototype.handoff = function(worker) {
const handle = this.handles.shift()
if (handle === undefined) {
this.free.push(worker)
return
}
const message = { act: 'newconn', key: this.key }
sendHelper(worker.process, message, handle, reply => {
if (reply.accepted)
handle.close();
// 某个子进程办事不力,给下一个子进程再试试
else
this.distribute(0, handle)
this.handoff(worker)
})
}
复制代码