- 原文地址:Node.js Streams: Everything you need to know
- 原文做者:Samer Buna
- 译文出自:掘金翻译计划
- 译者:loveky
- 校对者:zaraguo Aladdin-ADD
图片来源javascript
Node.js 中的流有着难以使用,更难以理解的名声。如今我有一个好消息告诉你:事情已经再也不是这样了。html
很长时间以来,开发人员创造了许许多多的软件包为的就是能够更简单的使用流。可是在本文中,我会把重点放在原生的 Node.js 流 API上。前端
“流是 Node 中最棒的,同时也是最被人误解的想法。”java
— Dominic Tarrnode
流是数据的集合 —— 就像数组或字符串同样。区别在于流中的数据可能不会马上就所有可用,而且你无需一次性地把这些数据所有放入内存。这使得流在操做大量数据或是数据从外部来源逐段发送过来的时候变得很是有用。react
然而,流的做用并不只限于操做大量数据。它还带给咱们组合代码的能力。就像咱们能够经过管道链接几个简单的 Linux 命令以组合出强大的功能同样,咱们能够利用流在 Node 中作一样的事。android
Linux 命令的组合性ios
const grep = ... // 一个 grep 命令输出的 stream
const wc = ... // 一个 wc 命令输入的 stream
grep.pipe(wc)复制代码
Node 中许多内建的模块都实现了流接口:git
截屏来自于个人 Pluralsight 课程 —— 高级 Node.jsgithub
上边的列表中有一些 Node.js 原生的对象,这些对象也是能够读写的流。这些对象中的一部分是既可读、又可写的流,例如 TCP sockets,zlib 以及 crypto。
须要注意的是这些对象是紧密关联的。虽然一个 HTTP 响应在客户端是一个可读流,但在服务器端它倒是一个可写流。这是由于在 HTTP 的状况中,咱们基本上是从一个对象(http.IncomingMessage
)读取数据,向另外一个对象(http.ServerResponse
)写入数据。
还须要注意的是 stdio
流(stdin
,stdout
,stderr
)在子进程中有着与父进程中相反的类型。这使得在子进程中从父进程的 stdio
流中读取或写入数据变得很是简单。
理论是伟大的,当每每没有 100% 的说服力。下面让咱们经过一个例子来看看流在节省内存消耗方面能够起到的做用。
首先让咱们建立一个大文件:
const fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();复制代码
看看在建立这个大文件时我用到了什么。一个可写流!
经过 fs
模块你可使用一个流接口读取或写入文件。在上面的例子中,咱们经过一个可写流向 big.file
写入了 100 万行数据。
执行这段脚本会生成一个约 400MB 大小的文件。
如下是一个用来发送 big.file
文件的 Node web 服务器:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if (err) throw err;
res.end(data);
});
});
server.listen(8000);复制代码
当服务器收到请求时,它会经过异步方法 fs.readFile
读取文件内容发送给客户端。看起来咱们并无阻塞事件循环。一切看起来还不错,是吧?是吗?
让咱们来看看真实的状况吧。咱们启动服务器,发起链接,并监控内存的使用状况。
当我启动服务器的时候,它占用了一个正常大小的内存空间,8.7MB:
当我链接到服务器的时候。请注意内存消耗的变化:
哇 —— 内存消耗暴增到 434.8MB。
在咱们将其写入响应对象以前,咱们基本上把 big.file
的所有内容都载入到内存中了。这是很是低效的。
HTTP 响应对象也是一个可写流。这意味着若是咱们有一个表明了 big.file
内容的可读流,咱们就能够经过将两个流链接起来以实现相同的功能而没必要消耗约 400MB 的内存。
Node fs
模块中的 createReadStream
方法能够针对任何文件给咱们返回一个可读流。咱们能够把它和响应对象链接起来:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);复制代码
如今,当你再次链接到服务器时,神奇的事情发生了(请注意内存消耗):
发生了什么?
当客户端请求这个大文件时,咱们经过流逐块的发送数据。这意味着咱们不须要把文件的所有内容缓存到内存中。内存消耗只增加了大约 25MB。
你能够把这个例子推向极端。从新生成一个 500 万行而不是 100 万行的 big.file
文件。它大概有 2GB 那么大。这已经超过了 Node 中默认的缓冲区大小的上限。
若是你尝试经过 fs.readFile
读取那个文件,默认状况下会失败(固然你能够修改缓冲区大小上限)。可是经过使用 fs.createReadStream
,向客户端发送一个 2GB 的文件就没有任何问题。更棒的是,进程的内存消耗并不会因文件增大而增加。
准备好学习流了吗?
这篇文章是个人 Pluralsight 课堂上 Node.js 课程中的一部分。你能够经过这个连接找到这部份内容的视频版。
在 Node.js 中有四种基本类型的流:可读流,可写流,双向流以及变换流。
fs.createReadStream
方法是一个可读流的例子。fs.createWriteStream
方法是一个可写流的例子。zlib.createGzip
来压缩数据。你能够把一个变换流想象成一个函数,这个函数的输入部分对应可写流,输出部分对应可读流。你也可能据说过变换流有时被称为 “thought streams”。全部的流都是 EventEmitter
的实例。它们发出可用于读取或写入数据的事件。然而,咱们能够利用 pipe
方法以一种更简单的方式使用流中的数据。
如下这行代码就是你要记住的魔法:
readableSrc.pipe(writableDest)复制代码
在这行简单的代码中,咱们以管道的方式把一个可读流的输出链接到了一个可写流的输入。管道的上游(source)必须是一个可读流,下游(destination)必须是一个可写流。固然,它们也能够是双向流/变换流。事实上,若是咱们使用管道链接的是双向流,咱们就能够像 Linux 系统里那样链接多个流:
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)复制代码
pipe
方法会返回最后一个流,这使得咱们能够串联多个流。对于流 a
(可读),b
和 c
(双向),以及 d
(可写)。咱们能够这样:
a.pipe(b).pipe(c).pipe(d)
# 等价于:
a.pipe(b)
b.pipe(c)
c.pipe(d)
# 在 Linux 中,等价于:
$ a | b | c | d复制代码
pipe
方法是使用流最简单的方式。一般的建议是要么使用 pipe
方法、要么使用事件来读取流,要避免混合使用二者。通常状况下使用 pipe
方法时你就没必要再使用事件了。但若是你想以一种更加自定义的方式使用流,就要用到事件了。
除了从可读流中读取数据写入可写流之外,pipe
方法还自动帮你处理了一些其余状况。例如,错误处理,文件结尾,以及两个流读取/写入速度不一致的状况。
然而,流也能够直接经过事件读取。如下是一段简化的使用事件来模拟 pipe
读取、写入数据的代码:
# readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});复制代码
如下是一些使用可读流或可写流时用到的事件和方法:
截屏来自于个人 Pluralsight 课程 - 高级 Node.js
这些事件和函数是相关的,由于咱们老是把它们组合在一块儿使用。
一个可读流上最重要的两个事件是:
data
事件,任什么时候候当可读流发送数据给它的消费者时,会触发此事件end
事件,当可读流没有更多的数据要发送给消费者时,会触发此事件一个可写流上最重要的两个事件是:
drain
事件,这是一个表示可写流能够接受更多数据的信号.finish
事件,当全部数据都被写入底层系统后会触发此事件。事件和函数能够组合起来使用,以更加定制,优化的方式使用流。对于可读流,咱们可使用 pipe
/unpipe
方法,或是 read
,unshift
,resume
方法。对于可写流,咱们能够把它设置为 pipe
/unpipe
方法的下游,亦或是使用 write
方法写入数据并在写入完成后调用 end
方法。
可读流有两种主要的模式,影响咱们使用它的方式:
这些模式有时也被成为拉取和推送模式。
全部的可读流默认都处于暂停模式。但它们能够按需在流动模式和暂停模式间切换。这种切换有时会自动发生。
当一个可读流处于暂停模式时,咱们可使用 read()
方法按需的读取数据。而对于一个处于流动模式的可读流,数据会源源不断的流动,咱们须要经过事件监听来处理数据。
在流动模式中,若是没有消费者监听事件那么数据就会丢失。这就是为什么在处理流动模式的可读流时咱们须要一个 data
事件回调函数。事实上,经过增长一个 data
事件回调就能够把处于暂停模式的流切换到流动模式;一样的,移除 data
事件回调会把流切回到暂停模式。这么作的一部分缘由是为了和旧的 Node 流接口兼容。
要手动在这两个模式间切换,你可使用 resume()
和 pause()
方法。
截屏来自于个人 Pluralsight 课程 - 高级 Node.js
当使用 pipe
方法时,它会自动帮你处理好这些模式之间的切换,所以你无须关心这些细节。
当咱们讨论 Node.js 中的流时,主要是讨论两项任务:
到目前为止,咱们只讨论了如何使用流。接下来让咱们看看如何实现它!
流的实现者一般都会 require
stream
模块。
要实现一个可写流,咱们须要使用来自 stream 模块的 Writable
类。
const { Writable } = require('streams');复制代码
实现一个可写流有不少种方法。例如,咱们能够继承 Writable
类:
class myWritableStream extends Writable {
}复制代码
然而,我倾向于更简单的构造方法。咱们能够直接给 Writable
构造函数传入配置项来建立一个对象。惟一必须的配置项是一个 write
函数,它用于暴露一个写入数据的接口。
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);复制代码
write 方法接受三个参数。
在 outStream
中,咱们只是单纯的把收到的数据当作字符串 console.log
出来,并经过执行 callback
时不传入错误对象以表示写入成功。这是一个很是简单且没什么用处的回传流。它会回传任何收到的数据。
要使用这个流,咱们能够把它和可读流 process.stdin
配合使用。只需把 process.stdin
经过管道链接到 outStream
。
当咱们运行上面的代码时,任何输入到 process.stdin
中的字符都会被 outStream
中的 console.log
输出回来。
这不是一个很是实用的流实现,由于 Node 已经内置了它的实现。它几乎等同于 process.stdout
。经过把 stdin
和 stdout
链接起来,咱们就能够经过一行代码获得彻底相同的回传效果:
process.stdin.pipe(process.stdout);复制代码
要实现可读流,咱们须要引入 Readable
接口并经过它建立对象:
const { Readable } = require('stream');
const inStream = new Readable({});复制代码
这是一个很是简单的可读流实现。咱们能够经过 push
方法向下游推送数据。
const { Readable } = require('stream');
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // 没有更多数据了
inStream.pipe(process.stdout);复制代码
当咱们 push
一个 null
值,这表示该流后续不会再有任何数据了。
要使用这个可读流,咱们能够把它链接到可写流 process.stdout
。
当咱们执行以上代码时,全部读取自 inStream
的数据都会被显示到标准输出上。很是简单,但并不高效。
在把该流链接到 process.stdout
以前,咱们就已经推送了全部数据。更好的方式是只在使用者要求时按需推送数据。咱们能够经过在可读流配置中实现 read()
方法来达成这一目的:
const inStream = new Readable({
read(size) {
// 某人想要读取数据
}
});复制代码
当可读流上的 read 方法被调用时,流实现能够向队列中推送部分数据。例如,咱们能够从字符编码 65(表示字母 A) 开始,一次推送一个字母,每次都把字符编码加 1:
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inStream.currentCharCode = 65
inStream.pipe(process.stdout);复制代码
当使用者读取该可读流时,read
方法会持续被触发,咱们不断推送字母。咱们须要在某处中止该循环,这就是为什么咱们放置了一个 if 语句以便在 currentCharCode 大于 90(表明 Z) 时推送一个 null 值。
这段代码等价于以前的咱们开始时编写的那段简单代码,但咱们已改成在使用者须要时推送数据。你始终应该这样作。
对于双向流,咱们要在同一个对象上同时现实可读流和可写流。就好像是咱们继承了两个接口。
如下的例子实现了一个综合了前面提到的可读流与可写流功能的双向流:
const { Duplex } = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);复制代码
经过组合这些方法,咱们能够经过该双向流读取从 A 到 Z 的字母还能够利用它的回传特性。咱们把可读的 stdin
流接入这个双向流以利用它的回传特性同时又把它接入可写的 stdout
流以查看字母 A 到 Z。
理解双向流的读取和写入部分是彻底独立的这一点很是重要。它只不过是把两种特性在同一个对象上实现罢了。
变换流是一种更有趣的双向流,由于它的输出是基于输入运算获得的。
对于一个变换流,咱们不须要实现 read
或 write
方法,而是只须要实现一个 transform
方法便可,它结合了两者的功能。它的函数签名和 write
方法一致,咱们也能够经过它 push
数据。
如下是一个把你输入的任何内容转换为大写字母的变换流:
const { Transform } = require('stream');
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);复制代码
在这个变换流中,咱们只实现了 transform()
方法,却达到了前面双向流例子的效果。在该方法中,咱们把 chunk
转换为大写而后经过 push
方法传递给下游。
默认状况下,流接收的参数类型为 Buffer/String。咱们能够经过设置 objectMode
参数使得流能够接受任何 JavaScript 对象。
如下是一个简单的演示。如下变换流的组合用于把一个逗号分割的字符串转变成为一个 JavaScript 对象。传入 "a,b,c,d" 就变成了 {a: b, c: d}
。
const { Transform } = require('stream');
const commaSplitter = new Transform({
readableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(chunk.toString().trim().split(','));
callback();
}
});
const arrayToObject = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk, encoding, callback) {
const obj = {};
for(let i=0; i < chunk.length; i+=2) {
obj[chunk[i]] = chunk[i+1];
}
this.push(obj);
callback();
}
});
const objectToString = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.stringify(chunk) + '\n');
callback();
}
});
process.stdin
.pipe(commaSplitter)
.pipe(arrayToObject)
.pipe(objectToString)
.pipe(process.stdout)复制代码
咱们给 commaSplitter
传入一个字符串(假设是 "a,b,c,d"
),它会输出一个数组做为可读数据([“a”, “b”, “c”, “d”]
)。在该流上增长 readableObjectMode
标记是必须的,由于咱们在给下游推送一个对象,而不是字符串。
咱们接着把 commaSplitter
输出的数组传递给了 arrayToObject
流。咱们须要设置 writableObjectModel
以便让该流能够接收一个对象。它还会往下游推送一个对象(输入的数据被转换成对象),这就是为何咱们还须要配置 readableObjectMode
标志位。最后的 objectToString
流接收一个对象但却输出一个字符串,所以咱们只需配置 writableObjectMode
便可。传递给下游的只是一个普通字符串。
以上实例代码的使用方法
Node 内置了一些很是有用的变换流。这就是 zlib 和 crypto 流。
下面是一个组合了 zlib.createGzip()
和 fs
可读/可写流来压缩文件的脚本:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'));复制代码
你能够经过该脚本给任何参数中传入的文件进行 gzip 压缩。咱们经过可读流读取文件内容传递给 zlib 内置的变换流,而后经过一个可写流来写入新文件。很简单吧。
使用管道很棒的一点在于,若是有必要,咱们能够把它和事件组合使用。例如,我但愿在脚本执行过程当中给用户一些进度提示,在脚本执行完成后显示一条完成消息。既然 pipe
方法会返回下游流,咱们就能够把注册事件回调的操做级联在一块儿:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => process.stdout.write('.'))
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));复制代码
因此使用 pipe
方法,咱们能够很简单的使用流。当须要时,咱们还能够经过事件来进一步定制和流的交互。
pipe
方法的好处在于,咱们能够用一种更加可读的方式经过若干片断组合咱们的程序。例如,咱们能够经过建立一个变换流来显示进度,而不是直接监听 data
事件。把 .on()
调用换成另外一个 .pipe()
调用:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
const { Transform } = require('stream');
const reportProgress = new Transform({
transform(chunk, encoding, callback) {
process.stdout.write('.');
callback(null, chunk);
}
});
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));复制代码
这个 reportProgress
流是一个简单的直通流,但同时报告了进度信息。请注意我是如何在 transform()
方法中利用 callback()
的第二个参数传递数据的。它等价于使用 push 方法推送数据。
组合流的应用是无止境的。例如,假设咱们须要在压缩文件以前或以后加密它,咱们要作的只不过是在正确的位置引入一个新的变换流。咱们可使用 Node 内置的 crypto
模块:
**const crypto = require('crypto');
**// ...复制代码
const crypto = require('crypto');
// ...
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192', 'a_secret'))
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));复制代码
以上的脚本对给定的文件先压缩再加密,只有知道秘钥的人才能利用生成的文件。咱们不能利用普通的解压工具解压该文件,由于它被加密了。
要能真正的解压任何使用以上脚本压缩过的文件,咱们须要以相反的顺序利用 crypto 和 zlib:
fs.createReadStream(file)
.pipe(crypto.createDecipher('aes192', 'a_secret'))
.pipe(zlib.createGunzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file.slice(0, -3)))
.on('finish', () => console.log('Done'));复制代码
假设传入的文件是压缩后的版本,以上的脚本会建立一个针对该文件的读取流,链接到一个 crypto 模块的 createDecipher()
流(使用相同的密钥),以后将输出传递给一个 zlib 模块的 createGunzip()
流,最后将获得的数据写入一个没有压缩文件扩展名的文件。
以上就是我关于本主题要讨论的所有内容了。感谢阅读!下次再见!
若是你认为这篇文件对你有帮助,请点击下方的💚。关注我以获取更多关于 Node.js 和 JavaScript 的文章。
我为 Pluralsight 和 Lynda 制做在线课程。我最近的课程是 React.js 入门, 高级 Node.js, 和学习全栈 JavaScript。
我还进行线上与现场培训,内容涵盖 JavaScript,Node.js,React.js 和 GraphQL 从初级到高级的所有范围。若是你在寻找一名讲师,请联系我。我将在今年七月份的 Foward.js 上进行 6 场现场讲习班,其中一场是 Node.js 进阶
若是关于本文或任何个人其余文章有疑问,你能够经过这个 slack 帐号找到我并在 #questions 房间里提问。
掘金翻译计划 是一个翻译优质互联网技术文章的社区,文章来源为 掘金 上的英文分享文章。内容覆盖 Android、iOS、React、前端、后端、产品、设计 等领域,想要查看更多优质译文请持续关注 掘金翻译计划。