浅析node中流应用(一) 可读流(fs.createReadStream)

为何要须要流?

  • 当咱们学习新知识的时候,首先咱们知道为何要学习,那咱们为何要学习流?由于在在node中读取文件的方式有来两种,一个是利用fs模块,一个是利用流来读取。若是读取小文件,咱们可使用fs读取,fs读取文件的时候,是将文件一次性读取到本地内存。而若是读取一个大文件,一次性读取会占用大量内存,效率很低,这个时候须要用流来读取。流是将数据分割段,一段一段的读取,能够控制速率,效率很高,不会占用太大的内存。gulp的task任务,文件压缩,和http中的请求和响应等功能的实现都是基于流来实现的。所以,系统学习下流仍是颇有必要的

可读流用法(先把用法学会)

  • node中读是将内容读取到内存中,而内存就是Buffer对象
  • 流都是基于原生的fs操做文件的方法来实现的,经过fs建立流。全部的 Stream 对象都是 EventEmitter 的实例。经常使用的事件有:
  • open -打开文件
  • data -当有数据可读时触发。
  • error -在读收和写入过程当中发生错误时触发。
  • close -关闭文件
  • end - 没有更多的数据可读时触发

建立可读流

  • 统一下 1.txt中的内容 1234567890
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3, //文件一次读多少字节,默认 64*1024
    flags:'r', //默认 'r'
    autoClose:true, //默认读取完毕后自动关闭
    start:0, //读取文件开始位置
    end:3, //流是闭合区间 包含start也含end
    encoding:'utf8' //默认null
});
复制代码
  • 注意: 默认建立一个流 是非流动模式,默认不会读取数据
  • 具体参数说明,咱们能够参考下node官网详细介绍
    http://nodejs.cn/api/fs.html#fs_fs_createreadstream_path_options

监听open事件

rs.on("open",()=>{
   console.log("文件打开")
});
复制代码

监听data事件

  • 可读流这种模式它默认状况下是非流动模式(暂停模式),它什么也不作,就在这等着html

  • 监听了data事件的话,就能够将非流动模式转换为流动模式node

  • 流动模式会疯狂的触发data事件,直到读取完毕git

  • 直接上代码github

//1.txt中内容为1234567890
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3, //文件一次读多少字节,默认 64*1024
    flags:'r', //默认 'r'
    autoClose:true, //默认读取完毕后自动关闭
    start:0, //读取文件开始位置
    end:3, //流是闭合区间 包含start也含end
    encoding:'utf8' //默认null
});
rs.on("open",()=>{
   console.log("文件打开")
});
//疯狂触发data事件 直到读取完毕
rs.on('data',(data)=>{
    console.log(data); //共读4个字节,可是highWaterMark为3,因此触发2次data事件,分别打印123  4
});
复制代码

监听err/end/close事件

rs.on("err",()=>{
    console.log("发生错误")
});
rs.on('end',()=>{ //文件读取完毕后触发
    console.log("读取完毕");
});
rs.on("close",()=>{ //最后文件关闭触发
    console.log("关闭")
});
复制代码

不要急,最后把方法介绍完统一写个例子,你们一看便一目了之算法

最后介绍两个方法就大功告成啦

  • rs.pause() 暂停读取,会暂停data事件的触发,将流动模式转变非流动模式
  • rs.resume()恢复data事件,继续读取,变为流动模式

终于把可读流的全部API讲完了,火烧眉毛的写个完整的案例来体验下,说干就干gulp

手写可读流

1、准备工做,构建可读流构造函数

  • 记住Stream 对象都是 EventEmitter 的实例,内部是经过发布订阅模式实现的。直接贴代码
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter { //建立可读流类,继承 EventEmitter
    constructor(path, options = {}) { //options默认空对象
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true;
        this.start = options.start || 0;
        this.pos = this.start; //pos会随着读取的位置改变
        this.end = options.end || null;
        this.encoding = options.encoding || null;
        this.flags = options.flags || 'r';
        this.flowing = null; //非流动模式
        //声明一个buffer表示都出来的数据
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.open(); //打开文件 fd
    }
复制代码
  • 其实只是赋值了不少默认值,没有什么难点,接下来就要写this.open()方法,即打开文件

2、在ReadStream原型中写open方法

  • 废话很少说,直接上代码,代码中有详细的代码解释
//打开文件用
    open() {
        fs.open(this.path, this.flags, (err, fd) => { //fd标识的就是当前this.path这个文件,从3开始(number类型)
            if (err) { 
                if (this.autoClose) { //若是须要自动关闭我再去销毁fd
                    this.destroy(); //关闭文件(触发关闭事件)
                }
                this.emit('error', err); //打开文件发生错误,发布error事件
            }
            this.fd = fd; //保存文件描述符
            this.emit('open', this.fd) //触发文件open方法
        })
    }
复制代码
  • 想下,打开文件咱们作了两件事,
  • 一、若是发生错误,关闭文件,同时发射 "error"事件
  • 二、若是没有错误,保存fd,而后发射 "open"事件
  • 先来实现下this.destroy()关闭文件的方法

3、实现destroy()方法

destroy() {
        if (typeof this.fd != 'number') { //文件未打开,也要关闭文件且触发close事件
            return this.emit('close');
        }
        fs.close(this.fd, () => {  //若是文件打开过了 那就关闭文件而且触发close事件
            this.emit("close");
        })
    }
复制代码
  • 这样一来,rs.on('open')已经实现了,咱们来测试下吧

4、实现主要的read方法真的读文件,于rs.on('data')方法对应

  • 一、确保真的拿到fd(文件描述符,默认3,number类型)
  • 二、确保拿到fd后,对fs.read中howMuchToRead有一个绕的算法,多举几个例子理解更好,若是对fs.read不了解,戳这里,fs.read()方法介绍
  • 三、异步递归去读文件,读完为止。
  • 四、说了这么多,直接干。
read() {
        //此时文件还没打开
        if (typeof this.fd != 'number') {
            //当文件真正打开的时候 会触发open事件,触发事件后再执行read,此时fd 就有了
            return this.once('open', () => this.read())
        }
        //此时有fd了 开始读取文件了
        //this.pos是变量,开始时this.pos = this.start,在上面定义过了
        //算法有点绕,源码中是这样实现的。举个例子 end=3,pos=0,highWaterMark=3, howMuchToRead = 3, 1.txt内容1234 就会读123  4 
        let howMuchToRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, byteRead) => {
            // byteRead真实读到的个数
            this.pos += byteRead;
            // this.buffer默认三个 
            let b = this.buffer.slice(0, byteRead);
            //对读到的b进行编码
            b = this.encoding ? b.toString(this.encoding) : b;
            //把读取到的buffer发射出去
            this.emit('data', b);
            if ((byteRead === this.highWaterMark) && this.flowing) {
                return this.read();
            }
            //这里没有更多逻辑了
            if (byteRead < this.highWaterMark) {
                //没有更多了
                this.emit('end'); //读取完毕
                this.destroy();   //销毁完毕
            }
        })
    }
复制代码

你们会发现,此时咱们尚未监听 rs.on('data')事件,来触发read方法,此时咱们须要修改下 第一步建立构造函数的代码api

constructor(path, options = {}) {
        //省略.... 代码和第一步同样,下面是新添加
    

       // 看是否监听了data事件,若是监听了就要变成流动模式
        this.on('newListener', (eventName, callback) => {
            if (eventName === 'data') {
                //至关于用户监听了data事件
                this.flowing = true;
                // 监听了就去读
                this.read(); //去读内容
            }
        })
    }

复制代码

若是能看到这里,就基本大功告成,就只剩下pause和resume 暂停和恢复暂停方法。那就一写到底bash

5、添加pause暂停 和resume恢复暂停方法

  • 两个方法很是简单,就直接贴代码
pause() {
        this.flowing = false;
    }
    resume() {
        this.flowing = true;
        //恢复暂停,在去无限读
        this.read();
    }
复制代码

终于大功告成,写的对不对呢,赶忙测试下吧,期待的搓手手异步

end

  • 咱们已经实现了可读流实现,后续还会有可写流实现。api虽然枯燥,但愿你们仍是多写写源码
  • 对源码感兴趣,我把源码放在github上 ,供你们参考
相关文章
相关标签/搜索