对于有继承关系的进程,nodejs自己为咱们提供了进程间通讯的方式,可是对于没有继承关系的进程,好比兄弟进程,想要通讯最简单的方式就是经过主进程中转,相似前端框架中子组件经过更新父组件的数据,而后父通知其余子组件。由于nodejs内置的进程间通讯须要通过序列化和反序列化,因此这种方式可能会带来必定的性能损耗,并且在实现上也比较麻烦。今天介绍的是实现兄弟进程通讯的另一种方式,在windows上使用命名管道,在非windows上使用unix域,另外本文还会介绍基于tcp的远程进程通讯的实现。下面具体介绍一下设计和实现。前端
1 IPC的实现
ipc的实现比较简单,主要是对nodejs提供的功能进行封装。首先咱们须要处理一下path,由于在命名管道和unix域中他的格式是不同的。node
const os = require('os');
module.exports = { path: os.platform() === 'win32' ? '\\\\?\\pipe\\ipc' : '/tmp/unix.sock',};
接着咱们看看客户端和服务器的实现。git
1.1 IPCClient的实现
const net = require('net');const { EventEmitter } = require('events');const { path } = require('../config');
class Client extends EventEmitter { constructor(options) { super(); this.options = { path, ...options }; const socket = net.connect(this.options); socket.on('error', (error) => { console.error(error); }); return socket; }}module.exports = { Client,};
1.2 IPCServer的实现
const fs = require('fs');const net = require('net');const { EventEmitter } = require('events');const { path } = require('../config');
class Server extends EventEmitter { constructor(options, connectionListener) { super(); if (typeof options === 'function') { options = { connectionListener: options, }; } else { options = { ...options, connectionListener }; } try { fs.existsSync(options.path) && fs.unlinkSync(options.path); } catch(e) {
} this.options = { path, ...options }; return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => { client.on('error', (error) => { console.error(error); }); typeof this.options.connectionListener === 'function' && this.options.connectionListener(client); }).listen(this.options); }}
module.exports = { Server,};
2 RPC的实现
咱们知道tcp是面向流的服务,他自己只负责传输数据,不负责数据的解析和解释。经过tcp传输数据时,须要本身解析数据,咱们须要从一串字节流中解析出一个个数据包。这就涉及到协议的设计。因此首先咱们要定义一个应用层协议。github
2.1 应用层协议的设计和实现
应用层协议的设计很是简单 npm
1 总长度是除了开头标记以外的其余数据长度。由于数据部分是变长的,因此咱们须要一个总长度来判断后续的数据长度是多少。编程
2 序列号是用于关联请求和响应,由于咱们在一个链接上可能会串行发送多个数据包,当咱们收到一个回包的时候,咱们不知道是来自哪一个请求的响应,经过响应体中的seq,咱们就知道是来自哪一个请求的响应。设计了通讯协议后,咱们就须要对协议进行封包解包。首先咱们看一下封包逻辑。windows
function seq() { return ~~(Math.random() * Math.pow(2, 31))}
function packet(data, sequnce) { // 转成buffer const bufferData = Buffer.from(data, 'utf-8'); // 开始标记长度 const startFlagLength = Buffer.from([PACKET_START]).byteLength; // 序列号 const _seq = sequnce || seq(); // 分配一个buffer存储数据 let buffer = Buffer.allocUnsafe(startFlagLength + TOTAL_LENGTH + SEQ_LEN); // 设计开始标记 buffer[0] = 0x3; // 写入总长度字段的值 buffer.writeUIntBE(TOTAL_LENGTH + SEQ_LEN + bufferData.byteLength, 1, TOTAL_LENGTH); // 写入序列号的值 buffer.writeUIntBE(_seq, startFlagLength + TOTAL_LENGTH, SEQ_LEN); // 把协议元数据和数据组装到一块儿 buffer = Buffer.concat([buffer, bufferData], buffer.byteLength + bufferData.byteLength); return buffer;}
接着咱们看一下解包的逻辑,由于数据的传输是字节流,因此有可能多个数据包的数据会粘在一块儿,因此咱们第一步首先要根据协议解析出一个个数据包,而后再解析每个数据包。咱们经过有限状态机实现数据的解析。下面是状态机的状态集。前端框架
const PARSE_STATE = { PARSE_INIT: 0, PARSE_HEADER: 1, PARSE_DATA: 2, PARSE_END: 3,};
接着咱们定义状态集的转换规则。服务器
class StateSwitcher { constructor(options) { this.options = options; }
[PARSE_STATE.PARSE_INIT](data) { // 数据不符合预期 if (data[0] !== PACKET_START) { // 跳过部分数据,找到开始标记 const position = data.indexOf(PACKET_START); // 没有开始标记,说明这部分数据无效,丢弃 if (position === -1) { return [NEED_MORE_DATA, null]; } // 不然返回有效数据部分,继续解析 return [PARSE_STATE.PACKET_START, data.slice(position)]; } // 保存当前正在解析的数据包 this.packet = new Packet(); // 跳过开始标记的字节数,进入解析协议头阶段 return [PARSE_STATE.PARSE_HEADER, data.slice(Buffer.from([PACKET_START]).byteLength)]; }
[PARSE_STATE.PARSE_HEADER](data) { // 数据不够头部的大小则等待数据到来 if (data.length < TOTAL_LENGTH + SEQ_LEN) { return [NEED_MORE_DATA, data]; } // 有效数据包的长度 = 整个数据包长度 - 头部长度 this.packet.set('length', data.readUInt32BE() - (TOTAL_LENGTH + SEQ_LEN)); // 序列号 this.packet.set('seq', data.readUInt32BE(TOTAL_LENGTH)); // 解析完头部了,跳过去 data = data.slice(TOTAL_LENGTH + SEQ_LEN); // 进入解析数据阶段 return [PARSE_STATE.PARSE_DATA, data]; }
[PARSE_STATE.PARSE_DATA](data) { const len = this.packet.get('length'); // 数据部分的长度小于协议头中定义的长度,则继续等待 if (data.length < len) { return [NEED_MORE_DATA, data]; } // 截取数据部分 this.packet.set('data', data.slice(0, len)); // 解析完数据了,完成一个包的解析,跳过数据部分 data = data.slice(len); typeof this.options.cb === 'function' && this.options.cb(this.packet); this.packet = null; // 解析完一个数据包,进入结束标记阶段 return [PARSE_STATE.PARSE_INIT, data]; }}
咱们再看一下状态机的实现微信
class FSM { constructor(options) { this.options = options; // 状态处理机,定义了状态转移集合 this.stateSwitcher = new StateSwitcher({cb: options.cb}); // 当前状态 this.state = PARSE_STATE.PARSE_INIT; // 结束状态 this.endState = PARSE_STATE.PARSE_END; // 当前待解析的数据 this.buffer = null; }
run(data) { // 没有数据或者解析结束了直接返回 if (this.state === this.endState || !data || !data.length) { return; } // 保存待解析的数据 this.buffer = this.buffer ? Buffer.concat([this.buffer, data]) : data; // 还没结束,而且还有数据能够处理则继续执行 while(this.state !== this.endState && this.buffer && this.buffer.length) { // 执行状态处理函数,返回[下一个状态, 剩下的数据] const result = this.stateSwitcher[this.state](this.buffer); // 若是下一个状态是NEED_MORE_DATA则说明须要更多的数据才能继续解析,并保持当前状态 if (result[0] === NEED_MORE_DATA) { return; } // 记录下一个状态和数据 [this.state, this.buffer] = result; }
}}
状态机就是对开始状态、结束状态、状态转换集的封装。实现了协议的封包和解析后咱们看一下如何使用。
2.2 RPC客户端实现
const net = require('net');const { EventEmitter } = require('events');const { FSM } = require('tiny-application-layer-protocol');class Client extends EventEmitter { constructor(options) { super(); this.options = { ...options }; const socket = net.connect(this.options); socket.on('error', (error) => { console.error(error); }); const fsm = new FSM({ cb: (packet) => { socket.emit('message', packet); } }); socket.on('data', fsm.run.bind(fsm)); return socket; }}module.exports = { Client,};
咱们作的事情主要是负责数据的解析。
2.3 RPC服务器实现
const fs = require('fs');const net = require('net');const { EventEmitter } = require('events')const { FSM } = require('tiny-application-layer-protocol');
class Server extends EventEmitter { constructor(options, connectionListener) { super(); if (typeof options === 'function') { options = { connectionListener: options, }; } else { options = { ...options, connectionListener }; } this.options = { ...options }; return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => { const fsm = new FSM({ cb: function(packet) { client.emit('message', packet); } }) client.on('data', fsm.run.bind(fsm)); client.on('error', (error) => { console.error(error); }); typeof this.options.connectionListener === 'function' && this.options.connectionListener(client); }).listen(this.options); }}
module.exports = { Server,};
一样,服务器也是负责数据的解析
3 使用
接下来咱们看一下如何使用。
3.1 ipc的使用
server.js
const { IPCServer } = require('../../src');const { packet } = require('tiny-application-layer-protocol');new IPCServer(function(client) { console.log(1) client.on('data', (data) => { console.log('receive', data); client.write(packet('world', data.seq)); });});
client.js
const { IPCClient } = require('../../src');const { packet, seq } = require('tiny-application-layer-protocol');const client = new IPCClient();client.write(packet('hello', seq()));client.on('data', function(res) { console.log('receive', res);})
服务器输出
客户端输出
3.2 RPC的使用
server.js
const { RPCServer } = require('../../src');const { packet } = require('tiny-application-layer-protocol');new RPCServer({host: '127.0.0.1', port: 80}, function(client) { client.on('message', (data) => { console.log('receive', data); client.write(packet('world', data.seq)); });});
client.js
const { RPCClient } = require('../../src');const { packet, seq } = require('tiny-application-layer-protocol');const client = new RPCClient({host: '127.0.0.1', port: 80});client.write(packet('hello', seq()));client.on('message', function(res) { console.log('receive', res);})
服务器输出客户端输出
4 RPC拓展
咱们实现了数据的传输和解析,可是如何咱们但愿数据的请求和响应是一一对应的怎么办呢?好比像http在tcp上能够并发发起多个请求同样,响应是否能够乱序返回,咱们又如何知道某个响应对应的是哪一个请求?接下来介绍如何解决这个问题。首先咱们实现一个请求管理的类。
class RequestManager { constructor(options) { this.options = { timeout: 10000, ...options }; this.map = {}; this.timerId = null; this.startPollTimeout(); } set(key, context) { if (typeof context.cb !== 'function') { throw new Error('cb is required'); } this.map[key] = { startTime: Date.now(), ...context, }; } get(key) { return this.map[key]; } del(key) { return delete this.map[key]; } // 执行上下文 exec(key, data) { const context = this.get(key); if (context) { this.del(key); context.cb(data); } } execAll(data) { for (const [key] of Object.entries(this.map)) { this.exec(key, data); } } // 定时轮询是否超时 startPollTimeout() { this.timerId = setTimeout(() => { if (!this.timerId) { return; } const nextMap = {}; for (const [key, context] of Object.entries(this.map)) { if (Date.now() - context.startTime < (context.timeout || this.options.timeout)) { nextMap[key] = context; } else { context.cb(new Error('timeout')); } } this.map = nextMap; this.startPollTimeout(); }, 1000); }}
该类的逻辑主要是请求的seq保存对应的上下文,而后收到响应的时候,咱们根据响应的seq拿到对应的上下文,从而执行对应的回调。咱们看看如何使用该类。server.js
const { RPCServer } = require('../../src');const { packet } = require('tiny-application-layer-protocol');new RPCServer({host: '127.0.0.1', port: 80}, function(client) { client.on('message', (data) => { console.log('receive', data); client.end(packet('world', data.seq)); }); client.on('end', (data) => { client.end(); });});
client.js
const { RPCClient, RequestManager } = require('../../src');const { packet, seq } = require('tiny-application-layer-protocol');const requestManager = new RequestManager({timeout: 3000});const client = new RPCClient({host: '127.0.0.1', port: 80});const _seq = seq(); requestManager.set(_seq, { cb: function() { console.log(...arguments); }})client.write(packet('hello', _seq));client.on('message', function(packet) { requestManager.exec(packet.seq, packet);})
输出 服务器输出客户端输出
github仓库:https://github.com/theanarkh/nodejs-ipc
github仓库:https://github.com/theanarkh/tiny-application-layer-protocol
npm install nodejs-i-p-c(ipc和rpc库,依赖tiny-application-layer-protocol)
npm install tiny-application-layer-protocol(基于tcp的小型应用层协议,包含协议的定义、封包、解包功能)
本文分享自微信公众号 - 编程杂技(theanarkh)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。