浅析node中流应用(二) 可写流(fs.createWriteStream)

上篇文章写了可读流的用法和源码实现,能够先去看下,其中有类似的地方,重复的地方就很少作介绍了,就直接写用法。有一点值得一提就是可写流有缓存区的概念html

可写流的用法

建立可写流

let fs = require('fs');
let ws = fs.createWriteStream('2.txt', {
    flags: 'w',         // 文件的操做, 'w'写入文件,不存在则建立
    mode: 0o666,
    autoClose: true,
    highWaterMark: 3,   // 默认写是16*1024
    encoding: 'utf8'
});

复制代码

ws.write()方法和drain事件

  • 每次写入后会有一个标识 flag,当写入的内容的长度超过highWaterMark就会返回false
  • true表示能够继续写入,若是返回false,表示缓存区满了,咱们应当中止读取数据以免消耗过多内存。
  • 缓存区满后,文件写入一直在进行,不一下子会把缓存区的内容所有写入,缓存区处于清空状态,这时会触发可写流的‘drain’事件

不太明白?不要紧,咱们看下面这个例子帮助咱们理解node

let fs = require("fs");
let ws = fs.createWriteStream('1.txt',{
    flags:'w',
    encoding:'utf8',
    start:0,
    highWaterMark:3
});
let i =9;
function write() {
    let flag = true;
    while (flag && i>=0){
      flag =   ws.write(i-- +'');//往1.txt写入9876543210
        console.log(flag);
    }
}
ws.on('drain',()=>{ //缓存区充满并被写入完成,处于清空状态时触发
    console.log("干了");
    write(); //当缓存区清空后咱们在继续写
})
write(); //第一次调用write方法


复制代码

看到这里,咱们应该基本明白用法,下面咱们开始写源码实现,鉴于与可读流有不少类似写法上篇文章已详细写过,重复的就很少说了,说干就干git

可写流实现原理

一、声明WriteStream的构造函数(准备工做)

let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter{
    constructor(path,options={}){
        super();
        this.path = path;
        this.flags = options.flags || 'w';
        this.encoding = options.encoding || 'utf8';
        this.start = options.start || 0;
        this.pos = this.start;
        this.mode = options.mode || 0o666;
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark || 16 * 1024;

        //第一次写入是真的往文件里写
        this.writing = false; //默认第一次不是正在写入

        // 可写流  要有一个缓存区,当正在写入文件时,内容要写入到缓存区
        this.cache = [];

        // 记录缓存区大小
        this.len =0;

        // 是否触发drain事件
        this.needDrain = false;


        this.open(); //目的是拿到fd 异步,触发一个open事件后fd确定存在啦
    }
  }
复制代码
  • 把变量声明好,下面用到的时候就知道怎么回事了
  • 值得主要的是,咱们第一次往文件里写入的时候是真的往文件写,下次要写的内容先放到缓存区,写入完成后从缓存区去取内容

二、实现open方法和destroy方法

  • 这两个方法与可读流同样,不作介绍,直接上代码
open(){
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
            if(err){
                this.emit('error', err); //打开文件发生错误,发布error事件
                this.emit('error');
                if (this.autoClose) { //若是须要自动关闭我再去销毁fd
                    this.destroy(); //关闭文件(触发关闭事件)
                }
                return;
            }
            this.fd = fd; //保存文件描述符
            this.emit('open', this.fd) //触发文件open方法
        })
    }
    destroy() {
        if (typeof this.fd != 'number') { //文件未打开,也要关闭文件且触发close事件
            return this.emit('close');
        }
        fs.close(this.fd, () => {  //若是文件打开过了 那就关闭文件而且触发close事件
            this.emit("close");
        })
    }
复制代码

三、实现write方法,与客户端调用ws.write()方法对应

write(chunk,encoding = this.encoding,callback){ //客户调用的是write
        //chunk必须是buffer或者字符串, 为了统一,若是传递的是字符串也要转成buffer
        chunk = Buffer.isBuffer(chunk)?chunk : Buffer.from(chunk,encoding);
        this.len +=chunk.length; //维护写入的缓存的长度
        let ret = this.len <this.highWaterMark; //一个标识 比较是否达成了缓存区的大小
        this.needDrain = !ret; //是否须要触发needDrain
        if(this.writing){ //默认为false上面定义的,判断是否正在写入  若是是正在写入 就写入到缓存区中
            this.cache.push({chunk,encoding,callback})
        }else { //第一次写
            this.writing = true;
            this._write(chunk,encoding,()=>this.clearBuffer()); //专门实现写的方法

        }

        return ret; //能不能继续写了 false表示下次写的时候就要占用内存

    }
复制代码
  • _write()方法 重点, 咱们专门用来实现真正的写入方法

4 、实现重头戏 _write()方法

_write(chunk,encoding,clearBuffer){
        if(typeof this.fd!= 'number'){
            //由于write方法是同步调用,此时fd尚未获取到,因此等待获取到再执行write
            return this.once('open',()=>this._write(chunk,encoding,clearBuffer));
        }
        //确保有fd
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
            this.pos +=byteWritten; //偏移量,默认为0
            this.len -=byteWritten; //每次写入后就要在内存中减小下
            this.writing = false; //正在写入就不要再写拉,放缓存区
            clearBuffer(); //清空缓存区
            
        })
    }
复制代码

五、清空缓存区

let buffer = this.cache.shift(); //取缓存区第一个内容
        if(buffer){ //缓存里有
            this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer())
        }else { //缓存里没有了
            if(this.needDrain){ //须要触发drain事件
                this.writing = false; //告诉下次直接写就能够了  不须要写到内存中
                this.needDrain = false;
                this.emit('drain');
            }
        }
复制代码
  • 分析下
  • 若是是正在写入,就先把内容放到缓存区里,就是this.cache,默认[]
  • 给数组里存入一个对象,分别对应chunk, encoding, callback即(()=>this.clearBuffer())
  • 每一次写完后都须要把cache(缓存区)里的内容清空掉
  • 当缓存区cache数组里是空的时候就会触发drain事件了

写到这里,基本就写完了,源码有3000多行,这个只是简单实现,看不太懂的时候就多写几遍(ps 我也写了好多遍)。测试下看行不行吧github

相关文章
相关标签/搜索