可写流是对数据写入'目的地'的一种抽象。node
可写流的原理其实与可读流相似,当数据过来的时候会写入缓存池,当写入的速度很慢或者写入暂停时候,数据流便会进入到队列池缓存起来,固然即便缓存池满了,剩余的数据也是存在内存json
可写流的简单用法以下代码缓存
let fs = require('fs'); let path = require('path'); let ws = fs.createWriteStream(path.join(__dirname,'1.txt'),{ highWaterMark:3, autoClose:true, flags:'w', encoding:'utf8', mode:0o666, start:0, }); let i = 9; function write(){ let flag = true; while(i>0&&flag){ flag = ws.write(--i+'','utf8',()=>{console.log('ok')}); console.log(flag) } } write(); // drain只有当缓存区充满后 而且被消费后触发 ws.on('drain',function(){ console.log('抽干') write(); });
如今就让咱们来实现一个简单的可写流,来研究可写流的内部原理,可写流有不少方法与可读流相似,这里不在重复了首先要有一个构造函数来定义一些基本选项属性,而后调用一个open放法打开文件,而且有一个destroy方法来处理关闭逻辑函数
let EventEmitter = require('events'); let fs = require('fs'); class WriteStream extends EventEmitter { constructor(path,options) { super(); this.path = path; this.highWaterMark = options.highWaterMark || 16 * 1024; this.autoClose = options.autoClose || true; this.mode = options.mode; this.start = options.start || 0; this.flags = options.flags || 'w'; this.encoding = options.encoding || 'utf8'; // 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中 // 在源码中是一个链表 => [] this.buffers = []; // 标识 是否正在写入 this.writing = false; // 是否知足触发drain事件 this.needDrain = false; // 记录写入的位置 this.pos = 0; // 记录缓存区的大小 this.length = 0; this.open(); } destroy() { if (typeof this.fd !== 'number') { return this.emit('close'); } fs.close(this.fd, () => { this.emit('close') }); } open() { fs.open(this.path, this.flags, this.mode, (err,fd) => { if (err) { this.emit('error', err); if (this.autoClose) { this.destroy(); } return; } this.fd = fd; this.emit('open'); }) } } module.exports = WriteStream;
接着咱们实现write方法来让可写流对象调用,在write方法中咱们首先将数据转化为buffer,接着实现一些事件的触发条件的逻辑,若是如今没有正在写入的话咱们就要真正的进行写入操做了,这里咱们实现一个_write方法来实现写入操做,不然则表明文件正在写入,那咱们就将流传来的数据先放在缓存区中,保证写入数据不会同时进行。ui
write(chunk,encoding=this.encoding,callback=()=>{}){ chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding); // write 返回一个boolean类型 this.length+=chunk.length; let ret = this.length<this.highWaterMark; // 比较是否达到了缓存区的大小 this.needDrain = !ret; // 是否须要触发needDrain // 判断是否正在写入 若是是正在写入 就写入到缓存区中 if(this.writing){ this.buffers.push({ encoding, chunk, callback }); // [] }else{ // 专门用来将内容 写入到文件内 this.writing = true; this._write(chunk,encoding,()=>{ callback(); this.clearBuffer(); }); // 8 } return ret; } _write(chunk,encoding,callback){ if(typeof this.fd !== 'number'){ return this.once('open',()=>this._write(chunk,encoding,callback)); } fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{ this.length -= byteWritten; this.pos += byteWritten; callback(); // 清空缓存区的内容 }); }
_write写入以后的回调中咱们会调用传入回调函数clearBuffer,这个方法会去buffers中继续递归地把数据取出,而后继续调用_write方法去写入,直到所有buffer中的数据取出后,这样就清空了buffers。this
clearBuffer(){ let buffer = this.buffers.shift(); if(buffer){ this._write(buffer.chunk,buffer.encoding,()=>{ buffer.callback(); this.clearBuffer() }); }else{ this.writing = false; if(this.needDrain){ // 是否须要触发drain 须要就发射drain事件 this.needDrain = false; this.emit('drain'); } } }
最后附上完整的代码编码
let EventEmitter = require('events'); let fs = require('fs'); class WriteStream extends EventEmitter{ constructor(path,options){ super(); this.path = path; this.highWaterMark = options.highWaterMark||16*1024; this.autoClose = options.autoClose||true; this.mode = options.mode; this.start = options.start||0; this.flags = options.flags||'w'; this.encoding = options.encoding || 'utf8'; // 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中 // 在源码中是一个链表 => [] this.buffers = []; // 标识 是否正在写入 this.writing = false; // 是否知足触发drain事件 this.needDrain = false; // 记录写入的位置 this.pos = 0; // 记录缓存区的大小 this.length = 0; this.open(); } destroy(){ if(typeof this.fd !=='number'){ return this.emit('close'); } fs.close(this.fd,()=>{ this.emit('close') }) } open(){ fs.open(this.path,this.flags,this.mode,(err,fd)=>{ if(err){ this.emit('error',err); if(this.autoClose){ this.destroy(); } return } this.fd = fd; this.emit('open'); }) } write(chunk,encoding=this.encoding,callback=()=>{}){ chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding); // write 返回一个boolean类型 this.length+=chunk.length; let ret = this.length<this.highWaterMark; // 比较是否达到了缓存区的大小 this.needDrain = !ret; // 是否须要触发needDrain // 判断是否正在写入 若是是正在写入 就写入到缓存区中 if(this.writing){ this.buffers.push({ encoding, chunk, callback }); // [] }else{ // 专门用来将内容 写入到文件内 this.writing = true; this._write(chunk,encoding,()=>{ callback(); this.clearBuffer(); }); // 8 } return ret; } clearBuffer(){ let buffer = this.buffers.shift(); if(buffer){ this._write(buffer.chunk,buffer.encoding,()=>{ buffer.callback(); this.clearBuffer() }); }else{ this.writing = false; if(this.needDrain){ // 是否须要触发drain 须要就发射drain事件 this.needDrain = false; this.emit('drain'); } } } _write(chunk,encoding,callback){ if(typeof this.fd !== 'number'){ return this.once('open',()=>this._write(chunk,encoding,callback)); } fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{ this.length -= byteWritten; this.pos += byteWritten; callback(); // 清空缓存区的内容 }); } } module.exports = WriteStream;
前面咱们了解了可读流与可写流,那么怎么让两者结合起来使用呢,node给咱们提供好了方法--Pipe管道,流顾名思义,就是在可读流与可写流中间加入一个管道,实现一边读取,一边写入,读一点写一点。code
Pipe的使用方法以下orm
let fs = require('fs'); let path = require('path'); let ReadStream = require('./ReadStream'); let WriteStream = require('./WriteStream'); let rs = new ReadStream(path.join(__dirname, './1.txt'), { highWaterMark: 4 }); let ws = new WriteStream(path.join(__dirname, './2.txt'), { highWaterMark: 1 }); // 4 1 rs.pipe(ws);
Pipe的原理比较简单,简单说监听可读流的data事件来持续获取文件中的数据,而后咱们就会去调用写流的write方法。若是可写流缓存区已满,那么当咱们获得调用可读流的pause方法来暂停读取,而后等到写流的缓存区已经所有写入而且触发drain事件时,咱们就会调用resume从新开启读取的流程。上代码对象
pipe(ws) { this.on('data', (chunk) => { let flag = ws.write(chunk); if (!flag) { this.pause(); } }); ws.on('drain', () => { this.resume(); }) }
Node容许咱们自定义流,读流继承于Readable接口,写流则继承于Writable接口,因此咱们实际上是能够自定义一个流模块,只要继承stream模块对应的接口便可。
若是咱们要自定义读流的话,那咱们就须要继承Readable,Readable里面有一个read()方法,默认调用_read(),因此咱们只要复写了_read()方法就可实现读取的逻辑,同时Readable中也提供了一个push方法,调用push方法就会触发data事件,push中的参数就是data事件回调函数的参数,当push传入的参数为null的时候就表明读流中止,上代码
let { Readable } = require('stream'); // 想实现什么流 就继承这个流 // Readable里面有一个read()方法,默认掉_read() // Readable中提供了一个push方法你调用push方法就会触发data事件 let index = 9; class MyRead extends Readable { _read() { // 可读流何时中止呢? 当push null的时候中止 if (index-- > 0) return this.push('123'); this.push(null); } } let mr = new MyRead(); mr.on('data', function(data) { console.log(data); });
与自定义读流相似,自定义写流须要继承Writable接口,而且实现一个_write()方法,这里注意的是_write中能够传入3个参数,chunk, encoding, callback,chunk就是表明写入的数据,一般是一个buffer,encoding是编码类型,一般不会用到,最后的callback要注意,它并非咱们用这个自定义写流调用write时的回调,而是咱们上面讲到写流实现时的clearBuffer函数。
let { Writable } = require('stream'); // 可写流实现_write方法 // 源码中默认调用的是Writable中的write方法 class MyWrite extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); // clearBuffer } } let mw = new MyWrite(); mw.write('111', 'utf8', () => { console.log(1); }) mw.write('222', 'utf8', () => { console.log(1); });
双工流其实就是结合了上面咱们说的自定义读流和自定义写流,它既能读也能写,同时能够作到读写之间互不干扰
let { Duplex } = require('stream'); // 双工流 又能读 又能写,并且读取能够不要紧(互不干扰) let d = Duplex({ read() { this.push('hello'); this.push(null); }, write(chunk, encoding, callback) { console.log(chunk); callback(); } }); d.on('data', function(data) { console.log(data); }); d.write('hello');
转换流的本质就是双工流,惟一不一样的是它并不须要像上面提到的双工流同样实现read和write,它只须要实现一个transform方法用于转换
let { Transform } = require('stream'); // 它的参数和可写流同样 let tranform1 = Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); // 将输入的内容放入到可读流中 callback(); } }); let tranform2 = Transform({ transform(chunk, encoding, callback){ console.log(chunk.toString()); callback(); } }); // 等待你的输入 // rs.pipe(ws); // 但愿将输入的内容转化成大写在输出出来 process.stdin.pipe(tranform1).pipe(tranform2); // 对象流 可读流里只能放buffer或者字符串 对象流里能够放对象
默认状况下,流处理的数据是Buffer/String类型的值。对象流的特色就是它有一个objectMode标志,咱们能够设置它让流能够接受任何JavaScript对象。上代码
const { Transform } = require('stream'); let fs = require('fs'); let rs = fs.createReadStream('./users.json'); rs.setEncoding('utf8'); let toJson = Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push(JSON.parse(chunk)); callback(); } }); let jsonOut = Transform({ writableObjectMode: true, transform(chunk, encoding, callback) { console.log(chunk); callback(); } }); rs.pipe(toJson).pipe(jsonOut);