在前端工程化中产生了不少工具,例如grunt,gulp,webpack,babel...等等,这些工具都是经过node中的stream实现。 在node中stream也是很是很是很是重要的模块,好比咱们经常使用的console就是基于stream的实例,还有net,http等核心模块都是基于stream来实现的,可见stream是多么的重要。javascript
是一种数据传输手段,从一个地方传输到另外一个地方。
在写node的时候会存在读取文件,好比如今咱们有一个很是大的文件,50G吧前端
const fs = require('fs');
// test文件50个G
fs.readFileSync('./test.text');
复制代码
这个时候须要消耗大量的时候去读取这个文件,然而咱们可能关心的并非文件全部内容,还会存在直接读取失败。stream就是为了解决这些问题而产生,咱们读一些数据处理一些数据,当读到所关心数据的时候,则能够再也不继续读取。java
stream翻译成中文‘流’,就像水同样,从水龙头流向水杯。node
stream继承于EventEmitter,拥有事件触发和事件监听功能。主要分为4种基本流类型:
webpack
在流中存在一个重要的概念,缓存区,就像拿水杯去接水,水杯就是缓存区,当水杯满,则会关闭水龙头,等把水杯里面的水消耗完毕,再打开水龙头去接水。
stream默认缓存区大小为16384(16kb),能够经过highWaterMark参数设置缓存区大小,但设置encoding后,以设置的字符编码为单位衡量。web
首先建立一个可读流,可接收5个参数:gulp
可读流中分为2种模式流动模式和暂停模式。
监听data事件,触发流动模式,会源源不断生产数据触发data事件:前端工程化
const { Readable } = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
// 这里传入的read方法,会被写入_read()
read: (size) => {
// size 为highWaterMark大小
// 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),若是没有数据了,push(null)结束流
if (i < 10) {
rs.push(`当前读取数据: ${i++}`);
} else {
rs.push(null);
}
},
// 源代码,可覆盖
destroy(err, cb) {
rs.push(null);
cb(err);
}
});
rs.on('data', (data) => {
console.log(data);
// 每次push数据则触发data事件
// 当前读取数据: 0
// 当前读取数据: 1
// 当前读取数据: 2
// 当前读取数据: 3
// 当前读取数据: 4
// 当前读取数据: 5
// 当前读取数据: 6
// 当前读取数据: 7
// 当前读取数据: 8
// 当前读取数据: 9
})
复制代码
监听readable事件,触发暂停模式,当流有了新数据或到了流结束以前触发readable事件,须要显示调用read([size])读取数据:数组
const { Readable } = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
highWaterMark: 9,
// 这里传入的read方法,会被写入_read()
read: (size) => {
// size 为highWaterMark大小
// 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),若是没有数据了,push(null)结束流
if (i < 10) {
// push实际上是把数据放入缓存区
rs.push(`当前读取数据: ${i++}`);
} else {
rs.push(null);
}
}
});
rs.on('readable', () => {
const data = rs.read(9);
console.log(data);
//
})
复制代码
read([size]) size参数:缓存
这里的缓存区数据不是指highWaterMark,获取缓存区数据大小rs._readableState.length。
流的模式能够自由切换: 经过rs._readableState.flowing的值获取当前状态
rs.pause()切换到暂停模式 rs.resume()切换到流动模式
在可读流里面还能够监听其余事件:
rs.on('close', () => {
// 流关闭时或文件关闭时触发
})
rs.on('end', () => {
// 在流中没有数据可供消费时触发
})
rs.on('error', (err) => {
// 发生错误时候
})
复制代码
可写流可接受参数:
在实现流除了用上面直接传入参数的方式,还能够用继承类
class WS extends stream.Writable {
constructor() {
super({
highWaterMark: 1
});
}
_write(chunk, encoding, cb) {
console.log(this._writableState.length);
// chunk 为须要写入的数据
// encoding 字符编码
// cb 回调函数, 若是写入成功须要调用cb去执行下一次写入,若是发生错误,能够cb(new Error([错误信息]))
if (chunk.length < 4) {
fs.writeFileSync('./2.text', chunk, {
flag: 'a'
});
cb();
} else{
cb(new Error('超出4个字节'));
}
}
}
const ws = new WS();
let i = 0;
function next() {
let flag = true;
// write() 会返回boolean false -> 缓存区没满 true —> 已满,须要暂停写入数据
while(i < 10 && flag) {
flag = ws.write(`${i++}`);
console.log('flag', flag);
}
}
next();
// 当全部缓存区数据已经成功写入底层数据,缓存区没有数据了,触发drain事件
ws.on('drain', () => {
console.log('drain');
// 继续写入缓存区数据
next();
})
复制代码
可写流的end事件,一旦触发end事件,后续不能再写入数据.
ws.write('start');
ws.end('end');
ws.wrtie('test'); // 报错 write after end
复制代码
finish事件:
ws.write('start');
ws.end('end');
ws.on('finish', () => {
console.log('调用end方法后,而且全部数据已经写入底层')
})
复制代码
cork()与uncork(),强制全部数据先写入缓存区,直到调用uncork()或end(),这时一并写入底层:
const ws = stream.Writable({
writev(chunks, encoding, cb) {
// 这时chunks为一个数组,包含全部的chunk
// 如今length为10
console.log(chunk.length);
}
});
// 写入数据以前,强制写入数据放入缓存区
ws.cork();
// 写入数据
for (let i = 0; i < 10; i++) {
ws.write(i.toString());
}
// 写入完毕,能够触发写入底层
ws.uncork();
复制代码
读写流,该方法继承了可写流和可读流,但相互之间没有关系,各自独立缓存区,拥有Writable和Readable全部方法和事件,同时实现_read()和_write()方法。
const fs = require('fs');
const stream = require('stream');
const duplex = stream.Duplex({
write(chunk, encoding, cb) {
console.log(chunk.toString('utf8')); // 写入
},
read() {
this.push('读取');
this.push(null);
}
});
console.log(duplex.read(6).toString('utf8')); // 读取
duplex.write('写入');
复制代码
转换流,这个流在前端工程化中用到最多,从一个地方读取数据,转换数据后输出到一个地方,该流继承于Duplex。
const fs = require('fs');
const stream = require('stream');
const transform = stream.Transform({
transform(chunk, encoding, cb){
// 把数据转换成大写字母,而后push到缓存区
this.push(chunk.toString().toUpperCase());
cb();
}
});
transform.write('a');
console.log(transform.read(1).toString()); // A
复制代码
可读流和可写流都须要咱们去实现父类的方法,那么fs这个模块帮咱们作了这件事情,fs里面实现了高效而且可靠的可读/可写流,提供快速建立流,再也不去实现父类_write()或_read()。下面咱们来看看如何使用:
const fs = require('fs');
/** * 建立可读流 * * 第一个参数文件路径 * * 第二个参数为options * flags?: string; encoding?: string; 字符编码 fd?: number; 文件打开后的标识符 mode?: number; 文件的权限 autoClose?: boolean; 读取完毕后,是否自动关闭文件 start?: number; 从哪一个位置开始读取 end?: number; 读到何时结束 highWaterMark?: number; 最高水位线 */
const rs = fs.createReadStream('1.text');
rs.on('data', data => {
console.log(data);
})
/** * 建立可写流 * * 第一个参数文件路径 * * 第二个参数为options * flags?: string; encoding?: string; 字符编码 fd?: number; 文件打开后的标识符 mode?: number; 文件的权限 autoClose?: boolean; 写入完毕后,是否自动关闭文件 start?: number; 从什么位置开始写入 */
const ws = fs.createWriteStream('2.text');
ws.write('123');
复制代码
在流中搭建一条管道,从可读流中到可写流。
可读流中有pipe()方法,在可写流中能够监听pipe事件,下面实现了从可读流中经过管道到可写流:
const fs = require('fs');
const stream = require('stream');
const rs = stream.Readable({
read() {
this.push(fs.readFileSync('./1.text')); // 文件内容 test
this.push(null);
}
});
const ws = stream.Writable({
write(chunk, encoding, cb) {
// chunk为test buffer
fs.writeFileSync('./2.text', chunk.toString());
cb();
}
});
ws.on('pipe', data => {
// 触发pipe事件
console.log(data);
});
rs.pipe(ws);
复制代码
流分为四种基本类型,两种模式。流中的数据不是直接写入或读取,有缓存区的概念。