若是你正在学习Node,那么流必定是一个你须要掌握的概念。若是你想成为一个Node高手,那么流必定是武功秘籍中不可缺乏的一个部分。
引用自Stream-Handbook。因而可知,流对于深刻学习Node的重要性。javascript
你能够把流理解成一种传输的能力。经过流,能够以平缓的方式,无反作用的将数据传输到目的地。在Node中,Node Stream建立的流都是专用于String和Buffer上的,通常状况下使用Buffer。Stream表示的是一种传输能力,Buffer是传输内容的载体 (能够这样理解,Stream:外卖小哥哥, Buffer:你的外卖)。建立流的时候将ObjectMode设置true ,Stream一样能够传输任意类型的JS对象(除了null,null在流中有特殊用途)。html
如今有个需求,咱们要向客户端传输一个大文件。若是采用下面的方式java
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { fs.readFile('./big.file', (err, data) => { if (err) throw err; res.end(data); }); }); server.listen(8000);
每次接收一个请求,就要把这个大文件读入内存,而后再传输给客户端。经过这种方式可能会产生如下三种后果:git
因此这种方式在传输大文件的状况下,不是一个好的方案。并发量一大,几百个请求过来很容易就将内存耗尽。github
若是采用流呢?服务器
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { const src = fs.createReadStream('./big.file'); src.pipe(res); }); server.listen(8000);
采用这种方式,不会占用太多内存,读取一点就传输一点,整个过程平缓进行,很是优雅。若是想在传输的过程当中,想对文件进行处理,好比压缩、加密等等,也很好扩展(后面会具体介绍)。并发
流在Node中无处不在。从下图中能够看出:异步
Stream分为四大类:函数
可读流中的数据,在如下两种模式下都能产生数据。学习
两种模式下,触发的方式以及消耗的方式不同。
Flowing Mode:数据会源源不断地生产出来,造成“流动”现象。监听流的data
事件即可进入该模式。
Non-Flowing Mode下:须要显示地调用read()
方法,才能获取数据。
两种模式能够互相转换
流的初始状态是Null,经过监听data
事件,或者pipe
方法,调用resume
方法,将流转为Flowing Mode
状态。Flowing Mode
状态下调用pause
方法,将流置为Non-Flowing Mode
状态。Non-Flowing Mode
状态下调用resume
方法,一样能够将流置为Flowing Mode
状态。
下面详细介绍下两种模式下,Readable流的运行机制。
在Flowing Mode状态下,建立的myReadable读流,直接监听data事件,数据就源源不断的流出来进行消费了。
myReadable.on('data',function(chunk){ consume(chunk);//消费流 })
一旦监听data事件以后,Readable内部的流程以下图所示
核心的方法是流内部的read方法,它在参数n为不一样值时,分别触发不一样的操做。下面描述中的hightwatermark表示的是流内部的缓冲池的大小。
图中黄色标识的_read(),是用户实现流所须要本身实现的方法,这个方法就是实际读取流的方式(能够这样理解,外卖平台给你提供外卖的能力,那_read()方法就至关于你下单点外卖)。后面会详细介绍如何实现_read方法。
以上的流程能够描述为:监听data方法,Readable内部就会调用read方法,来进行触发读流操做,经过判断是同步仍是异步读取,来决定读取的数据是否放入缓冲区。若是为异步的,那么就要调用flow方法,来继续触发read方法,来读取流,同时根据size参数断定是否emit('data')来消费流,循环读取。若是是同步的,那就emit('data')来消费流,同时继续触发read方法,来读取流。一旦push方法传入的是null,整个流就结束了。
从使用者的角度来看,在这种模式下,你能够经过下面的方式来使用流
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(chunk){ writeFile1.write(chunk); })
相对于Flowing mode,Non-Flowing Mode要相对简单不少。
消费该模式下的流,须要使用下面的方式
myReadable.on(‘readable’,function(){ const chunk = myReadable.read() consume(chunk);//消费流 })
在Non-Flowing Mode下,Readable内部的流程以下图:
从这个图上看出,你要实现该模式的读流,一样要实现一个_read方法。
整个流程以下:监听readable方法,Readable内部就会调用read方法。调用用户实现的_read方法,来push数据到缓冲池,而后发送emit readable事件,通知用户端消费。
从使用者的角度来看,你能够经过下面的方式来使用该模式下的流
const fs = require('fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('readable',function(chunk) { while (null !== (chunk = myReadable.read())) { writeFile.write(chunk); } });
相对于读流,写流的机制就更容易理解了。
写流使用下面的方式进行数据写入
myWrite.write(chunk);
调用write后,内部Writable的流程以下图所示
相似于读流,实现一个写流,一样须要用户实现一个_write方法。
整个流程是这样的:调用write以后,会首先断定是否要写入缓冲区。若是不须要,那就调用用户实现的_write方法,将流写入到相应的地方,_write会调用一个writeable内部的一个回调函数。
从使用者的角度来看,使用一个写流,采用下面的代码所示的方式。
const fs = require('fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(chunk) { writeFile.write(chunk); })
能够看到,使用写流是很是简单的。
咱们先讲解一下如何实现一个读流和写流,再来看Duplex和Transform是什么,由于了解了如何实现一个读流和写流,再来理解Duplex和Transform就很是简单了。
实现自定义的Readable,只须要实现一个_read方法便可,须要在_read方法中调用push方法来实现数据的生产。以下面的代码所示:
const Readable = require('stream').Readable; class MyReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }); } } // 模拟资源池 const dataSource = { data: new Array(10).fill('-'), makeData() { if (!dataSource.data.length) return null; return dataSource.data.pop(); } }; const myReadable = new MyReadable(dataSource,); myReadable.on('readable', () => { let chunk; while (null !== (chunk = myReadable.read())) { console.log(chunk); } });
实现自定义的writable,只须要实现一个_write方法便可。在_write中消费chunk写入到相应地方,而且调用callback回调。以下面代码所示:
const Writable = require('stream').Writable; class Mywritable extends Writable{ constuctor(options){ super(options); } _write(chunk,endcoding,callback){ console.log(chunk); callback && callback(); } } const myWritable = new Mywritable();
双工流:简单理解,就是讲一个Readable流和一个Writable流绑定到一块儿,它既能够用来作读流,又能够用来作写流。
实现一个Duplex流,你须要同时实现_read和_write方法。
有一点须要注意的是:它所包含的 Readable流和Writable流是彻底独立,互不影响的两个流,两个流使用的不是同一个缓冲区。经过下面的代码能够验证
// 模拟资源池1 const dataSource1 = { data: new Array(10).fill('a'), makeData() { if (!dataSource1.data.length) return null; return dataSource1.data.pop(); } }; // 模拟资源池2 const dataSource2 = { data: new Array(10).fill('b'), makeData() { if (!dataSource2.data.length) return null; return dataSource2.data.pop(); } }; const Readable = require('stream').Readable; class MyReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }) } } const Writable = require('stream').Writable; class MyWritable extends Writable{ constructor(options){ super(options); } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback && callback(); } } const Duplex = require('stream').Duplex; class MyDuplex extends Duplex{ constructor(dataSource,options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }) } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback && callback(); } } const myWritable = new MyWritable(); const myReadable = new MyReadable(dataSource1); const myDuplex = new MyDuplex(dataSource1); myReadable.pipe(myDuplex).pipe(myWritable);
打印的结果是
abababababababababab
从这个结果能够看出,myReadable.pipe(myDuplex)
,myDuplex充当的是写流,写入的内容是a;myDuplex.pipe(myWritable)
,myDuplex充当的是读流,往myWritable写的倒是b;因此说它所包含的 Readable流和Writable流是彻底独立的。
理解了Duplex,就更好理解Transform了。Transform是一个转换流,它既有读的功能又有写的功能,可是它和Duplex不一样的是,它的读流和写流共用同一个缓冲区;也就是说,经过它读入什么,那它就能写入什么。
实现一个Transform,你只须要实现一个_transform方法。好比最简单的Transform:PassThrough,其源代码以下所示
PassThrough就是一个Transform,可是这个转换流,什么也没作,至关于一个透明的转换流。能够看到_transform中什么都没有,只是简单的将数据进行回调。
若是咱们在这个环节作些扩展,只须要在_transform中直接扩展就好了。好比咱们能够对流进行压缩,加密,混淆等等操做。
最后介绍一个流中很是重要的一个概念:背压。要了解这个,咱们首先来看下pipe和highWaterMaker是什么。
首先看下下面的代码
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.pipe(writeFile);
上面的代码和下面是等价的
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(data){ var flag = ws.write(data); if(!flag){ // 当前写流缓冲区已满,暂停读数据 readFile.pause(); } }) writeFile.on('drain',function()){ readFile.resume();// 当前写流缓冲区已清空,从新开始读流 } readFile.on('end',function(data){ writeFile.end();//将写流缓冲区的数据所有写入,而且关闭写入的文件 })
pipe所作的操做就是至关于为写流和读流自动作了速度的匹配。
读写流速度不匹配的状况下,通常状况下不会形成什么问题,可是会形成内存增长。内存消耗增长,就有可能会带来一系列的问题。因此在使用的流的时候,强烈推荐使用pipe。
highWaterMaker说白了,就是定义缓冲区的大小。
背压的概念能够理解为:为了防止读写流速度不匹配而产生的一种调整机制;背压该调整机制的触发时机,受限于highWaterMaker设置的大小。
如上面的代码 var flag = ws.write(data);
,一旦写流的缓冲区满了,那flag
就会置为false,反向促进读流的速度调整。
主要有如下场景
1.文件操做(复制,压缩,解压,加密等)
下面的就很容易就实现了文件复制的功能。
const fs = require('fs'); const readFile = fs.createReadStream('big.file'); const writeFile = fs.createWriteStream('big_copy.file'); readFile.pipe(writeFile);
那咱们想在复制的过程当中对文件进行压缩呢?
const fs = require('fs'); const readFile = fs.createReadStream('big.file'); const writeFile = fs.createWriteStream('big.gz'); const zlib = require('zlib'); readFile.pipe(zlib.createGzip()).pipe(writeFile);
实现解压、加密也是相似的。
2.静态文件服务器
好比须要返回一个html,可使用以下代码。
var http = require('http'); var fs = require('fs'); http.createServer(function(req,res){ fs.createReadStream('./a.html').pipe(res); }).listen(8000);