废话很少说,文本将带你实现一个简单的 memcached 客户端。node
memcached 自己并不支持集群,为了使用集群,咱们能够本身在客户端实现路由分发,将相同的 key 路由到同一台 memcached 上去便可。
路由算法有不少,这里咱们使用一致性哈希算法。git
一致性哈希算法的原理:github
一致性哈希算法已经有开源库 hashring
实现,基本用法:算法
const HashRing = require('hashring'); // 输入集群地址构造 hash ring const ring = new HashRing(['127.0.0.1:11211', '127.0.0.2:11211']); // 输入 key 获取指定节点 const host = ring.get(key);
包括 memcached 在内的许多系统对外都是经过 TCP 通讯。在 Node.js 中创建一个 TCP 链接并进行数据的收发很简单:npm
const net = require('net'); const socket = new net.Socket(); socket.connect({ host: host, // 目标主机 port: port, // 目标端口 // localAddress: localAddress, // 本地地址 // localPort: localPort, // 本地端口 }); socket.setKeepAlive(true); // 保活 // 链接相关 socket.on('connect', () => { console.log(`socket connected`); }); socket.on('error', error => { console.log(`socket error: ${error}`); }); socket.on('close', hadError => { console.log(`socket closed, transmission error: ${hadError}`); }); socket.on('data', data => { // 接受数据 }); socket.write(data); // 发送数据
一条链接由惟一的五元组肯定,所谓的五元组就是:协议(好比 TCP 或者 UDP)、本地地址、本地端口、远程地址、远程端口。
系统正是经过五元组去区分不一样的链接,其中本地地址和本地端口因为在缺省状况下会自动生成,经常会被咱们忽视。编程
一次完整的 TCP 通讯过程为:三次握手,创建链接 --> 数据传递 --> 挥手,关闭链接。socket
咱们都知道握手创建链接的过程是很是消耗资源的,而链接池就是为了解决这个问题,链接池是一个通用的模型,它包括:async
能够看到所谓的链接池其实就是在链接使用完成后并非当即关闭链接,而是让链接保活,等待下一次使用,从而避免反复创建链接的过程。memcached
正如上文所述,链接池是一个通用的模型,咱们这里直接使用开源库 generic-pool
。函数
池化 TCP 链接示例:
const net = require('net'); const genericPool = require('generic-pool'); // 自定义建立链接池的函数 function _buildPool(remote_server) { const factory = { create: function () { return new Promise((resolve, reject) => { const host = remote_server.split(':')[0]; const port = remote_server.split(':')[1]; const socket = new net.Socket(); socket.connect({ host: host, // 目标主机 port: port, // 目标端口 }); socket.setKeepAlive(true); socket.on('connect', () => { console.log(`socket connected: ${remote_server} , local: ${socket.localAddress}:${socket.localPort}`); resolve(socket); }); socket.on('error', error => { console.log(`socket error: ${remote_server} , ${error}`); reject(error); }); socket.on('close', hadError => { console.log(`socket closed: ${remote_server} , transmission error: ${hadError}`); }); }); }, destroy: function (socket) { return new Promise((resolve) => { socket.destroy(); resolve(); }); }, validate: function (socket) { // validate socket return new Promise((resolve) => { if (socket.connecting || socket.destroyed || !socket.readable || !socket.writable) { return resolve(false); } else { return resolve(true); } }); } }; const pool = genericPool.createPool(factory, { max: 10, // 最大链接数 min: 0, // 最小链接数 testOnBorrow: true, // 从池中取链接时进行 validate 函数验证 }); return pool; } // 链接池基本使用 const pool = _buildPool('127.0.0.1:11211'); // 构建链接池 const s = await pool.acquire(); // 从链接池中取链接 await pool.release(s); // 使用完成后释放链接
包括 memcached 在内的许多系统都定义了一套本身的协议用于对外通讯,为了实现 memcached 客户端固然就要遵照它的协议内容。
memcached 客户端协议,咱们实现最简单的 get 方法:
发送的数据格式:
get <key>\r\n
接受的数据格式:
VALUE <key> <flags> <bytes>\r\n <data block>\r\n
实现示例:
// 定义一个请求方法并返回响应数据 function _request(command) { return new Promise(async (resolve, reject) => { try { // ...这里省略了链接池构建相关部分 const s = await pool.acquire(); // 取链接 const bufs = []; s.on('data', async buf => { // 监听 data 事件接受响应数据 bufs.push(buf); const END_BUF = Buffer.from('\r\n'); // 数据接受完成的结束位 if (END_BUF.equals(buf.slice(-2))) { s.removeAllListeners('data'); // 移除监听 try { await pool.release(s); // 释放链接 } catch (error) { } const data = Buffer.concat(bufs).toString(); return resolve(data); } }); s.write(command); } catch (error) { return reject(error); } }); } // get function get(key) { return new Promise(async (resolve, reject) => { try { const command = `get ${key}\r\n`; const data = await _request(key, command); // ...响应数据的处理,注意有省略 // key not exist if (data === 'END\r\n') { return resolve(undefined); } /* VALUE <key> <flags> <bytesLength>\r\n <data block>\r\n */ const data_arr = data.split('\r\n'); const response_line = data_arr[0].split(' '); const value_flag = response_line[2]; const value_length = Number(response_line[3]); let value = data_arr.slice(1, -2).join(''); value = unescapeValue(value); // unescape \r\n // ...有省略 return resolve(value); } catch (error) { return reject(error); } }); }
以上示例都单独拿出来了,实际上是在整合在一个 class 中的:
class Memcached { constructor(serverLocations, options) { this._configs = { ...{ pool: { max: 1, min: 0, idle: 30000, // 30000 ms. }, timeout: 5000, // timeout for every command, 5000 ms. retries: 5, // max retry times for failed request. maxWaitingClients: 10000, // maximum number of queued requests allowed }, ...options }; this._hashring = new HashRing(serverLocations); this._pools = {}; // 经过 k-v 的形式存储具体的地址及它的链接池 } _buildPool(remote_server) { // ... } _request(key, command) { // ... } // get async get(key) { // ... } // ... 其余方法 }
// 使用实例 const memcached = new Memcached(['127.0.0.1:11211'], { pool: { max: 10, min: 0 } }); const key = 'testkey'; const result = await memcached.get(key);
完整的示例能够看 io-memcached 。