欢迎来个人博客阅读:《Node.js源码解析-Writable实现》node
对于一个 stream 模块来讲,最基本的就是读和写。读由 Readable 负责,写则是由 Writable 负责。Readable 的实现上篇文章已经介绍过了,这篇介绍 Writable 的实现git
在开始以前,先看看 Writable 的构造函数github
// lib/_stream_readable.js function Writable(options) { // 只能在 Writable 或 Duplex 的上下文中执行 if (!(realHasInstance.call(Writable, this)) && !(this instanceof Stream.Duplex)) { return new Writable(options); } // Wrirable 的状态集 this._writableState = new WritableState(options, this); // legacy this.writable = true; if (options) { if (typeof options.write === 'function') this._write = options.write; if (typeof options.writev === 'function') this._writev = options.writev; if (typeof options.destroy === 'function') this._destroy = options.destroy; if (typeof options.final === 'function') this._final = options.final; } Stream.call(this); } function WritableState(options, stream) { options = options || {}; // object 模式标识 this.objectMode = !!options.objectMode; if (stream instanceof Stream.Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode; var hwm = options.highWaterMark; var defaultHwm = this.objectMode ? 16 : 16 * 1024; this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm; // highWaterMark 高水位标识 // 此时,write() 返回 false // 默认 16k this.highWaterMark = Math.floor(this.highWaterMark); this.finalCalled = false; // drain 事件标识 this.needDrain = false; // 刚调用 end() 时的状态标识 this.ending = false; // end() 调用完成后的状态标识 this.ended = false; this.finished = false; this.destroyed = false; var noDecode = options.decodeStrings === false; // 数据写入前,是否应该将 string 解析为 buffer this.decodeStrings = !noDecode; this.defaultEncoding = options.defaultEncoding || 'utf8'; // 缓存中的数据长度 // 不是真正的 buffer 长度,而是正在等待写入的数据长度 this.length = 0; // writing 标识 this.writing = false; // cork 标识 this.corked = 0; // 标识是否有异步回调 this.sync = true; // 标识正在写入缓存中的内容 this.bufferProcessing = false; // _write() 和 _writev() 函数的回调 this.onwrite = onwrite.bind(undefined, stream); // 调用 write(chunk, cb) 时的回调函数 this.writecb = null; // 需写入的单个 chunk 块长度 this.writelen = 0; // Writable 的缓冲池实现也是一个链表,其每一个节点的结构以下: // { // chunk, // encoding, // isBuf, // callback, // next // } // 缓冲池头节点 this.bufferedRequest = null; // 缓冲池尾节点 this.lastBufferedRequest = null; // 缓冲池的大小 this.bufferedRequestCount = 0; // 还须要执行的 callback 数量,必须在 finish 事件发生以前降为 0 this.pendingcb = 0; this.prefinished = false; this.errorEmitted = false; // cork 的回调函数,最多只能有两个函数对象 var corkReq = { next: null, entry: null, finish: undefined }; corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this); this.corkedRequestsFree = corkReq; }
Writable 与 Readable 相似,也使用一个对象 ( WritableState ) 对状态和属性进行集中管理数组
在 Writable 的构造函数参数中,options.write
函数是必须的,options.writev
则是用于批量写入数据,能够选择实现缓存
Writable 的缓冲池也是由链表实现,但与 Readable 不一样的是,Writable 的缓冲池实现更简单,其节点结构以下:异步
{ chunk, // 数据块,多是 object / string / buffer encoding, // 数据块编码 isBuf, // buffer 标识 callback, // write(chunk, cb) 的回调函数 next // 下一个写入任务 }
除此以外,WritableState 还使用 bufferedRequest、lastBufferedRequest、bufferedRequestCount 属性分别记录缓冲池的头、尾节点和节点数量函数
在 Duplex 的源码中有这么一段注释this
// a duplex stream is just a stream that is both readable and writable. // Since JS doesn't have multiple prototypal inheritance, this class // prototypally inherits from Readable, and then parasitically from // Writable.
意思是: Duplex 流既是 Readable 流又是 Writable 流,可是因为 JS 中的继承是基于原型的,没有多继承。因此 Duplex 是继承自 Readable,寄生自 Writable编码
寄生自 Writable 体如今两个方面:prototype
duplex instanceof Writable
为 true
duplex 具备 Writable 的属性和方法
// lib/_stream_writable.js var realHasInstance; if (typeof Symbol === 'function' && Symbol.hasInstance) { realHasInstance = Function.prototype[Symbol.hasInstance]; Object.defineProperty(Writable, Symbol.hasInstance, { value: function(object) { if (realHasInstance.call(this, object)) return true; return object && object._writableState instanceof WritableState; } }); } else { realHasInstance = function(object) { return object instanceof this; }; } function Writable(options) { if (!(realHasInstance.call(Writable, this)) && !(this instanceof Stream.Duplex)) { return new Writable(options); } // ... }
能够看出,经过修改 Writable 的 Symbol.hasInstance
使得 duplex/writable instanceof Writable
为 true
。Writable 的构造函数也只能在 writable 或 duplex 的上下文中调用,使 duplex 具备 Writable 的属性
// lib/_stream_duplex.js util.inherits(Duplex, Readable); var keys = Object.keys(Writable.prototype); // 获取 Writable 的全部方法 for (var v = 0; v < keys.length; v++) { var method = keys[v]; if (!Duplex.prototype[method]) Duplex.prototype[method] = Writable.prototype[method]; } function Duplex(options) { if (!(this instanceof Duplex)) return new Duplex(options); Readable.call(this, options); Writable.call(this, options); // ... }
遍历 Writable 原型上的方法,并添加到 Duplex 的原型上,使 duplex 具备 Writable 的方法
Writable 的写过程比 Readable 的读过程简单得多,不用考虑异步操做,直接写入便可
// lib/_stream_writable.js Writable.prototype.write = function(chunk, encoding, cb) { var state = this._writableState; var ret = false; // 判断是不是 buffer var isBuf = Stream._isUint8Array(chunk) && !state.objectMode; // ... if (state.ended) // Writable 结束后继续写入数据会报错 // 触发 error 事件 writeAfterEnd(this, cb); else if (isBuf || validChunk(this, state, chunk, cb)) { // 对 chunk 进行校验 // 不能为 null // undefined 和非字符串只在 objectMode 下接受 state.pendingcb++; ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb); } return ret; };
write()
函数对传入数据进行初步处理与校验后交由 writeOrBuffer()
函数继续处理
// lib/_stream_writable.js function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { if (!isBuf) { // 非 buffer 的状况有: string 或 object // 为 object,表明是 objectMode,直接返回便可 // 为 string,则需解码成 buffer var newChunk = decodeChunk(state, chunk, encoding); if (chunk !== newChunk) { isBuf = true; encoding = 'buffer'; chunk = newChunk; } } var len = state.objectMode ? 1 : chunk.length; state.length += len; var ret = state.length < state.highWaterMark; if (!ret) state.needDrain = true; if (state.writing || state.corked) { // 正在写或处于 cork 状态 // 将数据块添加到缓冲池链表为尾部 var last = state.lastBufferedRequest; state.lastBufferedRequest = { chunk, encoding, isBuf, callback: cb, next: null }; if (last) { last.next = state.lastBufferedRequest; } else { state.bufferedRequest = state.lastBufferedRequest; } state.bufferedRequestCount += 1; } else { // 写入数据 doWrite(stream, state, false, len, chunk, encoding, cb); } return ret; }
writeOrBuffer()
函数对 chunk 进行处理后,根据 Writable 自身状态决定应什么时候写入数据
若是正在写入或处于 cork 状态,就将数据存储到缓冲池链表尾部,等待之后处理。不然,直接调用 doWrite()
写入数据
当缓存达到 highWaterMark 时,writeOrBuffer()
返回 false,表示不该该再写入数据
// lib/_stream_writable.js function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writelen = len; // 写入的数据块长度 state.writecb = cb; // 写入操做的回调函数 state.writing = true; state.sync = true; // 同步状态标识 if (writev) // 一次写入多个数据块 stream._writev(chunk, state.onwrite); else // 一次写入单个数据块 stream._write(chunk, encoding, state.onwrite); state.sync = false; }
在 doWrite()
函数中,根据 writev 参数决定该执行 _write()
仍是 _writev()
_write()
函数用于写入单个数据块,_writev()
函数用于写入多个数据块
_write()
/ _writev()
中的回调函数不是传入的 cb 而是 state.onwrite
,其定义以下:
this.onwrite = onwrite.bind(undefined, stream);
可知,写入完成后,执行 onwrite(stream, err)
// lib/_stream_writable.js function onwrite(stream, er) { var state = stream._writableState; var sync = state.sync; var cb = state.writecb; // 更新 state onwriteStateUpdate(state); if (er) // 发生错误 onwriteError(stream, state, sync, er, cb); else { var finished = needFinish(state); if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) { // 清空缓冲池 // 有 _writev() 函数时,执行 _writev() 一次写入多个数据块 // 没有,则循环执行 _write() 写入单个数据块 clearBuffer(stream, state); } if (sync) { // 表明写入操做是同步的,须要在 process.nextTick() 中执行 callback process.nextTick(afterWrite, stream, state, finished, cb); } else { // 表明写入操做是异步的,直接执行 callback 便可 afterWrite(stream, state, finished, cb); } } }
当写入过程当中有错误发生时,会执行 onwriteError()
,继而调用 cb(err)
并触发 error 事件
若是写入过程正确执行,则先查看还有多少数据块正在等待写入,有多个,就执行 clearBuffer()
清空缓存,而后执行 afterWrite()
// lib/_stream_writable.js function afterWrite(stream, state, finished, cb) { if (!finished) onwriteDrain(stream, state); state.pendingcb--; // 执行回调函数 cb(); // 检查是否应该结束 Writable finishMaybe(stream, state); }
当有大量小数据块须要写入时,若是一个个写入,会致使效率低下。Writable 提供了 cork()
和 uncork()
两个方法用于大量小数据块写入的状况
先将写操做柱塞住,等到缓存达到必定量后,再解除柱塞,而后一次性将存储的数据块写入,这个操做须要 _writev()
支持
// lib/_stream_writable.js Writable.prototype.cork = function() { var state = this._writableState; // 增长柱塞的次数 state.corked++; };
因为 cork()
函数可能会被屡次调用,因此 state.corked
须要记录 cork()
调用的次数,是个 number
// lib/_stream_writable.js Writable.prototype.uncork = function() { var state = this._writableState; if (state.corked) { // 减小柱塞的次数 state.corked--; if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) // 清空缓冲池 clearBuffer(this, state); } };
当 state.corked === 0
时,才能表示柱塞已经所有解除完毕,能够执行 clearBuffer()
来处理缓存中的数据
// lib/_stream_writable.js function clearBuffer(stream, state) { // 正在清空 buffer 的标识 state.bufferProcessing = true; // 缓存的头节点 var entry = state.bufferedRequest; if (stream._writev && entry && entry.next) { // _writev() 函数存在,且有一个以上数据块,就使用 _writev() 写入数据,效率更高 var l = state.bufferedRequestCount; var buffer = new Array(l); var holder = state.corkedRequestsFree; holder.entry = entry; var count = 0; var allBuffers = true; // 取得全部数据块 while (entry) { buffer[count] = entry; if (!entry.isBuf) allBuffers = false; entry = entry.next; count += 1; } buffer.allBuffers = allBuffers; // 写入数据 doWrite(stream, state, true, state.length, buffer, '', holder.finish); state.pendingcb++; state.lastBufferedRequest = null; // 保证最多只有两个实例 if (holder.next) { state.corkedRequestsFree = holder.next; holder.next = null; } else { var corkReq = { next: null, entry: null, finish: undefined }; corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state); state.corkedRequestsFree = corkReq; } } else { // 一个个的写入 while (entry) { var chunk = entry.chunk; var encoding = entry.encoding; var cb = entry.callback; var len = state.objectMode ? 1 : chunk.length; doWrite(stream, state, false, len, chunk, encoding, cb); entry = entry.next; // 若是写操做不是同步执行的,就意味着须要等待这次写入完成,再继续写入 if (state.writing) { break; } } if (entry === null) state.lastBufferedRequest = null; } state.bufferedRequestCount = 0; // 修正缓存的头节点 state.bufferedRequest = entry; state.bufferProcessing = false; }
执行 clearBuffer()
时,根据是否有 _writev()
函数和待写入数据块数量,决定使用 _writev()
仍是 _write()
写入数据
_writev()
: 会先将全部数据块包装成数组,而后写入。写入完成后,回调 corkReq.finish
_write()
: 只须要将数据块一个个写入便可
在使用 _writev()
的状况下,写入完成后回调 corkReq.finish
也就是 onCorkedFinish()
函数
// lib/_stream_writable.js function onCorkedFinish(corkReq, state, err) { var entry = corkReq.entry; corkReq.entry = null; // 依次执行回调函数 while (entry) { var cb = entry.callback; state.pendingcb--; cb(err); entry = entry.next; } // 保证最多只有两个实例 if (state.corkedRequestsFree) { state.corkedRequestsFree.next = corkReq; } else { state.corkedRequestsFree = corkReq; } }
根据缓冲池链表的顺序,依次执行写操做的回调函数
每次调用 stream.write(chunk, cb)
,Writable 都会根据自身状态,决定是将 chunk 加到缓冲池,仍是直接写入
当须要写入大量小数据块时,推荐先使用 cork()
将写操做柱塞住,待调用完毕后,再调用 uncork()
解除柱塞,而后一次性写入全部缓存数据
参考: