本文包括以下内容:html
WebSocket
协议第四章 - 链接握手WebSocket
协议第五章 - 数据帧nodejs ws
库源码分析 - 链接握手过程nodejs ws
库源码分析 - 数据帧解析过程参考node
ws - githubgithub
本文对WebSocket
的概念、定义、解释和用途等基础知识不会涉及, 稍微偏干一点, 篇幅较长, markdown大约800行, 阅读须要耐心web
关于WebSocket
有一句很常见的话: Websocket复用了HTTP的握手通道, 它具体指的是:面试
客户端经过HTTP请求与WebSocket服务器协商升级协议, 协议升级完成后, 后续的数据交换则遵守WebSocket协议
首先由客户端换发起协议升级请求, 根据WebSocket
协议规范, 请求头必须包含以下的内容ajax
GET / HTTP/1.1 Host: localhost:8080 Origin: http://127.0.0.1:3000 Connection: Upgrade Upgrade: websocket Sec-WebSocket-Version: 13 Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==
服务器返回的响应头必须包含以下的内容api
HTTP/1.1 101 Switching Protocols Connection:Upgrade Upgrade: websocket Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=
HTTP/1.1 101 Switching Protocols
规范提到:数组
Sec-WebSocket-Key值由一个随机生成的16字节的随机数经过base64(见RFC4648的第四章)编码获得的
例如, 随机选择的16个字节为:浏览器
// 十六进制 数字1~16 0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08 0x09 0x0a 0x0b 0x0c 0x0d 0x0e 0x0f 0x10
经过base64编码后值为: AQIDBAUGBwgJCgsMDQ4PEA==
测试代码以下:
const list = Array.from({ length: 16 }, (v, index) => ++index) const key = Buffer.from(list) console.log(key.toString('base64')) // AQIDBAUGBwgJCgsMDQ4PEA==
而Sec-WebSocket-Accept
值的计算方式为:
Sec-Websocket-Key
的值和258EAFA5-E914-47DA-95CA-C5AB0DC85B11
拼接SHA1
计算出摘要, 并转成base64
字符串此处不须要纠结神奇字符串258EAFA5-E914-47DA-95CA-C5AB0DC85B11
, 它就是一个GUID
, 没准儿是写RFC的时候随机生成的
测试代码以下:
const crypto = require('crypto') function hashWebSocketKey (key) { const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' return crypto.createHash('sha1') .update(key + GUID) .digest('base64') } console.log(hashWebSocketKey('w4v7O6xFTi36lq3RNcgctw==')) // Oy4NRAQ13jhfONC7bP8dTKb4PTU=
前面简单提到他的做用为: 提供基础的防御, 减小恶意链接, 进一步阐述以下:
Key
能够避免服务器收到非法的WebSocket
链接, 好比http
请求链接到websocket
, 此时服务端能够直接拒绝Key
能够用来初步确保服务器认识ws
协议, 但也不能排除有的http服务器只处理Sec-WebSocket-Key
, 并不实现ws
协议Key
能够避免反向代理缓存Sec-Websocket-Key
以及相关header是被禁止的, 这样能够避免客户端发送ajax请求时, 意外请求协议升级最终须要强调的是: Sec-WebSocket-Key/Accept并非用来保证数据的安全性, 由于其计算/转换公式都是公开的, 并且很是简单, 最主要的做用是预防一些意外的状况
WebSocket
通讯的最小单位是帧, 由一个或多个帧组成一条完整的消息, 交换数据的过程当中, 发送端和接收端须要作的事情以下:
数据帧格式做为核心内容, 一眼看去彷佛难以理解, 但本文做者下死命令了, 必须理解, 冲冲冲
FIN
: 占1bit
0
表示不是消息的最后一个分片1
表示是消息的最后一个分片RSV1
, RSV2
, RSV3
: 各占1bit, 通常状况下全为0, 与Websocket拓展有关, 若是出现非零的值且没有采用WebSocket拓展, 链接出错Opcode
: 占4bit
%x0
: 表示本次数据传输采用了数据分片, 当前数据帧为其中一个数据分片%x1
: 表示这是一个文本帧%x2
: 表示这是一个二进制帧%x3-7
: 保留的操做代码, 用于后续定义的非控制帧%x8
: 表示链接断开%x9
: 表示这是一个心跳请求(ping)%xA
: 表示这是一个心跳响应(pong)%xB-F
: 保留的操做代码, 用于后续定义的非控制帧Mask
: 占1bit
0
表示不对数据载荷进行掩码异或操做1
表示对数据载荷进行掩码异或操做Payload length
: 占7或7+16或7+64bit
0~125
: 数据长度等于该值126
: 后续的2个字节表明一个16位的无符号整数, 值为数据的长度127
: 后续的8个字节表明一个64位的无符号整数, 值为数据的长度Masking-key
: 占0或4bytes
1
: 携带了4字节的Masking-key0
: 没有Masking-keypayload data
: 载荷数据我想若是知道byte和bit的区别, 这部分就没问题- -
WebSocket
的每条消息可能被切分红多个数据帧, 当接收到一个数据帧时,会根据FIN值来判断, 是否为最后一个数据帧
数据帧传递示例:
FIN=0, Opcode=0x1
: 发送文本类型, 消息尚未发送完成,还有后续帧FIN=0, Opcode=0x0
: 消息没有发送完成, 还有后续帧, 接在上一条后面FIN=1, Opcode=0x0
: 消息发送完成, 没有后续帧, 接在上一条后面组成完整消息虽然以前用的都是socket.io
, 偶然发现了ws
, 使用量居然还挺大, 周下载量是socket.io
的六倍
在NodeJS
中, 每当遇到协商升级请求时, 就会触发http
模块的upgrade
事件, 这即是实现WebSocketServer
的切入点, 原生示例代码以下:
// 建立 HTTP 服务器。 const srv = http.createServer( (req, res) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('响应内容'); }); srv.on('upgrade', (req, socket, head) => { // 特定的处理, 以实现Websocket服务 });
而且, 在通常的使用中, 都是在一个已有的httpServer
基础上进行拓展, 以实现WebSocket
, 而不是建立一个独立的WebSocketServer
在一个已有httpServer
的基础上, ws
使用的实例代码为
const http = require('http'); const WebSocket = require('ws'); const server = http.createServer(); const wss = new WebSocket.Server({ server }); server.listen(8080);
已有的httpServer
做为参数传给了WebSocket.Server
构造函数, 因此源码分析的核心切入点为:
new WebSocket.Server({ server });
经过这个切入点, 就能够完整复现链接握手的过程
由于httpServer
已做为参数传递进来, 所以其构造函数变得十分简单:
class WebSocketServer extends EventEmitter { constructor(options, callback) { super() // 在提供了http server的基础上, 代码能够简化为 if (options.server) { this._server = options.server } // 监听事件 if (this._server) { this._removeListeners = addListeners(this._server, { listening: this.emit.bind(this, 'listening'), error: this.emit.bind(this, 'error'), // 核心 upgrade: (req, socket, head) => { // 下一步切入点 this.handleUpgrade(req, socket, head, (ws) => { this.emit('connection', ws, req) }) } }) } } } // 这是一段很是带秀的代码, 在绑定多个事件监听器的同时返回一个移除多个事件监听器的函数 function addListeners(server, map) { for (const event of Object.keys(map)) server.on(event, map[event]); return function removeListeners() { for (const event of Object.keys(map)) { server.removeListener(event, map[event]); } }; }
能够看到, 在构造函数中, 为httpServer
注册了upgrade
事件的监听器, 触发时, 会执行this.handleUpgrade
函数, 这即是下一步的方向
这个函数主要用来过滤掉不合法的请求, 检查的内容包括:
Sec-WebSocket-Key
值Sec-WebSocket-Version
值WebSocket
请求的路径关键代码以下:
const keyRegex = /^[+/0-9A-Za-z]{22}==$/; handleUpgrade(req, socket, head, cb) { socket.on('error', socketOnError) // 获取sec-websocket-key const key = req.headers['sec-websocket-key'] !== undefined ? req.headers['sec-websocket-key'] : false // 获取sec-websocket-version const version = +req.headers['sec-websocket-version'] // 获取协议拓展, 本篇不涉及 const extensions = {}; // 对于不合法的请求, 中断握手 if ( req.method !== 'GET' || req.headers.upgrade.toLowerCase() !== 'websocket' || !key || !keyRegex.test(key) || (version !== 8 && version !== 13) || // 该函数是对Websocket请求路径的判断, 与option.path相关, 不展开 !this.shouldHandle(req) ) { return abortHandshake(socket, 400) } // 对于合法的请求, 给它升级! this.completeUpgrade(key, extensions, req, socket, head, cb) }
对于不合法的请求, 直接400 bad request
了, abortHandshake
以下:
const { STATUS_CODES } = require('http'); function abortHandshake(socket, code, message, headers) { // net.Socket 也是双工流,所以它既可读也可写 if (socket.writable) { message = message || STATUS_CODES[code]; headers = { Connection: 'close', 'Content-type': 'text/html', 'Content-Length': Buffer.byteLength(message), ...headers }; socket.write( `HTTP/1.1 ${code} ${STATUS_CODES[code]}\r\n` + Object.keys(headers) .map((h) => `${h}: ${headers[h]}`) .join('\r\n') + '\r\n\r\n' + message ); } // 移除handleUpgrade中添加的error监听器 socket.removeListener('error', socketOnError); // 确保在该 socket 上再也不有 I/O 活动 socket.destroy(); }
若是一切顺利, 咱们来到completeUpgrade
函数
这个函数主要用来, 返回正确的响应, 触发相关的事件, 记录值等, 代码比较简单
const { createHash } = require('crypto'); const { GUID } = require('./constants'); const WebSocket = require('./websocket'); function completeUpgrade(key, extensions, req, socket, head, cb) { // Destroy the socket if the client has already sent a FIN packet. if (!socket.readable || !socket.writable) return socket.destroy() // 生成sec-websocket-accept const digest = createHash('sha1') .update(key + GUID) .digest('base64'); // 组装Headers const headers = [ 'HTTP/1.1 101 Switching Protocols', 'Upgrade: websocket', 'Connection: Upgrade', `Sec-WebSocket-Accept: ${digest}` ]; // 建立一个Websocket实例 const ws = new Websocket(null) this.emit('headers', headers, req); // 返回响应 socket.write(headers.concat('\r\n').join('\r\n')); socket.removeListener('error', socketOnError); // 下一步切入点 ws.setSocket(socket, head, this.options.maxPayload); // 经过Set记录处于链接状态的客户端 if (this.clients) { this.clients.add(ws); ws.on('close', () => this.clients.delete(ws)); } // 触发connection事件 cb(ws); }
到这里, 就完成了整个握手阶段, 但还没涉及到对数据帧的处理
上一章末尾, 启示下文的代码为completeUpgrade
中的:
ws.setSocket(socket, head, this.options.maxPayload);
进入WebSocket
类中的setSocket
方法, 关于数据帧处理代码主要能够简化为:
Class WebSocket extends EventEmitter { ... setSocket(socket, head, maxPayload) { // 实例化一个可写流, 用于处理数据帧 const receiver = new Receiver( this._binaryType, this._extensions, maxPayload ); receiver[kWebSocket] = this; socket.on('data', socketOnData); } } function socketOnData(chunk) { if (!this[kWebSocket]._receiver.write(chunk)) { this.pause(); } }
此处忽略了不少事件处理, 例如error
, end
, close
等, 由于他们与本文目标无关, 对于一些API, 也不作介绍
因此核心切入点为Receiver
类, 它就是用于处理数据帧的核心
Receiver类继承自可写流, 还须要明确两点基本概念:
stream
全部的流都是EventEmitter
的实例writable._write
方法, 该方法供内部使用const { Writable } = require('stream') class Recevier extends Writable { constructor(binaryType, extensions, maxPayload) { super() this._binaryType = binaryType || BINARY_TYPES[0]; // nodebuffer this[kWebSocket] = undefined; // WebSocket实例的引用 this._extensions = extensions || {}; // WebSocket协议拓展 this._maxPayload = maxPayload | 0; // 100 * 1024 * 1024 this._bufferedBytes = 0; // 记录buffer长度 this._buffers = []; // 记录buffer数据 this._compressed = false; // 是否压缩 this._payloadLength = 0; // 数据帧 PayloadLength this._mask = undefined; // 数据帧Mask Key this._fragmented = 0; // 数据帧是否分片 this._masked = false; // 数据帧 Mask this._fin = false; // 数据帧 FIN this._opcode = 0; // 数据帧 Opcode this._totalPayloadLength = 0; // 载荷总长度 this._messageLength = 0; // 载荷总长度, 与this._compressed有关 this._fragments = []; // 载荷分片记录数组 this._state = GET_INFO; // 标志位, 用于startLoop函数 this._loop = false; // 标志位, 用于startLoop函数 } _write(chunk, encoding, cb) { if (this._opcode === 0x08 && this._state == GET_INFO) return cb(); this._bufferedBytes += chunk.length; this._buffers.push(chunk); this.startLoop(cb); } }
能够看到, 每当收到新的数据帧, 就会将其记录在_buffers
数组中, 并当即开始解析流程startLoop
startLoop(cb) { let err; this._loop = true; do { switch (this._state) { case GET_INFO: err = this.getInfo(); break; case GET_PAYLOAD_LENGTH_16: err = this.getPayloadLength16(); break; case GET_PAYLOAD_LENGTH_64: err = this.getPayloadLength64(); break; case GET_MASK: this.getMask(); break; case GET_DATA: err = this.getData(cb); break; default: // `INFLATING` this._loop = false; return; } } while (this._loop); cb(err); }
解析流程很简单:
getInfo
首先解析FIN
, RSV
, OPCODE
, MASK
, PAYLOAD LENGTH
等数据由于payload length
分为三种状况(具体后面叙述, 此处只列出分支):
haveLength
方法getPayloadLength16
方法, 再调用haveLength
方法getPayloadLength64
方法, 再调用haveLength
方法haveLength
方法中, 若是存在掩码(mask), 先调用getMask
方法, 再调用getData
方法总体流程和状态经过this._loop
和this._state
控制, 比较直观
按理说第一步应该分析getInfo
方法, 不过里面涉及到了consume
方法, 这个函数提供了一种简洁的方式消费已获取的Buffer, 这个函数接受一个参数n
, 表明须要消费的字节数, 最后返回消费的字节
假如须要得到数据帧的第一个字节的数据(包含了 FIN + RSV + OPCODE), 只须要经过this.consume(1)
便可
记录值this._buffers
是一个buffer数组, 最开始, 里面存放完整的数据帧, 随着消费的进行, 数据则会逐渐变小, 那么每次消费存在三种可能:
chunk
的字节数chunk
的字节数chunk
的字节数对于第一种状况, 只须要移出 + 返回便可
if (n === this._buffers[0].length) return this._buffers.shift()
对于第二种状况, 只须要裁剪 + 返回便可
if (n < this._buffers[0].length) { const buf = this._buffers[0] this._buffers[0] = buf.slice(n) return buf.slice(0, n) }
对于第三种状况, 会稍微复杂一点, 首先咱们要申请一个大小为须要消费字节数的buffer空间, 用于存储返回的buffer
// buffer空间是否初始化并不重要, 由于最终他都会被所有覆盖 const dst = Buffer.allocUnsafe(n)
在这种状况中, 能够保证他的长度大于第一个chunk, 但不能肯定在消费一个chunk以后, 是否还大于第一个chunk(消费以后索引前移), 所以须要循环
// do...while能够避免一次无心义判断, 首先执行一次循环体, 再判断条件 do { const buf = this._buffers[0] // 若是长度大于第一个chunk, 移除 + 复制便可 if (n >= buf.length) { this._buffers.shift().copy(dst, dst.length - n); } // 若是长度小于一个chunk, 裁剪 + 复制便可 else { // buf.copy这个api就本身复习一下嗷 buf.copy(dst, dst.length - n, 0, n); this._buffers[0] = buf.slice(n); } n -= buf.length; } while (n > 0)
一个最小的数据帧必须包含以下的数据:
FIN (1 bit) + RSV (3 bit) + OPCODE (4 bit) + MASK (1 bit) + PAYLOADLENGTH (7 bit)
最少2个字节, 所以少于两个字节的数据帧是错误的, 简化的getInfo
以下
getInfo() { if (this._bufferedBytes < 2) { this._loop = false return } const buf = this.consume(2) // 只保留了数据帧中的几个关键数据 this._fin = (buf[0] & 0x80) === 0x80 this._opcode = buf[0] & 0x0f this._payloadLength = buf[1] & 0x7f this._masked = (buf[1] & 0x80) === 0x80 // 对应Payload Length的三种状况 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64 else return this.haveLength() }
此处的核心就是按位于运算符&
的含义, 先以FIN
为例, FIN
在数据帧中处于第一个bit
// FIN的值用[]指代, X表明第一个字节中的后续bit []xxxxxxx // 十六进制数0x80表明二进制 10000000 // 二者按位与, 结果与后面7个bit无关 []0000000 // 所以, 只须要比较[]0000000 和 10000000是否相等便可, 简化即获得 this._fin = (buf[0] & 0x80) === 0x80
OPCODE
和PAYLOAD LENGTH
同理
// OPCODE处于第一个字节的后四位, 与0000 1111按位与便可 xxxx[][][][] & 0000 1111 (也就是0x0f) // PAYLOAD LENGTH处于第二个字节的后七为, 与0111 1111按位于便可 x[][][][][][][][] & 0111 1111 (也就是0x7f)
三种状况以下:
0-125
: 载荷实际长度就是0-125之间的某个数126
: 载荷实际长度为随后2个字节表明的一个16位的无符号整数的数值 127
: 载荷实际长度为随后8个字节表明的一个64位的无符号整数的数值 可能听起来比较绕, 看代码, 以126
分支为例:
getPayloadLength16() { if (this._bufferedBytes < 2) { this._loop = false; return; } this._payloadLength = this.consume(2).readUInt16BE(0); return this.haveLength(); }
能够看到, 处理长度的核心为readUInt16BE(0)
, 这便涉及到大小端了:
那么, 规范中提到的随后2个字节表明的一个16位的无符号整数的数值, 天然指的是大端了
大端 vs 小端对比:
// 假设后面两个字节二进制值为 1111 1111 0000 0001 // 转为十六进制为 0xff 0x01 // 大端输出 65281 console.log(Buffer.from([0xff, 0x01]).readUInt16BE(0).toString(10)) // 小端输出 511 console.log(Buffer.from([0xff, 0x01]).readUInt16LE(0).toString(10))
除此以外, 7 + 64
的模式还有一点额外的处理, 代码以下:
getPayloadLength64() { if (this._bufferedBytes < 8) { this._loop = false; return; } const buf = this.consume(8); const num = buf.readUInt32BE(0); // // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned // if payload length is greater than this number. // if (num > Math.pow(2, 53 - 32) - 1) { this._loop = false; return error( RangeError, 'Unsupported WebSocket frame: payload length > 2^53 - 1', false, 1009 ); } this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4); return this.haveLength(); }
在得到载荷以前, 若是getInfo
中mask
为1, 须要进行getMask
操做, 获取Mask Key
(一共四个字节)
getMask() { if (this._bufferedBytes < 4) { this._loop = false; return; } this._mask = this.consume(4); this._state = GET_DATA; }
getData
源码简化为以下
getData(cb) { // data为 Buffer.alloc(0) let data = EMPTY_BUFFER; // 消费payload data = this.consume(this._payloadLength) // 若是有mask, 根据mask key进行解码, 此处不展开 if (this._masked) unmask(data, this._mask) // 将其记录进分片数组 this._fragments.push(data) // 若是该数据帧表示: 链接断开, 心跳请求, 心跳响应 if (this._opcode > 0x07) return this.controlMessage(data) // 若是该数据帧表示: 数据分片、文本帧、二进制帧 return this.dataMessage() }
接着分析dataMessage()
函数, 它用于将多个帧的数据合并, 简化以后也比较简单
dataMessage() { if (this._fin) { const messageLength = this._messageLength const fragments = this._fragments const buf = concat(fragments, messageLength) this.emit('message', buf.toString()) } } // 简明易懂哦, 不解释啦 function concat(list, totalLength) { if (list.length === 0) return EMPTY_BUFFER; if (list.length === 1) return list[0]; const target = Buffer.allocUnsafe(totalLength); let offset = 0; for (let i = 0; i < list.length; i++) { const buf = list[i]; buf.copy(target, offset); offset += buf.length; } return target; }
本文篇幅较长且并非面试题那种小块的知识点, 阅读急需耐心, 已尽可能避免贴大段代码, 能看到这里我都想给你打钱了
经过本篇分析, 完整的介绍以及复现了WebSocket
中的两个关键阶段:
我的认为最关键即是: 涉及到了对Node.js的buffer模块以及stream模块的使用, 这也是收获最大的一部分