Node.js - 阿里Egg的多进程模型和进程间通信

前言

最近用Egg做为底层框架开发项目,好奇其多进程模型的管理实现,因而学习了解了一些东西,顺便记录下来。文章若有错误, 请轻喷html

为何须要多进程

伴随科技的发展, 如今的服务器基本上都是多核cpu的了。然而,Node是一个单进程单线程语言(对于开发者来讲是单线程,实际上不是)。咱们都知道,cpu的调度单位是线程,而基于Node的特性,那么咱们每次只能利用一个cpu。这样不只仅利用率极低,并且容错更是不能接受(出错时会崩溃整个程序)。因此,Node有了cluster来协助咱们充分利用服务器的资源。前端

cluster工做原理
关于cluster的工做原理推荐你们看这篇文章,这里简单总结一下:node

  1. 子进程的端口监听会被hack掉,而是统一由master的内部TCP监听,因此不会出现多个子进程监听同一端口而报错的现象。
  2. 请求统一通过master的内部TCP,TCP的请求处理逻辑中,会挑选一个worker进程向其发送一个newconn内部消息,随消息发送客户端句柄。(这里的挑选有两种方式,第一种是除Windows外全部平台的默认方法循环法,即由主进程负责监听端口,接收新链接后再将链接循环分发给工做进程。在分发中使用了一些内置技巧防止工做进程任务过载。第二种是主进程建立监听socket后发送给感兴趣的工做进程,由工做进程负责直接接收链接。)
  3. worker进程收到句柄后,建立客户端实例(net.socket)执行具体的业务逻辑,而后返回。

如图:
git

图引用 出处

多进程模型

先看一下Egg官方文档的进程模型github

+--------+          +-------+
                | Master |<-------->| Agent |
                +--------+          +-------+
                ^   ^    ^
               /    |     \
             /      |       \
           /        |         \
         v          v          v
+----------+   +----------+   +----------+
| Worker 1 |   | Worker 2 |   | Worker 3 |
+----------+   +----------+   +----------+
复制代码
类型 进程数量 做用 稳定性 是否运行业务代码
Master 1 进程管理,进程间消息转发 很是高
Agent 1 后台运行工做(长链接客户端) 少许
Worker 通常为cpu核数 执行业务代码 通常

大体上就是利用Master做为主线程,启动Agent做为秘书进程协助Worker处理一些公共事务(日志之类),启动Worker进程执行真正的业务代码。npm

多进程的实现

流程相关代码

首先从Master入手,这里暂时认为Master是最顶级的进程(事实上还有一个parent进程,待会再说)。后端

/**
 * start egg app
 * @method Egg#startCluster
 * @param {Object} options {@link Master}
 * @param {Function} callback start success callback
 */
exports.startCluster = function(options, callback) {
  new Master(options).ready(callback);
};
复制代码

先从Master的构造函数看起api

constructor(options) {
  super();
  // 初始化参数
  this.options = parseOptions(options);
  // worker进程的管理类 详情见 Manager及Messenger篇
  this.workerManager = new Manager();
  // messenger类, 详情见 Manager及Messenger篇
  this.messenger = new Messenger(this);
  // 设置一个ready事件 详情见get-ready npm包
  ready.mixin(this);
  // 是否为生产环境
  this.isProduction = isProduction();
  this.agentWorkerIndex = 0;
  // 是否关闭
  this.closed = false;
  ...

  接下来看的是ready的回调函数及注册的各种事件:
  this.ready(() => {
    // 将开始状态设置为true
    this.isStarted = true;
    const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
    this.logger.info('[master] %s started on %s (%sms)%s',
    frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);

    // 发送egg-ready至各个进程并触发相关事件
    const action = 'egg-ready';
    this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
    this.messenger.send({ action, to: 'app', data: this.options });
    this.messenger.send({ action, to: 'agent', data: this.options });
    // start check agent and worker status
    this.workerManager.startCheck();
    });
    // 注册各种事件
    this.on('agent-exit', this.onAgentExit.bind(this));
    this.on('agent-start', this.onAgentStart.bind(this));
    ...
    // 检查端口并 Fork一个Agent
    detectPort((err, port) => {
      ... 
      this.forkAgentWorker();
    }
  });
}
复制代码

综上, 能够看到Master的构造函数主要是初始化和注册各种相应的事件, 最后运行的是forkAgentWorker函数, 该函数的关键代码能够看到:bash

const agentWorkerFile = path.join(__dirname, 'agent_worker.js');
// 经过child_process执行一个Agent
const agentWorker = childprocess.fork(agentWorkerFile, args, opt);
复制代码

继续到agent_worker.js上面看,agent_worker实例化一个agent对象,agent_worker.js有一句关键代码:服务器

agent.ready(() => {
  agent.removeListener('error', startErrorHandler); // 清除错误监听的事件
  process.send({ action: 'agent-start', to: 'master' }); // 向master发送一个agent-start的动做
});
复制代码

能够看到, agent_worker.js中的代码向master发出了一个信息, 动做为agent-start, 再回到Master中, 能够看到其注册了两个事件, 分别为once的forkAppWorkers和 on的onAgentStart

this.on('agent-start', this.onAgentStart.bind(this));
this.once('agent-start', this.forkAppWorkers.bind(this));
复制代码

先看onAgentStart函数, 这个函数相对简单, 就是一些信息的传递:

onAgentStart() {
    this.agentWorker.status = 'started';

    // Send egg-ready when agent is started after launched
    if (this.isAllAppWorkerStarted) {
      this.messenger.send({ action: 'egg-ready', to: 'agent', data: this.options });
    }

    this.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] });
    // should send current worker pids when agent restart
    if (this.isStarted) {
      this.messenger.send({ action: 'egg-pids', to: 'agent', data: this.workerManager.getListeningWorkerIds() });
    }

    this.messenger.send({ action: 'agent-start', to: 'app' });
    this.logger.info('[master] agent_worker#%s:%s started (%sms)',
      this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
  }
复制代码

而后会执行forkAppWorkers函数,该函数主要是借助cforkfork对应的工做进程, 并注册一系列相关的监听事件,

...
cfork({
  exec: this.getAppWorkerFile(),
  args,
  silent: false,
  count: this.options.workers,
  // don't refork in local env refork: this.isProduction, }); ... // 触发app-start事件 cluster.on('listening', (worker, address) => { this.messenger.send({ action: 'app-start', data: { workerPid: worker.process.pid, address }, to: 'master', from: 'app', }); }); 复制代码

能够看到forkAppWorkers函数在监听Listening事件时,会触发master上的app-start事件。

this.on('app-start', this.onAppStart.bind(this));

...
// master ready回调触发
if (this.options.sticky) {
  this.startMasterSocketServer(err => {
    if (err) return this.ready(err);
      this.ready(true);
  });
} else {
  this.ready(true);
}

// ready回调 发送egg-ready状态到各个进程
const action = 'egg-ready';
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });

// start check agent and worker status
if (this.isProduction) {
  this.workerManager.startCheck();
}
复制代码

总结下:

  1. Master.constructor: 先执行Master的构造函数, 里面有个detect函数被执行
  2. Detect: Detect => forkAgentWorker()
  3. forkAgentWorker: 获取Agent进程, 向master触发agent-start事件
  4. 执行onAgentStart函数, 执行forkAppWorker函数(once)
  5. onAgentStart => 发送各种信息, forkAppWorker => 向master触发 app-start事件
  6. App-start事件 触发 onAppStart()方法
  7. onAppStart => 设置ready(true) => 执行ready的回调函数
  8. Ready() = > 发送egg-ready到各个进程并触发相关事件, 执行startCheck()函数
+---------+           +---------+          +---------+
|  Master |           |  Agent  |          |  Worker |
+---------+           +----+----+          +----+----+
     |      fork agent     |                    |
     +-------------------->|                    |
     |      agent ready    |                    |
     |<--------------------+                    |
     |                     |     fork worker    |
     +----------------------------------------->|
     |     worker ready    |                    |
     |<-----------------------------------------+
     |      Egg ready      |                    |
     +-------------------->|                    |
     |      Egg ready      |                    |
     +----------------------------------------->|
复制代码

进程守护

根据官方文档,进程守护主要是依赖于gracefulegg-cluster这两个库。

未捕获异常

  1. 关闭异常 Worker 进程全部的 TCP Server(将已有的链接快速断开,且再也不接收新的链接),断开和 Master 的 IPC 通道,再也不接受新的用户请求。
  2. Master 马上 fork 一个新的 Worker 进程,保证在线的『工人』总数不变。
  3. 异常 Worker 等待一段时间,处理完已经接受的请求后退出。
+---------+                 +---------+
|  Worker |                 |  Master |
+---------+                 +----+----+
     | uncaughtException         |
     +------------+              |
     |            |              |                   +---------+
     | <----------+              |                   |  Worker |
     |                           |                   +----+----+
     |        disconnect         |   fork a new worker    |
     +-------------------------> + ---------------------> |
     |         wait...           |                        |
     |          exit             |                        |
     +-------------------------> |                        |
     |                           |                        |
    die                          |                        |
                                 |                        |
                                 |                        |
复制代码

由执行的app文件可知, app其实是继承于Application类, 该类下面调用了graceful()

onServer(server) {
    ......
    graceful({
      server: [ server ],
      error: (err, throwErrorCount) => {
        ......
      },
    });
    ......
  }
复制代码

继续看graceful, 能够看到它捕获了process.on('uncaughtException')事件, 并在回调函数里面关闭TCP链接, 关闭自己进程, 断开与masterIPC通道。

process.on('uncaughtException', function (err) {
    ......
    // 对http链接设置 Connection: close响应头
    servers.forEach(function (server) {
      if (server instanceof http.Server) {
        server.on('request', function (req, res) {
          // Let http server set `Connection: close` header, and close the current request socket.
          req.shouldKeepAlive = false;
          res.shouldKeepAlive = false;
          if (!res._header) {
            res.setHeader('Connection', 'close');
          }
        });
      }
    });

    // 设置一个定时函数关闭子进程, 并退出自己进程
    // make sure we close down within `killTimeout` seconds
    var killtimer = setTimeout(function () {
      console.error('[%s] [graceful:worker:%s] kill timeout, exit now.', Date(), process.pid);
      if (process.env.NODE_ENV !== 'test') {
        // kill children by SIGKILL before exit
        killChildren(function() {
          // 退出自己进程
          process.exit(1);
        });
      }
    }, killTimeout);

    // But don't keep the process open just for that! // If there is no more io waitting, just let process exit normally. if (typeof killtimer.unref === 'function') { // only worked on node 0.10+ killtimer.unref(); } var worker = options.worker || cluster.worker; // cluster mode if (worker) { try { // 关闭TCP链接 for (var i = 0; i < servers.length; i++) { var server = servers[i]; server.close(); } } catch (er1) { ...... } try { // 关闭ICP通道 worker.disconnect(); } catch (er2) { ...... } } }); 复制代码

ok, 关闭了IPC通道后, 咱们继续看cfork文件, 即上面提到的fork worker的包, 里面监听了子进程的disconnect事件, 他会根据条件判断是否从新fork一个新的子进程

cluster.on('disconnect', function (worker) {
    ......
    // 存起该pid
    disconnects[worker.process.pid] = utility.logDate();
    if (allow()) {
      // fork一个新的子进程
      newWorker = forkWorker(worker._clusterSettings);
      newWorker._clusterSettings = worker._clusterSettings;
    } else {
      ......
    }
  });
复制代码

通常来讲, 这个时候会继续等待一会而后就执行了上面说到的定时函数了, 即退出进程

OOM、系统异常 关于这种系统异常, 有时候在子进程中是不能捕获到的, 咱们只能在master中进行处理, 也就是cfork包。

cluster.on('exit', function (worker, code, signal) {
    // 是程序异常的话, 会经过上面提到的uncatughException从新fork一个子进程, 因此这里就不须要了
    var isExpected = !!disconnects[worker.process.pid];
    if (isExpected) {
      delete disconnects[worker.process.pid];
      // worker disconnect first, exit expected
      return;
    }
    // 是master杀死的子进程, 无需fork
    if (worker.disableRefork) {
      // worker is killed by master
      return;
    }

    if (allow()) {
      newWorker = forkWorker(worker._clusterSettings);
      newWorker._clusterSettings = worker._clusterSettings;
    } else {
      ......
    }
    cluster.emit('unexpectedExit', worker, code, signal);
  });
复制代码

进程间通讯(IPC)

上面一直提到各类进程间通讯,细心的你可能已经发现 cluster 的 IPC 通道只存在于 Master 和 Worker/Agent 之间,Worker 与 Agent 进程互相间是没有的。那么 Worker 之间想通信该怎么办呢?是的,经过 Master 来转发。

广播消息: agent => all workers
                  +--------+          +-------+
                  | Master |<---------| Agent |
                  +--------+          +-------+
                 /    |     \
                /     |      \
               /      |       \
              /       |        \
             v        v         v
  +----------+   +----------+   +----------+
  | Worker 1 |   | Worker 2 |   | Worker 3 |
  +----------+   +----------+   +----------+

指定接收方: one worker => another worker
                  +--------+          +-------+
                  | Master |----------| Agent |
                  +--------+          +-------+
                 ^    |
     send to    /     |
    worker 2   /      |
              /       |
             /        v
  +----------+   +----------+   +----------+
  | Worker 1 |   | Worker 2 |   | Worker 3 |
  +----------+   +----------+   +----------+
复制代码

master中, 能够看到当agent和app被fork时, 会监听他们的信息, 同时将信息转化成一个对象:

agentWorker.on('message', msg => {
  if (typeof msg === 'string') msg = { action: msg, data: msg };
  msg.from = 'agent';
  this.messenger.send(msg);
});

worker.on('message', msg => {
  if (typeof msg === 'string') msg = { action: msg, data: msg };
  msg.from = 'app';
  this.messenger.send(msg);
});
复制代码

能够看到最后调用的是messenger.send, 而messengeer.send就是根据from和to来决定将信息发送到哪里

send(data) {
    if (!data.from) {
      data.from = 'master';
    }
    ......

    // app -> master
    // agent -> master
    if (data.to === 'master') {
      debug('%s -> master, data: %j', data.from, data);
      // app/agent to master
      this.sendToMaster(data);
      return;
    }

    // master -> parent
    // app -> parent
    // agent -> parent
    if (data.to === 'parent') {
      debug('%s -> parent, data: %j', data.from, data);
      this.sendToParent(data);
      return;
    }

    // parent -> master -> app
    // agent -> master -> app
    if (data.to === 'app') {
      debug('%s -> %s, data: %j', data.from, data.to, data);
      this.sendToAppWorker(data);
      return;
    }

    // parent -> master -> agent
    // app -> master -> agent,可能不指定 to
    if (data.to === 'agent') {
      debug('%s -> %s, data: %j', data.from, data.to, data);
      this.sendToAgentWorker(data);
      return;
    }
  }
复制代码

master则是直接根据action信息emit对应的注册事件

sendToMaster(data) {
  this.master.emit(data.action, data.data);
}
复制代码

而agent和worker则是经过一个sendmessage包, 实际上就是调用下面相似的方法

// 将信息传给子进程
 agent.send(data)
 worker.send(data)
复制代码

最后, 在agent和app都继承的基础类EggApplication上, 调用了Messenger类, 该类内部的构造函数以下:

constructor() {
    super();
    ......
    this._onMessage = this._onMessage.bind(this);
    process.on('message', this._onMessage);
  }

_onMessage(message) {
    if (message && is.string(message.action)) {
      // 和master同样根据action信息emit对应的注册事件  
      this.emit(message.action, message.data);
    }
  }
复制代码

总结一下:
思路就是利用事件机制和IPC通道来达到各个进程之间的通讯。

其余

学习过程当中有遇到一个timeout.unref()的函数, 关于该函数推荐你们参考这个问题的6楼回答

总结

从前端思惟转到后端思惟其实仍是很吃力的,加上Egg的进程管理实现确实很是厉害, 因此花了不少时间在各类api和思路思考上。

参考与引用

多进程模型和进程间通信
Egg 源码解析之 egg-cluster

相关文章
相关标签/搜索