Nodejs多核处理模块cluster

cluster是一个nodejs内置的模块,用于nodejs多核处理。cluster模块,能够帮助咱们简化多进程并行化程序的开发难度,轻松构建一个用于负载均衡的集群。html

环境:node

MacOS 10.14
Node v8.11.3
npm 6.4.0npm

实践

master是总控节点,worker是运行节点。而后根据CPU的数量,启动worker
咱们能够先查看一下本身电脑CPU的核数和线程:bash

sysctl machdep.cpu
复制代码
machdep.cpu.core_count: 6
machdep.cpu.thread_count: 12
复制代码

6核12线程并发

新建app.jsapp

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

// 是不是主节点
if (cluster.isMaster) {
  console.log("master start...");

  // Fork workers.
  for (var i = 0; i < numCPUs; i++) {
    // 监听建立worker进程事件
    cluster.fork();
  }

  // 监听worker
  cluster.on('listening', function (worker, address) {
    // address对象包含链接属性信息
    console.log('listening: worker ' + worker.process.pid + ', Address: ' + address.address + ":" + address.port + "," + address.addressType);
    // 3秒后杀掉全部worker
    setTimeout(() => {
      worker.kill()
    }, 3000)
  });

  // 监听worker退出事件,code进程非正常退出的错误code,signal致使进程被杀死的信号名称
  cluster.on('exit', function (worker, code, signal) {
    // console.log('worker ' + worker.process.pid + ' died');
    console.log('工做进程 %d 关闭 (%s)(%s). 重启中...', worker.process.pid, signal || code);
    // 退出以后重启
    // cluster.fork();
  });

} else {
  console.log('createServer...')
  http.createServer(function (req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(0);
}


复制代码

咱们设定启动以后三秒再杀掉全部进程
启动负载均衡

➜  node app.js
master start...
createServer...
createServer...
createServer...
listening: worker 29374, Address: null:61186,4
createServer...
listening: worker 29375, Address: null:61186,4
listening: worker 29376, Address: null:61186,4
createServer...
listening: worker 29377, Address: null:61186,4
createServer...
createServer...
listening: worker 29378, Address: null:61186,4
createServer...
listening: worker 29379, Address: null:61186,4
listening: worker 29380, Address: null:61186,4
createServer...
createServer...
listening: worker 29381, Address: null:61186,4
createServer...
listening: worker 29382, Address: null:61186,4
listening: worker 29383, Address: null:61186,4
createServer...
listening: worker 29384, Address: null:61186,4
listening: worker 29385, Address: null:61186,4
工做进程 29374 关闭 (SIGTERM)(%s). 重启中...
工做进程 29375 关闭 (SIGTERM)(%s). 重启中...
工做进程 29376 关闭 (SIGTERM)(%s). 重启中...
工做进程 29377 关闭 (SIGTERM)(%s). 重启中...
工做进程 29378 关闭 (SIGTERM)(%s). 重启中...
工做进程 29379 关闭 (SIGTERM)(%s). 重启中...
工做进程 29380 关闭 (SIGTERM)(%s). 重启中...
工做进程 29381 关闭 (SIGTERM)(%s). 重启中...
工做进程 29382 关闭 (SIGTERM)(%s). 重启中...
工做进程 29383 关闭 (SIGTERM)(%s). 重启中...
工做进程 29384 关闭 (SIGTERM)(%s). 重启中...
工做进程 29385 关闭 (SIGTERM)(%s). 重启中...
复制代码

电脑有12个线程,因此在总控结点master启动后,生成了12个运行节点worker
杀掉进程后,能够经过exit事件监听到,还能够经过fork()重启curl

每一个worker进程经过使用child_process.fork()函数,基于IPCInter-Process Communication,进程间通讯),实现与master进程间通讯。ide

worker使用server.listen(...)函数时 ,将参数序列传递给master进程。若是master进程已经匹配workers,会将传递句柄给工人。若是master没有匹配好worker,那么会建立一个worker,再传递并句柄传递给worker函数

由于worker都是独立运行的,根据程序的须要,它们能够被独立删除或者重启,worker并不相互影响。只要还有worker存活,则master将继续接收链接。Node不会自动维护workers的数目。咱们能够创建本身的链接池。

cluster对象

cluster的各类属性和函数

  • cluster.setttings:配置集群参数对象
  • cluster.isMaster:判断是否是master节点
  • cluster.isWorker:判断是否是worker节点
  • Event: 'fork': 监听建立worker进程事件
  • Event: 'online': 监听worker建立成功事件
  • Event: 'listening': 监听workermaster状态事件
  • Event: 'disconnect': 监听worker断线事件
  • Event: 'exit': 监听worker退出事件
  • Event: 'setup': 监听setupMaster事件
  • cluster.setupMaster([settings]): 设置集群参数
  • cluster.fork([env]): 建立worker进程
  • cluster.disconnect([callback]): 关闭worket进程
  • cluster.worker: 得到当前的worker对象
  • cluster.workers: 得到集群中全部存活的worker对象

worker对象

worker的各类属性和函数:能够经过cluster.workers, cluster.worker得到。

  • worker.id: 进程ID
  • worker.process: ChildProcess对象
  • worker.suicide: 在disconnect()后,判断worker是否自杀
  • worker.send(message, [sendHandle]): masterworker发送消息。注:worker给发master发送消息要用process.send(message)
  • worker.kill([signal='SIGTERM']): 杀死指定的worker,别名destory()
  • worker.disconnect(): 断开worker链接,让worker自杀
  • Event: 'message': 监听masterworkermessage事件
  • Event: 'online': 监听指定的worker建立成功事件
  • Event: 'listening': 监听masterworker状态事件
  • Event: 'disconnect': 监听worker断线事件
  • Event: 'exit': 监听worker退出事件

master和worker之间的通讯

新建cluster.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log('[master] ' + "start master...");
  
  // 线程太多,起3个
  for (var i = 0; i < numCPUs / 4; i++) {
    var wk = cluster.fork();
    wk.send('[master] ' + 'hi worker' + wk.id);
  }

  // 监听worker生成
  cluster.on('fork', function (worker) {
    console.log('[master] fork: worker' + worker.id);
  });

  // 当衍生一个新的工做进程后,工做进程应当响应一个上线消息。 当主进程收到上线消息后将会触发此事件。 'fork' 事件和 'online' 事件的区别在于,当主进程衍生工做进程时触发 'fork',当工做进程运行时触发 'online'。
  cluster.on('online', function (worker) {
    console.log('[master] online: worker' + worker.id);
  });

  // 当一个工做进程调用 listen() 后,工做进程上的 server 会触发 'listening' 事件,同时主进程上的 cluster 也会触发 'listening' 事件。
  cluster.on('listening', function (worker, address) {
    console.log('[master] listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
  });

  // 在工做进程的 IPC 管道被断开后触发。 可能致使事件触发的缘由包括:工做进程优雅地退出、被杀死、或手动断开链接(如调用 worker.disconnect())。
  cluster.on('disconnect', function (worker) {
    console.log('[master] disconnect: worker' + worker.id);
  });

  // 当任何一个工做进程关闭的时候,cluster 模块都将会触发 'exit' 事件。
  cluster.on('exit', function (worker, code, signal) {
    console.log('[master] exit worker' + worker.id + ' died');
  });


  function eachWorker(callback) {
    for (var id in cluster.workers) {
      callback(cluster.workers[id]);
    }
  }
  // 3秒后向全部worker推送信息
  setTimeout(function () {
    eachWorker(function (worker) {
      worker.send('[master] ' + 'send message to worker' + worker.id);
    });
  }, 3000);

  Object.keys(cluster.workers).forEach(function (id) {
    // 当 cluster 主进程接收任意工做进程发送的消息时触发。
    cluster.workers[id].on('message', function (msg) {
      console.log('[master] ' + 'message ' + msg);
    });
  });

} else if (cluster.isWorker) {
  console.log('[worker] ' + "start worker ..." + cluster.worker.id);

  // 相似于 cluster.on('message') 事件,但特定于此工做进程。
  process.on('message', function (msg) {
    // 接收的是master发出来的初始化worker信息
    console.log('[worker] ' + msg);
    // 而后告诉master接收到了,触发了cluster.workers[id].on('message',fn)
    process.send('[worker] worker' + cluster.worker.id + ' received!');
  });

  http.createServer(function (req, res) {
    res.writeHead(200, { "content-type": "text/html" });
    res.end('worker' + cluster.worker.id + ',PID:' + process.pid);
  }).listen(3000);
  console.log(`工做进程 ${process.pid} 已启动`);

}
复制代码
➜ node cluster.js
[master] start master...
[master] fork: worker1
[master] fork: worker2
[master] fork: worker3
[master] online: worker1
[master] online: worker2
[master] online: worker3
[worker] start worker ...1
[worker] [master] hi worker1
[master] message [worker] worker1 received!
[worker] start worker ...2
[master] listening: worker1,pid:30288, Address:null:3000
[worker] start worker ...3
[worker] [master] hi worker2
[master] message [worker] worker2 received!
[master] listening: worker2,pid:30289, Address:null:3000
[worker] [master] hi worker3
[master] message [worker] worker3 received!
[master] listening: worker3,pid:30290, Address:null:3000
[worker] [master] send message to worker1
[worker] [master] send message to worker2
[worker] [master] send message to worker3
[master] message [worker] worker1 received!
[master] message [worker] worker2 received!
[master] message [worker] worker3 received!

复制代码

负载均衡

咱们先模拟一下访问,看下是否会自动分配

load-balance.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log('[master] ' + "start master...");

  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('listening', function (worker, address) {
    console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
  });

} else if (cluster.isWorker) {
  console.log('[worker] ' + "start worker ..." + cluster.worker.id);
  http.createServer(function (req, res) {
    console.log('worker' + cluster.worker.id);
    res.end('worker' + cluster.worker.id + ',PID:' + process.pid);
  }).listen(3000);
}
复制代码
node load-balance.js
[master] start master...
[worker] start worker ...1
[worker] start worker ...3
[worker] start worker ...2
[worker] start worker ...4
[worker] start worker ...6
[worker] start worker ...7
[master] listening: worker3,pid:96592, Address:null:3000
[master] listening: worker2,pid:96591, Address:null:3000
[master] listening: worker1,pid:96590, Address:null:3000
[master] listening: worker4,pid:96593, Address:null:3000
[master] listening: worker6,pid:96595, Address:null:3000
[worker] start worker ...5
[master] listening: worker7,pid:96596, Address:null:3000
[worker] start worker ...8
[worker] start worker ...9
[worker] start worker ...10
[master] listening: worker5,pid:96594, Address:null:3000
[master] listening: worker8,pid:96597, Address:null:3000
[master] listening: worker9,pid:96598, Address:null:3000
[worker] start worker ...11
[worker] start worker ...12
[master] listening: worker10,pid:96599, Address:null:3000
[master] listening: worker11,pid:96600, Address:null:3000
[master] listening: worker12,pid:96601, Address:null:3000
复制代码

咱们使用curl访问一下

➜  cluster curl http://172.16.78.185:3000/
worker1,PID:96590%                                                                                     
➜  cluster curl http://172.16.78.185:3000/
worker3,PID:96592%                                                                                     
➜  cluster curl http://172.16.78.185:3000/
worker2,PID:96591%                                                                                     
➜  cluster curl http://172.16.78.185:3000/
worker4,PID:96593%                                                                                     
➜  cluster curl http://172.16.78.185:3000/
worker6,PID:96595% 
复制代码

随机分配是没问题的,可是这样的请求量没法看出是否均衡分配,咱们要模拟下并发请求

➜  node load-balance.js > server.log
复制代码

而后用siege模拟压测,并发量每秒50

siege -c 50 http://localhost:3000
复制代码
HTTP/1.1 200     0.00 secs:      16 bytes ==> GET  /
^C
Lifting the server siege...
Transactions:		       16276 hits
Availability:		      100.00 %
Elapsed time:		       31.65 secs
Data transferred:	        0.25 MB
Response time:		        0.03 secs
Transaction rate:	      514.25 trans/sec
Throughput:		        0.01 MB/sec
Concurrency:		       14.72
Successful transactions:       16276
Failed transactions:	           0
Longest transaction:	        0.18
Shortest transaction:	        0.00
复制代码

耗时31.65
发送16276次请求
每秒处理514.25个请求

咱们能够查看下server.log文件

须要下载一下R语言包 教程

~ R 

> df<-read.table(file="server.log",skip=9,header=FALSE)
> summary(df)
       V1
 worker9 :1361  
 worker2 :1359  
 worker5 :1358  
 worker1 :1357  
 worker12:1356  
 worker11:1354  
 (Other) :8122 
复制代码

咱们能够看到,请求被分配到worker数据量至关。 因此,cluster的负载均衡的策略,应该是随机分配的。


学习连接 粉丝日志

相关文章
相关标签/搜索