[译] Node.js 流: 你须要知道的一切

Node.js 流: 你须要知道的一切

图片来源javascript

Node.js 中的流有着难以使用,更难以理解的名声。如今我有一个好消息告诉你:事情已经再也不是这样了。html

很长时间以来,开发人员创造了许许多多的软件包为的就是能够更简单的使用流。可是在本文中,我会把重点放在原生的 Node.js 流 API上。前端

“流是 Node 中最棒的,同时也是最被人误解的想法。”java

— Dominic Tarrnode

流究竟是什么呢?

流是数据的集合 —— 就像数组或字符串同样。区别在于流中的数据可能不会马上就所有可用,而且你无需一次性地把这些数据所有放入内存。这使得流在操做大量数据或是数据从外部来源逐发送过来的时候变得很是有用。react

然而,流的做用并不只限于操做大量数据。它还带给咱们组合代码的能力。就像咱们能够经过管道链接几个简单的 Linux 命令以组合出强大的功能同样,咱们能够利用流在 Node 中作一样的事。android

Linux 命令的组合性ios

const grep = ... // 一个 grep 命令输出的 stream
const wc = ... // 一个 wc 命令输入的 stream

grep.pipe(wc)复制代码

Node 中许多内建的模块都实现了流接口:git

截屏来自于个人 Pluralsight 课程 —— 高级 Node.jsgithub

上边的列表中有一些 Node.js 原生的对象,这些对象也是能够读写的流。这些对象中的一部分是既可读、又可写的流,例如 TCP sockets,zlib 以及 crypto。

须要注意的是这些对象是紧密关联的。虽然一个 HTTP 响应在客户端是一个可读流,但在服务器端它倒是一个可写流。这是由于在 HTTP 的状况中,咱们基本上是从一个对象(http.IncomingMessage)读取数据,向另外一个对象(http.ServerResponse)写入数据。

还须要注意的是 stdio 流(stdinstdoutstderr)在子进程中有着与父进程中相反的类型。这使得在子进程中从父进程的 stdio 流中读取或写入数据变得很是简单。

一个流的真实例子

理论是伟大的,当每每没有 100% 的说服力。下面让咱们经过一个例子来看看流在节省内存消耗方面能够起到的做用。

首先让咱们建立一个大文件:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();复制代码

看看在建立这个大文件时我用到了什么。一个可写流!

经过 fs 模块你可使用一个流接口读取或写入文件。在上面的例子中,咱们经过一个可写流向 big.file 写入了 100 万行数据。

执行这段脚本会生成一个约 400MB 大小的文件。

如下是一个用来发送 big.file 文件的 Node web 服务器:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;

    res.end(data);
  });
});

server.listen(8000);复制代码

当服务器收到请求时,它会经过异步方法 fs.readFile 读取文件内容发送给客户端。看起来咱们并无阻塞事件循环。一切看起来还不错,是吧?是吗?

让咱们来看看真实的状况吧。咱们启动服务器,发起链接,并监控内存的使用状况。

当我启动服务器的时候,它占用了一个正常大小的内存空间,8.7MB:

当我链接到服务器的时候。请注意内存消耗的变化:

哇 —— 内存消耗暴增到 434.8MB。

在咱们将其写入响应对象以前,咱们基本上把 big.file 的所有内容都载入到内存中了。这是很是低效的。

HTTP 响应对象也是一个可写流。这意味着若是咱们有一个表明了 big.file 内容的可读流,咱们就能够经过将两个流链接起来以实现相同的功能而没必要消耗约 400MB 的内存。

Node fs 模块中的 createReadStream 方法能够针对任何文件给咱们返回一个可读流。咱们能够把它和响应对象链接起来:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);复制代码

如今,当你再次链接到服务器时,神奇的事情发生了(请注意内存消耗):

发生了什么?

当客户端请求这个大文件时,咱们经过流逐块的发送数据。这意味着咱们不须要把文件的所有内容缓存到内存中。内存消耗只增加了大约 25MB。

你能够把这个例子推向极端。从新生成一个 500 万行而不是 100 万行的 big.file 文件。它大概有 2GB 那么大。这已经超过了 Node 中默认的缓冲区大小的上限。

若是你尝试经过 fs.readFile 读取那个文件,默认状况下会失败(固然你能够修改缓冲区大小上限)。可是经过使用 fs.createReadStream,向客户端发送一个 2GB 的文件就没有任何问题。更棒的是,进程的内存消耗并不会因文件增大而增加。

准备好学习流了吗?

这篇文章是个人 Pluralsight 课堂上 Node.js 课程中的一部分。你能够经过这个连接找到这部份内容的视频版。

流快速入门

在 Node.js 中有四种基本类型的流:可读流,可写流,双向流以及变换流。

  • 可读流是对一个能够读取数据的源的抽象。fs.createReadStream 方法是一个可读流的例子。
  • 可写流是对一个能够写入数据的目标的抽象。fs.createWriteStream 方法是一个可写流的例子。
  • 双向流既是可读的,又是可写的。TCP socket 就属于这种。
  • 变换流是一种特殊的双向流,它会基于写入的数据生成可供读取的数据。例如使用 zlib.createGzip 来压缩数据。你能够把一个变换流想象成一个函数,这个函数的输入部分对应可写流,输出部分对应可读流。你也可能据说过变换流有时被称为 “thought streams”。

全部的流都是 EventEmitter 的实例。它们发出可用于读取或写入数据的事件。然而,咱们能够利用 pipe 方法以一种更简单的方式使用流中的数据。

pipe 方法

如下这行代码就是你要记住的魔法:

readableSrc.pipe(writableDest)复制代码

在这行简单的代码中,咱们以管道的方式把一个可读流的输出链接到了一个可写流的输入。管道的上游(source)必须是一个可读流,下游(destination)必须是一个可写流。固然,它们也能够是双向流/变换流。事实上,若是咱们使用管道链接的是双向流,咱们就能够像 Linux 系统里那样链接多个流:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)复制代码

pipe 方法会返回最后一个流,这使得咱们能够串联多个流。对于流 a (可读),bc (双向),以及 d(可写)。咱们能够这样:

a.pipe(b).pipe(c).pipe(d)

# 等价于:
a.pipe(b)
b.pipe(c)
c.pipe(d)

# 在 Linux 中,等价于:
$ a | b | c | d复制代码

pipe 方法是使用流最简单的方式。一般的建议是要么使用 pipe 方法、要么使用事件来读取流,要避免混合使用二者。通常状况下使用 pipe 方法时你就没必要再使用事件了。但若是你想以一种更加自定义的方式使用流,就要用到事件了。

流事件

除了从可读流中读取数据写入可写流之外,pipe 方法还自动帮你处理了一些其余状况。例如,错误处理,文件结尾,以及两个流读取/写入速度不一致的状况。

然而,流也能够直接经过事件读取。如下是一段简化的使用事件来模拟 pipe 读取、写入数据的代码:

# readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk);
});

readable.on('end', () => {
  writable.end();
});复制代码

如下是一些使用可读流或可写流时用到的事件和方法:

截屏来自于个人 Pluralsight 课程 - 高级 Node.js

这些事件和函数是相关的,由于咱们老是把它们组合在一块儿使用。

一个可读流上最重要的两个事件是:

  • data 事件,任什么时候候当可读流发送数据给它的消费者时,会触发此事件
  • end 事件,当可读流没有更多的数据要发送给消费者时,会触发此事件

一个可写流上最重要的两个事件是:

  • drain 事件,这是一个表示可写流能够接受更多数据的信号.
  • finish 事件,当全部数据都被写入底层系统后会触发此事件。

事件和函数能够组合起来使用,以更加定制,优化的方式使用流。对于可读流,咱们可使用 pipe/unpipe 方法,或是 readunshiftresume方法。对于可写流,咱们能够把它设置为 pipe/unpipe 方法的下游,亦或是使用 write 方法写入数据并在写入完成后调用 end 方法。

可读流的暂停和流动模式

可读流有两种主要的模式,影响咱们使用它的方式:

  • 它要么处于暂停模式
  • 要么就是处于流动模式

这些模式有时也被成为拉取和推送模式。

全部的可读流默认都处于暂停模式。但它们能够按需在流动模式和暂停模式间切换。这种切换有时会自动发生。

当一个可读流处于暂停模式时,咱们可使用 read() 方法按需的读取数据。而对于一个处于流动模式的可读流,数据会源源不断的流动,咱们须要经过事件监听来处理数据。

在流动模式中,若是没有消费者监听事件那么数据就会丢失。这就是为什么在处理流动模式的可读流时咱们须要一个 data 事件回调函数。事实上,经过增长一个 data 事件回调就能够把处于暂停模式的流切换到流动模式;一样的,移除 data 事件回调会把流切回到暂停模式。这么作的一部分缘由是为了和旧的 Node 流接口兼容。

要手动在这两个模式间切换,你可使用 resume()pause() 方法。

截屏来自于个人 Pluralsight 课程 - 高级 Node.js

当使用 pipe 方法时,它会自动帮你处理好这些模式之间的切换,所以你无须关心这些细节。

实现流接口

当咱们讨论 Node.js 中的流时,主要是讨论两项任务:

  • 一个是实现流。
  • 一个是使用流。

到目前为止,咱们只讨论了如何使用流。接下来让咱们看看如何实现它!

流的实现者一般都会 require stream 模块。

实现一个可写流

要实现一个可写流,咱们须要使用来自 stream 模块的 Writable 类。

const { Writable } = require('streams');复制代码

实现一个可写流有不少种方法。例如,咱们能够继承 Writable 类:

class myWritableStream extends Writable {
}复制代码

然而,我倾向于更简单的构造方法。咱们能够直接给 Writable 构造函数传入配置项来建立一个对象。惟一必须的配置项是一个 write 函数,它用于暴露一个写入数据的接口。

const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);复制代码

write 方法接受三个参数。

  • chunk 一般是一个 buffer,除非咱们对流进行了特殊配置。
  • encoding 一般能够忽略。除非 chunk 被配置为不是 buffer。
  • callback 方法是一个在咱们完成数据处理后要执行的回调函数。它用来表示数据是否成功写入。如果写入失败,在执行该回调函数时须要传入一个错误对象。

outStream 中,咱们只是单纯的把收到的数据当作字符串 console.log 出来,并经过执行 callback 时不传入错误对象以表示写入成功。这是一个很是简单且没什么用处的回传流。它会回传任何收到的数据。

要使用这个流,咱们能够把它和可读流 process.stdin 配合使用。只需把 process.stdin 经过管道链接到 outStream

当咱们运行上面的代码时,任何输入到 process.stdin 中的字符都会被 outStream 中的 console.log 输出回来。

这不是一个很是实用的流实现,由于 Node 已经内置了它的实现。它几乎等同于 process.stdout。经过把 stdinstdout 链接起来,咱们就能够经过一行代码获得彻底相同的回传效果:

process.stdin.pipe(process.stdout);复制代码

实现一个可读流

要实现可读流,咱们须要引入 Readable 接口并经过它建立对象:

const { Readable } = require('stream');

const inStream = new Readable({});复制代码

这是一个很是简单的可读流实现。咱们能够经过 push 方法向下游推送数据。

const { Readable } = require('stream');  

const inStream = new Readable();

inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // 没有更多数据了

inStream.pipe(process.stdout);复制代码

当咱们 push 一个 null 值,这表示该流后续不会再有任何数据了。

要使用这个可读流,咱们能够把它链接到可写流 process.stdout

当咱们执行以上代码时,全部读取自 inStream 的数据都会被显示到标准输出上。很是简单,但并不高效。

在把该流链接到 process.stdout 以前,咱们就已经推送了全部数据。更好的方式是只在使用者要求时按需推送数据。咱们能够经过在可读流配置中实现 read() 方法来达成这一目的:

const inStream = new Readable({
  read(size) {
    // 某人想要读取数据
  }
});复制代码

当可读流上的 read 方法被调用时,流实现能够向队列中推送部分数据。例如,咱们能够从字符编码 65(表示字母 A) 开始,一次推送一个字母,每次都把字符编码加 1:

const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inStream.currentCharCode = 65

inStream.pipe(process.stdout);复制代码

当使用者读取该可读流时,read 方法会持续被触发,咱们不断推送字母。咱们须要在某处中止该循环,这就是为什么咱们放置了一个 if 语句以便在 currentCharCode 大于 90(表明 Z) 时推送一个 null 值。

这段代码等价于以前的咱们开始时编写的那段简单代码,但咱们已改成在使用者须要时推送数据。你始终应该这样作。

实现双向/变换流

对于双向流,咱们要在同一个对象上同时现实可读流和可写流。就好像是咱们继承了两个接口。

如下的例子实现了一个综合了前面提到的可读流与可写流功能的双向流:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;

process.stdin.pipe(inoutStream).pipe(process.stdout);复制代码

经过组合这些方法,咱们能够经过该双向流读取从 A 到 Z 的字母还能够利用它的回传特性。咱们把可读的 stdin 流接入这个双向流以利用它的回传特性同时又把它接入可写的 stdout 流以查看字母 A 到 Z。

理解双向流的读取和写入部分是彻底独立的这一点很是重要。它只不过是把两种特性在同一个对象上实现罢了。

变换流是一种更有趣的双向流,由于它的输出是基于输入运算获得的。

对于一个变换流,咱们不须要实现 readwrite 方法,而是只须要实现一个 transform 方法便可,它结合了两者的功能。它的函数签名和 write 方法一致,咱们也能够经过它 push 数据。

如下是一个把你输入的任何内容转换为大写字母的变换流:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);复制代码

在这个变换流中,咱们只实现了 transform() 方法,却达到了前面双向流例子的效果。在该方法中,咱们把 chunk 转换为大写而后经过 push 方法传递给下游。

流对象模式

默认状况下,流接收的参数类型为 Buffer/String。咱们能够经过设置 objectMode 参数使得流能够接受任何 JavaScript 对象。

如下是一个简单的演示。如下变换流的组合用于把一个逗号分割的字符串转变成为一个 JavaScript 对象。传入 "a,b,c,d" 就变成了 {a: b, c: d}

const { Transform } = require('stream');
const commaSplitter = new Transform({
  readableObjectMode: true,
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(','));
    callback();
  }
});
const arrayToObject = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    const obj = {};
    for(let i=0; i < chunk.length; i+=2) {
      obj[chunk[i]] = chunk[i+1];
    }
    this.push(obj);
    callback();
  }
});
const objectToString = new Transform({
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n');
    callback();
  }
});
process.stdin
  .pipe(commaSplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)复制代码

咱们给 commaSplitter 传入一个字符串(假设是 "a,b,c,d"),它会输出一个数组做为可读数据([“a”, “b”, “c”, “d”])。在该流上增长 readableObjectMode 标记是必须的,由于咱们在给下游推送一个对象,而不是字符串。

咱们接着把 commaSplitter 输出的数组传递给了 arrayToObject 流。咱们须要设置 writableObjectModel 以便让该流能够接收一个对象。它还会往下游推送一个对象(输入的数据被转换成对象),这就是为何咱们还须要配置 readableObjectMode 标志位。最后的 objectToString 流接收一个对象但却输出一个字符串,所以咱们只需配置 writableObjectMode 便可。传递给下游的只是一个普通字符串。

以上实例代码的使用方法

Node 内置的变换流

Node 内置了一些很是有用的变换流。这就是 zlib 和 crypto 流。

下面是一个组合了 zlib.createGzip()fs 可读/可写流来压缩文件的脚本:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));复制代码

你能够经过该脚本给任何参数中传入的文件进行 gzip 压缩。咱们经过可读流读取文件内容传递给 zlib 内置的变换流,而后经过一个可写流来写入新文件。很简单吧。

使用管道很棒的一点在于,若是有必要,咱们能够把它和事件组合使用。例如,我但愿在脚本执行过程当中给用户一些进度提示,在脚本执行完成后显示一条完成消息。既然 pipe 方法会返回下游流,咱们就能够把注册事件回调的操做级联在一块儿:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));复制代码

因此使用 pipe 方法,咱们能够很简单的使用流。当须要时,咱们还能够经过事件来进一步定制和流的交互。

pipe 方法的好处在于,咱们能够用一种更加可读的方式经过若干片断组合咱们的程序。例如,咱们能够经过建立一个变换流来显示进度,而不是直接监听 data 事件。把 .on() 调用换成另外一个 .pipe() 调用:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

const { Transform } = require('stream');

const reportProgress = new Transform({
  transform(chunk, encoding, callback) {
    process.stdout.write('.');
    callback(null, chunk);
  }
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));复制代码

这个 reportProgress 流是一个简单的直通流,但同时报告了进度信息。请注意我是如何在 transform() 方法中利用 callback() 的第二个参数传递数据的。它等价于使用 push 方法推送数据。

组合流的应用是无止境的。例如,假设咱们须要在压缩文件以前或以后加密它,咱们要作的只不过是在正确的位置引入一个新的变换流。咱们可使用 Node 内置的 crypto 模块:

**const crypto = require('crypto');
**// ...复制代码
const crypto = require('crypto');
// ...
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_secret'))
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));复制代码

以上的脚本对给定的文件先压缩再加密,只有知道秘钥的人才能利用生成的文件。咱们不能利用普通的解压工具解压该文件,由于它被加密了。

要能真正的解压任何使用以上脚本压缩过的文件,咱们须要以相反的顺序利用 crypto 和 zlib:

fs.createReadStream(file)
  .pipe(crypto.createDecipher('aes192', 'a_secret'))
  .pipe(zlib.createGunzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file.slice(0, -3)))
  .on('finish', () => console.log('Done'));复制代码

假设传入的文件是压缩后的版本,以上的脚本会建立一个针对该文件的读取流,链接到一个 crypto 模块的 createDecipher() 流(使用相同的密钥),以后将输出传递给一个 zlib 模块的 createGunzip() 流,最后将获得的数据写入一个没有压缩文件扩展名的文件。

以上就是我关于本主题要讨论的所有内容了。感谢阅读!下次再见!

若是你认为这篇文件对你有帮助,请点击下方的💚。关注我以获取更多关于 Node.js 和 JavaScript 的文章。

我为 PluralsightLynda 制做在线课程。我最近的课程是 React.js 入门, 高级 Node.js, 和学习全栈 JavaScript

我还进行线上与现场培训,内容涵盖 JavaScript,Node.js,React.js 和 GraphQL 从初级到高级的所有范围。若是你在寻找一名讲师,请联系我。我将在今年七月份的 Foward.js 上进行 6 场现场讲习班,其中一场是 Node.js 进阶

若是关于本文或任何个人其余文章有疑问,你能够经过这个 slack 帐号找到我并在 #questions 房间里提问。


掘金翻译计划 是一个翻译优质互联网技术文章的社区,文章来源为 掘金 上的英文分享文章。内容覆盖 AndroidiOSReact前端后端产品设计 等领域,想要查看更多优质译文请持续关注 掘金翻译计划