cluster
是一个nodejs
内置的模块,用于nodejs
多核处理。cluster
模块,能够帮助咱们简化多进程并行化程序的开发难度,轻松构建一个用于负载均衡的集群。html
环境:node
MacOS 10.14
Node v8.11.3
npm 6.4.0npm
master
是总控节点,worker
是运行节点。而后根据CPU
的数量,启动worker
。
咱们能够先查看一下本身电脑CPU的核数和线程:bash
sysctl machdep.cpu
复制代码
machdep.cpu.core_count: 6
machdep.cpu.thread_count: 12
复制代码
6核12线程并发
新建app.js
app
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
// 是不是主节点
if (cluster.isMaster) {
console.log("master start...");
// Fork workers.
for (var i = 0; i < numCPUs; i++) {
// 监听建立worker进程事件
cluster.fork();
}
// 监听worker
cluster.on('listening', function (worker, address) {
// address对象包含链接属性信息
console.log('listening: worker ' + worker.process.pid + ', Address: ' + address.address + ":" + address.port + "," + address.addressType);
// 3秒后杀掉全部worker
setTimeout(() => {
worker.kill()
}, 3000)
});
// 监听worker退出事件,code进程非正常退出的错误code,signal致使进程被杀死的信号名称
cluster.on('exit', function (worker, code, signal) {
// console.log('worker ' + worker.process.pid + ' died');
console.log('工做进程 %d 关闭 (%s)(%s). 重启中...', worker.process.pid, signal || code);
// 退出以后重启
// cluster.fork();
});
} else {
console.log('createServer...')
http.createServer(function (req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(0);
}
复制代码
咱们设定启动以后三秒再杀掉全部进程
启动负载均衡
➜ node app.js
master start...
createServer...
createServer...
createServer...
listening: worker 29374, Address: null:61186,4
createServer...
listening: worker 29375, Address: null:61186,4
listening: worker 29376, Address: null:61186,4
createServer...
listening: worker 29377, Address: null:61186,4
createServer...
createServer...
listening: worker 29378, Address: null:61186,4
createServer...
listening: worker 29379, Address: null:61186,4
listening: worker 29380, Address: null:61186,4
createServer...
createServer...
listening: worker 29381, Address: null:61186,4
createServer...
listening: worker 29382, Address: null:61186,4
listening: worker 29383, Address: null:61186,4
createServer...
listening: worker 29384, Address: null:61186,4
listening: worker 29385, Address: null:61186,4
工做进程 29374 关闭 (SIGTERM)(%s). 重启中...
工做进程 29375 关闭 (SIGTERM)(%s). 重启中...
工做进程 29376 关闭 (SIGTERM)(%s). 重启中...
工做进程 29377 关闭 (SIGTERM)(%s). 重启中...
工做进程 29378 关闭 (SIGTERM)(%s). 重启中...
工做进程 29379 关闭 (SIGTERM)(%s). 重启中...
工做进程 29380 关闭 (SIGTERM)(%s). 重启中...
工做进程 29381 关闭 (SIGTERM)(%s). 重启中...
工做进程 29382 关闭 (SIGTERM)(%s). 重启中...
工做进程 29383 关闭 (SIGTERM)(%s). 重启中...
工做进程 29384 关闭 (SIGTERM)(%s). 重启中...
工做进程 29385 关闭 (SIGTERM)(%s). 重启中...
复制代码
电脑有12个线程,因此在总控结点master
启动后,生成了12个运行节点worker
杀掉进程后,能够经过exit
事件监听到,还能够经过fork()
重启curl
每一个worker
进程经过使用child_process.fork()
函数,基于IPC
(Inter-Process Communication
,进程间通讯),实现与master
进程间通讯。ide
当worker
使用server.listen(...)
函数时 ,将参数序列传递给master
进程。若是master
进程已经匹配workers
,会将传递句柄给工人。若是master
没有匹配好worker
,那么会建立一个worker
,再传递并句柄传递给worker
。函数
由于worker
都是独立运行的,根据程序的须要,它们能够被独立删除或者重启,worker
并不相互影响。只要还有worker
存活,则master
将继续接收链接。Node
不会自动维护workers
的数目。咱们能够创建本身的链接池。
cluster
对象cluster
的各类属性和函数
cluster.setttings
:配置集群参数对象cluster.isMaster
:判断是否是master
节点cluster.isWorker
:判断是否是worker
节点Event: 'fork'
: 监听建立worker
进程事件Event: 'online'
: 监听worker
建立成功事件Event: 'listening'
: 监听worker
向master
状态事件Event: 'disconnect'
: 监听worker
断线事件Event: 'exit'
: 监听worker
退出事件Event: 'setup'
: 监听setupMaster
事件cluster.setupMaster([settings])
: 设置集群参数cluster.fork([env])
: 建立worker
进程cluster.disconnect([callback])
: 关闭worket
进程cluster.worker
: 得到当前的worker
对象cluster.workers
: 得到集群中全部存活的worker
对象worker
对象worker
的各类属性和函数:能够经过cluster.workers
, cluster.worker
得到。
worker.id
: 进程ID
号worker.process
: ChildProcess
对象worker.suicide
: 在disconnect()
后,判断worker
是否自杀worker.send(message, [sendHandle])
: master
给worker
发送消息。注:worker
给发master
发送消息要用process.send(message)
worker.kill([signal='SIGTERM'])
: 杀死指定的worker
,别名destory()
worker.disconnect()
: 断开worker
链接,让worker
自杀Event: 'message'
: 监听master
和worker
的message
事件Event: 'online'
: 监听指定的worker
建立成功事件Event: 'listening'
: 监听master
向worker
状态事件Event: 'disconnect'
: 监听worker
断线事件Event: 'exit'
: 监听worker
退出事件新建cluster.js
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log('[master] ' + "start master...");
// 线程太多,起3个
for (var i = 0; i < numCPUs / 4; i++) {
var wk = cluster.fork();
wk.send('[master] ' + 'hi worker' + wk.id);
}
// 监听worker生成
cluster.on('fork', function (worker) {
console.log('[master] fork: worker' + worker.id);
});
// 当衍生一个新的工做进程后,工做进程应当响应一个上线消息。 当主进程收到上线消息后将会触发此事件。 'fork' 事件和 'online' 事件的区别在于,当主进程衍生工做进程时触发 'fork',当工做进程运行时触发 'online'。
cluster.on('online', function (worker) {
console.log('[master] online: worker' + worker.id);
});
// 当一个工做进程调用 listen() 后,工做进程上的 server 会触发 'listening' 事件,同时主进程上的 cluster 也会触发 'listening' 事件。
cluster.on('listening', function (worker, address) {
console.log('[master] listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
});
// 在工做进程的 IPC 管道被断开后触发。 可能致使事件触发的缘由包括:工做进程优雅地退出、被杀死、或手动断开链接(如调用 worker.disconnect())。
cluster.on('disconnect', function (worker) {
console.log('[master] disconnect: worker' + worker.id);
});
// 当任何一个工做进程关闭的时候,cluster 模块都将会触发 'exit' 事件。
cluster.on('exit', function (worker, code, signal) {
console.log('[master] exit worker' + worker.id + ' died');
});
function eachWorker(callback) {
for (var id in cluster.workers) {
callback(cluster.workers[id]);
}
}
// 3秒后向全部worker推送信息
setTimeout(function () {
eachWorker(function (worker) {
worker.send('[master] ' + 'send message to worker' + worker.id);
});
}, 3000);
Object.keys(cluster.workers).forEach(function (id) {
// 当 cluster 主进程接收任意工做进程发送的消息时触发。
cluster.workers[id].on('message', function (msg) {
console.log('[master] ' + 'message ' + msg);
});
});
} else if (cluster.isWorker) {
console.log('[worker] ' + "start worker ..." + cluster.worker.id);
// 相似于 cluster.on('message') 事件,但特定于此工做进程。
process.on('message', function (msg) {
// 接收的是master发出来的初始化worker信息
console.log('[worker] ' + msg);
// 而后告诉master接收到了,触发了cluster.workers[id].on('message',fn)
process.send('[worker] worker' + cluster.worker.id + ' received!');
});
http.createServer(function (req, res) {
res.writeHead(200, { "content-type": "text/html" });
res.end('worker' + cluster.worker.id + ',PID:' + process.pid);
}).listen(3000);
console.log(`工做进程 ${process.pid} 已启动`);
}
复制代码
➜ node cluster.js
[master] start master...
[master] fork: worker1
[master] fork: worker2
[master] fork: worker3
[master] online: worker1
[master] online: worker2
[master] online: worker3
[worker] start worker ...1
[worker] [master] hi worker1
[master] message [worker] worker1 received!
[worker] start worker ...2
[master] listening: worker1,pid:30288, Address:null:3000
[worker] start worker ...3
[worker] [master] hi worker2
[master] message [worker] worker2 received!
[master] listening: worker2,pid:30289, Address:null:3000
[worker] [master] hi worker3
[master] message [worker] worker3 received!
[master] listening: worker3,pid:30290, Address:null:3000
[worker] [master] send message to worker1
[worker] [master] send message to worker2
[worker] [master] send message to worker3
[master] message [worker] worker1 received!
[master] message [worker] worker2 received!
[master] message [worker] worker3 received!
复制代码
咱们先模拟一下访问,看下是否会自动分配
load-balance.js
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log('[master] ' + "start master...");
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('listening', function (worker, address) {
console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
});
} else if (cluster.isWorker) {
console.log('[worker] ' + "start worker ..." + cluster.worker.id);
http.createServer(function (req, res) {
console.log('worker' + cluster.worker.id);
res.end('worker' + cluster.worker.id + ',PID:' + process.pid);
}).listen(3000);
}
复制代码
node load-balance.js
[master] start master...
[worker] start worker ...1
[worker] start worker ...3
[worker] start worker ...2
[worker] start worker ...4
[worker] start worker ...6
[worker] start worker ...7
[master] listening: worker3,pid:96592, Address:null:3000
[master] listening: worker2,pid:96591, Address:null:3000
[master] listening: worker1,pid:96590, Address:null:3000
[master] listening: worker4,pid:96593, Address:null:3000
[master] listening: worker6,pid:96595, Address:null:3000
[worker] start worker ...5
[master] listening: worker7,pid:96596, Address:null:3000
[worker] start worker ...8
[worker] start worker ...9
[worker] start worker ...10
[master] listening: worker5,pid:96594, Address:null:3000
[master] listening: worker8,pid:96597, Address:null:3000
[master] listening: worker9,pid:96598, Address:null:3000
[worker] start worker ...11
[worker] start worker ...12
[master] listening: worker10,pid:96599, Address:null:3000
[master] listening: worker11,pid:96600, Address:null:3000
[master] listening: worker12,pid:96601, Address:null:3000
复制代码
咱们使用curl
访问一下
➜ cluster curl http://172.16.78.185:3000/
worker1,PID:96590%
➜ cluster curl http://172.16.78.185:3000/
worker3,PID:96592%
➜ cluster curl http://172.16.78.185:3000/
worker2,PID:96591%
➜ cluster curl http://172.16.78.185:3000/
worker4,PID:96593%
➜ cluster curl http://172.16.78.185:3000/
worker6,PID:96595%
复制代码
随机分配是没问题的,可是这样的请求量没法看出是否均衡分配,咱们要模拟下并发请求
➜ node load-balance.js > server.log
复制代码
而后用siege
模拟压测,并发量每秒50
siege -c 50 http://localhost:3000
复制代码
HTTP/1.1 200 0.00 secs: 16 bytes ==> GET /
^C
Lifting the server siege...
Transactions: 16276 hits
Availability: 100.00 %
Elapsed time: 31.65 secs
Data transferred: 0.25 MB
Response time: 0.03 secs
Transaction rate: 514.25 trans/sec
Throughput: 0.01 MB/sec
Concurrency: 14.72
Successful transactions: 16276
Failed transactions: 0
Longest transaction: 0.18
Shortest transaction: 0.00
复制代码
耗时31.65
秒
发送16276
次请求
每秒处理514.25
个请求
咱们能够查看下server.log
文件
须要下载一下R语言包 教程
~ R
> df<-read.table(file="server.log",skip=9,header=FALSE)
> summary(df)
V1
worker9 :1361
worker2 :1359
worker5 :1358
worker1 :1357
worker12:1356
worker11:1354
(Other) :8122
复制代码
咱们能够看到,请求被分配到worker数据量至关。 因此,cluster的负载均衡的策略,应该是随机分配的。
学习连接 粉丝日志