文章翻译自:Node.js Streams: Everything you need to knownode
在开发者中广泛认为Node.js流不但难以应用,并且难以理解。如今有一个好消息,Node.js流将不在难以处理。过去几年,为了方便操做Node.js流,开发者开发了许多第三方Node.js包。可是在这篇文章中,我将集中在Node.js原生的流接口应用的介绍。linux
“Streams are Node’s best and most misunderstood idea.”数组
— Dominic Tarr缓存
流就是数据集合----诸如数组或是字符串。不一样之处在于流没必要一次所有使用,它们也没必要适应内存。这两个特色使流在处理大量数据或一次向外部返回一大块数据时很是高效。bash
流代码的组合性,为流处理大量数据,提供了新的力量。就像把微小linux命令组合成功能丰富的组合命令同样,Node.js流经过一样的方式实现数据通道的功能。服务器
const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)
复制代码
许多Node.js内置模块都实现流接口:异步
上面展现的API中,一部分原生Node.js对象既是可读又是可写流,诸如TCP Sockets,Zlib和Crypto流。socket
值得注意的是,对象的部分行为是密切相关的。例如:在客户端HTTP对象是可读流,在服务端HTTP对象是可写流。这是由于在HTTP上,程序从一个对象上读取数据(http.IncomingMessage),而后将读取的数据写到另一个对象上(http.ServerResponse)。ide
理论听起来美妙,但并不能彻底传递流的精妙。让咱们看一个例子,经过这个例子,能够看出是否使用流对于内存占用的不一样影响。函数
让咱们先建立一个大的文件:
const fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();
复制代码
在上面的示例代码中,fs模块能够经过流接口实现对文件的读和写。经过循环一百万次可写流,将数据写入到big.file文件中。
执行相应的代码,生成大约400兆的文件。
下面是一个专门用来操做这个大文件的Node服务器代码:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if (err) throw err;
res.end(data);
});
});
server.listen(8000);
复制代码
当服务接收请求,程序将会经过异步fs.readFile函数向请求者发送数据报文。表面上看这样的代码并不会阻塞程序的事件循环,真的是这样吗?
好,当咱们启动这段服务,而后请求这个服务后,让咱们看看内存占用将会有怎样的变化。
当启动服务时,服务占用的内存量是8.7兆。
而后请求这个服务,注意内存占用的状况:
哇 ---- 内存占用忽然间跳到434.8兆。
本质上讲,程序在将大数据文件写入到http响应对象前,会将全部的数据写入内存中。这种代码的效率是很是低效的。
HTTP响应对象也是一个可写流,若是咱们将表明big.file内容可读流与HTTP相应对象的可写流在管道中链接,程序就能够经过两个流管道,在不产生近400兆内存占用的状况下,达到相同的结果。
Node.js中的fs模块经过createReadStream方法,生成读取文件的可读流。而后程序能够经过管道将可读流传到http响应对象中:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
复制代码
当再次请求服务时,一个神奇的事情发生了(注意内存占用):
当客户端请求大文件时,程序一次将一块数据生成流文件,这就意味着咱们不须要将数据缓存到内存中。内存的占用也仅仅上升了25兆。
咱们能够将这个测试用例推到极限。从新生成五百万行而不是一百万行的big.file文件,从新生成的文件将会达到2GB,这将大于Node.js默认的缓存量。
使用fs.readFile实现大内存文件的读取,最好不要修改程序默认的缓存空间。可是若是使用fs.createReadStream,即使请求2GB的数据流也不会有问题。使用第二种方式做为服务程序的内存占用几乎不发生变化。
流在Node.js中有四种:Readable、Writable、Duplex和Transform。
全部流都是EventEmitter模块的实例,触发可读和可写数据的事件。可是,程序可使用pipe函数消费流数据。
下面是一段你值得记忆的魔法代码:
readableSrc.pipe(writableDest)
在这一简单的代码中,将可读流的输出 (数据源) 做为可写流的输入 (目标) 进行管道化。源数据必须是可读流,目标必须是可写流。它们也能够同时是双工流或者转换流。事实上, 若是开发者将双工流传入管道中, 咱们就能够像Linux那样连接到管道调用:
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)
复制代码
管道函数返回的是目标流,它能够容许程序作上面的链式调用。下面的代码: 流a是可读流、流b与c是双工流、流c是可写流。
a.pipe(b).pipe(c).pipe(d)
# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Which, in Linux, is equivalent to:
$ a | b | c | d
复制代码
管道(pipe)方法是实现流消费的最简单方式。一般建议使用管道函数(pipe)或者事件消费流,可是避免将它们混合使用。一般当你使用管道(pipe)函数时,你就不须要使用事件。可是若是程序须要定制流的消费,事件能够是一个不错的选择。
除了读取可读流源,并把读取的数据写入到可写的目的地上。管道(pipe)还能够自动管理一些事情。例如:它能够处理异常,当一个流比其它流更快或更慢时结束文件。
可是,流能够经过事件被直接消费。下面是一段等效于管道(pipe)方法的程序,它经过简化的、与事件等效的代码实现数据的读取或写入。
# readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});
复制代码
这里有一系列可用于可读、可写流的事件或函数。
这些事件或函数一般以某种方式相关联,关于可读流的事件有:
关于可写流的重要事件有: -drain事件,可写流接受数据时的信号 -finish事件,全部的数据已经刷新到系统底层时触发
经过事件和函数能够组合在一块儿后自定义流或流的优化。为了消费可读流,程序可使用pipe/unpipe方法,或是read/unshift/resume方法。为了消费可写流,程序能够将它做为pipe/unpipe的目的地,或是经过write方法写入数据,在写入完成后调用end方法。
可读流中存在两种模式影响程序对可读流的使用:
这些模式又是被认为是拉和推模式。
全部的可读流在默认状况下都是从暂停模式开始,在程序须要时,转换成流动模式或者暂停模式。有时这种转换是自发的。
当可读流在暂停(paused)模式时,咱们可使用read方法按需读取流数据。可是,对于处在流动(flowing)模式下的可读流,咱们必须经过监听事件来消费数据。
在流动(flowing)模式下,若是数据没有被消费,数据可能会丢失。这就是当程序中有流动的可读流时,须要data事件处理数据的缘由。事实上,只须要添加data事件就能够将流从暂停转换为流动模式和解除程序与事件监听器的绑定、将流从流动模式转换为暂停模式。其中的一些是为了向后兼容老版本Node流的接口。
开发者可使用resume方法和pause方法,手动实现两种流模式的转换。
当程序使用管道(pipe)方法消费可读流时,开发这就没必要关心流模式的转换了,由于管道(pipe)会自动实现。
##实现流
当咱们Node.js中的流时,有两种不一样的任务:
到目前为止,咱们仅仅讨论着消费流。让咱们实现一些例子吧!
实现流须要咱们在程序中引入流模块
开发者可使用流模块中的Writeable构造器,实现可写流。
const { Writable } = require('stream');
开发者实现可写流有不少种方式。例如:经过继承writable构造器
class myWritableStream extends Writable {
}
复制代码
可是,我更喜欢使用构造器的实现方式。仅仅经过writable接口建立对象并传递一些选项:一个必须函数选项是write函数,传入要写入的数据块。
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);
复制代码
write函数有三个参数:
在outStream类中,程序仅仅将数据转换为字符串类型打印出来,并在没有出现异常时调用回调函数,以此来标志程序的成功执行。这是一个简单但不是特别有效的回声流,程序会输出任何输入的数据。
要使用这个流,咱们能够将它与process.stdin一块儿使用,这是一个可读的流,将process.stdin传输到outStream。
当程序执行时,任何经过process.stdin输入的数据都会被outStream中的console.log函数打印出来。
可是这个功能能够经过Node.js内置模块实现,所以这并非一个很是实用的流。它与process.stdout的功能很是相似,咱们使用下面的代码能够实现相同的功能:
process.stdin.pipe(process.stdout);
为了实现一个可读流,开发者须要引入Readable的接口后经过这个接口构建对象:
const { Readable } = require('stream');
const inStream = new Readable({});
复制代码
这是实现可读流的最简单方式,开发者能够直接推送数据以供消费使用。
const { Readable } = require('stream');
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);
复制代码
当程序中推送一个空对象时,这就意味着再也不有数据供给可读流。
开发者能够将可读流经过管道传给process.stdout的方式,供其消费可读流。
执行这段代码,程序能够读取来自可读流的数据,并将数据打印出来。很是简单,可是并不高效。
上段代码的本质是:把数据推送给流,而后将流经过管道传给process.stdout消费。其实程序能够在消费者请求流时,按需推送数据,这种方式比上一种更高效。经过实现readable流中的read函数实现:
const inStream = new Readable({
read(size) {
// there is a demand on the data... Someone wants to read it.
}
});
复制代码
在readable流中调用read函数,程序能够将部分数据传输到队列上。例如:每次向队列中推送一个字母,字母的的序号从65(表明A)开始,每次推送的字母序号都自增1:
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
复制代码
当消费者正在消费可读流时,read函数就会被激活,程序就会推送更多的字母。经过向队列些推送空对象,终止循环。如上代码中,当字母的序号超过90时,终止循环。
这段代码的功能与以前实现的代码是等效的,可是当消费者要读流时,程序能够按需推送数据的效率更优。所以建议使用这种实现方式。
双工流:在同一对象上分别实现可读流和可写流,就像对象继承了两个可读流和可写流接口。
下面是一个双工流,它结合了上面已经实现的可读流、可写流的例子:
const { Duplex } = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
复制代码
经过实现双工流的对象,程序能够读取A-Z的字母,而后按顺序打印出来。开发者将stdin可读流传输到双工流,而后将双工流传输到stdout可写流中,打印出A-Z字母。
在双工流中的可读流与可写流是彻底独立的,双工流仅仅是一个对象同时具备可读流和可写流的功能。理解这一点相当重要。
转换流比双工流更有趣,由于它的结果是根据输入流计算出来的。
对于双工流,并不须要实现read和write函数,开发者仅仅须要实现transform函数,由于transform函数已经实现了read函数和write函数。
下面是将输入的字母转换为大写格式后,而后把转换后的数据传给可写流:
const { Transform } = require('stream');
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);
复制代码
在这个例子中,开发者仅仅经过transform函数,就实现了向上面双工流的功能。在transform函数中,程序将数据转换为大写后推送到可写流中。
默认状况下,流只接受Buffer和String的数据。可是开发者能够经过设置objectMode标识的值,可使流接受任何Javascript数据。
下面的例子能够证实这一点。经过一组流将以逗号分隔的字符串转换为Javscript对象,因而"a,b,c,d"转换成{a: b, c : d}。
const { Transform } = require('stream');
const commaSplitter = new Transform({
readableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(chunk.toString().trim().split(','));
callback();
}
});
const arrayToObject = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk, encoding, callback) {
const obj = {};
for(let i=0; i < chunk.length; i+=2) {
obj[chunk[i]] = chunk[i+1];
}
this.push(obj);
callback();
}
});
const objectToString = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.stringify(chunk) + '\n');
callback();
}
});
process.stdin
.pipe(commaSplitter)
.pipe(arrayToObject)
.pipe(objectToString)
.pipe(process.stdout)
复制代码
commaSplitter转换流将输入的字符串(例如:“a,b,c,d”)转换为数组([“a”, “b”, “c”, “d”])。设置writeObjectMode标识,由于在transform函数中推的数据是对象而不是字符串。
而后将commaSplitter输出的可读流传输到转换流arrayToObject中。因为接收的是对象,一样须要在arrayToObject中须要设置writableObjectMode标识。因为须要在程序中推送对象(将传入的数组转换为对象),这也是程序中设置readableObjectMode标识的缘由。最后,转换流objectToString接收对象,可是输出字符串。这就是程序中只设置writableObjectModel标识的缘由。输出的可读流时正常的字符串(序列化后的数组)。
Node的内置转换流
Node有许多内置转换流,如:lib和crypto流。
下面的代码是使用zlib.createGzip()流与fs模块的可读和可写流相结合,实现压缩文件的代码:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'));
复制代码
程序将读取文件的可读流传输进Node内置的转换流zlib中,最后传输到建立压缩文件的可写流中。所以开发者只要将须要压缩的文件路径做为参数传进程序中,就能够实现任何文件的压缩。
开发者能够将管道函数与事件结合使用,这是选择管道函数另外一个缘由。例如:开发者让程序经过打印出标记符显示脚本正在执行,并在脚本执行完毕后打印出"Done"信息。pipe函数返回的是目标流,程序能够在获取目标流后注册事件链:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => process.stdout.write('.'))
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
复制代码
开发者经过pipe函数能够很容易操做流,甚至在须要时,经过事件对通过pipe函数处理后的目标流作一些定制交互。
管道函数的强大之处在于,使用易理解的方式将多个管道函数联合在一块儿。例如:不一样于上个示例,开发者能够经过传入一个转换流,标识脚本正在执行。
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
const { Transform } = require('stream');
const reportProgress = new Transform({
transform(chunk, encoding, callback) {
process.stdout.write('.');
callback(null, chunk);
}
});
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
复制代码
reportProgress只是一个转换流,在这个流中标识脚本正在执行。值得注意的是,代码中使用callback函数推送transform中的数据。这与先前示例中this.push()的功能是等效的。
组合流的应用场景还有不少。例如:开发者要先加密文件,而后压缩文件或是先压缩后加密。若是要完成这个功能,程序只要将文件按照顺序传入流中,使用crypto模块实现:
const crypto = require('crypto');
// ...
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192', 'a_secret'))
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
复制代码
上面的代码实现了压缩、加密文件,只有知道密码的用户才可使用加密后的文件。由于开发者不能按照普通解压的工具对加密后压缩文件,进行解压。
对于任何经过上面代码压缩的文件,开发者只须要以相反的顺序使用crypto和zlib流,代码以下:
fs.createReadStream(file)
.pipe(crypto.createDecipher('aes192', 'a_secret'))
.pipe(zlib.createGunzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file.slice(0, -3)))
.on('finish', () => console.log('Done'));
复制代码
假设传输进去的文件是压缩后的文件,上面的程序首先会生成可读流,而后传输到crypto的createDecipher()流中,接着将输出的流文件传输到zlib的createGunzip()流中,最后写入到文件中。
上面就是我对这个主题的总结,感谢您的阅读,期待下次与你相遇。