做为前端,咱们经常会和 Stream 有着频繁的接触。好比使用 gulp 对项目进行构建的时候,咱们会使用 gulp.src 接口将匹配到的文件转为 stream(流)的形式,再经过 .pipe() 接口对其进行链式加工处理;html
或者好比咱们经过 http 模块建立一个 HTTP 服务:前端
const http = require('http'); http.createServer( (req, res) => { //... }).listen(3000);
此处的 req 和 res 也属于 Stream 的消费接口(前者为 Readable Stream,后者为 Writable Stream)。node
事实上像上述的 req/res,或者 process.stdout 等接口都属于 Stream 的实例,所以较少存在状况,是须要咱们手动引入 Stream 模块的,例如:git
//demo1.js 'use strict'; const Readable = require('stream').Readable; const rs = Readable(); const s = 'VaJoy'; const l = s.length; let i = 0; rs._read = ()=>{ if(i == l){ rs.push(' is my name'); return rs.push(null) } rs.push(s[i++]) }; rs.pipe(process.stdout);
若是不太能读懂上述代码,或者对 Stream 的概念感到模糊,那么能够放轻松,由于本文会进一步地对 Stream 进行剖析,而且谈谈直接使用它可能会存在的一些问题(这也是为什么 gulp 要使用 through2 的缘由)。github
另外本文的示例都可在个人 github 仓库(https://github.com/VaJoy/stream/)获取到,读者能够自行下载和调试。npm
一. Stream的做用gulp
在介绍 Stream(流)以前,咱们先来看一个例子 —— 模拟服务器把本地某个文件内容吐给客户端:api
//demo2 var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { fs.readFile(__dirname + '/data.txt', function (err, data) { res.end(data); }); }); server.listen(3000);
这段代码虽然能够正常执行,但存在一个显著的问题 —— 对于每个客户端的请求,fs.readFile 接口都会把整个文件都缓存到内存中去,而后才开始把数据吐给用户。那么当文件体积很大、请求也较多(且特别当请求来自慢速用户)的时候,服务器须要消耗很大的内存,致使性能低下。缓存
然而这个问题,则正是 stream 发挥所长的地方。如前文说起的,res 是流对象,那咱们正好能够将其利用起来:服务器
var server2 = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server2.listen(4000);
在上方代码段里,fs.createReadStream 建立了 data.txt 的可读流(Readable Stream)。这里须要事先了解的是,流能够简单地分为“可读的(readable)”、“可写的(writable)”,或者“读写都可”三种类型,且全部的流都属于 EventEmitter 的实例。
回到代码,对于建立的可读流,咱们经过 .pipe() 接口来监听其 data 和 end 事件,并把 data.txt (的可读流)拆分红一小块一小块的数据(chunks),像流水同样源源不断地吐给客户端,而再也不须要等待整个文件都加载到内存后才发送数据。
其中 .pipe 能够视为流的“管道/通道”方法,任何类型的流都会有这个 .pipe 方法去成对处理流的输入与输出。
为了方便理解,咱们把上述两种方式(不使用流/使用流)处理为以下的情景(卧槽我好好一个前端为啥要P这么萌的图):
⑴ 不使用流:
⑵ 使用流:
由此能够得知,使用流(stream)的形式,能够大大提高响应时间,又能有效减轻服务器内存的压力。
二. Stream的分类
在上文咱们曾说起到,stream 能够按读写权限来简单地分作三类,不过这里咱们再细化下,能够把 stream 归为以下五个类别:
⑴ Readable Streams
⑵ Writable Streams
⑶ Transform Streams
⑷ Duplex Streams
⑸ Classic Streams
其中 Transform Streams 和 Duplex Streams 都属于便可读又可写的流,而最后一个 Classic Streams 是对 Node 古早版本上的 Stream 的一个统称。咱们将照例对其进行逐一介绍。
2.1 Readable Streams
便可读流,经过 .pipe 接口能够将其数据传递给一个 writable、transform 或者 duplex流:
readableStream.pipe(dst)
常见的 Readable Streams 包括:
例如在前面 demo2 的代码段中,咱们就使用了 fs.createReadStream 接口来建立了一个 fs read stream:
var server2 = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server2.listen(4000);
这里有个有趣的地方 —— 虽然 Readable Streams 称为可读流,但在将其传入一个消耗对象以前,它都是可写的:
var Readable = require('stream').Readable; var rs = new Readable; rs.push('servers '); rs.push('are listening on\n'); rs.push('3000 and 4000\n'); rs.push(null); rs.pipe(process.stdout);
执行结果:
在这段代码中,咱们经过 readStream.push(data) 的形式往可读流里注入数据,并以 readStream.push(null) 来结束可读流。
不过这种写法有个弊端 —— 从使用 .push() 将数据注入 readable 流中开始,直到另外一个东西(process.stdout)来消耗数据以前,这些数据都会存在缓存中。
这里有个内置接口 ._read() 能够用来处理这个问题,它是从系统底层开始读取数据流时才会不断调用自身,从而减小缓存冗余。
咱们能够回过头来看 demo1 的例子:
'use strict'; const Readable = require('stream').Readable; const rs = Readable(); const s = 'VaJoy'; const l = s.length; let i = 0; rs._read = ()=>{ if(i == l){ rs.push(' is my name'); return rs.push(null) } rs.push(s[i++]) }; rs.pipe(process.stdout);
咱们是在 ._read 方法中才使用 readStream.push(data) 往可读流里注入数据供下游消耗(也会流经缓存),从而提高流处理的性能。
这里也有个小问题 —— 上一句话所提到的“供下游消耗”,这个下游一般又会以怎样的形式来消耗可读流的呢?
首先,可使用咱们熟悉的 .pipe() 方法将可读流推送给一个消耗对象(writable、transform 或者 duplex流):
//ext1 const fs = require('fs'); const zlib = require('zlib'); const r = fs.createReadStream('data.txt'); const z = zlib.createGzip(); const w = fs.createWriteStream('data.txt.gz'); r.pipe(z).pipe(w);
其次,也能够经过监听可读流的“data”事件(别忘了文章前面提到的“全部的流都属于 EventEmitter 的实例”)来实现消耗处理 —— 在首次监听其 data 事件后,readStream 便会持续不断地调用 _read(),经过触发 data 事件将数据输出。当数据所有被消耗时,则触发 end 事件。
示例:
//demo3 const Readable = require('stream').Readable; class ToReadable extends Readable { constructor(iterator) { super(); this.iterator = iterator } _read() { const res = this.iterator.next(); if (res.done) { // 迭代结束,顺便结束可读流 this.push(null) } setTimeout(() => { // 将数据添加到流中 this.push(res.value + '\n') }, 0) } } const gen = function *(a){ let count = 5, res = a; while(count--){ res = res*res; yield res } }; const readable = new ToReadable(gen(2)); // 监听`data`事件,一次获取一个数据 readable.on('data', data => process.stdout.write(data)); // 可读流消耗完毕 readable.on('end', () => process.stdout.write('readable stream ends~'));
执行结果为:
这里须要留意的是,在使用 .push() 往可读流里注入数据的代码段,咱们使用了 setTimeout 将其包裹起来,这是为了让系统能有足够时间优先处理接收流结束信号的事务。固然你也能够改写为:
if (res.done) { // 直接 return return this.push(null) } this.push(res.value + '\n')
2.2 Writable Streams
Writable(可写)流接口是对写入数据的目标的抽象:
src.pipe(writableStream)
常见的 Writable Streams 包括:
可写流有两个重要的方法:
上方两方法的 encoding 参数表示编码字符串(chunk为String时才能够用)。
write 方法的 callback 回调参数会在 chunk 被消费后(从缓存中移除后)被触发;end 方法的 callback 回调参数则在 Stream 结束时触发。
另外,如同经过 readable._read() 方法能够处理可读流,咱们能够经过 writable._write(chunk, enc, next) 方法在系统底层处理流写入的逻辑中,对数据进行处理。
其中参数 chunk 表明写进来的数据;enc 表明编码的字符串;next(err) 则是一个回调函数,调用它能够告知消费者进行下一轮的数据流写入。
示例:
//demo4 const Writable = require('stream').Writable; const writable = Writable(); writable._write = (chunck, enc, next) => { // 输出打印 process.stdout.write(chunck.toString().toUpperCase()); // 写入完成时,调用`next()`方法通知流传入下一个数据 process.nextTick(next) }; // 全部数据均已写入底层 writable.on('finish', () => process.stdout.write('DONE')); // 将一个数据写入流中 writable.write('a' + '\n'); writable.write('b' + '\n'); writable.write('c' + '\n'); // 再无数据写入流时,须要调用`end`方法 writable.end();
执行以下:
2.3 Duplex Streams
Duplex 是双工的意思,所以很容易猜到 Duplex 流就是既能读又能写的一类流,它继承了 Readable 和 Writable 的接口。
常见的 Duplex Streams 有:
示例:
//demo5 const Duplex = require('stream').Duplex; const duplex = Duplex(); duplex._read = function () { var date = new Date(); this.push( date.getFullYear().toString() ); this.push(null) }; duplex._write = function (buf, enc, next) { console.log( buf.toString() + '\n' ); next() }; duplex.on('data', data => console.log( data.toString() )); duplex.write('the year is'); duplex.end();
执行结果:
2.4 Transform Streams
Transform Stream 是在继承了 Duplex Streams 的基础上再进行了扩展,它能够把写入的数据和输出的数据,经过 ._transform 接口关联起来。
常见的 Transform Streams 有:
示例:
//demo6 const Transform = require('stream').Transform; class SetName extends Transform { constructor(name, option) { super(option || {}); this.name = name || '' } // .write接口写入的数据,处理后直接从 data 事件的回调中可取得 _transform(buf, enc, next) { var res = buf.toString().toUpperCase(); this.push(res + this.name + '\n'); next() } } var transform = new SetName('VaJoy'); transform.on('data', data => process.stdout.write(data)); transform.write('my name is '); transform.write('here is '); transform.end();
执行结果:
其中的 _transform 是 Transform Streams 的内置方法,全部 Transform Streams 都须要使用该接口来接收输入和处理输出,且该方法只能由子类来调用。
_transform 接口格式以下:
transform._transform(chunk, encoding, callback)
第一个参数表示被转换(transformed)的数据块(chunk),除非构造方法 option 参数(可选)传入了 “decodeString : false”,不然其类型均为 Buffer;
第二个参数用于设置编码,但只有当 chunck 为 String 格式(即构造方法传入 “decodeString : false”参数)的时候才可配置,不然默认为“buffer”;
第三个参数 callback 用于在 chunk 被处理后调用,通知系统进入下一轮 _transform 调用。该回调方法接收两个可选参数 —— callback([error, data]),其中的 data 参数能够将 chunck 写入缓存中(供更后面的消费者去消费):
transform.prototype._transform = function(data, encoding, callback){ this.push(data); callback() }; ///////等价于 transform.prototype._transform = function(data, encoding, callback){ callback(null, data) };
另外 Transform Streams 还有一个 _flush(callback) 内置方法,它会在没有更多可消耗的数据时、在“end”事件以前被触发,并且会清空缓存数据并结束 Stream。
该内置方法一样只容许由子类来调用,并且执行后,不能再调用 .push 方法。
关于 Transform Streams 的更多细节还能够参考这篇文章,推荐阅读。
2.5 Classic Streams
在较早版本的 NodeJS 里,Stream 的实现相较简陋,例如上文说起的“Stream.Readable”接口均是从 Node 0.9.4 开始才有,所以咱们每每须要对其进行屡次封装扩展才能更好地用来开发。
而 Classic Streams 即是对这种古旧模式的 Stream 接口的统称。
须要留意的是,只要往任意一个 stream 注册一个“data”事件监听器,它就会自动切换到“classic”模式,并按照旧的 API 去执行。
classic 流能够看成一个带有 .pipe 接口的事件发射器(event emitter),当它要为消耗者提供数据时会发射“data”事件,当要结束生产数据时,则发射“end”事件。
另外只有当设置 Stream.readable 为 true 时,.pipe 接口才会将当前流视做可读流:
//demo7 var Stream = require('stream'); var stream = new Stream(); stream.readable = true; //告诉 .pipe 这是个可读流 var c = 64; var iv = setInterval(function () { if (++c >= 75) { clearInterval(iv); stream.emit('end'); } else stream.emit('data', String.fromCharCode(c)); }, 100); stream.pipe(process.stdout);
另外,Classic readable streams 还有 .pause() 和 .resume() 两个接口可用于暂停/恢复流的读取:
createServer(function(q,s) { // ADVISORY only! q.pause() session(q, function(ses) { q.on('data', handler) q.resume() }) })
3. Object Mode
对于可读流来讲,push(data) 时,data 的类型只能是 String 或Buffer,且消耗时 data 事件输出的数据类型都为 Buffer;
对于可写流来讲,write(data) 时,data 的类型也只能是 String 或 Buffer,_write(data) 调用时所传进来的 data 类型都为 Buffer。
示例:
//demo8 writable._write = (chunck, enc, next) => { // 输出打印 console.log(chunck); //Buffer //console.log(chunck.toString()); //转为String process.nextTick(next) }; writable.write('Happy Chinese Year'); writable.end();
执行结果:
不过,为了加强数据类型的灵活性,不管是可读流或是可写流,只须要往其构造函数里传入配置参数“{ objectMode: true }”,即可往流里传入/获取任意类型(null除外)的数据:
const objectModeWritable = Writable({ objectMode: true }); objectModeWritable._write = (chunck, enc, next) => { // 输出打印 console.log(typeof chunck); console.log(chunck); process.nextTick(next) }; objectModeWritable.write('Happy Chinese Year'); objectModeWritable.write( { year : 2017 } ); objectModeWritable.end( 2017 );
执行结果:
4. Stream的兼容问题
在前文咱们介绍了 classic streams,它属于陈旧版本的 Node 上的 Stream 接口,能够把它称为 Streams1。而从 Node 0.10 开始,Stream 新增了系列实用的新接口,能够作更多除了 .pipe() 以外的事情,咱们把其归类为 Streams2(事实上,在 Node 0.11+开始,Stream有些许新的变更,从该版本开始的 Stream 也可称为 Streams3)。
那么这里存在一个问题 —— 那些使用了 Stream1 的项目(特别是 npm 包),想升级使用环境的 Node 版本到 0.10+,会否致使兼容问题呢?
还好 Streams2 虽然改头换面,但本质上是设计为向后兼容的。
打个比方,若是你同时推送了一条 Streams2 流和一条旧格式的、基于事件发射器的流,Stream2 将降级为旧模式(shim mode)来向后兼容。
可是,若是咱们的开发环境使用的是 Node 0.8(且由于某些缘由不能升级),但又想使用 Streams2 的API怎么办呢?或者好比 npm 上的某些开源的工具包,想要拥抱 Streams2 的便利,又想保持对使用 Node 0.8 的用户进行兼容处理,这样又得怎么处理?
针对上述问题,早在 Node 0.10 释放以前,Issacs 就把 Node-core 中操做 Stream 的核心接口独立拷贝了一份出来,开源到了 npm 上并持续更新,它就是 readable-stream。
经过使用 readable-stream,咱们就能够在那些核内心没有 Streams2/3 的低版本 Node 中,直接使用 Streams2/3:
var Readable = require('stream').Readable || require('readable-stream').Readable
readable-stream 如今有 v1.0.x 和 v1.1.x 两个主要版本,前者跟进 Streams2 的迭代,后者跟进 Streams3 的迭代,用户能够根据需求使用对应版本的包。
5. through2
readable-stream 虽然提供了一个 Streams 的兼容方案,但咱们也但愿能对 Stream 复杂的API进行精简。
而 through2 便基于 readable-stream 对 Stream 接口进行了封装,并提供了更简单和灵活的方法。
through2 会为你生成 Transform Streams(貌似旧版本是 Duplex Streams)来处理任意你想使用的流 —— 如前文介绍,相比其它流,Transform 流处理起数据会更加灵活方便。
来看下 through2 的示例:
//demo9 const fs = require('fs'); const through2 = require('through2'); fs.createReadStream('data.txt') .pipe(through2(function (chunk, enc, callback) { for (var i = 0; i < chunk.length; i++) if (chunk[i] == 97) chunk[i] = 122; // 把 'a' 替换为 'z' this.push(chunk); callback() })) .pipe(fs.createWriteStream('out.txt')) .on('finish', ()=> { console.log('DONE') });
使用 through2.obj 接口操做 Object Mode 下的流:
//demo10 const fs = require('fs'); const through2 = require('through2'); const csv2 = require('csv2'); let all = []; fs.createReadStream('list.csv') .pipe(csv2()) // through2.obj(fn) 是 through2({ objectMode: true }, fn) 的简易封装 .pipe(through2.obj(function (chunk, enc, callback) { var data = { name: chunk[0], sex: chunk[1], addr: chunk[2] }; this.push(data); callback() })) .on('data', function (data) { all.push(data) }) .on('end', function () { console.log(all) });
对比原生的 Stream API,through2 简洁了很多,加上有 readable-stream 依赖加持,也很好理解为什么像 gulp 及其插件都会使用 through2 来操做和处理 stream 了。
顺便贴下对 through2 的源码注释:
var Transform = require('readable-stream/transform'), inherits = require('util').inherits, xtend = require('xtend'); //构造方法,继承了Transform function DestroyableTransform(opts) { Transform.call(this, opts); this._destroyed = false } inherits(DestroyableTransform, Transform); //原型接口 destroy,用于关闭当前流 DestroyableTransform.prototype.destroy = function (err) { if (this._destroyed) return; this._destroyed = true; var self = this; process.nextTick(function () { if (err) self.emit('error', err); self.emit('close') }) }; // a noop _transform function function noop(chunk, enc, callback) { callback(null, chunk) } // 闭包,用于返回对外接口方法 function through2(construct) { //最终返回此匿名函数 return function (options, transform, flush) { if (typeof options == 'function') { flush = transform transform = options options = {} } if (typeof transform != 'function') transform = noop if (typeof flush != 'function') flush = null return construct(options, transform, flush) } } // 出口,执行 throuh2 闭包函数,返回一个 DestroyableTransform 的实例(t2) module.exports = through2(function (options, transform, flush) { //t2 为 Transform Stream 对象 var t2 = new DestroyableTransform(options); //Transform Streams 的内置接口 _transform(chunk, encoding, next) 方法 t2._transform = transform; if (flush) t2._flush = flush; return t2 }); // 对外暴露一个能够直接 new (或者不加 new)来建立实例的的构造函数 module.exports.ctor = through2(function (options, transform, flush) { function Through2(override) { if (!(this instanceof Through2)) return new Through2(override) this.options = xtend(options, override) DestroyableTransform.call(this, this.options) } inherits(Through2, DestroyableTransform) Through2.prototype._transform = transform if (flush) Through2.prototype._flush = flush return Through2 }) //Object Mode接口的简易封装 module.exports.obj = through2(function (options, transform, flush) { var t2 = new DestroyableTransform(xtend({objectMode: true, highWaterMark: 16}, options)) t2._transform = transform if (flush) t2._flush = flush return t2 })
以上是本文对 Stream 的一个介绍,但事实上 Stream 还有许多未露面的 API,感兴趣的同窗能够直接阅读官方 API文档作进一步了解。
本篇文章是对后续 gulp 源码解析系列的一个基础铺垫,想了解更多 gulp 相关内容的话能够留意个人博客。最后恭祝你们鸡年大吉!共勉~
Reference
⑴ Stream API Doc - https://nodejs.org/api/stream.html
⑵ stream-handbook - https://github.com/substack/stream-handbook
⑶ Node.js Stream - 基础篇 - http://www.cnblogs.com/zapple/p/5759670.html
⑷ Why I don't use Node's core 'stream' module - https://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html