本次试图浅析探索Nodejs的Stream模块中对于Readable类的一部分实现(可写流也差很少)。其中会以可读流两种模式中的paused mode即暂停模式的表现形式来解读源码上的实现,为何不分析flowing mode天然是由于这个模式是咱们经常使用的其原理相比暂停模式下相对简单(实际上是由于笔者老是喜欢关注一些边边角角的东西,不按套路出牌=。=),同时核心方法都是同样的,一通百通嘛,有兴趣的童鞋能够本身看下完整源码。html
欢迎关注个人博客,不按期更新中——前端
首先先明确为何Nodejs要实现一个stream,这就要清楚关于生产者消费者问题的概念。node
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要做用是生成必定量的数据放到缓冲区中,而后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
简单来讲就是内存问题。与前端不一样,后端对于内存仍是至关敏感的,好比读取文件这种操做,若是文件很小就算了,但若是这个文件一个g呢?难道全读出来?这确定是不可取的。经过流的形式读一部分写一部分慢慢处理才是一个可取的方式。PS:有关为何使用stream欢迎你们百(谷)度(歌)一下。git
如今咱们将本身实现一个可读流,以此来方便观察以后数据的流动过程:github
const Readable = require('stream').Readable; // 实现一个可读流 class SubReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } // 文档提出必须经过_read方法调用push来实现对底层数据的读取 _read() { console.log('阈值规定大小:', arguments['0'] + ' bytes') const data = this.dataSource.makeData() let result = this.push(data) if(data) console.log('添加数据大小:', data.toString().length + ' bytes') console.log('已缓存数据大小: ', subReadable._readableState.length + ' bytes') console.log('超过阈值限制或数据推送完毕:', !result) console.log('====================================') } } // 模拟资源池 const dataSource = { data: new Array(1000000).fill('1'), // 每次读取时推送必定量数据 makeData() { if (!dataSource.data.length) return null; return dataSource.data.splice(dataSource.data.length - 5000).reduce((a,b) => a + '' + b) } //每次向缓存推5000字节数据 }; const subReadable = new SubReadable(dataSource);
至此subReadable即是咱们实现的自定义可读流。后端
先来看下总体的流程:
可读流会经过_read()
方式从资源读取数据到缓存池,同时设置了一个阈值highWaterMark
,标记数据到缓存池大小的一个上限,这个阈值是会浮动的,最小值也是默认值为16384。当消费者监听了readable
事件以后,就能够显式调用read()
方法来读取数据。api
经过注册readable事件以此来触发暂停模式:缓存
subReadable.on('readable', () => { console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte') console.log('------------------------------------') })
能够发现当注册readable
事件后可对流会从底层资源推送数据到缓存直到达到超过阈值或者底层数据所有加载完。多线程
调用read(n); n = 1000;函数
首先修改资源池大小data: new Array(10000).fill('1')
(方便打印数据),执行read(1000)每次读取1000字节资源读取资源:
subReadable.on('readable', () => { let chunk = subReadable.read(1000) if(chunk) console.log(`读取 ${chunk.length} bytes数据`); console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte') console.log('------------------------------------') })
结果执行了两次读取数据,同时若是每次读取的字节少于缓存中的数据,则可读流不会再从资源加载新的数据。
无参调用read()
subReadable.on('readable', () => { let chunk = subReadable.read() if(chunk) console.log(`读取 ${chunk.length} bytes数据`); console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte') console.log('------------------------------------') })
直接调用read()
后,会逐步读取彻底部资源,至于每次读取多少下文会统一探讨。
以上咱们依次尝试了在实现可读流后触发暂停模式会发生的事情,接下来做者将会对如下几个可能有疑问的点进行探究:
_read()
方法并在其中调用push()
read()
与传入固定数据的区别_read()
方法并在其中调用push()
Readable.prototype._read = function(n) { this.emit('error', new errors.Error('ERR_STREAM_READ_NOT_IMPLEMENTED')); }; //只是定义接口 Readable.prototype.read = function(n) { ... var doRead = state.needReadable; if (doRead) { this._read(state.highWaterMark); } }
当咱们调用subReadable.read()便会执行到上面的代码,能够发现,源码中
对于_read()
只是定义了一个接口,里面并无具体实现,若是咱们不本身定义那么就会报错。同时read()
中会执行它经过它调用push()
来从资源中读取数据,而且传入highWaterMark
,这个值你能够用也能够不用由于_read()
是咱们本身实现的。
Readable.prototype.push = function(chunk, encoding) { ... return readableAddChunk(this, chunk, encoding, false, skipChunkCheck); };
从代码中能够看出,将底层资源推送到缓存中的核心操做是经过push,经过语义化也能够看出push方法中最后会进行添加新数据的操做。因为以后方法中嵌套不少,不一一展现,直接来看最后调用的方法:
// readableAddChunk最后会调用addChunk function addChunk(stream, state, chunk, addToFront) { ... state.buffer.push(chunk); //数据推送到buffer中 if (state.needReadable)//判断此属性值来看是否触发readable事件 emitReadable(stream); maybeReadMore(stream, state);//可能会推送更多数据到缓存 }
咱们能够看出,方法调用的最后确实执行了资源数据推送到缓存的操做。与此同时在会判断needReadable属性值来看是否触发readable回调事件。而这也为以后咱们来分析为何注册了readable事件以后会执行一次回调埋下了伏笔。最后调用maybeReadMore()则是蓄满缓存池的方法。
先来看下源码里是如何绑定的事件:
Readable.prototype.on = function(ev, fn) { if (ev === 'data') { ... } else if (ev === 'readable') { const state = this._readableState; state.needReadable = true;//设定属性为true,触发readable回调 ... process.nextTick(nReadingNextTick, this); } }; function nReadingNextTick(self) { self.read(0); } //以后执行read(0) => _read() => push() => addChunk() // => maybeReadMore()
maybeReadMore()中当缓存池存储大小小于阈值时则会一直调用read(0)不读取数据,可是会一直push底层资源到缓存:
function maybeReadMore_(stream, state) { ... if (state.length < state.highWaterMark) { stream.read(0); } }
上文提到过,绑定事件后会开始推送数据至缓存池,最后会执行到addChunk()方法,内部经过needReadable属性来判断是否触发readable事件。当你第一次绑定事件时会执行state.needReadable = true;,从而在最后推送数据后会执行触发readable的操做。
read()
与传入特定数值的区别区别在执行read()方法的时候,会将参数n传入到下面这个函数中由它来计算如今应该应该读取多少数据:
function howMuchToRead(n, state) { if (n <= 0 || (state.length === 0 && state.ended)) return 0; if (state.objectMode) return 1; if (n !== n) { // Only flow one buffer at a time if (state.flowing && state.length) return state.buffer.head.data.length; else return state.length; } // If we're asking for more than the current hwm, then raise the hwm. if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); if (n <= state.length) return n; // Don't have enough if (!state.ended) { //传输没有结束都是false state.needReadable = true; return 0; } return state.length; }
当直接调用read(),n参数则为NaN,当处于流动模式的时候n则为buffer头数据的长度,不然是整个缓存的数据长度。若为read(n)传入数字,大于当前的hwm时能够发现会从新计算一个hwm,与此同时若是已缓存的数据小于请求的数据量,那么将设置state.needReadable = true;
并返回0;
第一次试图梳理源码的思路,一路写下来发现有不少想说可是又不知道怎么连贯的理清楚=。= 既然代码细节也有些说不清,不过最后仍是进行一个核心思路的提炼:
源码的边界状况比较多。做者若是哪里说错了请指正=。=
PS:源码地址
惯例po做者的博客,不定时更新中——有问题欢迎在issues下交流。