上篇文章写了可读流的用法和源码实现,能够先去看下,其中有类似的地方,重复的地方就很少作介绍了,就直接写用法。有一点值得一提就是可写流有缓存区的概念html
let fs = require('fs');
let ws = fs.createWriteStream('2.txt', {
flags: 'w', // 文件的操做, 'w'写入文件,不存在则建立
mode: 0o666,
autoClose: true,
highWaterMark: 3, // 默认写是16*1024
encoding: 'utf8'
});
复制代码
不太明白?不要紧,咱们看下面这个例子帮助咱们理解node
let fs = require("fs");
let ws = fs.createWriteStream('1.txt',{
flags:'w',
encoding:'utf8',
start:0,
highWaterMark:3
});
let i =9;
function write() {
let flag = true;
while (flag && i>=0){
flag = ws.write(i-- +'');//往1.txt写入9876543210
console.log(flag);
}
}
ws.on('drain',()=>{ //缓存区充满并被写入完成,处于清空状态时触发
console.log("干了");
write(); //当缓存区清空后咱们在继续写
})
write(); //第一次调用write方法
复制代码
看到这里,咱们应该基本明白用法,下面咱们开始写源码实现,鉴于与可读流有不少类似写法上篇文章已详细写过,重复的就很少说了,说干就干git
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter{
constructor(path,options={}){
super();
this.path = path;
this.flags = options.flags || 'w';
this.encoding = options.encoding || 'utf8';
this.start = options.start || 0;
this.pos = this.start;
this.mode = options.mode || 0o666;
this.autoClose = options.autoClose || true;
this.highWaterMark = options.highWaterMark || 16 * 1024;
//第一次写入是真的往文件里写
this.writing = false; //默认第一次不是正在写入
// 可写流 要有一个缓存区,当正在写入文件时,内容要写入到缓存区
this.cache = [];
// 记录缓存区大小
this.len =0;
// 是否触发drain事件
this.needDrain = false;
this.open(); //目的是拿到fd 异步,触发一个open事件后fd确定存在啦
}
}
复制代码
open(){
fs.open(this.path,this.flags,this.mode,(err,fd)=>{
if(err){
this.emit('error', err); //打开文件发生错误,发布error事件
this.emit('error');
if (this.autoClose) { //若是须要自动关闭我再去销毁fd
this.destroy(); //关闭文件(触发关闭事件)
}
return;
}
this.fd = fd; //保存文件描述符
this.emit('open', this.fd) //触发文件open方法
})
}
destroy() {
if (typeof this.fd != 'number') { //文件未打开,也要关闭文件且触发close事件
return this.emit('close');
}
fs.close(this.fd, () => { //若是文件打开过了 那就关闭文件而且触发close事件
this.emit("close");
})
}
复制代码
write(chunk,encoding = this.encoding,callback){ //客户调用的是write
//chunk必须是buffer或者字符串, 为了统一,若是传递的是字符串也要转成buffer
chunk = Buffer.isBuffer(chunk)?chunk : Buffer.from(chunk,encoding);
this.len +=chunk.length; //维护写入的缓存的长度
let ret = this.len <this.highWaterMark; //一个标识 比较是否达成了缓存区的大小
this.needDrain = !ret; //是否须要触发needDrain
if(this.writing){ //默认为false上面定义的,判断是否正在写入 若是是正在写入 就写入到缓存区中
this.cache.push({chunk,encoding,callback})
}else { //第一次写
this.writing = true;
this._write(chunk,encoding,()=>this.clearBuffer()); //专门实现写的方法
}
return ret; //能不能继续写了 false表示下次写的时候就要占用内存
}
复制代码
_write(chunk,encoding,clearBuffer){
if(typeof this.fd!= 'number'){
//由于write方法是同步调用,此时fd尚未获取到,因此等待获取到再执行write
return this.once('open',()=>this._write(chunk,encoding,clearBuffer));
}
//确保有fd
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
this.pos +=byteWritten; //偏移量,默认为0
this.len -=byteWritten; //每次写入后就要在内存中减小下
this.writing = false; //正在写入就不要再写拉,放缓存区
clearBuffer(); //清空缓存区
})
}
复制代码
let buffer = this.cache.shift(); //取缓存区第一个内容
if(buffer){ //缓存里有
this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer())
}else { //缓存里没有了
if(this.needDrain){ //须要触发drain事件
this.writing = false; //告诉下次直接写就能够了 不须要写到内存中
this.needDrain = false;
this.emit('drain');
}
}
复制代码
写到这里,基本就写完了,源码有3000多行,这个只是简单实现,看不太懂的时候就多写几遍(ps 我也写了好多遍)。测试下看行不行吧github