在以前的博客中已经了解了流的基本用法(请看我以前的博客),这篇的重点在于根据可读流的用法对可读流的原理进行分析,实现简易版的 ReadStream
编程
在使用 fs
的 createReadStream
建立可读流时,返回了 ReadStream
对象,上面存在着一些事件和方法,其实咱们在建立这个可读流的时候建立了某一个类的实例,这个实例能够调用类原型上的方法,咱们这里将这个类命名为 ReadStream
。数组
在类原型上的方法内部可能会建立一些事件,在 NodeJS 中,事件是依赖 events
模块的,即 EventEmitter
类,同时类的方法可能会操做文件,会用到 fs
模块,因此也提早引入 fs
。缓存
// 引入依赖模块
const EventEmitter = require("events");
const fs = require("fs");
// 建立 ReadStream 类
class ReadStream extends EventEmitter {
constructor(path, options = {}) {
super();
// 建立可读流参数传入的属性
this.path = path; // 读取文件的路径
this.flags = options.flags || "r"; // 文件标识位
this.encoding = options.encoding || null; // 字符编码
this.fd = options.fd || null; // 文件描述符
this.mode = options.mode || 0o666; // 权限位
this.autoClose = options.autoClose || true; // 是否自动关闭
this.start = options.start || 0; // 读取文件的起始位置
this.end = options.end || null; // 读取文件的结束位置(包含)
this.highWaterMark = options.highWaterMark || 64 * 1024; // 每次读取文件的字节数
this.flowing = false; // 控制当前是不是流动状态,默认为暂停状态
this.buffer = Buffer.alloc(this.highWaterMark); // 存储读取内容的 Buffer
this.pos = this.start; // 下次读取文件的位置(变化的)
// 建立可读流要打开文件
this.open();
// 若是监听了 data 事件,切换为流动状态
this.on("newListener", type => {
if (type === "data") {
this.flowing = true;
// 开始读取文件
this.read();
}
});
}
}
// 导出模块
module.exports = ReadStream;复制代码
使用 fs.createReadStream
时传入了两个参数,读取文件的路径和一个 options
选项,options
上有八个参数,咱们在建立 ReadStream
类的时候将这些参数初始化到了 this
上。bash
建立可读流的时候有两种状态,流动状态和暂停状态,默认建立可读流是暂停状态,只有在触发 data
事件时才会变为流动状态,因此在 this
上挂载了 flowing
存储当前的状态是否为流动状态,值默认为 false
。异步
readable
, 是可读流的另外一种模式,咱们这节讨论的可读流为流动模式。
在读取文件时实际上是操做 Buffer 进行读取的,须要有一个 Buffer 实例用来存储每次读取的数据,因此在 this
上挂载了一个新建立的 Buffer,长度等于 highWaterMark
。函数
当从 start
值的位置开始读取文件,下一次读取文件的位置会发生变化,因此在 this
上挂载了 pos
属性,用于存储下次读取文件的位置。性能
在建立 ReadStream
的实例(可读流)时,应该打开文件并进行其余操做,因此在 this
上挂载了 open
方法并执行。测试
建立实例的同时监听了 newListener
事件,回调在每次使用 on
监听事件时触发,回调内部逻辑是为了将默认的暂停状态切换为流动状态,由于在使用时,流动状态是经过监听 data
事件触发的,在 newListener
的回调中判断事件类型为 data
的时候将 flowing
标识的值更改成 true
,并调用读取文件的 read
方法。ui
在使用 ES6 的类编程时,原型上的方法都是写在 class
内部,咱们下面为了把原型上的方法拆分出来成为单独的代码块,都使用 ReadStream.prototype.open = function...
直接给原型添加属性的方式,但这样的方式和直接写在 class
内有一点区别,就是 class
内部的书写的原型方法都是不可遍历的,添加属性的方式建立的方法都是可遍历的,可是这点区别对咱们代码的执行没有任何影响。this
在使用可读流时,打开时默认是暂停状态,会触发 open
事件,若是打开文件出错会触发 error
事件。
// 打开文件
ReadStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
this.emit("error", err);
// 若是文件打开了出错,并配置自动关闭,则关掉文件
if (this.autoClose) {
// 关闭文件(触发 close 事件)
this.destroy();
// 再也不继续执行
return;
}
}
// 存储文件描述符
this.fd = fd;
// 成功打开文件后触发 open 事件
this.emit("open");
});
};复制代码
open
方法的逻辑就是在打开文件的时候,将文件描述符存储在实例上方便后面使用,并使用 EventEmitter
的原型方法 emit
触发 open
事件,若是出错就使用 emit
触发 error
事件,若是配置 autoClose
参数为 true
,就关闭文件并触发 close
。
咱们将关闭文件的逻辑抽取出来封装在了 ReadStream
类的 destroy
方法中,下面来实现 destroy
。
文件出错分为两种,第一种文件打开出错,第二种是文件不存在出错(没打开),第二种系统是没有分配文件描述符的。
// 关闭文件
ReadStream.prototype.detroy = function() {
// 判断是否存在文件描述符
if (typeof this.fd === "number") {
// 存在则关闭文件并触发 close 事件
fs.close(fd, () => {
this.emit("close");
});
return;
}
// 不存在文件描述符直接触发 close 事件
this.emit("close");
};复制代码
若是是打开文件后出错须要关闭文件,并触发 close
事件,若是是没打开文件,则直接触发 close
事件,因此上面经过文件描述符来判断该如何处理。
还记得在 ReadStream
类中,监听的 newListener
事件的回调中若是监听了 data
事件则会执行 read
读取文件,接下来就实现读取文件的核心逻辑。
// 读取文件
ReadStream.prototype.read = function() {
// 因为 open 异步执行,read 是在建立实例时同步执行
// read 执行可能早于 open,此时不存在文件描述符
if (typeof this.fd !== "number") {
// 由于 open 用 emit 触发了 open 事件,因此在这是从新执行 read
return this.once("open", () => this.read());
}
// 如过设置告终束位置,读到结束为止就不能再读了
// 若是最后一次读取真实读取数应该小于 highWaterMark
// 因此每次读取的字节数应该和 highWaterMark 取最小值
let howMuchToRead = this.end
? Math.min(this.highWaterMark, this.end - this.pos + 1)
: this.highWaterMark;
// 读取文件
fs.read(
this.fd,
this.buffer,
0,
howMuchToRead,
this.pos,
(err, bytesRead) => {
// 若是读到内容执行下面代码,读不到则触发 end 事件并关闭文件
if (bytesRead > 0) {
// 维护下次读取文件位置
this.pos += bytesRead;
// 保留有效的 Buffer
let realBuf = this.buffer.slice(0, bytesRead);
// 根据编码处理 data 回调返回的数据
realBuf = this.encoding
? realBuf.toString(this.encoding)
: realBuf;
// 触发 data 事件并传递数据
this.emit("data", realBuf);
// 递归读取
if (this.flowing) {
this.read();
}
} else {
this.isEnd = true;
this.emit("end"); // 触发 end 事件
this.detroy(); // 关闭文件
}
}
);
};复制代码
建立 ReadStream
的实例时,执行的 open
方法内部是使用 fs.open
打开文件的,是异步操做,而读取文件方法 read
是在 newListener
回调中同步执行的,这样极可能触发 read
的时候文件尚未被打开(不存在文件描述符),因此在 read
方法中判断了文件描述符是否存在,并在不存在时候使用 once
添加了 open
事件,回调中从新执行了 read
。
因为在 open
方法中使用 emit
触发了 open
事件,因此 read
内用 once
添加的 open
事件的回调也会跟着执行一次,并在回调中从新调用了 read
方法,保证了 read
读取文件的逻辑在文件真正打开后才执行,为了文件打开前执行 read
而不执行读取文件的逻辑,用 once
添加 open
事件时别忘记 return
。
在使用 fs.read
读取文件的时候有一个参数为本次读取几个字符到 Buffer 中,若是在建立可读流的时候设置了读取文件的结束位置 end
参数,则读到 end
位置就不该该再继续读取了,因此在存在 end
参数的时候每次都计算一下读取个数和 highWaterMark
取最小值,保证读取内容小于 highWaterMark
的时候不会多读,由于读取时是包括 end
值做为 Buffer 的索引这一项的,因此计算时多减去的要 +1
加回来,再一次读取这个读取个数计算结果变成了 0
,也就结束了读取。
由于 end
参数的状况,因此在内部读取逻辑前判断了 bytesRead
(实际读取字节数)是否大于 0
,若是不知足条件则在实例添加是否读取结束标识 isEnd
(后面使用),触发 end
事件并关闭文件,若是知足条件,也是经过 bytesRead
对 Buffer 进行截取,保留了有用的 Buffer,而且经过 encoding
编码对 Buffer 进行处理后,触发 data
事件,并将处理后的数据传递给 data
事件的回调。
pause
的目的就是暂停读取,其实就是阻止 read
方法在读取时进行递归,因此只须要更改 flowing
的值便可。
// 暂停读取
ReadStream.prototype.pause = function() {
this.flowing = false;
};复制代码
resume
的目的是恢复读取,在更改 flowing
值得基础上从新执行 read
方法,因为在 pause
调用时 read
内部仍是执行得读取文件得分支,文件并无关闭,读取文件位置的参数也是经过实例上的当前的属性值进行计算的,因此从新执行 read
会继续上一次的位置读取。
// 恢复读取
ReadStream.prototype.resume = function() {
this.flowing = true;
if (!this.isEnd) this.read();
};复制代码
上面在从新执行 read
以前使用 isEnd
标识作了判断,防止在 setInterval
中调用 resume
在读取完成后不断的触发 end
和 close
事件。
接下来咱们使用本身实现的 ReadStream
类来建立可读流,并按照 fs.createReadStream
的用法进行使用并验证。
// 文件 1.txt 内容为 0123456789
const fs = require("fs");
const ReadStream = require("./ReadStream");
// 建立可读流
let rs = new ReadStream("1.txt", {
encoding: "utf8",
start: 0,
end: 5,
highWaterMark: 2
});
rs.on("open", () => console.log("open"));
rs.on("data", data => {
console.log(data, new Date());
rs.pause();
});
rs.on("end", () => console.log("end"));
rs.on("close", () => console.log("close"));
rs.on("error", err => console.log(err));
setInterval(() => rs.resume(), 1000);
// open
// 01 2018-07-04T10:44:20.384Z
// 23 2018-07-04T10:44:21.384Z
// 45 2018-07-04T10:44:22.384Z
// end
// close复制代码
执行上面的代码正常的执行逻辑是先触发 open
事件,而后触发 data
事件,读取一次后暂停,每隔一秒恢复读取一次,再读取完成后触发 end
和 close
事件,经过运行代码结果和咱们但愿的同样。
在 fs
模块中用 createReadStream
建立的可读流中经过监听 readable
事件触发暂停模式(监听 data 事件触发流动模式),经过下面例子感觉暂停模式与流动模式的不一样,如今读取文件 1.txt
,内容为 0~9
十个数字。
// 读取的
const fs = require("fs");
// 建立可读流
let rs = fs.createReadStream("1.txt", {
encoding: "utf8",
start: 0,
hithWaterMark: 3
});
rs.on("readable", () => {
// read 参数为本次读取的个数
let r = rs.read(3);
// 打印读取的数据
console.log(r);
// 打印容器剩余空间
console.log(rs._readableState.length);
});
// 012
// 0
// 345
// 0
// 678
// 0
// null
// 1
// 90
// 0复制代码
通俗的解释,暂停模式的 readable
事件默认会触发一次,监听 readable
事件后就像建立了一个 “容器”,容量为 hithWaterMark
,文件中的数据会自动把容器注满,调用可读流的 read
方法读取时,会从容器中取出数据,若是 read
方法读取的数据小于 hithWaterMark
,则直接暂停,再也不继续读取,若是大于 hithWaterMark
,说明 “容器” 空了,则会触发 readable
事件,不管读取字节数与 hithWaterMark
关系如何,只要 “容器” 内容量剩余小于 hithWaterMark
就会进行 “续杯”,再次向 “容器” 中填入 hithWaterMark
个,因此有些时候真实的容量会大于 hithWaterMark
。
read
方法读取的内容会返回 null
是由于容器内真实的数据数小于了读取数,若是不是最后一次读取,会在屡次读取后将值一并返回,若是是最后一次读取,会把剩余不足的数据返回。
readable
事件的触发条件:“容器” 空了;
hithWaterMark
。
read
返回 null
:“容器” 容器内可悲读取数据没法知足一次读取字节数。
同为可读流,暂停模式与流动模式相同,都依赖 fs
模块和 events
模块的 EventEmitter
类,参数依然为读取文件的路径和 options
。
// 引入依赖
const EventEmitter = require("events");
const fs = require("fs");
class ReadableStream extends EventEmitter {
constructor(path, options = {}) {
super();
this.path = path; // 读取文件的路径
this.flags = options.flags || "r"; // 文件标识位
this.encoding = options.encoding || null; // 字符编码
this.fd = options.fd || null; // 文件描述符
this.mode = options.mode || 0o666; // 权限位
this.autoClose = options.autoClose || true; // 是否自动关闭
this.start = options.start || 0; // 读取文件的起始位置
this.highWaterMark = options.highWaterMark || 64 * 1024; // 每次读取文件的字节数
this.reading = false; // 若是正在读取,则再也不读取
this.emitReadable = false; // 当缓存区的长度等于 0 的时候, 触发 readable
this.arr = []; // 缓存区
this.len = 0; // 缓存区的长度
this.pos = this.start; // 下次读取文件的位置(变化的)
// 建立可读流要打开文件
this.open();
this.on("newListener", type => {
if (type === "readable") {
this.read(); // 监听readable就开始读取
}
});
}
}
// 导出模块
module.exports = ReadableStream;复制代码
在类的添加了 newListener
事件,在回调中判断是否监听了 readable
事件,若是监听了开始从 “容器” 中读取。
打开和关闭文件的方法和流动模式的套路基本类似。
// 打开文件
ReadableStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
this.emit("error", err);
if (this.autoClose) {
this.destroy();
return;
}
}
this.fd = fd;
this.emit("open");
});
};复制代码
// 关闭文件
ReadableStream.prototype.detroy = function() {
if (typeof this.fd === "number") {
fs.close(fd, () => {
this.emit("close");
});
return;
}
this.emit("close");
};复制代码
read
方法的参数不传时就至关于从 “容器” 读取 highWaterMart
个字节,若是传参表示读取参数数量的字节数。
ReadableStream.prototype.read = function(n) {
// 若是读取大于了 highWaterMark,从新计算 highWaterMark,并从新读取
if (n > this.len) {
// 计算新的 highWaterMark,方法摘自 NodeJS 源码
this.highWaterMark = computeNewHighWaterMark(n);
this.reading = true;
this._read();
}
// 将要返回的数据
let buffer;
// 若是读取的字节数大于 0 小于等于当前缓存 Buffer 的总长度
if (n > 0 && n <= this.len) {
// 则从缓存中取出
buffer = Buffer.alloc(n);
let current; // 存储每次从缓存区读出的第一个 Buffer
let index = 0; // 每次读取缓存 Buffer 的索引
let flag = true; // 是否结束整个 while 循环的标识
// 开始读取
while ((current = this.arr.shift()) && flag) {
for (let i = 0; i < current.length; i++) {
// 将缓存中取到的 Buffer 的内容读到本身定义的 Buffer 中
buffer[index++] = current[i];
// 若是当前索引值已经等于了读取个数,结束 for 循环
if (index === n) {
flag = false;
// 取出当前 Buffer 没有消耗的
let residue = current.slice(i + 1);
// 在读取后维护缓存的长度
this.len -= n;
// 若是 BUffer 真的有剩下的就给塞回到缓存中
if (residue.length) {
this.arr.unshift(residue);
}
break;
}
}
}
}
// 若是当前 读取的 Buffer 为 0,将触发 readable 事件
if (this.len === 0) {
this.emitReadable = true;
}
// 若是当前的缓存区大小小于 highWaterMark,就要读取
if (this.len < this.highWaterMark) {
// 若是不是正在读取才开始读取
if (!this.read) {
this.reading = true;
this._read(); // 正真读取的方法
}
}
// 将 buffer 转回建立可读流设置成的编码格式
if (buffer) {
buffer = this.encoding ? buffer.toString(this.encoding) : buffer;
}
return buffer;
};复制代码
上面的 read
方法的参数大小对比缓存区中取出的 Buffer 长度有两种状况,一种是小于当前缓存区内取出 Buffer 的长度,一种是大于了真个缓存区的 len
的长度。
小于当前缓存区总长度经过循环取出须要的 Buffer 存储了咱们要返回建立的 Buffer 中,剩余的 Buffer 会丢失,因此咱们作了一个小小的处理,将剩下的 Buffer 做为第一个 Buffer 塞回到了缓存区中,在处理这个问题时与流动模式不相同,流动模式处理后直接跳出了,而暂停模式至关于从 “容器” 中读取,若是第一次读取后还有剩余还要接着从容器中继续读取。
大于 len
属性时,规定须要从新计算 highWaterMark
,遵循的原则是将当前 highWaterMark
设定为当前读取字节个数距离最接近的 2
的 n
次方的数值,NodeJS 源码中方法名称为 computeNewHighWaterMark
,为了提升性能是使用位运算的方式进行计算的,源码以下。
function computeNewHighWaterMark(n) {
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
return n;
}复制代码
在调用该方法从新计算 highWaterMark
后更改正在读取状态,从新读取,因为读取逻辑的重复,因此真正读取文件的逻辑抽取成一个 _read
方法来实现,下面呢就来看看 _read
内部都作了什么。
对比可读流(流动模式)的 read
方法,在调用 _read
方法读取时,是在 newListener
中同步执行 _read
,因此为了保证 _read
的逻辑是在 open
方法打开文件之后执行,使用了与 read
相同的处理方式。
ReadableStream.prototype._read = function() {
if (typeof this.fd !== "number") {
return this.once("open", () => _read());
}
// 建立本次读取的 Buffer
let buffer = Buffer.alloc(this.highWaterMark);
// 读取文件
fs.read(
this.fd,
buffer,
0,
this.highWaterMark,
this.pos,
(err, bytesRead) => {
if (bytesRead > 0) {
this.arr.push(buffer); // 缓存
this.len += bytesRead; // 维护缓存区长度
this.pos += bytesRead; // 维护下一次读取位置
this.reading = false; // 读取完毕
// 触发 readable 事件
if (this.emitReadable) {
// 触发后更改触发状态为 false
this.emitReadable = false;
this.emit("readable");
}
} else {
// 若是读完触发结束事件
this.emit("end");
}
}
);
};复制代码
因为缓存区是一个数组,存储的每个 Buffer 是独立存在的,因此不能挂载在实例上共用,若是挂在实例上则引用相同,一动全动,这不是咱们想要的,因此每一次执行 _read
方法时都建立新的 Buffer 实例存入读取的数据后存储在缓存区中,若是读取完成 bytesRead
为 0
,则触发 end
事件。
fs.createReadStream
建立一个可读流,经过监听 data
和 readable
两种不一样的事件来触发两种不一样的模式,而咱们为了模拟,把两种模式拆开成了两个类来实现的,在测试时须要建立不一样类的实例。
为了统一咱们依然读取真正用法中 1.txt
文件,内容为 0~9 十个数字。
// 引入依赖
const fs = require("fs");
const ReadableStream = require("./ReadableStream");
let rs = new ReadableStream("1.txt", {
encoding: "utf8",
start: 0,
highWaterMark: 3
});
rs.on("readable", () => {
let r = rs.read(3);
console.log(r);
console.log(rs.len);
});复制代码
在打印 “容器” 剩余容量时,咱们使用在 ReadableStream
上构造的 len
属性。
流动模式和暂停模式分别有不一样的应用场景,若是只是但愿读取一个文件,并最快的得到结果使用流动模式是很好的选择,若是但愿了解读取文件的具体内容,并进行精细的处理,使用暂停模式更好一些。
在使用 fs
的 createWriteStream
建立可写流时,返回了 WriteStream
对象,上面也存在事件和方法,建立可写流的时也是建立类的实例,咱们将这个类命名为 WriteStream
。事件一样依赖 events
模块的 EventEmitter
类,文件操做一样依赖 fs
模块,因此需提早引入。
// 引入依赖模块
const EventEmitter = require("events");
const fs = require("fs");
// 建立 WriteStream 类
class WriteStream extends EventEmitter {
constructor(path, options = {}) {
super();
// 建立可写流参数传入的属性
this.path = path; // 写入文件的路径
this.flags = options.flags || "w"; // 文件标识位
this.encoding = options.encoding || "utf8"; // 字符编码
this.fd = options.fd || null; // 文件描述符
this.mode = options.mode || 0o666; // 权限位
this.autoClose = options.autoClose || true; // 是否自动关闭
this.start = options.start || 0; // 写入文件的起始位置
this.highWaterMark = options.highWaterMark || 16 * 1024; // 对比写入字节数的标识
this.writing = false; // 是否正在写入
this.needDrain = false; // 是否须要触发 drain 事件
this.buffer = []; // 缓存,正在写入就存入缓存中
this.len = 0; // 当前缓存的个数
this.pos = this.start; // 下次写入文件的位置(变化的)
// 建立可写流要打开文件
this.open();
}
}
// 导出模块 复制代码
module.exports = WriteStream;使用 fs.createWriteStream
建立可写流时传入了两个参数,写入的文件路径和一个 options
选项,options
上有七个参数,咱们在建立 ReadStream
类的时候将这些参数初始化到了 this
上。
建立可写流后须要使用 write
方法进行写入,写入时第一次会真的经过内存写入到文件中,而再次写入则会将内容写到缓存中,注意这里的 “内存” 和 “缓存”,内存是写入文件是的系统内存,缓存是咱们本身建立的数组,第一次写入之后要写入文件的 Buffer 都会先存入这个数组中,这个数组名为 buffer
,挂载在实例上,实例上同时挂载了 len
属性用来存储当前缓存中 Buffer 总共的字节数(长度)。
咱们在可读流上挂载了是否正在写入的状态 writing
属性,只要缓存区中存在未写入的 Buffer,writing
的状态就是正在写入,当写入的字节数大于了 highWaterMark
须要触发 drain
事件,因此又挂载了是否须要触发 drain
事件的标识 needDrain
属性。
当从文件的 start
值对应的位置开始写入,下一次写入文件的位置会发生变化,因此在 this
上挂载了 pos
属性,用于存储下次写入文件的位置。
在 NodeJS 流的源码中缓存是用链表实现的,经过指针来操做缓存中的 Buffer,而咱们为了简化逻辑就使用数组来做为缓存,虽然性能相对链表要差。
在 WriteStream
中,写入文件以前也应该打开文件,在打开文件过程当中出错时也应该触发 error
事件并关闭文件,打开和关闭文件的方法 open
和 detroy
与 ReadStream
的 open
和 detroy
方法的逻辑一模一样,因此这里直接拿过来用了。
// 打开文件
WriteStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
this.emit("error", err);
if (this.autoClose) {
this.destroy();
return;
}
}
this.fd = fd;
this.emit("open");
});
};复制代码
// 关闭文件
WriteStream.prototype.detroy = function() {
if (typeof this.fd === "number") {
fs.close(fd, () => {
this.emit("close");
});
return;
}
this.emit("close");
};复制代码
write
方法默认支持传入三个参数:
// 写入文件的方法,只要逻辑为写入前的处理
WriteStream.prototype.write = function(
chunk,
encoding = this.encoding,
callback
) {
// 为了方便操做将要写入的数据转换成 Buffer
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
// 维护缓存的长度
this.len += chunk.lenth;
// 维护是否触发 drain 事件的标识
this.needDrain = this.highWaterMark <= this.len;
// 若是正在写入
if (this.writing) {
this.buffer.push({
chunk,
encoding,
callback
});
} else {
// 更改标识为正在写入,再次写入的时候走缓存
this.writing = true;
// 若是已经写入清空缓存区的内容
this._write(chunk, encoding, () => this.clearBuffer());
}
return !this.needDrain;
};复制代码
与可写流的 read
同样,咱们在使用 write
方法将数据写入文件时,也是操做 Buffer,在 write
方法中,首先将接收到的要写入的数据转换成了 Buffer,由于是屡次写入,要知道缓存中 Buffer 字节数的总长度,因此维护了 len
变量。
咱们的 WriteStream
构造函数中,this
挂载了 needDrain
属性,在使用 fs.createWriteStream
建立的可读流时,是写入的字节长度超过 highWaterMark
才会触发 drain
事件,而 needDrain
与 write
的返回值正好相反,因此咱们用 needDrain
取反来做为 write
方法的返回值。
在写入的逻辑中第一次是直接经过内存写入到文件,可是再次写入就须要将数据存入缓存,将数据写入到文件中写入状态 writing
默认为 false
,经过缓存再写入证实应该正在写入中,因此在第一次写入后应更改 writing
的状态为 true
,写入缓存其实就是把转换的 Buffer、编码以及写入成功后要执行的回调挂在一个对象上存入缓存的数组 buffer
中。
咱们把真正写入文件的逻辑抽取成一个单独的方法 _write
,并传入 chunk
(要写入的内容,已经处理成 Buffer)、encoding
(字符编码)、回调函数,在回调函数中执行了原型方法 clearBuffer
,接下来就来实现 _write
和 clearBuffer
。
对比可读流(流动模式)的 read
方法,在调用 _write
方法写入时,是在建立可写流以后的同步代码中执行的,与可读流在 newListener
中同步执行 read
的状况相似,因此为了保证 _write
的逻辑是在 open
方法打开文件之后执行,使用了与 read
相同的处理方式。
// 真正的写入文件操做的方法
WriteStream.prototype._write = function(chunk, encoding, callback) {
// 因为 open 异步执行,write 是在建立实例时同步执行
// write 执行可能早于 open,此时不存在文件描述符
if (typeof this.fd !== "number") {
// 由于 open 用 emit 触发了 open 事件,因此在这是从新执行 write
return this.once("open", () => this._write(chunk, encoding, callback));
}
// 读取文件
fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
// 维护下次写入的位置和缓存区 Buffer 的总字节数
this.pos += bytesWritten;
this.len -= bytesWritten;
callback();
});
};复制代码
在打开文件并写入的时候须要维护两个变量,下次写入的位置 pos
和当前缓存区内 Buffer 所占总字节数 len
,本次写入了多少个字节,下次写入须要在写入位置的基础上加多少个字节,而 len
偏偏相反,本次写入了多少个字节,缓存区中的总长度应该对应的减小多少个字节。
在维护两个变量的值之后调用 callback
,其实 callback
内执行的是 clearBuffer
方法,就如方法名,译为 “清空缓存”,其实就是一次一次的将数据写入文件并从缓存中移除,很明显须要递归调用 _write
方法,咱们将这个递归的逻辑统一放在 clearBuffer
方法中实现。
// 清空缓存方法
WriteStream.prototype.clearBuffer = function() {
// 先写入的在数组前面,从前面取出缓存中的 Buffer
let buf = this.buffer.shift();
// 若是存在 buf,证实缓存还有 Buffer 须要写入
if (buf) {
// 递归 _write 按照编码将数据写入文件
this._write(buf.chunk, buf.encoding, () => this.clearBuffer);
} else {
// 若是没有 buf,说明缓存内的内容已经彻底写入文件并清空,须要触发 drain 事件
this.emit("drain");
// 更改正在写入状态
this.writing = false;
// 更改是否须要触发 drain 事件状态
this.needDrain = false;
}
};复制代码
clearBuffer
方法中获取了缓存区数组的最前面的 Buffer(最前面的是先写入缓存的,也应该先取出来写入文件),存在这个 Buffer 时,递归 _write
方法按照编码将数据写入文件,若是不存在说明缓存区已经清空了,表明内容彻底写入文件中,因此触发 drain
事件,最后更改了 writing
和 needDrain
的状态。
更正 writing
是为了 WriteStream
建立的可读流在下次调用 write
方法时默认第一次真正写入文件,而更正 needDrain
的状态是在缓存区要清空的最后一个 Buffer 的长度小于了 highWaterMark
时,保证 write
方法的返回值是正确的。
第一次是真正写入,其余的都写入缓存,再一个一个的将缓存中存储的 Buffer 写入并从缓存清空,之因此这样设计是为了把写入的内容排成一个队列,假若有 3
我的同时操做一个文件写入内容,只有第一我的是真的写入,其余的人都写在缓存中,再按照写入缓存的顺序依次写入文件,避免冲突和写入顺序出错。
接下来咱们使用本身实现的 WriteStream 类来建立可写流,并按照 fs.createWriteStream
的用法进行使用并验证。
// 向 1.txt 文件中写入 012345
const fs = require("fs");
const WriteStream = require("./WriteStream");
// 建立可写流
let ws = new WriteStream("2.txt", {
highWaterMark: 3
});
let i = 0;
function write() {
let flag = true;
while (i <= 6 && flag) {
i++;
flag = ws.write(i + "", "utf8");
}
}
ws.on("drain", function() {
console.log("写入成功");
write();
});
write();
// true
// true
// false
// 写入成功
// true
// true
// false
// 写入成功复制代码
可使用 fs.createWriteStream
和 WriteStream
类分别执行上面的代码,对比结果,看看是否相同。
可写流和可读流通常是经过 pipe
配合来使用的,pipe
方法是可读流 ReadStream
的原型方法,参数为一个可写流。
// 链接可读流和可写流的方法 pipe
ReadStream.prototype.pipe = function(dest) {
// 开始读取
this.on("data", data => {
// 若是超出可写流的 highWaterMark,暂停读取
let flag = dest.write(data);
if (!flag) this.pause();
});
dest.on("drain", () => {
// 当可写流清空内存时恢复读取
this.resume();
});
this.on("end", () => {
// 在读取完毕后关闭文件
this.destroy();
});
};复制代码
pipe
方法其实就是经过可读流的 data
事件触发流动状态,并用可写流接收读出的数据进行写入,当写入数据超出 highWaterMark
,则暂停可读流的读取,直到可写流的缓存被清空并把内容写进文件后,恢复可读流的读取,当读取结束后关闭文件。
下面咱们实现一个将 1.txt
的内容拷贝 2.txt
中的例子。
// pipe 的使用
const fs = require("fs");
// 引入本身的 ReadStream 类和 WriteStream 类
const ReadStream = rquire("./ReadStream");
const WriteStream = rquire("./WriteStream");
// 建立可读流和可写流
let rs = new ReadStream("1.txt", {
highWaterMark: 3
});
let ws = new WriteStream("2.txt", {
highWaterMark: 2
});
// 使用 pipe 实现文件内容复制
rs.pipe(ws);复制代码
在 NodeJS 源码中,可读流和可写流的内容要比本篇内容多不少,本篇是将源码精简,抽出核心逻辑并针对流的使用方式进行实现,主要目的是帮助理解流的原理和使用,争取作到 “知其然知其因此然”,了解了一些底层再对流使用时,也能游刃有余。