今天小编就为你们分享一篇关于Node.js Stream ondata触发时机与顺序的探索,写的十分的全面细致,具备必定的参考价值,对此有须要的朋友能够参考学习下。若有不足之处,欢迎批评指正。前端
无用逻辑异步
当时研究pipe细节是基于Node.js v8.11.1的源码,其中针对上游的ondata事件处理有以下一段代码:函数
// If the user pushes more data while we're writing to dest then we'll end up // in ondata again. However, we only want to increase awaitDrain once because // dest will only emit one 'drain' event for the multiple writes. // => Introduce a guard on increasing awaitDrain. var increasedAwaitDrain = false; src.on('data', ondata); function ondata(chunk) { debug('ondata'); increasedAwaitDrain = false; var ret = dest.write(chunk); if (false === ret && !increasedAwaitDrain) { if (((state.pipesCount === 1 && state.pipes === dest) || (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; increasedAwaitDrain = true; } //在此我向你们推荐一个前端全栈开发交流圈:619586920 突破技术瓶颈,提高思惟能力 src.pause(); } }
重点关注increasedAwaitDrain变量,理解这个变量指望达到什么目的,而后仔细阅读代码,会发现if (false === ret && !increasedAwaitDrain)语句中increasedAwaitDrain变量确定是false,由于前一行才将该变量赋值为false,这样一来这个变量就变得毫无心义。学习
increasedAwaitDrain = false; var ret = dest.write(chunk); if (false === ret && !increasedAwaitDrain) {}
以上就是关键的三行代码,由于Node.js是单线程且dest.write(chunk)内部没有修改变量increasedAwaitDrain
的值,那么if语句中increasedAwaitDrain
的值确定仍是false,即increasedAwaitDrain
相关逻辑没有达到所指望的目标。优化
无用代码出现的缘由ui
前段虽已经分析出increasedAwaitDrain
没起到做用,但做者为何写了这样一段逻辑呢?其实在定义increasedAwaitDrain语句的上方,做者说可能存在这样一种状况:“当咱们接收到一次上游的ondata事件并尝试将数据写到下游时,上游可能同时又有一个data事件触发,而这两个ondata的数据在写入下游时可能都返回false,从而致使src._readableState.awaitDrain++
执行两次”。 awaitDrain++执行两次是做者不但愿看到的状况,由于下游触发drain事件时awaitDrain
相应减1,直到其值为0时才让上游从新流动,若是awaitDrain++
执行两次,下游却只触发一次drain
事件,awaitDrain
就不会为0,上游不从新流动也就没法继续读取数据。this
真相的探索过程prototype
虽然从理性上认为increasedAwaitDrain
没起到做用,但也没法确定加绝对,本身尝试去求助,没有出现高手指点出问题所在,但一个同事听我描述后,说可能这就是个BUG,虽心中以为可能性不大,但仍是抱着试试看的心态切换到master分支上去瞅瞅,随即发现最新的代码里并无与increasedAwaitDrain
相似的逻辑,间接说明v8.11.1分支上increasedAwaitDrain
相关逻辑的确无用。 虽然比较确定这里存在一段无用代码,但应该如何理解做者在increasedAwaitDrain
上方的注释呢?为了进一步揭露真相,本身继续花时间去看了看stream.Readable
相关代码,想知道data事件的触发时机与顺序是如何决定的。线程
readable流的简单原理debug
在进一步解释data事件的触发顺序前,简单讲一下readable流的实现原理,若是须要本身实现一个readable
流,可使用new stream.Readable(options)
方法,其中options可包含四个属性:highWaterMark、encoding、objectMode、read。最主要的是read属性,当流的使用者须要数据时,read方法被用来从数据源获取数据,而后经过this.push(chunk)将数据传递给使用者,若是没有更多数据可供读取时使用this.push(null)表示读取结束。
const Readable = require('stream').Readable; let letter = 'ABCDEFG'.split(''); let index = 0; const rs = new Readable({ read(size) { this.push(letter[index++] || null); } }); rs.on('data', chunk => { console.log(chunk.toString()); }); //在此我向你们推荐一个前端全栈开发交流圈:619586920 突破技术瓶颈,提高思惟能力 // 输出 // A // B // C // ...
这里ondata虽然没有明显调用read方法,但内部依旧是经过调用read方法结合this.push输出数据,而且在源代码内部能够发现经过参数传递的read方法实际上被赋值给this._read,而后在Readable.prototype.read中调用this._read获取数据。
灵魂代码
为了进一步说明stream.Readable的data事件触发顺序与场景,将有关官方源码通过修改和删减成以下:
function Readable(options) { this._read = options.read; // 将参数传递的read函数赋值到this._read } // 使用者经过调用read方法获取数据 Readable.prototype.read = function (size) { var state = this._readableState; // 模拟锁,一次_read若是没有返回(this.push),后续read不会继续调用_read读取数据 if (!state.reading) { state.reading = true; state.sync = true; // sync用于在push方法中指示_read内部是否同步调用了push this._read(size); state.sync = false; } // _read内部若是是同步调用push,数据会放入缓冲区 // _read内部若是是异步调用push且缓冲区没有内容,数据可能emit data返回 // 尝试从缓冲区(state.buffer)中获取大小为size的数据,若是获取成功则触发data事件 if (ret) this.emit('data', ret); return ret; }; // 在this._read执行过程当中经过this.push输出数据 Readable.prototype.push = function (chunk, encoding) { var state = this._readableState; // 本次_read获取到数据,打开锁 state.reading = false; // 流动模式 & 缓冲区没有数据 & 非同步返回,则直接触发data事件 if (state.flowing && state.length === 0 && !state.sync) { stream.emit('data', chunk); stream.read(0); // 触发下一次读取,_read异步push的话仍是会到这里,相似flow中的保持流出于流动 } else { // 将数据放入缓冲区 state.length += chunk.length; state.buffer.push(chunk); } }; // 暂停流动 Readable.prototype.pause = function() { if (this._readableState.flowing !== false) { this._readableState.flowing = false; this.emit('pause'); } return this; }; function flow(stream) { const state = stream._readableState; while (state.flowing && stream.read() !== null); } //在此我向你们推荐一个前端全栈开发交流圈:619586920 突破技术瓶颈,提高思惟能力
data事件的触发时机与顺序
时机
data的触发只有两处:
顺序
关于data的触发顺序,实际是由emit顺序决定,为讨论原始问题:“increasedAwaitDrain相关逻辑为何能够被删除?”,将代码简化:
let count = 0; src.on('data', chunk => { let ret = dest.write(chunk); if (!ret) { count++; src.pause(); } });
当监听流的data事件时,流最终会经过resume并调用flow函数进入流动模式模式,即不断的调用read方法读取数据。接下来分析如下几种场景,当dest.write(chunk)返回false时++count会执行几回,注意结合前文的灵魂代码。
//在此我向你们推荐一个前端全栈开发交流圈:619586920 突破技术瓶颈,提高思惟能力
场景一:每次_read同步push一次数据
当发生第一次读取,数据同步push到缓冲区,紧接着从缓冲区中读取数据并经过emit data的方式传递到ondata中,若是此时dest.write(chunk)返回false,count++将执行一次,接着因为调用了stream.pause(),while条件state.flowing为false致使stream.read再也不被调用,在流从新流动前,count的值不会继续增长。
场景二:每次_read异步push一次数据
当发生第一次读取,异步push的数据将直接经过emit data传递到ondata中,而read函数中的emit因为没法从缓冲区读取数据从而不会触发,同时read返回null致使while循环也相应中止,此种状况下异步push触发data事件后,紧接着的stream.read(0)会继续保持流的流动,当dest.write(chunk)返回false,count++执行一次并将流暂停,紧接着会继续调用一次read,但此次数据将被放入缓冲区且不触发data事件,count++依旧只执行一次。 场景二流暂停一次后再次流动时,数据消耗模式与以前会有所差别,会优先消耗缓冲区数据直至为空时回到以前的模式,但这一样不会致使count++执行屡次。
场景三:每次_read屡次同步push数据
与场景一相似,只是每次_read会屡次往缓冲区写入数据,最终data事件仍是依靠从缓冲区读数据后触发。
场景四:每次_read屡次异步push数据
同场景二相似,假设在一次_read中有两次异步push,当第一个异步push执行时,data事件触发且其中的dest.write(chunk)返回false,致使count++同时流被暂停,等第二个异步push执行时,因为流已经暂停,数据将写入缓冲区而不是触发data事件,因此count++只执行一次。
场景五:_read操做可能同步或异步push
不论是同步或者异步push,当一次ondata内部将流设置为暂停模式后,flow函数中while条件state.flowing为false将致使stream.read再也不调用,异步的push的emit data判断条件一样再也不知足,即目前阶段内部不会再有data事件触发直到外部再次间接或直接调用read方法。 以上五个场景是为了分析该问题而模拟的,实际只要能理解第五个场景就能明白全部。
小结
文章最终写出来的内容与我最开始的初衷所偏离,并且本身不知道如何评价这篇文章的好坏,但为了写这文章花了两天业余时间去深刻理解stream.Readable倒是很是有收获的一件事情,更坚决本身在写文章的路途上能够走的更远。 PS:猜想为何有烂电影的存在,多是由于导演长时间投入的创做会让他迷失在内部而没法发现问题,写文章也是,难以经过阅读去优化费心思写的文章。
PS:下图是美团博客的,也许我写了这么多却抵不上这张图,说明方式很重要。
结语
感谢您的观看,若有不足之处,欢迎批评指正。