流“忙”

为何要使用流?

这里就要提到一个网上的例子了。首先,咱们须要先生成一个大文件:node

const fs = require('fs');
const file = fs.createWriteStream('./test.txt');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();
复制代码

这就生成了一个大概 450MB 的文件。第二步,咱们须要启动一个简单的 Node 服务器。缓存

const fs = require('fs');
const http = require('http');
const server = http.createServer();

server.on('request', (req, res) => {
  fs.readFile('./test.txt', (err, data) => {
    if (err) throw err;
  
    res.end(data);
  });
});
server.listen(3000);
复制代码

把代码写好以后,我在本地全局安装了一个 pm2,固然了,你也可使用系统自带任务管理器、活动监视器等等来监控内存消耗的变化,pm2 比较直观好用,我就直接使用了 pm2。服务器

这是初始状态:curl

咱们打开命令行,使用 curl 来访问服务器:curl http://localhost:3000ide

这时候,咱们能够经过 pm2 monit index.js 来查看。函数

能够看到如今的内存消耗为:学习

能够看到,用这种方法是将整个文件都放在了内存当中,若是文件小还好,可是当数据量很是大的时候,就会形成内存不够用的问题,而且当你将内存占个七七八八也会影响到用户访问的速度,针对这种状况,咱们使用流来解决这个问题。ui

首先,HTTP 响应对象也是一种可写流,咱们能够建立一个文件的可读流,经过 pipe 链接这两个流,咱们能够看下这样的内存消耗。this

const fs = require('fs');
const http = require('http');

const server = http.createServer();

server.on('request', (req, res) => {
  fs.createReadStream('./test.txt').pipe(res);
});

server.listen(3000);
复制代码

能够看到在使用流的状况下,咱们内存的消耗是十分少的。感觉到流的魅力了么?一块儿来学习吧。url

从小学数学题开始

你们应该都记得,小学的时候常常碰到这样的题:有一个 x 立方米的水池,一个进水管每秒进水 a 立方米,一个出水管每秒出水 b 立方米,问啥时候水池积满?不知道你们小时候有没有骂过这道题,一边进一边出这不是有病么?哈哈,不扯远了,继续回到咱们的流,这个题的模型其实和咱们的流很类似,固然了,上面这个题还涉及到了流控的问题,下面再说。

在 Node.js 中,有四种基本的流类型:

  • Readable: 可读流,你能够把它想象成进水管,它将数据源以流的方式一点一点的提供给下游。
  • Writable: 可写流,这是做为下游来消耗上游提供的数据,你能够把它想象成出水管。
  • Duplex: 双工流,便可读也可写。
  • Transform: 继承自 Duplex 的双工流,它能够在写入或者读取数据的时候修改数据。

可读流

说可读流以前,先划重点:

  1. read() 和 _read()
  2. flowing 和 pause 模式
  3. 事件 readable 和 data

重点画出来了咱们开始一点一点扒可读流,首先,咱们要说的就是 read_read 这两个函数,正常状况下,咱们在使用可读流的时候,须要提供一个 _read 方法,负责向缓冲区补充数据,它要从数据源拉取数据,而后在这个方法中调用 push 方法把数据推到缓冲池中,当流结束时,咱们须要 push(null),而 read(size) 函数是咱们要消耗缓冲区的数据的时候使用的方法,不须要咱们本身实现。

下面要说的是可读流分两种模式:flowing 和 pause,这两种模式决定了数据流动的方式。

flowing 模式

在 flowing 模式下,数据会源源不断的产生,每次都会触发 data 事件 ,经过监听这个事件来获取数据。

那么在什么状况下会进入 flowing 模式呢?OK,扒源码,经过看可读流的源码,知道在流的属性里有一个 flowing 的属性,这个属性初始化的时候为 null。这时候是处于 pause 模式,咱们在当前文件全局搜一下 flowing = 发现当咱们调用 resume() 的时候会将这个标志位设为 true,这时候就处于 flowing 模式了,那么还有没有其余的方法呢?答案是确定的,是有的,看源码:

Readable.prototype.on = function(ev, fn) {
  const res = Stream.prototype.on.call(this, ev, fn);
  const state = this._readableState;
  if(ev === 'data') {
    state.readableListening = this.listenerCount('readable') > 0;
    if (state.flowing !== false)
      this.resume();
  } else if ('readable'){
      //...
  }
  return res;
}
复制代码

咱们能够关注到,当咱们监听 data 事件的时候,由于当前初始化标志位为 null,因此会去调用 resume(),这时候就会进入 flowing 模式,同时,当可读流调用 pipe 的时候会去监听 data 事件,也会进入 flowing 模式。

那么当你监听 data 事件进入 flowing 模式时,整个代码流程是什么呢?

从这张图,咱们能看出 flowing 模式的一个大概流程,从初始化开始,flowing = null,而后当咱们监听 data 事件,会去调用 this.resume(),这时候就将 flowing 变为 true,而后调用了 resume,在这个函数里,调用了 read(0) 去触发 _read() 向缓冲区补充数据,这里要提一点的是当咱们调用 read(0) 的时候,不会破坏缓冲区的数据和状态,并触发 _read 去读取数据到缓冲区。接下来就是不断的循环往复,直到 push(null) 则流结束。

pause 模式

如今知道了 flowing 模式,那么 pause 模式又是怎样的呢?首先咱们来看如何进入 pause 模式:

  1. 刚刚说过,可读流的初始状态就是 pause 模式。
  2. 调用 pause 方法
  3. 调用 unpipe 方法

通常来讲咱们不多会去使用到 pause 模式,在 pause 模式下,咱们须要手动的调用 read() 函数去获取数据。

readable 和 data

这两个都是关于数据的事件,至于 end 事件,很简单,就很少说了。

那么 readable 事件表明了什么呢?readable 只负责通知消费者流里有新数据或者流读完了,至于如何使用则是消费者本身的事情了,这时候 read() 就会返回新数据或者是 null。

至于 data 事件,咱们看一下上面那张图,这个事件是在流把数据传递给消费者的时候触发的。

那么咱们同时监听 datareadable 事件会怎么样呢?从上面的图咱们能够得知,当监听 data 事件的时候,流直接将数据传递给了消费者,并无进入缓冲区,只会触发 data 事件,而只有当数据消耗完成时 push(null) 会触发 readable 事件。

可写流

可写流是做为下流来消耗上游的数据,那么开始划重点:

  1. _write 和 write
  2. finish 和 prefinishi 事件

和可读流同样,咱们须要在初始化流的时候提供一个 _write() 方法,用来向底层写数据,而 write() 方法是用来向可写流提供数据的,注意在 _write 方法中的第三个参数在源码中是一个叫 onwrite 的方法,这是为了代表当前写入数据已经完成了,能够开始写入下面的数据了。可写流的终止信号是调用 end() 方法。

那么可写流是如何监听流结束事件呢?答案是有两个事件能够监听,一个是 prefinish,另外一个是 finish

这两个事件的区别是,finish 是在流的全部数据都写入底层而且回调函数都执行了才会触发,而 prefinish 的触发条件是全部的数据都写入底层,这二者之间仍是有必定差别的。

Duplex 和 Tranform

Duplex 的代码量很是少,由于它同时继承了可读流和可写流,它同时包含了这两种流原型上的方法,同时包含了两种流的属性。因此咱们既能够实现 _read 将它当成可读流也能够实现 _write 将其当成可写流来使用。

而 Transform 继承了 Duplex,而且关联了两个缓存池,咱们向流中写入数据,就可以进行转换,而后再读取,那为何能够这样操做呢?

咱们去看看源码,Transform 本身实现了 _write_read 方法,注意的是这里使用的是同一个缓存,咱们来看这么一段代码。

const { Transform } = require('stream')

var transform = Transform({
  transform: function (buf, _, next) {
    next(null, buf.toString().replace('a', 'b'))
  }
})

// 'b'
transform.pipe(process.stdout)

transform.write('a')
transform.end()
复制代码

上面的代码主要流程是这样的,Transform 调用了继承自可写流的 write 方法,而后这个方法调用本身实现的 _write 将写入的数据存到了 Transform 的缓存中,而后将其转换成 buffer,在其后 _read 函数被调用,在这个函数中调用了在初始化的时候传入转换函数 _transform 对数据进行转换,在转换事后就是 readable.pipe(writable) 的模式了。

还有一点是,Transform 还有一个 _flush 函数,在 prefinish 触发时就会调用它,说明写流结束了。

神器 pipe

在咱们进行可写流和可读流的对接的时候咱们要处理各类事件,以及流控的问题,就像咱们在上面提到的那道题,若是读流速度太快,而写流速度慢,就会致使速度不匹配的问题,而 pipe 实现了一套背压平衡机制来控制两边的速度。

那关于 pipe 的源码解析等等能够去看看这篇文章

总结

在 Node 里,流是很是重要的一个模块,它可以很好的处理大文件,以及对数据的处理能力。此次对流的学习也是收获了很多东西,与君共勉!

相关文章
相关标签/搜索