关于IPC-Message通讯

阿里eggjs中有个核心就是egg-cluster来作基本的启动流程,里面的通讯仍是比较有意思的。仔细研究了下nodejs官方的cluster再加上eggjs的Agent理念,若是要保持通讯,仍是踩了很多的坑。这个坑其实来自cluster自身。若是是刚研究cluster,那么不少人都是被其迷惑,究竟是如何监听,如何发送呢?javascript

概念

先理解一些概念:html

  • master 主进程 cluster.isMaster 肯定的进程
  • worker 子进程 cluster.isWorker 肯定的进程
  • agent child_process 建立的进程

想要在这些进程上互相通讯,咱们须要理清楚发送的方式。java

  • worker与agent通讯,须要master做为桥梁中转
  • worker与master通讯
  • master与worker通讯
  • master与agent通讯
  • agent与master通讯
  • agent与worker通讯,须要master做为桥梁中转

如何让其完美通讯,我这里推荐一个库 github.com/cevio/ipc-m…。它能让你在无感知的状况下帮你绑定完毕全部的事件,同时打通消息通道。node

建立

它是一个类,须要被继承后使用。git

const IPCMessage = require('ipc-message');
module.exports = class NodeBase extends IPCMessage {
  constructor() {
    // If it is a `agent` type process, you need to set the parameter to `true`. 
    // super(true);
    super();

    // receive message from other processes.
    this.on('message', msg => {
      console.log(`[${this.type}] Receive Message:`, msg);
    });

    if (this.type === 'master') {
      // do master ...
    } else {
      // do worker
    }
  }
}
复制代码

咱们经过绑定message事件来得到消息,经过registAgent来注册agent,经过cluster.fork()来建立子进程,固然这个建立是被自动监听的,你无需关心。github

消息

很简单,他们在任意的代码中经过send方法来发送消息。好比如上的例子(假设已经设置来一个名字为staticAgent的agent和创建了4个worker,如今是在worker运行的代码上):json

const base = new NodeBase();
base.send('staticAgent', 'worker-ready', {
    a: 1,
    b: 2
});
复制代码

agent经过master的转发就收到了该信息api

[staticAgent] agent receive message:app

{
    to: [19678],
    from: 19679,
    transfer: true,
    action: 'worker-ready',
    body: {
        a: 1,
        b: 2
    }
}
复制代码

你能够经过这个数据来解析,具体如何解决全靠我的想法了。koa

使用

咱们来看2段实际代码

test/index.js

const IPCMessage = require('ipc-message');
const ChildProcess = require('child_process');
const path = require('path');
const cluster = require('cluster');
const Koa = require('koa');
const os = require('os');

class Nodebase extends IPCMessage {
  constructor() {
    super();
    if (this.type === 'master') {
      const agentWorkerRuntimeFile = path.resolve(__dirname, 'agent.js');
      // 建立一个agent
      const agent = ChildProcess.fork(agentWorkerRuntimeFile, null, {
        cwd: process.cwd(),
        stdout: process.stdout,
        stderr: process.stderr,
        stdin: process.stdin,
        stdio: process.stdio
      });
      // 注册该agent
      this.registAgent('agent', agent);

      let cpus = os.cpus().length;
      while (cpus--) {
        // 根据内核数来建立子进程
        cluster.fork();
      }
    } else {
      // 如下为实际开发的代码
      const app = new Koa();
      app.use(async (ctx, next) => {
        if (ctx.req.url === '/favicon.ico') return;
        this.send('agent', '/test/agent', { a:1, c:3 });
        ctx.body = 'hello world';
      });
      app.listen(3000, () => {
        console.log('server start at 3000');
      });
    }

    this.on('message', msg => {
      console.log(`[${this.type}] onMessageReceive:`, msg);
    });
  }
}

const nodebase = new Nodebase();

if (nodebase.type === 'master') {
  setTimeout(() => {
    // 发送消息给agent
    nodebase.send('agent', '/a/b', {
      a:1
    });
  }, 5000)
}
复制代码

test/agent.js

const IPCMessage = require('ipc-message');

class Agent extends IPCMessage {
  constructor() {
    // 若是是个agent启动的文件,这里必须为true
    super(true);
    this.timer = setInterval(() => {
      console.log('agent alive');
    }, 1000);

    process.on('SIGINT', () => {
      clearInterval(this.timer);
      process.exit(0);
    });

    this.on('message', msg => {
      console.log('[Agent] onMessageReceive:', msg);
      this.send([msg.from, 'master'], '/reply/agent', 'done');
    })
  }
}

const agent = new Agent();
// 发送消息
agent.send('master', '/agent/ready', { a: 1, b: 2 });
复制代码

最后

有了通讯处理的简化模块,你也能够尝试使用其来建立相似egg的启动流程,egg的核心启动也就再也不神秘了。

相关文章
相关标签/搜索