手写node可读流之流动模式

node的可读流基于事件node

可读流之流动模式,这种流动模式会有一个"开关",每次当"开关"开启的时候,流动模式起做用,若是将这个"开关"设置成暂停的话,那么,这个可读流将不会去读取文件,直到将这个"开关"从新置为流动。缓存

读取文件流程

读取文件内容的流程,主要为:bash

  1. 打开文件,打开文件成功,将触发open事件,若是打开失败,触发error事件和close事件,将文件关闭。
  2. 开始读取文件中的内容,监听data事件,数据处于流动状态,可经过修改开关的状态来暂停读取。
  3. 每次读取到的内容放入缓存中,并经过data事件将数据发布出去。
  4. 当文件中的内容读取完毕以后,将文件关闭。

这一系列动做都是基于事件来进行操做的,而node中的事件咱们都知道是一种发布订阅模式来实现的。服务器

下面咱们来看一看,node是如何使用可读流来读取文件中的内容?异步

node 可读流参数

首先咱们经过fs模块来建立一个可读流,可读流接受两个参数:函数

  • 第一个参数是要读取的文件地址,在这里指明你要读取哪一个文件。
  • 第二个参数是可选项,这个参数是一个对象,用来指定可读流的一些具体的参数。

以下几个参数咱们来一一说明:ui

  • highWaterMark:设置高水位线,这个参数主要用于在读取文件时,可读流会将文件中的内容读取到缓存当中,而这里咱们须要建立一个buffer来缓存这些数据,因此这个参数是用来设置buffer的大小,若是不对这个参数进行设置的话,可读流默认的配置64k。this

  • flags:这个参数主要用于设置文件的执行模式,好比说咱们具体的操做适用于读取文件仍是写入文件等这些操做。若是是写入文件的话那咱们,使用的是w。若是是读取文件的话那这个操做符就应该是r。编码

下面这张表格就说明了不一样的符号表明不一样含义:spa

符号 含义
r 读文件,文件不存在报错
r+ 读取并写入,文件不存在报错
rs 同步读取文件并忽略缓存
w 写入文件,不存在则建立,存在则清空
wx 排它写入文件
w+ 读取并写入文件,不存在则建立,存在则清空
wx+ 和w+相似,排他方式打开
a 追加写入
ax 与a相似,排他方式写入
a+ 读取并追加写入,不存在则建立
ax+ 做用与a+相似,可是以排他方式打开文件
  • autoClose:这个参数主要用于,对文件的关闭的一些控制。若是文件再打开的过程或者其余操做的过程当中出现了错误的状况下,须要将文件进行关闭。那这个参数是设置文件是否自动关闭的功能。

  • encoding:node中用buffer来读取文件操做的东西二进制数据。这些数据展示出来的话咱们是一堆乱码,因此须要,要咱们对这个数据指定一个具体的编码格式。而后将会对这些数据进行编码转化,这样转化出来的数据就是咱们能看懂的数据。

  • starts:这个参数主要用于指定从什么位置开始读取文件中的内容,默认的话是从零开始。

  • ends:这个参数主要用于指定定具体要读取文件多长的数据,这里须要说明一下,这个参数是包括自己的位置,也就是所谓的包前和包后。

下面咱们来看看可读流具体例子:

let fs = require("fs");
let rs = fs.createReadStream("./a.js", {
    highWaterMark: 3,
    encoding: "utf8",
    autoClose: true,
    start: 0,
    end: 9
});
rs.on("open", () => {console.log("open");});
rs.on("close", () => {console.log("close");});
rs.on("data", data => {
    console.log(data);
    rs.pause();//暂停读取 此时流动模式为暂停模式
});
setInterval(() => {
    rs.resume();//从新设置为流动模式,开始读取数据
}, 1000);
rs.on("end", () => { console.log("end"); });
rs.on("error", err => { console.log(err); });
复制代码

手写可读流第一步

上面咱们说过,node可读流是基于node的核心模块事件来完成的,因此在实现咱们本身的可读流时须要继承events模块,代码以下:

let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {

}
复制代码

继承了EventEmitter类,咱们就可使用EventEmitter类中的各个方法,而且一样是采用发布订阅的模式了处理事件。

第二步:处理可读流配置的参数

上面咱们提到,node中建立可读流时能够对这个流配置具体的参数,好比

let rs = fs.createReadStream("./a.js", {
    highWaterMark: 3,
    encoding: "utf8",
    autoClose: true,
    start: 0,
    end: 9
});
复制代码

那么对于这些参数,咱们本身实现的可读流类也须要对这些参数进行处理,那么这些参数该如何进行处理呢?

constructor(path, options = {}) {
    super();
    this.path = path; //指定要读取的文件地址
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.autoClose = options.autoClose || true; //是否自动关闭文件
    this.start = options.start || 0; //从文件哪一个位置开始读取
    this.pos = this.start; // pos会随着读取的位置改变
    this.end = options.end || null; // null表示没传递
    this.encoding = options.encoding || null;// buffer编码
    this.flags = options.flags || 'r';

    this.flowing = null; // 模式开关
    this.buffer = Buffer.alloc(this.highWaterMark);// 根据设置建立一个buffer存储读出来的数
    this.open();
}
复制代码

一般配置的原则是以用户配置的参数为准,若是用户没有对这个参数进行设置的话,就采用默认的配置。

实现可读流第三步:打开文件

这里原理是使用node模块fs中的open方法。首先咱们来回顾下fs.open()方法的使用。

fs.open(filename,flags,[mode],callback);
//实例
fs.open('./1,txt','r',function(err,fd){});
复制代码

这里须要说明下,回调函数callback中有2个参数:

  • 第一个是error,node中异步回调都会返回的一个参数,用来讲明具体的错误信息
  • 第二个参数是fd,是文件描述符,用来标识文件,等价于open函数的第一个参数

好了,如今咱们来看看咱们本身的可读流的open方法该如何实现吧:

open() {
    fs.open(this.path, this.flags, (err, fd) => { 
        //fd标识的就是当前this.path这个文件,从3开始(number类型)
        if (err) {
            if (this.autoClose) { // 若是须要自动关闭则去关闭文件
                this.destroy(); // 销毁(关闭文件,触发关闭事件)
            }
            this.emit('error', err); // 若是有错误触发error事件
            return;
        }
        this.fd = fd; // 保存文件描述符
        this.emit('open', this.fd); // 触发文件的打开的方法
    });
}
复制代码

从代码上咱们能够看出:

  • fs.open函数是异步函数,也就是说callback是异步执行的,在成功打开文件的状况下,fd这个属性也是异步获取到的,这点须要注意。

  • 另外重要的一点是,若是在打开文件发生错误时,则代表打开文件失败,那么此时就须要将文件关闭。

实现可读流第四步:读取文件内容

上面咱们详细说过,可读流自身定义了一个"开关",当咱们要读取文件中的内容的时候,咱们须要将这个"开关"打开,那么node可读流自己是如何来打开这个"开关"的呢?

监听data事件

node可读流经过监听data事件来实现这个"开关"的开启:

rs.on("data", data => {
    console.log(data);
});
复制代码

当用户监听data事件的时候,"开关"开启,不停的从文件中读取内容。那么node是怎么监听data事件的呢? 答案就是 事件模块的newListener

这是由于node可读流是基于事件的,而事件中,服务器就能够经过newListener事件监听到从用户这边过来的全部事件,每一个事件都有对应的类型,当用户监听的是data事件的时候,咱们就能够获取到,而后就能够去读取文件中的内容了,那咱们本身的可读流该如何实现呢?

// 监听newListener事件,看是否监听了data事件,若是监听了data事件的话,就开始启动流动模式,读取文件中的内容
this.on("newListener", type => {
    if (type === "data") {
        //  开启流动模式,开始读取文件中的内容
        this.flowing = true;
        this.read();
    }
});
复制代码

好了,知道了这个"开关"是如何打开的,那么这个时候就到了真正读取文件中内容的关键时候了,先上代码先:

read() {
    // 第一次读取文件的话,有可能文件是尚未打开的,此时this.fd可能尚未值
    if (typeof this.fd !== "number") {
        // 若是此时文件仍是没有打开的话,就触发一次open事件,这样文件就真的打开了,而后再读取
        return this.once("open", () => this.read());
    }
    // 具体每次读取多少个字符,须要进行计算,由于最后一次读取倒的可能比highWaterMark小
    let howMuchRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
    fs.read(this.fd, this.buffer, 0, howMuchRead, this.pos, (err, byteRead) => {
        // this.pos 是每次读取文件读取的位置,是一个偏移量,每次读取会发生变化
        this.pos += byteRead;
        // 将读取到的内容转换成字符串串,而后经过data事件,将内容发布出去
        let srr = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);
        // 将内容经过data事件发布出去
        this.emit("data", srr);
        // 当读取到到内容长度和设置的highWaterMark一致的话,而且仍是流动模式的话,就继续读取
        if ((byteRead === this.highWaterMark) && this.flowing) {
            return this.read();
        }
        // 没有更多的内容了,此时表示文件中的内容已经读取完毕
        if (byteRead < this.highWaterMark) {
            // 读取完成,发布end方法,并关闭文件
            this.emit("end");
            this.destory();
        }
    });
}
复制代码

这里咱们特别要注意的是:

  • 文件是否已经打开,是否获取到fd,若是没有打开的话,则再次触发open方法
  • 分批次读取文件内容,每次读取的内容是变化的,因此位置和偏移量是要动态计算的
  • 控制读取中止的条件。

实现可读流第五步:关闭文件

好了,到如今,基础的读取工做已经完成,那么就须要将文件关闭了,上面的open和read方法里面都调用了一个方法:destory,没错,这个就是关闭文件的方法,好了,那么咱们来看看这个方法该如何实现吧

destory() {
    if (typeof this.fd !== "number") {
        // 发布close事件
        return this.emit("close");
    }
    // 将文件关闭,发布close事件
    fs.close(this.fd, () => {
        this.emit("close");
    });
}
复制代码

固然这块的原理就是调用fs模块的close方法啦。

实现可读流第六步:暂停和恢复

既然都说了,node可读流有一个神奇的"开关",就像大坝的阀门同样,能够控制水的流动,一样也能够控制水的暂停啦。固然在node可读流中的暂停是中止对文件的读取,恢复就是将开关打开,继续读取文件内容,那么这两个分别对应的方法就是pause()和resume()方法。

那么咱们本身的可读流类里面该如何实现这两个方法的功能呢?很是简单: 咱们在定义类的私有属性的时候,定义了这样一个属性flowing,当它的值为true时表示开关打开,反之关闭。

pause() {
    this.flowing = false;// 将流动模式设置成暂停模式,不会读取文件
}
resume() {
    this.flowing = true;//将模式设置成流动模式,能够读取文件
    this.read();// 从新开始读取文件
}
复制代码

好了,关于node可读流的实现咱们就写到这里,快快敲起代码,动手实现一个你本身的可读流吧!

相关文章
相关标签/搜索