Node.js Stream中Readable类的内部实现

写在最前

本次试图浅析探索Nodejs的Stream模块中对于Readable类的一部分实现(可写流也差很少)。其中会以可读流两种模式中的paused mode即暂停模式的表现形式来解读源码上的实现,为何不分析flowing mode天然是由于这个模式是咱们经常使用的其原理相比暂停模式下相对简单(实际上是由于笔者老是喜欢关注一些边边角角的东西,不按套路出牌=。=),同时核心方法都是同样的,一通百通嘛,有兴趣的童鞋能够本身看下完整源码html

欢迎关注个人博客,不按期更新中——前端

生产者消费者问题

首先先明确为何Nodejs要实现一个stream,这就要清楚关于生产者消费者问题的概念。node

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要做用是生成必定量的数据放到缓冲区中,而后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

简单来讲就是内存问题。与前端不一样,后端对于内存仍是至关敏感的,好比读取文件这种操做,若是文件很小就算了,但若是这个文件一个g呢?难道全读出来?这确定是不可取的。经过流的形式读一部分写一部分慢慢处理才是一个可取的方式。PS:有关为何使用stream欢迎你们百(谷)度(歌)一下。git

实现一个可读流

如今咱们将本身实现一个可读流,以此来方便观察以后数据的流动过程:github

const Readable = require('stream').Readable;
// 实现一个可读流
class SubReadable extends Readable {
  constructor(dataSource, options) {
    super(options);
    this.dataSource = dataSource;
  }
  // 文档提出必须经过_read方法调用push来实现对底层数据的读取
  _read() {
    console.log('阈值规定大小:', arguments['0'] + ' bytes')
    const data = this.dataSource.makeData()
    let result = this.push(data)
    if(data) console.log('添加数据大小:', data.toString().length + ' bytes')
    console.log('已缓存数据大小: ', subReadable._readableState.length + ' bytes')
    console.log('超过阈值限制或数据推送完毕:', !result)
    console.log('====================================')
  }
}

// 模拟资源池
const dataSource = {
  data: new Array(1000000).fill('1'),
  // 每次读取时推送必定量数据
  makeData() {
    if (!dataSource.data.length) return null;
    return dataSource.data.splice(dataSource.data.length - 5000).reduce((a,b) => a + '' + b)
  }
  //每次向缓存推5000字节数据
};

const subReadable = new SubReadable(dataSource);

至此subReadable即是咱们实现的自定义可读流。后端

Paused Mode 暂停模式都作了什么?

先来看下总体的流程:
image.png
可读流会经过_read()方式从资源读取数据到缓存池,同时设置了一个阈值highWaterMark,标记数据到缓存池大小的一个上限,这个阈值是会浮动的,最小值也是默认值为16384。当消费者监听了readable事件以后,就能够显式调用read()方法来读取数据。api

触发暂停模式

经过注册readable事件以此来触发暂停模式:缓存

subReadable.on('readable', () => {
    console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte')
    console.log('------------------------------------')
})

image.png
能够发现当注册readable事件后可对流会从底层资源推送数据到缓存直到达到超过阈值或者底层数据所有加载完。多线程

开始消费数据

调用read(n); n = 1000;函数

首先修改资源池大小data: new Array(10000).fill('1')(方便打印数据),执行read(1000)每次读取1000字节资源读取资源:

subReadable.on('readable', () => {
    let chunk = subReadable.read(1000)
    if(chunk) 
      console.log(`读取 ${chunk.length} bytes数据`);
    console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte')
    console.log('------------------------------------')
})

image.png
结果执行了两次读取数据,同时若是每次读取的字节少于缓存中的数据,则可读流不会再从资源加载新的数据。

无参调用read()

subReadable.on('readable', () => {
    let chunk = subReadable.read()
    if(chunk) 
      console.log(`读取 ${chunk.length} bytes数据`);
    console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte')
    console.log('------------------------------------')
})

image

直接调用read()后,会逐步读取彻底部资源,至于每次读取多少下文会统一探讨。

小结

以上咱们依次尝试了在实现可读流后触发暂停模式会发生的事情,接下来做者将会对如下几个可能有疑问的点进行探究:

  • 为何本身实现的可读流要实现_read()方法并在其中调用push()
  • 触发暂停模式后缓存池如何被蓄满,以及为什么会直接执行一次回调
  • 无参调用read()与传入固定数据的区别

为何本身实现的可读流要实现_read()方法并在其中调用push()

Readable.prototype._read = function(n) {
  this.emit('error', new errors.Error('ERR_STREAM_READ_NOT_IMPLEMENTED'));
}; //只是定义接口
Readable.prototype.read = function(n) {
    ...
    var doRead = state.needReadable;
    if (doRead) {
        this._read(state.highWaterMark);
    }
}

当咱们调用subReadable.read()便会执行到上面的代码,能够发现,源码中
对于_read()只是定义了一个接口,里面并无具体实现,若是咱们不本身定义那么就会报错。同时read()中会执行它经过它调用push()来从资源中读取数据,而且传入highWaterMark,这个值你能够用也能够不用由于_read()是咱们本身实现的。

Readable.prototype.push = function(chunk, encoding) {
  ...
  return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};

从代码中能够看出,将底层资源推送到缓存中的核心操做是经过push,经过语义化也能够看出push方法中最后会进行添加新数据的操做。因为以后方法中嵌套不少,不一一展现,直接来看最后调用的方法:

// readableAddChunk最后会调用addChunk
function addChunk(stream, state, chunk, addToFront) {
  ...
    state.buffer.push(chunk); //数据推送到buffer中
    if (state.needReadable)//判断此属性值来看是否触发readable事件
      emitReadable(stream);
    maybeReadMore(stream, state);//可能会推送更多数据到缓存
}

咱们能够看出,方法调用的最后确实执行了资源数据推送到缓存的操做。与此同时在会判断needReadable属性值来看是否触发readable回调事件。而这也为以后咱们来分析为何注册了readable事件以后会执行一次回调埋下了伏笔。最后调用maybeReadMore()则是蓄满缓存池的方法。

触发暂停模式后缓存池如何被蓄满

先来看下源码里是如何绑定的事件:

Readable.prototype.on = function(ev, fn) {
  if (ev === 'data') {
    ...
  } else if (ev === 'readable') {
    const state = this._readableState;
    state.needReadable = true;//设定属性为true,触发readable回调
    ...
        process.nextTick(nReadingNextTick, this);
  }
};
function nReadingNextTick(self) {
  self.read(0);
}
//以后执行read(0) => _read() => push() => addChunk()
//        => maybeReadMore()

maybeReadMore()中当缓存池存储大小小于阈值时则会一直调用read(0)不读取数据,可是会一直push底层资源到缓存:

function maybeReadMore_(stream, state) {
...
  if (state.length < state.highWaterMark) {
    stream.read(0);
  }
}

绑定监听事件后为什么会直接执行一次回调

上文提到过,绑定事件后会开始推送数据至缓存池,最后会执行到addChunk()方法,内部经过needReadable属性来判断是否触发readable事件。当你第一次绑定事件时会执行state.needReadable = true;,从而在最后推送数据后会执行触发readable的操做。

read()与传入特定数值的区别

区别在执行read()方法的时候,会将参数n传入到下面这个函数中由它来计算如今应该应该读取多少数据:

function howMuchToRead(n, state) {
  if (n <= 0 || (state.length === 0 && state.ended))
    return 0;
  if (state.objectMode)
    return 1;
  if (n !== n) {
    // Only flow one buffer at a time
    if (state.flowing && state.length)
      return state.buffer.head.data.length;
    else
      return state.length;
  }
  // If we're asking for more than the current hwm, then raise the hwm.
  if (n > state.highWaterMark)
    state.highWaterMark = computeNewHighWaterMark(n);
  if (n <= state.length)
    return n;
  // Don't have enough
  if (!state.ended) { //传输没有结束都是false
    state.needReadable = true;
    return 0;
  }
  return state.length;
}

当直接调用read(),n参数则为NaN,当处于流动模式的时候n则为buffer头数据的长度,不然是整个缓存的数据长度。若为read(n)传入数字,大于当前的hwm时能够发现会从新计算一个hwm,与此同时若是已缓存的数据小于请求的数据量,那么将设置state.needReadable = true;并返回0;

总结

第一次试图梳理源码的思路,一路写下来发现有不少想说可是又不知道怎么连贯的理清楚=。= 既然代码细节也有些说不清,不过最后仍是进行一个核心思路的提炼:

核心方法:

  • Readable.prototype.read()
  • Readable.prototype.push(); push中可能会执行emitReadable();

核心属性:

  • this.needReadable:经过此属性来决定是否触发回调

核心思路:

  1. 推送数据至缓存与读取缓存数据的操做均由read()控制(由于read内部既实现了push也实现了howMuchToread(),不一样的是前者为read(0)即只推送不读取;后者为read()或read(n);
  2. 注册readable事件后,执行read(0)资源就被push到缓存中,直到达到highWaterMark
  3. 期间会触发回调函数,若是执行read()则至关于每次都会把缓存中的数据所有取出来,缓存中时刻没有数据只能继续从资源获取直到数据所有取出并读取完毕。若执行read(n)(n < highWaterMark),则只会取出2n的数据,同时缓存资源大于n时将会中止。由于此时会认为你每次只取n个数据,缓存中彻底够用,因此就不会再更新数据也就不会再触发回调。

参考资料

最后

源码的边界状况比较多。做者若是哪里说错了请指正=。=
PS:源码地址

惯例po做者的博客,不定时更新中——有问题欢迎在issues下交流。

相关文章
相关标签/搜索