目录html
示例代码托管在:http://www.github.com/dashnowords/blogs前端
博客园地址:《大史住在大前端》原创博文目录git
华为云社区地址:【你要的前端打怪升级指南】github
阅读本章须要先阅读本系列前两章内容预热一下。windows
前两篇博文中已经分别介绍了使用cluster
模块创建集群时主进程执行cluster.fork( )
方法时的执行逻辑,以及net
模块在不一样场景下创建通信的基本原理。本篇继续分析cluster
模块,从第一个子进程开始创建服务器讲起,cluster
基本用法示例代码再来一遍:数组
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { console.log(`主进程 ${process.pid} 正在运行`); // 衍生工做进程。 for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`工做进程 ${worker.process.pid} 已退出`); }); } else { // 工做进程能够共享任何 TCP 链接。 // 在本例子中,共享的是 HTTP 服务器。 http.createServer((req, res) => { res.writeHead(200); res.end('你好世界\n'); }).listen(8000); console.log(`工做进程 ${process.pid} 已启动`); }
代码是足够精简的,实现过程也确实是很庞大的工程。每个子进程中执行的逻辑都是http.createServer().listen()
,咱们来看看它是如何一步一步运做而最终创建通信机制的,你会发现它和上一节中的简易模型很是类似。缓存
在http
模块的源码中很容易找到http.createServer( )
方法的逻辑就是透传参数生成了一个net.Server
实例,这个实例在上一节中就已经介绍过,实际上就只是生成了一个server
的实例,因此这里跳转到net.Server.prototype.listen()
(net.js
文件1306-1404行),基本逻辑以下:服务器
Server.prototype.listen = function(...args){ const normalized = normalizeArgs(args); var options = normalized[0]; /*..获取监听参数中的句柄对象..*/ options = options._handle || options.handle || options; //若是options上有句柄,句柄是一个TCP实例 if(options instanceof TCP){ //...... listenInCluster(......); } //若是配置参数中有fd(file descriptor) if(typeof options.fd === 'number' && options.fd >=0){ //...... listenInCluster(......); } //若是参数中有port端口号 if(typeof options.port === 'number' || typeof options.port === 'string'){ //..... listenInCluster(......); } //若是参数中有port端口号 或 字符型的pipe名称 if(typeof options.port === 'number' || typeof options.port === 'string'){ //..... listenInCluster(......); } }
这里不难看出它的逻辑就和net
模块官方文档中描述的server.listen( )
的几种场景对应,能够监听带有非空handle
属性的句柄对象,数字型端口号,字符串型命名管道地址,或者直接传入配置参数合集options
,而后分别根据几种不一样的状况来调用listenInCluster
方法(集群功能的逻辑主线是数字型port,假设传入了12315
)。并发
listenInCluster
方法定义以下:socket
大体能够看出,若是是主进程,就直接调用server._listen2()
方法而后return
了,不然(也就是在工做进程中的逻辑,敲黑板!!!这里是重点了),构造一个serverQuery
的参数集,能够看到里面记录了以各类不一样姿式调用这个方法时传入的参数,因此有的参数为null
也很正常,而后调用了cluster._getServer( )
方法,这就是工做进程在引用cluster
模块时引入的child.js
中定义并挂载在cluster
上的方法,最后一个参数listenOnMasterHandle
是一个回调函数,也是一个错误前置风格的函数,能够看到,它接收了一个句柄对象,并把这个句柄对象挂载在了子进程这个server
实例的_handle
属性上,接着也调用了server._listen2( )
方法,能够看到两种状况下调用这个方法时传入的参数是同样的。接着来到server._listen2( )
方法,它绑定了setupListenHandle
方法(别抓狂,这是net
模块中相关逻辑的最后一步了),简化代码以下:
function setupListenHandle(......){ if (this._handle) { //工做进程在执行上一步逻辑时,在cluster._getServer()回调函数中把一个handle传递给了server._handle debug('setupListenHandle: have a handle already'); } else { //主进程会执行的逻辑 debug('setupListenHandle: create a handle'); //...... rval = createServerHandle(address, port, addressType, fd, flags); //...... this._handle = rval; } //...... this._handle.onconnection = onconnection; this._handle[owner_symbol] = this; //.... }
工做进程经过cluster._getServer( )
方法拿到了一个handle
,因此不会再生成,而主进程server.listen(port)
执行时会走到else分支,而后生成一个新的绑定了端口号的特殊的socket句柄而后挂载到主进程server._handle
上,这里对句柄的connection
事件回调逻辑进行了修改,相关代码以下:
这里须要注意的是,server._handle
的connection
事件和server
的connection
事件是两码事,server._handle
指向的是一个绑定了端口的特殊的socket
句柄,当客户端connect一个server
时实际上底层是客户端socket
与服务端这个socket
的对接,因此须要在server._handle
这个的connection
回调函数中,将客户端的socket
句柄clientHandle
从新包装,而后再经过触发server
的connection
事件将其转发给server
实例。因此在使用server
实例时能够直接添加connectionListener
:
let server = net.createServer(socket=>{ /*这个回调函数就是server的connection事件回调 * 这里接收到的socket就是server._handle的connection收到的客户端句柄clientHandle封装成的socket实例 */ })
不管是主进程仍是子进程都会触发这个逻辑,只须要当作是一种功能性质的封装便可,并不影响业务逻辑。
下面回到cluster
模块继续,_getServer( )
方法只存在于子进程代码中,源码位于lib/internal/cluster/child.js
,方法定义在54-106行,基本逻辑以下:
cluster._getServer = function(obj, options, cb){ /* 这里的obj就是子进程中运行上面listenInCluster方法中传入的server, * options就是serverQuery, * cb就是最后要把主进程handle传回去的回调函数listenOnMasterHandler */ //先构建index而后进行了一通记录,就是根据监听的参数来构建一个识别这个server的索引 //而后构建消息 const message = { act: 'queryServer', index, data: null, ...options }; //...... /* 发送act:queryServer消息,并传一个回调函数, * 从形参命名就能够看出,这个回调函数被调用时会被传入一个句柄, * 最后根据不一样的调度策略来执行不一样的函数,这里主要看Round-robin */ send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); //...... }
rr
方法将响应reply
和前一个调用者传入的回调函数cb
进行了透传,rr
的函数体就是实现listen
方法偷梁换柱的地方了:
// Round-robin. Master distributes handles across workers. function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); var key = message.key; function listen(backlog) { return 0; } function close() { if (key === undefined) return; send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); key = undefined; } function getsockname(out) { if (key) Object.assign(out, message.sockname); return 0; } const handle = { close, listen, ref: noop, unref: noop }; if (message.sockname) { handle.getsockname = getsockname; // TCP handles only. } assert(handles.has(key) === false); handles.set(key, handle); cb(0, handle); //这里的cb其实就是listenInCluster方法中定义的那个listenOnMasterHandler回调 }
能够看到rr
方法中构建了一个假的handle
句柄,并调用cb将它传了回去,而后执行逻辑回回到net
模块,前文已经提这个handle在回调函数中被挂载在了server._handle
上,因而setupListenHandle( )
的逻辑中也不会从新构建句柄。
从新梳理一下这部分的逻辑,就是子进程中调用listen
方法时,会经过cluster._getServer( )
拿到一个假句柄,而后执行一个空的listen
方法,这样就避免了端口的重复监听。因此咱们能够推测,cluster._getServer( )
必然会触发主进程启动一个监听端口的服务器,并创建对子进程的调度,进程之间的IPC通信能够直接经过process
对象来完成,不须要再从新构建跨进程通信管道。
继续进行后续内容前,先来看一个独立的跨进程通信工具,源码放在lib/internal/cluster/utils.js
。
它是cluster
模块发送跨进程消息的内部代理,这个模块对外暴露了消息发送方法sendHelper
和内部消息监听器的预处理方法internal
,源码很短就不贴了。当子进程调用sendHelper
发送消息时,utils
内部会把这条消息处理完成后须要执行的回调函数先缓存起来,而后给消息添加一些包装标记,而后再发出去;internal
会对传入的内部消息监听器进行代理,过滤掉非NODE_CLUSTER
类别的消息,若是消息携带的message
对象没有ack
属性则最终会执行绑定监听时传入的回调函数,不然会从缓存中找出以前暂存的回调函数来执行。
发个消息为何要搞这么复杂呢?这个ack
属性又是哪来的呢?其实这个utils
模块主要是在跨进程的双向消息通信时实现了方法复用,同一个message
从工做进程发往主进程时和主进程发回给工做进程时是由同一个事件名internalMessage
携带的,那如何来区分消息发送的方向呢,就是ack
属性,若是消息带有ack
属性,就表示它是由主进程发给子进程的,那么就要调用子进程中的后续处理方法,这个方法其实就是子进程发送消息给主进程以前暂存在utils
内部callbacks
里的方法,也就是child.js
中cluster._getServer()
中调用send
方法时传入的回调方法,也就是net
模块中listenInCluster( )
方法中的listenOnMasterHandle
方法,这个方法漂洋过海透传了N个函数,的确不容易看懂,“回调地狱”也的确不是闹着玩的。再看看没有ack
属性的状况,没有这个属性时消息是从子进程发给主进程的,天然要调用主进程的方法,从逻辑里不难看出,这种状况下方法引用的就是internal
方法执行时传入的第二个参数(master.js
源码213行执行的internal(worker, onmessage)
的onmessage
这个函数),源码中就是利用高阶函数这种分步执行的特色实现了引用。
故事再回到第三节工做进程中发出act:'queryServer
的消息后,来看主进程master.js
中的代码,主进程中在调用cluster.fork( )
时就绑定了对worker线程internalMessage
的监听,对于act:queryServer
类型的集群消息,主进程已经定义了queryServer
这个方法来处理。这段源代码的主要逻辑以下:
1.根据重要参数组拼接出一个惟一的key 2.1.根据key查询是否有已经存在的调度句柄round-robin-handle,若是有则直接进行后续逻辑 2.2.若是没有已经存在的调度句柄,则选择调度策略,实例化一个调度句柄,并把它添加进记录里 3.把消息数据message.data挂载在调度句柄的handle.data字段上 4.执行调度句柄的add方法,把子进程和一个回调方法传进实例,回调方法被执行时会从调度句柄中取得数据,并组装返回消息(带有ack属性和其余数据的消息)发给子进程,子进程收到这个消息后执行的方法,就是前文分析过的返回假句柄给net模块中的`listenInCluster()`逻辑。
从开篇的多进程代码能够看到,每一个子进程中执行的listen
方法监听的端口号都是同样的,因此每一个子进程发送queryServer
消息给主进程并执行这段逻辑时,其实对应的key
都是同样的,因此调度对象RoundRobinHandle
只会实例化一次,在以后的过程当中,每个子进程会根据key
获取到同一个调度实例,并调用add
方法将worker
对象和一个回调函数添加进调度实例,能够看到回调函数执行时,就会将原message
中的seq
属性的值添加给ack
属性再挂载上处理后的数据并发送给子进程。那么剩下的事情,就剩下调度对象RoundRobinHandle
的源码了。
咱们不妨来推测一下,它的主要逻辑就是在主进程中创建真正监听目标端口的服务器,并添加当客户端请求到达时对于工做进程的调度代码,下一节咱们就一块儿来验证一下。
调度方法的源码是internal/cluster/round_robin_handle.js
,另外一种shared_handle.js
是windows下使用的调度策略,先不作分析(主要是没研究过,不敢瞎说)。先从构造函数开始:
16行,bingo,终于看到主进程启动服务器了。接着就是根据参数而分流的监听方法,集群代码中对应的是20行的带有有效port
参数的状况,因此服务器就在主进程启动了,最后来看看server
开始触发listening
事件时执行的逻辑(此处调用的是once
方法,因此只会执行一次):
1.将主进程server的内部_handle句柄,挂载给round-robin-handle实例 2.当这个句柄被链接时(也就是客户端socket执行connect方法链接后),会触发它的`connection`事件,回调函数会调用`distribute`方法来分发这个客户端socket句柄,注意32行后面半句的箭头函数方法,这里的handle就是指客户端`socket`实例。 3.将server._handle指向null 4.将server属性指向null
若是你还记得net
模块中listen
方法的逻辑的话可能会有印象,_handle
的connection
事件回调其实本来已经被复写过一次了,也就是说单进程运行的程序在创建服务器时,server._handle
的connection
事件会触发server
实例的connection
事件,而在集群模式下,主进程中调度实例中服务器句柄server._handle
的connection
再次被复写,将逻辑改变为分发socket
,而子进程中的server._handle
仍是保持原来的逻辑。
最后一步指向null
的逻辑还涉及到add
方法,继续看主进程中调用的add
方法:
这个send
形参实际上就是主进程中传入的最终向子进程发送返回消息的那个回调函数,它被封装进了done
函数,这里须要着重看一下55行的逻辑,this.server === null
这个条件实际上对应的就是构造函数中服务器开始监听的事件,因此55-59行的代码以及构造函数中添加的listening
事件的回调函数须要联合在一块儿来理解,也就是每一个子进程的send
方法都被包裹在一个独立的done
函数中,这个函数会在主进程的server
处于listening
状态后触发执行,而且只触发一次。当它触发时,会从实例的handle
属性(也就是server
的_handle
句柄)上取得socket
名称而后调用send
方法,这个特殊socket
的名称在回调函数中对应reply
形参,最终挂载在message
中发回了子进程。
至此其实主进程和子进程创建服务器的消息已经完成了闭环。最后再看一下RoundRobinHandle
中最后两个方法:
当客户端socket
执行connect
方法链接到主进程server
的句柄后,主进程会调用round-robin-handle
实例的distribute
方法,这个方法的逻辑比较简单,把这个客户端句柄加入到待处理队列,而后从空闲进程队列头部取出一个worker
进程,把它做为参数传给handoff
方法。
handoff
方法中,从客户端请求句柄队列的头部取出下一个待处理的socket
,若是已经没有要处理的请求,就把传进来的worker
放回空闲子进程队列free
中。在add
方法内部封装的done
方法中也执行了这个handoff
方法,如今再回过头来看这个add
方法的做用,就是当主进程处于监听状态后,将每个子进程对象worker
依次添加到空闲进程队列free
中。最后够早了一个新的act:newconn
消息,并经过调度选出的worker.process
对象实现跨进程通信来将待处理句柄和【新链接】消息发送给子进程。
集群创建过程的逻辑大体的跳转路径以下,细节部分直接参考前文的讲解便可。