let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3, //文件一次读多少字节,默认 64*1024
flags:'r', //默认 'r'
autoClose:true, //默认读取完毕后自动关闭
start:0, //读取文件开始位置
end:3, //流是闭合区间 包含start也含end
encoding:'utf8' //默认null
});
复制代码
rs.on("open",()=>{
console.log("文件打开")
});
复制代码
可读流这种模式它默认状况下是非流动模式(暂停模式),它什么也不作,就在这等着html
监听了data事件的话,就能够将非流动模式转换为流动模式node
流动模式会疯狂的触发data事件,直到读取完毕git
直接上代码github
//1.txt中内容为1234567890
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3, //文件一次读多少字节,默认 64*1024
flags:'r', //默认 'r'
autoClose:true, //默认读取完毕后自动关闭
start:0, //读取文件开始位置
end:3, //流是闭合区间 包含start也含end
encoding:'utf8' //默认null
});
rs.on("open",()=>{
console.log("文件打开")
});
//疯狂触发data事件 直到读取完毕
rs.on('data',(data)=>{
console.log(data); //共读4个字节,可是highWaterMark为3,因此触发2次data事件,分别打印123 4
});
复制代码
rs.on("err",()=>{
console.log("发生错误")
});
rs.on('end',()=>{ //文件读取完毕后触发
console.log("读取完毕");
});
rs.on("close",()=>{ //最后文件关闭触发
console.log("关闭")
});
复制代码
不要急,最后把方法介绍完统一写个例子,你们一看便一目了之算法
终于把可读流的全部API讲完了,火烧眉毛的写个完整的案例来体验下,说干就干gulp
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter { //建立可读流类,继承 EventEmitter
constructor(path, options = {}) { //options默认空对象
super();
this.path = path;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.autoClose = options.autoClose || true;
this.start = options.start || 0;
this.pos = this.start; //pos会随着读取的位置改变
this.end = options.end || null;
this.encoding = options.encoding || null;
this.flags = options.flags || 'r';
this.flowing = null; //非流动模式
//声明一个buffer表示都出来的数据
this.buffer = Buffer.alloc(this.highWaterMark);
this.open(); //打开文件 fd
}
复制代码
//打开文件用
open() {
fs.open(this.path, this.flags, (err, fd) => { //fd标识的就是当前this.path这个文件,从3开始(number类型)
if (err) {
if (this.autoClose) { //若是须要自动关闭我再去销毁fd
this.destroy(); //关闭文件(触发关闭事件)
}
this.emit('error', err); //打开文件发生错误,发布error事件
}
this.fd = fd; //保存文件描述符
this.emit('open', this.fd) //触发文件open方法
})
}
复制代码
destroy() {
if (typeof this.fd != 'number') { //文件未打开,也要关闭文件且触发close事件
return this.emit('close');
}
fs.close(this.fd, () => { //若是文件打开过了 那就关闭文件而且触发close事件
this.emit("close");
})
}
复制代码
read() {
//此时文件还没打开
if (typeof this.fd != 'number') {
//当文件真正打开的时候 会触发open事件,触发事件后再执行read,此时fd 就有了
return this.once('open', () => this.read())
}
//此时有fd了 开始读取文件了
//this.pos是变量,开始时this.pos = this.start,在上面定义过了
//算法有点绕,源码中是这样实现的。举个例子 end=3,pos=0,highWaterMark=3, howMuchToRead = 3, 1.txt内容1234 就会读123 4
let howMuchToRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, byteRead) => {
// byteRead真实读到的个数
this.pos += byteRead;
// this.buffer默认三个
let b = this.buffer.slice(0, byteRead);
//对读到的b进行编码
b = this.encoding ? b.toString(this.encoding) : b;
//把读取到的buffer发射出去
this.emit('data', b);
if ((byteRead === this.highWaterMark) && this.flowing) {
return this.read();
}
//这里没有更多逻辑了
if (byteRead < this.highWaterMark) {
//没有更多了
this.emit('end'); //读取完毕
this.destroy(); //销毁完毕
}
})
}
复制代码
你们会发现,此时咱们尚未监听 rs.on('data')事件,来触发read方法,此时咱们须要修改下 第一步建立构造函数的代码api
constructor(path, options = {}) {
//省略.... 代码和第一步同样,下面是新添加
// 看是否监听了data事件,若是监听了就要变成流动模式
this.on('newListener', (eventName, callback) => {
if (eventName === 'data') {
//至关于用户监听了data事件
this.flowing = true;
// 监听了就去读
this.read(); //去读内容
}
})
}
复制代码
若是能看到这里,就基本大功告成,就只剩下pause和resume 暂停和恢复暂停方法。那就一写到底bash
pause() {
this.flowing = false;
}
resume() {
this.flowing = true;
//恢复暂停,在去无限读
this.read();
}
复制代码
终于大功告成,写的对不对呢,赶忙测试下吧,期待的搓手手异步