Node.js的stream模块是有名的应用困难,更别说理解了。那如今能够告诉你,这些都不是问题了。node
多年来,开发人员在那里建立了大量的软件包,其惟一目的就是使用stream使用起来更简单,可是在这篇文章里,咱们专一于介绍原生的Node.js Steam Api。linux
"Stream 是Node.js中最好的却最容易被误解的部分" ----- Dominic Tarr程序员
Streams是数据的集合,就跟数组和字符串同样。不一样点就在于Streams可能不是马上就所有可用,而且不会所有载入内存。这使得他很是适合处理大量数据,或者处理每隔一段时间有一个数据片断传入的状况。web
可是,Stream并不只仅适用于处理大数据(大块的数据。。。)。使用它,一样也有利于组织咱们大代码。就像咱们使用管道去和合并强大的Linux命令。在Node.js中,咱们也能够作一样的事情。数组
const grep = ... // A stream for the grep output const wc = ... // A stream for the wc input grep.pipe(wc)
Node.js的不少内置模块都实现了Stream接口服务器
上面例子里面的Node.js对象列表包括了可读流和可写流,有一些对象既是可读流也是可写流,像TCP sockets, zlib 和 crypto streams。微信
注意这些对象是有很密切的关联的。当一个客户端的HTTP 响应对象是一个可读流,那么在服务器端这就是一个可写流。由于在HTTP例子中,咱们一般是从一个对象(http.IncomingMessage
)读取再写入到另一个对象(http.ServerResponse
)中去。异步
还要注意,当涉及到子进程时,stdio
流(stdin
,stdout
,stderr
)具备逆流类型。这就容许咱们很是方便的使用管道从主进程链接子进程的Streams
。socket
理论都是很好的,但事实究竟是怎么样子的呢?让咱们看一些例子示范代码Streams
在内存使用方面的比较。ide
咱们先建立一个大文件
const fs = require('fs'); const file = fs.createWriteStream('./big.file'); for(let i=0; i<= 1e6; i++) { file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n'); } file.end();
看看我使用什么建立文件的?一个可写流嘛
fs
模块能够经过Stream
接口来读取和写入文件。在上面的例子中,咱们在循环中经过可写流向big.file
写入了1百万行数据。
运行上面的代码会生成一个大概400M的文件
这是一个简单的Node web服务器,专门为big.file
提供服务:
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { fs.readFile('./big.file', (err, data) => { if (err) throw err; res.end(data); }); }); server.listen(8000);
当server
收到请求,它会使用异步方法fs.readFile
处理这个big file
。可是这并不表明咱们会打断事件循环机制。一切都是正确的吗??
那如今当咱们启动server
,看看内存监视器都发生了什么。
如今访问这个服务器,看看内存的使用状况。
内存占用马上飙升到434.8 MB。
在咱们把文件内容输出到客户端以前,咱们就把整个文件读入了内存。这是很低效的。
HTTP response对象(上文中的res
对象)也是一个可写流,这就意味着若是咱们有一个表明着big file
的可读流,咱们能够经过管道把他们俩链接起来实现一样的功能,而不须要使用400M内存。
Node的fs
模块给咱们提供了一个能够操做任何文件的可读流,经过createReadStream
方法建立。咱们能够把它和response对象链接起来。
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { const src = fs.createReadStream('./big.file'); src.pipe(res); }); server.listen(8000);
如今再去访问server的时候,使人惊讶的事情发生了(看内存监视器)
发生了什么?
当咱们访问服务器的时候,咱们经过流每次使用一段数据,这意味着咱们不是把所有的数据都加载到内存中。内存使用量只上升了不到25M。
能够把上面的例子用到极致,生成5百万行数据而不是1百万行。这样子的话,这个文件的大小会超过2GB,这实际上大于Node中的默认缓冲区限制。
若是你想在server上使用fs.readFile
,这在默认状况下是行不通的,除非你改了Node.js的默认缓冲区限制。可是使用fs.createReadStream
,把2 GB的数据返回给客户端根本不存在问题,甚至内存使用量都没有任何变化。
准备好学习Steam了吗?
在Node.js中有4中基本的流类型:Readable, Writable, Duplex, and Transform streams。
fs.createReadStream
方法fs.createWriteStream
方法zlib.createGzip
使用gzip压缩数据。你能够把Transform streams当成是一个传入可读流,返回一个可写流的函数。它还有一个别名through streams
全部的Stream都是EventEmitter
的实例对象。当流读和写的时候都会触发相应的事件。可是还有一个更简单的使用方法,那就是使用pipe
。
要记住下面这个魔幻方法
readableSrc.pipe(writableDest)
在这一行里面,咱们经过管道把可读流(源)输出到一个可写流里面去(目标),源必须是一个可写流,目标必须是可写流。固然,他们也均可以是duplex/Transform。事实上,当咱们使用管道链接流的时候,咱们能够像在linux中同样使用链式链接。
readableSrc .pipe(transformStream1) .pipe(transformStream2) .pipe(finalWrtitableDest)
pipe
方法返回目标流,这保证了咱们可使用链式调用。对于streams a(可读流),b,c(可读可写流),d可写流,咱们可使用:
a.pipe(b).pipe(c).pipe(d) # Which is equivalent to: a.pipe(b) b.pipe(c) c.pipe(d) # Which, in Linux, is equivalent to: $ a | b | c | d
pipe
方法是使用流最简便的方法。一般经过管道和事件的方法使用流,可是要尽可能避免二者混用。一般当你使用pipe
方法就不须要使用事件了。可是当你须要更多定制的操做的话,使用事件的方式会更好。
除了从可读流读取数据传输到可写流,pipe
方法还自动处理一些其余事情。好比处理错误,处理文件结束操做,流之间速度快慢问题。
同时,流也能够直接使用事件操做。如下是和管道相等的经过事件操做流的方法。
# readable.pipe(writable) readable.on('data', (chunk) => { writable.write(chunk); }); readable.on('end', () => { writable.end(); });
下面是一些重要流的事件和方法。
这些事件和方法在某种程度上是相关的,由于它们一般被一块儿使用。
可读流上的最重要的事件是
data
事件,当可读流传输了一段数据的时候会触发end
事件,当没有数据被传输时触发可写流上的最重要的事件是
drain
事件,当可写流能够接收事件的时候被触发finish
事件,当全部数据被接收时被触发事件和方法能够结合起来,以便定制和优化流的使用。读取可读流,咱们可使用pipe/unpipe
方法,或者read/unshift/resume
方法。使用可写流,咱们能够可写流做为pipe/unpipe
方法的参数,或者使用write
方法写入,使用end
方法关闭。
可读流有两个很重要的模式影响了咱们使用的方式。
这些模式有时候被称为拉和推模式
全部的可读流开始的时候都是默认暂停模式,可是它们能够轻易的被切换成流动模式,当咱们须要的时候又能够切换成暂停模式。有时候这个切换是自动的。
当一个可读流是暂停模式的时候,咱们可使用read
方法从流中读取。可是当一个流是流动模式的时候,数据是持续的流动,咱们须要使用事件去监听数据的变化。
在流动模式中,若是可读流没有监听者,可读流的数据会丢失。这就是为何当可读流逝流动模式的时候,咱们必须使用data
事件去监听数据的变化。事实上,只需添加一个数据事件处理程序便可将暂停的流转换为流模式,删除数据事件处理程序将流切换回暂停模式。 其中一些是为了与旧的Node Stream接口进行向后兼容。
可使用resume()
和pause()
方法在这两种模式之间切换。
当咱们使用pipe
方法操做可读流的时候是不须要担忧上面的这些操做的,由于pipe
方法会自动帮咱们处理这些问题。
当咱们讨论Node.js中的流时,有两项重要的任务:
咱们到如今为止讨论的都是如何使用流,那下面来看看如何建立吧!
Streams的建立一般使用stream
模块。
为了建立一个可写流,咱们须要使用stream
模块里面的Writable
类。
const { Writable } = require('stream');
咱们能够有不少种方式实现一个可写流。例如,咱们能够继承Writable
类。
class myWritableStream extends Writable { }
可是我更喜欢使用构造函数的方式建立。经过给Writable
传递一些参数来建立一个对象。惟一必需要传的选项时write
方法,它须要暴漏须要写入的数据块。
const { Writable } = require('stream'); const outStream = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } }); process.stdin.pipe(outStream);
write
方法接收3个参数
chunk
一般是一个buffer对象,咱们能够经过配置修改encoding
在这种状况下就须要了,不过一般状况是能够忽略的callback
是当咱们处理完这个数据块的时候须要调用的函数。这是一个写入是否成功的信号。若是失败了,给这个回调传递一个Error
对象在outStream
中,咱们简单的把chunk
打印出来,由于并无发生错误,咱们直接调用了callback
方法。这是这是简单并不实用的打印
流。它会打印接收到的全部值。
为了使用这个流,咱们能够简单的process.stdin
这个可读流。经过pipe
方法链接起来。
当咱们运行上面的例子,任何咱们在控制台输入的内容都会被console.log
打印出来。
这不是一个很是实用的流的实现,可是它已经被Node.js内置实现了。outStream
功能和process.stdout
基本相似。咱们也能够经过pipe
方法把stdin
和stdout
链接起来并实现一样的功能。
process.stdin.pipe(process.stdout);
建立可读流,咱们须要Readable
类
const { Readable } = require('stream'); const inStream = new Readable({});
建立一个可读流很是简单。可使用push
方法推入数据给其余流使用
const { Readable } = require('stream'); const inStream = new Readable(); inStream.push('ABCDEFGHIJKLM'); inStream.push('NOPQRSTUVWXYZ'); inStream.push(null); // No more data inStream.pipe(process.stdout);
当咱们push
一个null
对象进去的时候,这就标志着咱们要终止传输了。
咱们能够简单的把这个流经过pipe
方法链接到一个可写流process.stdout
运行上面的代码,会获取全部的inStream
的数据并打印出来。很是简单但有效。
咱们在经过pipe
链接以前,就会把全部的数据推送到流里面。更好的方法是在消费者要求时按需推送数据。能够经过修改可读流配置里面的read()
方法实现。
const inStream = new Readable({ read(size) { // there is a demand on the data... Someone wants to read it. } });
当读取方法在可读流上被调用时,该实现能够将部分数据推送到队列。 例如,咱们能够一次推一个字母,从字符代码65(表示A)开始,并在每次推送时递增:
const inStream = new Readable({ read(size) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 90) { this.push(null); } } }); inStream.currentCharCode = 65; inStream.pipe(process.stdout);
当有流在读取可读流的数据的时候,read
方法会持续执行,这样就会一直推出更多的字符。咱们须要在某个时刻终止它,这就是为何咱们设置了一个终止条件推入了null
。
咱们应该始终按需推送数据。
使用Duplex流,咱们经过同一个对象实现可读流和可写流。这相似同时实现了两个接口。
下面这个例子就结合了上面两个可读流和可写流的综合例子。
const { Duplex } = require('stream'); const inoutStream = new Duplex({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); }, read(size) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 90) { this.push(null); } } }); inoutStream.currentCharCode = 65; process.stdin.pipe(inoutStream).pipe(process.stdout);
经过合并这些方法,咱们可使用这个duplex
流读取从A-Z的字母也一样可使用它的打印功能。咱们把stdin
流链接到这个duplex
上去使用它的打印功能,再把这个duplex
流自己链接到stdout
上去就在控制台看到了A-Z。
双工流的可读写的两侧彻底独立运行。就像一个对象上两种独立的功能。
transform
流是一种更有趣的duplex
流。由于它的输出来源于她的输入。
对于一个transform
流,咱们不须要实现read
和write
方法,咱们仅仅须要实现transform
方法,这个方法合并了它们两个。它具备写入方法的功能,也能够用它推送数据。
这是一个简单的transform
例子,把任何输入转换成大写。
const { Transform } = require('stream'); const upperCaseTr = new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } }); process.stdin.pipe(upperCaseTr).pipe(process.stdout);
在这个transform
stream里面,像上个例子中双工流同样。可是咱们只实现了transform()
方法。咱们把chunk
转换成大写,再把大写字母做为可读流的输入。
默认,流会接收 Buffer/String 类型的数据。还有个字段 objectMode
设置,可让stream 接收任意类型的对象。
下面是一个这种类型的例子。如下变换流的组合使得将逗号分隔值的字符串映射为JavaScript对象的功能。 因此“a,b,c,d”成为{a:b,c:d}。
const { Transform } = require('stream'); const commaSplitter = new Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push(chunk.toString().trim().split(',')); callback(); } }); const arrayToObject = new Transform({ readableObjectMode: true, writableObjectMode: true, transform(chunk, encoding, callback) { const obj = {}; for(let i=0; i < chunk.length; i+=2) { obj[chunk[i]] = chunk[i+1]; } this.push(obj); callback(); } }); const objectToString = new Transform({ writableObjectMode: true, transform(chunk, encoding, callback) { this.push(JSON.stringify(chunk) + '\n'); callback(); } }); process.stdin .pipe(commaSplitter) .pipe(arrayToObject) .pipe(objectToString) .pipe(process.stdout)
咱们经过commasplitter传递输入字符串(例如,“a,b,c,d”
),它将数组做为其可读数据([“a”,“b”,“c”,“d”]
))。 在该流上添加可读的ObjectMode
标志是必要的,由于咱们正在将对象推送到其上,而不是字符串。
而后咱们把数组导入到arrayToObject
数据流中,咱们须要把writableObjectMode
设置为 true
,以表示arrayToObject
会接收一个对象。另外它还会推送一个对象出去,因此还要把他的readableObjectMode
为true
。最后一个objectToString
接收一个对象可是输出字符串,因此就只须要设置一个writableObjectMode
。
Node有一些很是有用的内置transform streams对象。这包括zlib
和crypto
。
下面这个例子使用了zlib.createGzip()
结合了额fs
readable/writable streams实现了文件压缩。
const fs = require('fs'); const zlib = require('zlib'); const file = process.argv[2]; fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(fs.createWriteStream(file + '.gz'));
你可使用上面的脚本压缩任何你传入的参数文件。咱们把文件的可读流传入了zlib
的内置转换流。再写入到新的.gz文件中。
使用管道还有一个很酷的事情,就是能够和事件结合起来。好比我想用户看到进度,并在结束的时候发个消息。由于pipe
方法会返回目标流,咱们也能够经过链式注册事件。
const fs = require('fs'); const zlib = require('zlib'); const file = process.argv[2]; fs.createReadStream(file) .pipe(zlib.createGzip()) .on('data', () => process.stdout.write('.')) .pipe(fs.createWriteStream(file + '.zz')) .on('finish', () => console.log('Done'));
因此使用管道方法,咱们能够轻松地操做流,可是咱们还可使用须要的事件进一步定制与这些流的交互。
管道方法的好处是,咱们能够用它来以一种可读的方式逐一构成咱们的程序。 例如,咱们能够简单地建立一个变换流来报告进度,而不用监听上面的数据事件,并用另外一个.pipe()
调用替换 .on()
调用:
const fs = require('fs'); const zlib = require('zlib'); const file = process.argv[2]; const { Transform } = require('stream'); const reportProgress = new Transform({ transform(chunk, encoding, callback) { process.stdout.write('.'); callback(null, chunk); } }); fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(reportProgress) .pipe(fs.createWriteStream(file + '.zz')) .on('finish', () => console.log('Done'));
reportProgress
流是一个简单的pass-through
流,可是也跟标准事件同样报告进度。注意callback()
函数的第二个参数,这至关于把数据推送出去。
结合流的应用是无止境的。例如,若是咱们须要在咱们gzip以前或以后加密文件,咱们须要作的就是按照咱们须要的确切顺序来管理另外一个转换流。使用Node的crypto
模块处理这个事情。
const crypto = require('crypto'); // ... fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(crypto.createCipher('aes192', 'a_secret')) .pipe(reportProgress) .pipe(fs.createWriteStream(file + '.zz')) .on('finish', () => console.log('Done'));
上面的脚本压缩而后加密传递的文件,只有具备密码的人才可使用文件。 咱们没法使用正常的解压缩实用程序解压缩此文件,由于它已被加密。
为了可以解压缩文件,咱们须要使用彻底相反的操做,这也很简单。
fs.createReadStream(file) .pipe(crypto.createDecipher('aes192', 'a_secret')) .pipe(zlib.createGunzip()) .pipe(reportProgress) .pipe(fs.createWriteStream(file.slice(0, -3))) .on('finish', () => console.log('Done'));
假设传递的文件是压缩版本,上面的代码将建立一个读取流,将其传输到crypto createDecipher()流中(使用相同的秘密),将其输出管道输入到zlib createGunzip()流中, 而后将文件写回到没有扩展名的文件中。
以上就是所有了,谢谢阅读!!
翻译自Node.js Streams: Everything you need to know
建立了一个程序员交流微信群,你们进群交流IT技术
若是已过时,能够添加博主微信号15706211347,拉你进群