最近作了点nodejs项目,对nodejs的cluster怎么利用多进程处理请求产生了疑问,因而着手进行了研究,以后发现这其中竟大有文章!一切仍是先从遥远的TCP提及吧。。。javascript
说到TCP,相信不少人都至关了解了,大学已经教过,可是又相信有不少人也不是很了解,要不是当时没听,要不也多是自身的编程能力不足以去实践相关内容,写到这我还特地去翻了一下大学的计算机网络教材,内容是很丰富的,但教人实践的内容仍是太少了,里面的内容都把学生当成了有至关的Linux编程能力的人了,因此结果就是大部分只上了一年编程课刚学会几个Hello world程序的大二学生,听了这门课后一脸懵逼,即便记住了也由于没什么实践很快忘了,当年我就是这么懵逼过来的。
因此,扯了这些,结果是什么呢,结果就是咱们要多动手!而要动手创建一条TCP链接能够用socket来实现,不过这里不是要说socket用法,只是来简单聊一聊他们之间的一点小联系,以便于理解后面的内容。java
应用层经过传输层进行TCP通讯时,有时TCP须要为多个应用程序进程提供并发服务。多个TCP链接或多个应用程序进程可能须要经过同一个TCP协议端口传输数据。为了区别不一样的应用程序进程和链接,许多计算机操做系统为应用程序与TCP协议交互提供了称为套接字 (Socket)的接口,区分不一样应用程序进程间的网络通讯和链接。node
咱们能够用一个四元组来肯定一条TCP链接(源ip,源端口,目标ip,目标端口),而链接是经过socket来创建的(服务端进行bind和listen->客户端发起connect->服务端accept),计算机系统就是经过socket来区分不一样的TCP链接的。因此咱们能够看出来,只要目标ip/端口不一样,服务端能够用同一个端口生成多个socket,创建多条链接。
可是,一个进程只能监听一个端口,一个端口怎么生成多个socket呢?其实服务器端程序通常会把socket和服务器某个端口(ip+端口)bind起来, 这样构成了一个特殊的socket, 这个socket没有目标ip和端口。socket进行listen以后当有新的链接进来时, 系统将请求存进队列(此时TCP握三次手完成), 后续能够再调用accept拿到队列的请求,返回一个新的socket, 这个socket是由四元组创建的, 也就对应了一个惟一的链接。程序员
说完这些,能够来聊一聊nodejs是怎样创建一个TCP服务的了。编程
通常咱们用nodejs启动一个TCP服务多是这样的:服务器
require('net').createServer(function(sock) { sock.on('data', function(data) { sock.write('Hello world'); }); }).listen(8080, '127.0.0.1');
进到createServer
一看(代码都在net模块中),里面return了一个Server
对象,Server
继承EventEmitter
,将createServer
的参数作为connection
事件的回调函数,这块比较简单就不贴代码了。咱们须要关注的是Server
的listen
方法,其不一样的参数最终都会调用到listenInCluster
方法。cluster!是的这和cluster有关,但先无论它,咱们先管在主进程中它的执行:网络
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive) { // ... if (cluster.isMaster || exclusive) { // ... server._listen2(address, port, addressType, backlog, fd); return; } // ... }
从代码咱们能够看到listenInCluster
最终是调用了_listen2
方法,它就是服务启动的关键,其定义以下:并发
function setupListenHandle(address, port, addressType, backlog, fd) { // ... var rval = null; // ... if (rval === null) rval = createServerHandle(address, port, addressType, fd); // ... this._handle = rval; // ... this._handle.onconnection = onconnection; this._handle.owner = this; var err = this._handle.listen(backlog || 511); // ... }
其中createServerHandle
方法就不展开了,它就如以前所说的:把socket和服务器某个端口(ip+端口)bind起来, 这样构成了一个特殊的socket, 这个socket没有目标ip和端口。它绑定了address+port
并返回了一个特殊socket(句柄)rval
,能够看到最后它调用了listen对端口进行监听,而且指定了一个回调函数onconnection
,函数会在C++层当accept请求时触发,其回调参数之一就是前面提到的accept后与客户端链接的新socket句柄。到这里再看一下onconnection
的代码:负载均衡
function onconnection(err, clientHandle) { // ... var self = handle.owner; var socket = new Socket({ handle: clientHandle, allowHalfOpen: self.allowHalfOpen, pauseOnCreate: self.pauseOnConnect }); socket.readable = socket.writable = true; // ... self.emit('connection', socket); }
能够看到nodejs在对socket句柄进一步封装后(封装成nodejs的Socket对象),再触发server(由createServer建立)的connection
事件。这时咱们再回到前面createServer
的介绍,其监听了connection
事件,因此最终流程走下来createServer
的的方法参数将被触发,而且能够拿到一个nodejs的Socket对象进行write与read操做,与客户端进行通讯。socket
至此咱们已经对nodejs启动一个TCP服务的流程有了了解,接下来就到主题cluster了。
开始说代码以前,先来聊一聊喂鸽子吧。假设你坐在布拉格广场前静静地坐着,而后往前面撒了一把狗粮,喔不对是鸽粮,而后周围的一群鸽子都震惊了并往你这边飞抢东西吃。这个现象能够用一个词来形容就是“惊群“。然而这只是个人瞎掰,咱们程序员理解的惊群应该是:多个进程/线程同时阻塞等待某个事件,当事件发生时唤醒了全部等待的进程/线程,但最终只有一个能对事件进行处理。很明显这对cpu形成了浪费,而cluster的多进程模型对此作了处理:只用一个master进程等待请求,而后有请求到来时使用round-robin轮询分配请求给各个子进程进行处理,这块后面提到的源码会涉及到,这里就不深刻了。除了round-robin,还有其余的一些cluster为咱们作的,就用代码来talk吧:
const cluster = require('cluster'); const http = require('http'); if (cluster.isMaster) { const numCPUs = require('os').cpus().length; for (let i = 0; i < numCPUs; i++) { cluster.fork(); } } else { // Worker processes have a http server. http.Server((req, res) => { res.writeHead(200); res.end('hello world\n'); }).listen(8000); }
以上代码就是cluster的典型用法,在nodejs启动文件判断当前进程,若是当前进程是master进程,那么就根据cpu的核数fork出相同数量的进程,不然(worker进程)就启动一个http服务,因此通常这样会给一个核心分配一个worker进程来启动一个服务,搭起一个小服务集群。可是问题来了,为何这里能够有多个进程同时监听一个端口呢,是由于listen作的一些文章,下面再一步步深刻解析。因为http.Server实际上是继承了net.Server,因此跟前面建立TCP服务同样,listen
最终也是调用到listenInCluster
,咱们从这里从新开始。
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive) { // ... const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags: 0 }; // Get the master's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle); // ... }
listenInCluster
在worker进程中调用cluster._getServer
,而且传入了一个函数listenOnMasterHandle
。这里还不知道它作了什么,因此再进入cluster._getServer
看看(因为当前是在worker进程,cluster模块文件是lib/internal/cluster/child.js
):
cluster._getServer = function(obj, options, cb) { // ... const message = util._extend({ act: 'queryServer', index: indexes[indexesKey], data: null }, options); send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); // ... };
关注send
方法,它调用了sendHelper
方法,该方法是在internal/cluster/utils
定义的,至关一个消息转发器处理进程间通讯,它发送一个“进程内部消息“(internalMessage
),而worker进程在master进程被fork出来的时候监听了internalMessage
:
// lib/internal/cluster/master.js worker.process.on('internalMessage', internal(worker, onmessage));
因此最终在worker进程发送的消息,触发了master进程执行了onmessage
方法,onmessage
判断message.act === 'queryServer'
执行queryServer
,而就是在这个方法中,新建了一个RoundRobinHandle
调度器,就是这个东西分配请求作了负载均衡。这里用地址和端口号做为key将调度器存储起来,调度器不会被worker建立两次,最后将worker进程add到队列。相关代码以下:
// lib/internal/cluster/master.js function queryServer(worker, message) { // ... var handle = handles[key]; if (handle === undefined) { var constructor = RoundRobinHandle; // ... handles[key] = handle = new constructor(key, message.address, message.port, message.addressType, message.fd, message.flags); } // ... // Set custom server data handle.add(worker, (errno, reply, handle) => { // ... }); }
而后咱们再来看看RoundRobinHandle
,它里面调用net.createServer
方法新建了一个server,而且开始监听,这块能够看前面内容。不过与前面不一样的是,server在listening
事件完成时拿到监听端口的那个特殊socket句柄,重置了onconnection
方法,当新的链接创建时方法被调用,将accept链接的socket句柄分发到队列里的worker进行处理(distribute)。对于listening
事件,它在Server.listen
执行后就会触发,代码就在setupListenHandle
方法里面。RoundRobinHandle
代码以下:
// lib/internal/cluster/round_robin_handle.js function RoundRobinHandle(key, address, port, addressType, fd) { // ... this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd }); else if (port >= 0) this.server.listen(port, address); else this.server.listen(address); // UNIX socket path. this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); // ... }); } RoundRobinHandle.prototype.distribute = function(err, handle) { this.handles.push(handle); const worker = this.free.shift(); if (worker) this.handoff(worker); }; RoundRobinHandle.prototype.handoff = function(worker) { // ... const message = { act: 'newconn', key: this.key }; sendHelper(worker.process, message, handle, (reply) => { // ... }); };
从代码上看到最终调度器调用handoff
方法,经过sendHelper
向worker进程发送一个新链接到达的消息newconn
,执行worker进程的server的onconnection
方法,worker进程相关代码以下:
// lib/internal/cluster/child.js cluster._setupWorker = function() { // ... process.on('internalMessage', internal(worker, onmessage)); send({ act: 'online' }); function onmessage(message, handle) { if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') _disconnect.call(worker, true); } }; // Round-robin connection. function onconnection(message, handle) { const key = message.key; const server = handles[key]; const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle); }
走到这里worker进程的server就拿到了链接的socket句柄能够进行处理,可是好像有点问题,worker进程的server好像还没起起来啊,前面讲的只是在master进程的调度器启动了一个server,worker进程并无server。咱们又得翻回前面的内容看一看了,看看以前提到的workder进程的cluster._getServer
,里面send
方法发送了一个函数,函数里面的rr(reply, indexesKey, cb);
就是建立了workder进程server的代码。
先来看看cluster._getServer
中发送的函数怎么被调用的。这里须要来了解一下以前出现了几回的sendHelper
,它是cluster模块用来作进程间通讯的,另外还有一个internal
方法用来处理通讯的回调。cluster._getServer
的send
会调用sendHelper
,它会用message.seq
当key把send的函数存储起来。而后在internal
方法处理通讯的回调时判断message是否有这个key,是否能找到这个函数,能够的话就执行。而在master进程执行queryServer
把worker进程添加到调度器中时添加了一个回调函数,回调函数send了一个带seq的消息,而且handle为null,就是这个消息触发了cluster._getServer
发送的函数的执行。相关代码以下:
// `internal/cluster/utils.js` const callbacks = {}; var seq = 0; function sendHelper(proc, message, handle, cb) { // ... if (typeof cb === 'function') callbacks[seq] = cb; message.seq = seq; // ... } function internal(worker, cb) { return function onInternalMessage(message, handle) { // ... var fn = cb; if (message.ack !== undefined && callbacks[message.ack] !== undefined) { fn = callbacks[message.ack]; delete callbacks[message.ack]; } // ... }; } // lib/internal/cluster/master.js function queryServer(worker, message) { // ... // Set custom server data handle.add(worker, (errno, reply, handle) => { reply = util._extend({ // ... ack: message.seq, // ... }, reply); // ... send(worker, reply, handle); });
最终,rr(reply, indexesKey, cb);
执行,它构造了一个假的socket句柄,句柄设置了一个不作操做的listen方法。而后执行cb,这个cb也就是前面提到过的listenOnMasterHandle
,它会把假socket句柄赋值给worker进程的server._handle
,随后因为server._handle
的存在,server._listen2(address, port, addressType, backlog, fd);
也不会作任何操做,也就是说worker进程建立的server是不会对端口进行监听的。相关代码以下:
// lib/internal/cluster/child.js function rr(message, indexesKey, cb) { function listen(backlog) { // ... return 0; } // ... cb(0, handle); } // lib/net.js function listenOnMasterHandle(err, handle) { // ... server._handle = handle; server._listen2(address, port, addressType, backlog, fd); } // setupListenHandle就是_listen2 function setupListenHandle(address, port, addressType, backlog, fd) { // ... if (this._handle) { debug('setupListenHandle: have a handle already'); } // ...
至此,cluster模块如何创建多进程服务的就算讲完了。画个草图总结下吧: