理解Egg.js中的多进程模型(egg-cluster)

背景

咱们知道,js是单线程的,意味着一个nodejs进程只能运行在单个cpu上面。nodejs在io处理方面是很是优秀的,可是在密集运算型应用中仍然有不足之处,而解决的办法,就是利用多核cpu的并发优点,将nodejs进程运行在各个cpu上面。而egg为咱们提供了egg-cluster模块,用于多进程管理以及进程间通讯。html

介绍(egg的多进程模型)

egg-cluster介绍

借用官网的文档,cluster是什么呢? 简单的说:node

  • 在服务器上同时启动多个进程。
  • 每一个进程里都跑的是同一份源代码(比如把之前一个进程的工做分给多个进程去作)。
  • 更神奇的是,这些进程能够同时监听一个端口。

其中:git

  • 负责启动其余进程的叫作 Master 进程,他比如是个『包工头』,不作具体的工做,只负责启动其余进程。
  • 其余被启动的叫 Worker 进程,顾名思义就是干活的『工人』。它们接收请求,对外提供服务。
  • Worker 进程的数量通常根据服务器的 CPU 核数来定,这样就能够完美利用多核资源。

ps: 由于官网讲得很详细,因此这一部分都是借鉴官网的。github

多进程模型

下面,咱们经过文档中的图,来看看多进程模型windows

与咱们使用pm2开启进程守护相似。master进程充当了进程管理的工做,不会运行任何业务代码,它负责agent,worker进程的start,reload工做以及进程间消息转发等工做。
而看到这里,相信有部分读者会有疑问,为何须要agent进程呢?
那么文档上其实也作出了详细的说明。答案就是

有些工做其实不须要每一个 Worker 都去作,若是都作,一来是浪费资源,更重要的是可能会致使多进程间资源访问冲突api

Agent
在大部分状况下,咱们在写业务代码的时候彻底不用考虑 Agent 进程的存在,可是当咱们遇到一些场景,只想让代码运行在一个进程上的时候,Agent 进程就到了发挥做用的时候了。 因为 Agent 只有一个,并且会负责许多维持链接的脏活累活,所以它不能轻易挂掉和重启,因此 Agent 进程在监听到未捕获异常时不会退出,可是会打印出错误日志,咱们须要对日志中的未捕获异常提升警戒。bash

为何不会端口冲突?

Q: 当fork进程时,明明代码中已经监听了一个端口,为何fork时没有报端口占用?
A: cluster的工做原理推荐这一篇文章《经过源码解析 Node.js 中 cluster 模块的主要功能实现》, 结合朴灵老师的《深刻浅出nodejs》中的多进程架构,这里作一下总结:服务器

  1. 在master-worker模式中,建立子进程后,父子进程将会建立ipc通道,进程间经过ipc通道,使用message和send进行消息传递。用法以下:
// parent.js
var n = require('child_process').fork(__dirname + 'child.js')
n.on('message', function(m){
    console.log('parent get msg:'+  m)  
})
n.send({hello: 'world'})
// child.js
process.on('message', function(m){
    console.log('child get msg' + m)
})
process.send({hello: 'world'})
复制代码
  1. 为了解决端口不能重复监听的问题,在nodev0.5.9中引入了进程间发送句柄的功能(句柄是一种能够用来标识资源的引用,它的内部 包含了指向对象的文件描述符,好比句柄能够用来标识一个socket对象 ,一个UDP套接字,一个管道等)。send方法出了能够发送数据,还能够发送句柄。
child.send(params, [sendHandle])
复制代码

详细用法:架构

// parent.js
var n = require('child_process').fork(__dirname + 'child.js')
var server = require('net').createServer();
server.listen(8080, function(){
    n.send('server', server)
})
// child.js
process.on('message', function(m, server){
    server.on('connection', function(socket){
        console.log('child get msg' + m)
        socket.end('handle by child process')
    })
})
复制代码

经过传递 TCP server,咱们能够发现,没有异常了,多个子进程能够监听同个端口 。 在node句柄发送的过程当中,多个进程能够监听到相同的端口,而不引发EADDRINUSE异常,这是由于,咱们独立启动的进程中,tcp服务端套接字socket的文件描述符并不相同,致使监听相同的端口时会抛出异常,因为独立启动的进程互相之间并不知道文件描述符,因此监听相同端口就会失败,但对于send()发送的句柄还原出来的服务而言,他们的文件描述符是相同的,因此监听相同端口不会引发异常。并发

总结下来,一句话:经过进程间ipc通讯传递句柄从而共享文件描述符

进程的启动顺序

egg-cluster/master.js中承担了初始化,启动agent和app进程,检测状态等工做,只详细讲解一下master.js中作了什么?咱们看一下构造函数中的代码,整个流程在constructor中已经很好的提现出来了。

constructor(options) {
  super();
  this.options = parseOptions(options);
  // new 一个 Messenger 实例
  this.messenger = new Messenger(this);
  // 借用 ready 模块的方法
  ready.mixin(this);
  this.isProduction = isProduction();
  this.isDebug = isDebug();
  ...
  ...
  // 根据不一样运行环境(localtest、prod)设置日志输出级别
  this.logger = new ConsoleLogger({ level: process.env.EGG_MASTER_LOGGER_LEVEL || 'INFO' });
  ...
}
// master 启动成功后通知 parent、app worker、agent
this.ready(() => {
  this.isStarted = true;
  const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
  this.logger.info('[master] %s started on %s://127.0.0.1:%s (%sms)%s',
  frameworkPkg.name, this.options.https ? 'https' : 'http',
  this.options.port, Date.now() - startTime, stickyMsg);

  const action = 'egg-ready';
  this.messenger.send({ action, to: 'parent' });
  this.messenger.send({ action, to: 'app', data: this.options });
  this.messenger.send({ action, to: 'agent', data: this.options });
});
// 监听 agent 退出
this.on('agent-exit', this.onAgentExit.bind(this));
// 监听 agent 启动
this.on('agent-start', this.onAgentStart.bind(this));
// 监听 app worker 退出
this.on('app-exit', this.onAppExit.bind(this));
// 监听 app worker 启动
this.on('app-start', this.onAppStart.bind(this));
// 开发环境下监听 app worker 重启
this.on('reload-worker', this.onReload.bind(this));

// 监听 agent 启动,注意这里只执行一次
this.once('agent-start', this.forkAppWorkers.bind(this));
// master监听自身的退出及退出后的处理

// kill(2) Ctrl-C     监听 SIGINT 信号
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\     监听 SIGQUIT 信号
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default   监听 SIGTERM 信号
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));

// 监听 exit 事件
process.once('exit', this.onExit.bind(this));
// 监听端口冲突
detectPort((err, port) => {
  /* istanbul ignore if */
  if (err) {
    err.name = 'ClusterPortConflictError';
    err.message = '[master] try get free port error, ' + err.message;
    this.logger.error(err);
    process.exit(1);
    return;
  }
  this.options.clusterPort = port;
  this.forkAgentWorker(); // 若是端口没有冲突则执行该方法
});
复制代码

master继承eventEmitter,使用发布订阅模式监听消息。 构造函数中的流程以下:

  1. 使用detect-port来获取空闲的端口
  2. forkAgentWorker使用child_process.fork()来启动agent进程,启动后经过 process.send通知master agent已经启动
agent.ready(() => {
 agent.removeListener('error', startErrorHandler);
 process.send({ action: 'agent-start', to: 'master' });
});
复制代码
  1. forkAppWorkers: agent启动后,经过 cluster.fork()启动app_worker进程。
// fork app workers after agent started
this.once('agent-start', this.forkAppWorkers.bind(this));
复制代码

这里是使用了cfork模块,其本质也是cluster.fork(),默认启动进程数为os.cpu.length,也能够经过启动参数来指定worker进程的数量。

cfork({
  exec: this.getAppWorkerFile(),
  args,
  silent: false,
  count: this.options.workers,
  // don't refork in local env refork: this.isProduction, windowsHide: process.platform === 'win32', }); 复制代码

启动成功后,经过messenger告知master,worker进程已经ready

this.messenger.send({
    action: 'app-start',
    data: {
      workerPid: worker.process.pid,
      address,
    },
    to: 'master',
    from: 'app',
  });
复制代码
  1. onAppStart: app worker 启动成功后通知 agent。并告知parent,egg-ready了,并带上port,address,protocol等参数
this.ready(() => {
      this.isStarted = true;
      const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
      this.logger.info('[master] %s started on %s (%sms)%s',
        frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);

      const action = 'egg-ready';
      this.messenger.send({
        action,
        to: 'parent',
        data: {
          port: this[REAL_PORT],
          address: this[APP_ADDRESS],
          protocol: this[PROTOCOL],
        },
      });
      this.messenger.send({
        action,
        to: 'app',
        data: this.options,
      });
      this.messenger.send({
        action,
        to: 'agent',
        data: this.options,
      });

      // start check agent and worker status
      if (this.isProduction) {
        this.workerManager.startCheck();
      }
    });
复制代码
  1. startCheck: 若是在生产环境,,每隔10s检测agent和worker,若有异常则上报。
// check agent and worker must both alive
  // if exception appear 3 times, emit an exception event
  startCheck() {
    this.exception = 0;
    this.timer = setInterval(() => {
      const count = this.count();
      if (count.agent && count.worker) {
        this.exception = 0;
        return;
      }
      this.exception++;
      if (this.exception >= 3) {
        this.emit('exception', count);
        clearInterval(this.timer);
      }
    }, 10000);
  }
复制代码

egg文档上的流程图很好的总结了以上过程:

进程间消息通信

进程间通讯原理(ipc)

IPC的全称是Inter-Process Communication,即进程间通讯。进程间通讯的目的是为了让不一样的进程可以互相访问资源,并进程协调工做。实现进程间通讯的技术有不少,如命名管道、匿名管道、socket、信号量、共享内存、消息队列、Domain Socket等,node中实现IPC通道的是管道技术(pipe)。在node中管道是个抽象层面的称呼,具体细节实现由libuv提供,在win下是命名管道(named pipe)实现,在*nix下,采用unix Domain Socket来实现。

Q:那么,进程间是如何经过ipc通道去连接的呢?

父进程在实际建立子进程前,会建立IPC通道并监听它,而后才真正建立出子进程,并经过环境变量(NODE_CHANNEL_FD)告诉子进程这个IPC通讯的文件描述符。子进程在启动的过程当中,根据文件描述符去链接这个已存在的IPC通道,从而完成父子进程之间的链接。

egg-cluster中的进程间通讯

cluster 的 IPC 通道只存在于 Master 和 Worker/Agent 之间,Worker 与 Agent 进程互相间是没有的。那么 Worker 之间想通信该怎么办呢?是的,经过 Master 来转发

在egg-cluster的源码中,封装了一个messageer类来处理进程间通讯, 代码传送门

show the code

/**
 * master messenger,provide communication between parent, master, agent and app.
 *             ┌────────┐
 *             │ parent │
 *            /└────────┘\
 *           /     |      \
 *          /  ┌────────┐  \
 *         /   │ master │   \
 *        /    └────────┘    \
 *       /     /         \    \
 *     ┌───────┐         ┌───────┐
 *     │ agent │ ------- │  app  │
 *     └───────┘         └───────┘
 */
  send(data) {
    if (!data.from) {
      data.from = 'master';
    }

    // recognise receiverPid is to who
    if (data.receiverPid) {
      if (data.receiverPid === String(process.pid)) {
        data.to = 'master';
      } else if (data.receiverPid === String(this.master.agentWorker.pid)) {
        data.to = 'agent';
      } else {
        data.to = 'app';
      }
    }

    // default from -> to rules
    if (!data.to) {
      if (data.from === 'agent') data.to = 'app';
      if (data.from === 'app') data.to = 'agent';
      if (data.from === 'parent') data.to = 'master';
    }

    // app -> master
    // agent -> master
    if (data.to === 'master') {
      debug('%s -> master, data: %j', data.from, data);
      // app/agent to master
      this.sendToMaster(data);
      return;
    }

    // master -> parent
    // app -> parent
    // agent -> parent
    if (data.to === 'parent') {
      debug('%s -> parent, data: %j', data.from, data);
      this.sendToParent(data);
      return;
    }

    // parent -> master -> app
    // agent -> master -> app
    if (data.to === 'app') {
      debug('%s -> %s, data: %j', data.from, data.to, data);
      this.sendToAppWorker(data);
      return;
    }

    // parent -> master -> agent
    // app -> master -> agent,可能不指定 to
    if (data.to === 'agent') {
      debug('%s -> %s, data: %j', data.from, data.to, data);
      this.sendToAgentWorker(data);
      return;
    }
  }
复制代码
  1. app/agent -> master: 经过master.emit(data.action, data.data)(master 继承自 EventEmitter)
  2. app/master/agent -> parent: process.send(data)
  3. parent/agent -> master -> app: sendmessage(worker, data)
  4. parent/agent -> master -> agent: sendmessage(this.master.agentWorker, data)

注: [sendmessage]是一个苏千大大写的一个用于处理进程间通讯的module(Send a cross process message if message channel is connected.),感兴趣的同窗能够看看源码https://github.com/node-modules/sendmessage/blob/master/index.js

可能有同窗会想,为何多了一个parent?

原来,parent就是master进程的parent进程,通常是CLI,好比egg-script start和egg-bin,egg-script中经过('child_process').spawn 建立的master进程。child_process.spawn() 方法使用给定的 command 衍生一个新进程,并带上 args 中的命令行参数。同时,经过传递detached参数,可使得在父进程退出后子进程继续执行。
spawn文档传送门

(感谢@天猪 的解答,源码连接

Ref

相关文章
相关标签/搜索