可读流(Readable stream)接口是对你正在读取的数据的来源的抽象。换句话说,数据来来自可读流(Readable stream)不会分发数据,直到你代表准备就绪。
可读流(Readable stream) 有2种模式: 流动模式(flowing mode) 和 暂停模式(paused mode). 流动模式(flowing mode)时,尽快的从底层系统读取数据并提供给你的程序。 暂停模式(paused mode)时, 你必须明确的调用 stream.read() 来读取数据。 暂停模式(paused mode) 是默认模式。
能够经过下面几个方法,将流切换到流动模式(flowing mode)。json
let fs = require('fs'); /** * 全部初始工做模式为 paused 的 Readable 流,能够经过下面三种途径切换到 flowing 模式: 监听 'data' 事件 调用 stream.resume() 方法 调用 stream.pipe() 方法将数据发送到 Writable */ let rs = fs.createReadStream('./1.txt',{ highWaterMark:3 }); /* 269 stream.emit('data', chunk); stream.read(0); rs.on('data',function (data) { console.log(data); }); rs.on('end',function () { console.log('end'); });*/ //当你监听 readable事件的时候,会进入暂停模式 //当监听readable事件的时候,可读流会立刻去向底层读取文件,而后把读到文件的文件放在缓存区里const state = this._readableState; //self.read(0); 只填充缓存,可是并不会发射data事件,可是会发射stream.emit('readable');事件 //this._read(state.highWaterMark); 每次调用底层的方法读取的时候是读取3个字节 rs.on('readable',function(){ //length就是指得缓存区数据的大小 // state.length += chunk.length;==3 console.log(rs._readableState.length); //read若是不加参数表示读取整个缓存区数据 //读取一个字段,若是可读流发现你要读的字节小于等于缓存字节大小,则直接返回 let ch = rs.read(1); console.log(ch); console.log(rs._readableState.length); /* ch = rs.read(1); console.log(ch); console.log(rs._readableState.length);*/ //当你读完指定的字节后,若是可读流发现剩下的字节已经比最高水位线小了。则会立马再次读取填满 最高水位线 setTimeout(function(){ console.log(rs._readableState.length); },200) });
这个方法向底层系统写入数据,并在数据处理完毕后调用所给的回调。返回值表示你是否应该继续当即写入。若是数据要缓存在内部,将会返回false。不然返回 true。返回值仅供参考。即便返回 false,你也可能继续写。可是写会缓存在内存里,因此不要作的太过度。最好的办法是等待drain 事件后,再写入数据。缓存
let fs = require('fs'); let ws = fs.createWriteStream('2.txt',{ flags:'w', mode:0o666, start:0, highWaterMark:3 }); let count = 9; function write(){ let flag = true;//缓存区未满 //写入方法是同步的,可是写入文件的过程 异步的。在真正写入文件后还会执行咱们的回调函数 while(flag && count>0){ console.log('before',count); flag = ws.write((count)+'','utf8',(function (i) { return ()=>console.log('after',i); })(count)); count--; } } write();//987 //监听缓存区清空事件 ws.on('drain',function () { console.log('drain'); write();//654 321 }); ws.on('error',function (err) { console.log(err); }); //若是已经再也不须要写入了,能够调用end方法关闭写入流,一旦调用end方法以后则不能再写入 ws.end(); //write after end // ws.write('x');
双工流(Duplex streams)是同时实现了 Readable and Writable 接口。用法详见下文异步
let {Duplex} = require('stream'); let index = 0; let s = Duplex({ read(){ if(index++<3) this.push('a'); else this.push(null); }, write(chunk,encoding,cb){ console.log(chunk.toString().toUpperCase()); cb(); } }); //process.stdin 标准输入流 //proces.stdout标准输出流 process.stdin.pipe(s).pipe(process.stdout);
它的输出是从输入计算得来。 它实现了Readable 和 Writable 接口. 用法详见下文.函数
let {Transform} = require('stream'); //转换流是实现数据转换的 let t = Transform({ transform(chunk,encoding,cb){ this.push(chunk.toString().toUpperCase()); cb(); } }); process.stdin.pipe(t).pipe(process.stdout);
let {Transform} = require('stream'); let fs = require('fs'); let rs = fs.createReadStream('./user.json'); //普通流里的放的是Buffer,对象流里放的对象 let toJSON = Transform({ readableObjectMode:true,//就能够向可读流里放对象 transform(chunk,encoding,cb){ //向可读流里的缓存区里放 this.push(JSON.parse(chunk.toString())); } }); let outJSON = Transform({ writableObjectMode:true,//就能够向可读流里放对象 transform(chunk,encoding,cb){ console.log(chunk); cb(); } }); rs.pipe(toJSON).pipe(outJSON);