EggCluster 是如何解决多进程模式下相关问题的

背景

Node 官方提供了 cluster 模块来提供多进程的解决方案,以尽量提高服务器资源使用效率。javascript

总体而言,在这个问题域里,要解决的子问题有三块html

  • 重启机制
  • 负载均衡
  • 状态共享,即通讯机制

image.png
(上图总结自 《深刻浅出Node》第9章)

Egg 做为企业级框架,也针对这些问题,提供了 egg-cluster 模块来作了些加强java

通讯机制

为何先讲通讯?启动流程要用嘛。node

image.png

能够看到,主要的实体包括了 Master、Agent、Worker三个, Master、Agent、Worker,其实三者更多完成的是通讯的执行工做,真正提供通讯管理能力的是 Manager 和 Messenger。git

在看这俩模块以前,能够先看些基础知识github

Messenger

Messenger 是一个消息发送器,负责:接收消息 -> 定向转发。npm

那为何要单独搞这个模块?api

  • 协议格式统一:Agent 和 Worker 都有 exit 和 message 事件
  • 通讯方法统一:与 Parent 通讯要走 process.send,与 Worker / Agent 通讯要走 sendmessage 模块,和Master 走 EventEmitter

它包含两个部分:bash

  • 信息收集
  • 路由转发

以一个 worker 启动的例子为例。服务器

首先,信息收集,使用的是订阅/通知模式,是以 master 显式调用 messenger 来处理的。

// 在 cluster 启动完毕后,会告知父进程启动成功
const action = 'egg-ready';
this.messenger.send({
  action,
  to: 'parent',
  data: {
    port: this[REAL_PORT],
    address: this[APP_ADDRESS],
    protocol: this[PROTOCOL],
  },
});
复制代码

其次是经过 send 作定向转发,包括两部分

// 路由识别
if (data.to === 'parent') {
  this.sendToParent(data);
  return;
}

// 调用指定方法
sendToParent(data) {
  if (!this.hasParent) {
    return;
  }
  process.send(data);
}
复制代码

更多信息能够,查看这篇文档: Messenger 模块

Manager

Manager 模块比较简单,主要是针对Agent、Worker 提供管理操做。值得一提的是,它的存活检查代码

// agent.status的修改操做在master的onAgentStart中完成
count() {
  return {
    agent: (this.agent && this.agent.status === 'started') ? 1 : 0,
    worker: this.listWorkerIds().length,
  };
}

startCheck() {
  this.exception = 0;
  // 每10秒检查一次
  this.timer = setInterval(() => {
    const count = this.count();
    if (count.agent && count.worker) {
      this.exception = 0;
      return;
    }
    // 若是agent和worker都不符合要求,超过3次就触发exception,master那边收到消息后会退出
    this.exception++;
    if (this.exception >= 3) {
      this.emit('exception', count);
      clearInterval(this.timer);
    }
  }, 10000);
}
复制代码

详见文档:Manager

启动流程

从 npm run dev 开始

先从启动流程入手,来看看 npm run dev 这个命令到底发生了什么。

它实际上是执行了 egg-bin 的 lib/cmd/dev.js 文件的 run 方法

// lib/cmd/dev.js 文件
constructor(rawArgv) {
  // 省略其余初始化代码
  this.serverBin = path.join(__dirname, '../start-cluster');
}

* run(context) {
  // 省略参数格式化过程
  yield this.helper.forkNode(this.serverBin, devArgs, options);
}

// start-cluster.js文件,执行框架的 startCluster
require(options.framework).startCluster(options);
复制代码

若是框架是egg,那最后就会执行 egg 的这段代码

exports.startCluster = require('egg-cluster').startCluster;
复制代码

所以最终是执行了,egg-cluster 模块的 index.js

exports.startCluster = function(options, callback) {
  new Master(options).ready(callback);
};
复制代码

以后的流程不难,可是内容很是细碎,能够去看 启动和退出分析,主要是介绍如何实现下面的流程的

+---------+           +---------+          +---------+
    |  Master |           |  Agent  |          |  Worker |
    +---------+           +----+----+          +----+----+
         |      fork agent     |                    |
         +-------------------->|                    |
         |      agent ready    |                    |
         |<--------------------+                    |
         |                     |     fork worker    |
         +----------------------------------------->|
         |     worker ready    |                    |
         |<-----------------------------------------+
         |      Egg ready      |                    |
         +-------------------->|                    |
         |      Egg ready      |                    |
         +----------------------------------------->|
复制代码

Agent的平滑重启

首先,在启动Agent的时候就会去注册回调

forkAgentWorker(){
  // 得到agent
  const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);
  // 监听退出事件,转发给master
  agentWorker.once('exit', (code, signal) => {
    this.messenger.send({
      action: 'agent-exit',
      data: {
        code,
        signal,
      },
      to: 'master',
      from: 'agent',
    });
  });
}

constructor(){
	this.on('agent-exit', this.onAgentExit.bind(this));
}
复制代码

接着在 onAgentExit 中去处理重启逻辑

onAgentExit(data) {
  if (this.closed) return;
  // 清理工做
  const agentWorker = this.agentWorker;
  this.workerManager.deleteAgent(this.agentWorker);
  agentWorker.removeAllListeners();

  // 若是已经启动过,就自动重启
  if (this.isStarted) {
    setTimeout(() => {
      this.forkAgentWorker();
    }, 1000);
 
		// 省略一段转发消息给parent的代码
  } else {
    process.exit(1);
  }
}
复制代码

isStarted 标志,是用来记录总体是否启动成功,它在 ready 回调用中被赋值

// 这个ready是由 get-ready模块提供的,主要是解决异步任务注册问题的,便于自由添加启动前的异步任务
this.ready(() => {
	this.isStarted = true;
});
复制代码

Worker的平滑重启

Worker的平滑重启主要是交给 cfork 模块完成的,egg-cluster 中对于exit 事件的监听只是作个转发。

大体思路是经过 cluster 模块去监听 exit 事件 和 disconnect 事件,而后来根据 disableRefork 配置,判断是否要重启,这其中会处理一些重启逻辑

cluster.on('disconnect', function (worker) {
  // API参考:https://nodejs.org/api/cluster.html#cluster_worker_isdead
  var isDead = worker.isDead && worker.isDead();
  if (isDead) {
    // worker has terminated before disconnect
    return;
  }
  // 配置不重启就不会继续进行
  if (worker.disableRefork) {
    return;
  }
	
  // disconnect 用来保存失联的进程,下文会用到
  disconnects[worker.process.pid] = utility.logDate();
 
  // 重启逻辑
  if (allow()) {
    newWorker = forkWorker(worker._clusterSettings);
    newWorker._clusterSettings = worker._clusterSettings;
  } else {
  	// 省略
  }
});

cluster.on('exit', function (worker, code, signal) {
  var isExpected = !!disconnects[worker.process.pid];

  // 若是已经先响应了disconnect事件,就不用再走后续退出流程了
  if (isExpected) {
    delete disconnects[worker.process.pid];
    // worker disconnect first, exit expected
    return;
  }

  // 相似的判断 disableRefork 的逻辑,省略

  unexpectedCount++;
	
  // 相似的重启逻辑,省略
  
  cluster.emit('unexpectedExit', worker, code, signal);
});
复制代码

负载均衡 Sticky Mode

背景:最先 Session 等状态信息是保存在 Worker 内存里的,因此一旦用户的屡次请求打到不一样的Worker上的时候,必然会出现登陆态失效的问题。

解决方案:经过必定的方式保证同一个用户的请求打到同一个 Worker 上,Sticky Mode 就是为了解决这个问题

:::info 其实查看 egg-bin 的README.md 文件就能够发现其实默认是不启动的,但出于有趣,仍是想介绍下。 :::

转发实现

首先,若是启用了 sticky 模式,在 master 当中会分配一个 stickyWorkerPort

// master.js 
detectPorts() {
  return GetFreePort()
		// 省略中间一段设置主端口的代码
    .then(port => {
    if (this.options.sticky) {
      this.options.stickyWorkerPort = port;
    }
  })
}
复制代码

同时,会启动一个内部的 net.Server,用来作消息转发给Worker

if (this.options.sticky) {
  this.startMasterSocketServer(err => {
 		// 省略
  });
}

startMasterSocketServer(cb) {
  
  // 内部 net server
  require('net').createServer({
    pauseOnConnect: true,
  }, connection => {
     // 这段涉及到 TCP_reset_attack,有兴趣能够自查,不介绍
    if (!connection.remoteAddress) {
      connection.destroy();
    } else {
      // 选出一个worker
      const worker = this.stickyWorker(connection.remoteAddress);
      worker.send('sticky-session:connection', connection);
    }
  }).listen(this[REAL_PORT], cb);
}
复制代码

:::info 题外话:为何这里 listen 不会报端口重复监听?个人理解是,按照这篇文章的介绍,Master 在初次给 Worker 传递 Socket 的时候,才会去启动内部 TCP 服务,比 startMasterSocketServer要晚。 :::

在 worker 当中,若是是有 配置 sticky,就会使用该 stickyWorkerPort 端口进行监听,同时只监听 父进程(也就是master)转发过来的 sticky-session:connection消息

if (options.sticky) {
  server.listen(options.stickyWorkerPort, '127.0.0.1');
  
  process.on('message', (message, connection) => {
    if (message !== 'sticky-session:connection') {
      return;
    }

    server.emit('connection', connection);
    connection.resume();
  });
}
// 省略正常监听的代码
复制代码

这其中有个细节,那如何保证转发的过程当中,数据不丢失呢?

:::info 为何会丢失?由于 net.Socket 是个 Duplex Stream 对象,在 Flowing Mode 下面会自动读取数据,若是 不响应 data 事件,数据就丢了) :::

首先在建立 socket 的时候,开启 pauseOnConnect 选项;

If pauseOnConnect is set to true, then the socket associated with each incoming connection will be paused, and no data will be read from its handle.

其次在接受到 socket 的时候,恢复执行 resume

转发策略

转发是经过 stickyWorker 函数实现的,本质上就是把 remoteAddress 对 Worker 数量取余数,做为索引去 Worker 列表里随机取一个 Worker

stickyWorker(ip) {
    const workerNumbers = this.options.workers;
    // ws是一组pid列表
    const ws = this.workerManager.listWorkerIds();

    let s = '';
    // IP处理:127.0.0.1 -> 127001
    for (let i = 0; i < ip.length; i++) {
      
      // 这个判断能够过滤掉字母和符号,这样就能够兼容IPv4和IPv6
      if (!isNaN(ip[i])) {
        s += ip[i];
      }
    }
    s = Number(s);
    // 取余数
    const pid = ws[s % workerNumbers];
    return this.workerManager.getWorker(pid);
  }

复制代码

背景详见issue:Sticky Mode 引起的问题

其余有趣的小发现

agent/worker 启动方式差别

回去翻代码,能够发现 agent 是 child_process.fork 方法启动的,worker 是经过 cluster 启动的。 猜想多是若是须要提供一些相似管理页面的本地服务,通常是 agent作,所以agent 要有独立监听端口的能力

如何检测进程真的退出

对进程强制执行process.exit而且用try-catch包裹住,若是有报错,说明是真的退出了。

使用场景:一开始使用 SIGTERM 要求退出(对比 kill -1),后来退出超过必定时间了,使用 SIGKILL 强制退出(相似 kill -9)

function getUnterminatedProcesses(pids) {
  return pids.filter(pid => {
    try {
      // success means it's still alive
      process.kill(pid, 0);
      return true;
    } catch (err) {
      // error means it's dead
      return false;
    }
  });
}
复制代码

小结

EggCluster涉及知识体系大图

image.png
相关文章
相关标签/搜索