流是一种数据传输手段,是有顺序的,有起点和终点,好比你要把数据从一个地方传到另一个地方
流很是重要,gulp,webpack,HTTP里的请求和响应,http里的socket都是流,包括后面压缩,加密等流为何这么好用还这么重要呢?javascript
流是一个抽象接口,被Node中不少对象所实现,好比HTTP服务器request和response对象都是流Node.js 中有四种基本的流类型:java
能够经过 require('stream') 加载 Stream 基类。其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基类
可读流(Readable streams)是对提供数据的 源头(source)的抽象
可读流的例子包括:
全部的 Readable 都实现了 stream.Readable 类定义的接口
经过流读取数据
linux
下面简单举个可读流的例子:
let fs = require('fs'); //经过建立一个可读流 let rs = fs.createReadStream('./1.txt',{ flags:'r',//咱们要对文件进行何种操做 mode:0o666,//权限位 encoding:'utf8',//不传默认为buffer,显示为字符串 start:3,//从索引为3的位置开始读 //这是个人见过惟一一个包括结束索引的 end:8,//读到索引为8结束 highWaterMark:3//缓冲区大小 }); rs.on('open',function () { console.log('文件打开'); }); rs.setEncoding('utf8');//显示为字符串 //但愿流有一个暂停和恢复触发的机制 rs.on('data',function (data) { console.log(data); rs.pause();//暂停读取和发射data事件 setTimeout(function(){ rs.resume();//恢复读取并触发data事件 },2000); }); //若是读取文件出错了,会触发error事件 rs.on('error',function () { console.log("error"); }); //若是文件的内容读完了,会触发end事件 rs.on('end',function () { console.log('读完了'); }); rs.on('close',function () { console.log('文件关闭'); }); /** 文件打开 334 455 读完了 文件关闭 **/
let fs = require('fs'); let ReadStream = require('./ReadStream'); let rs = ReadStream('./1.txt', { flags: 'r', encoding: 'utf8', start: 3, end: 7, highWaterMark: 3 }); rs.on('open', function () { console.log("open"); }); rs.on('data', function (data) { console.log(data); }); rs.on('end', function () { console.log("end"); }); rs.on('close', function () { console.log("close"); }); /** open 456 789 end close **/
let fs = require('fs'); let EventEmitter = require('events'); class ReadStream extends EventEmitter { constructor(path, options) { super(path, options); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.buffer = Buffer.alloc(this.highWaterMark); this.flags = options.flags || 'r'; this.encoding = options.encoding; this.mode = options.mode || 0o666; this.start = options.start || 0; this.end = options.end; this.pos = this.start; this.autoClose = options.autoClose || true; this.bytesRead = 0; this.closed = false; this.flowing; this.needReadable = false; this.length = 0; this.buffers = []; this.on('end', function () { if (this.autoClose) { this.destroy(); } }); this.on('newListener', (type) => { if (type == 'data') { this.flowing = true; this.read(); } if (type == 'readable') { this.read(0); } }); this.open(); } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { if (this.autoClose) { this.destroy(); return this.emit('error', err); } } this.fd = fd; this.emit('open'); }); } read(n) { if (typeof this.fd != 'number') { return this.once('open', () => this.read()); } n = parseInt(n, 10); if (n != n) { n = this.length; } if (this.length == 0) this.needReadable = true; let ret; if (0 < n < this.length) { ret = Buffer.alloc(n); let b; let index = 0; while (null != (b = this.buffers.shift())) { for (let i = 0; i < b.length; i++) { ret[index++] = b[i]; if (index == ret.length) { this.length -= n; b = b.slice(i + 1); this.buffers.unshift(b); break; } } } if (this.encoding) ret = ret.toString(this.encoding); } let _read = () => { let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark; fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => { if (err) { return } let data; if (bytesRead > 0) { data = this.buffer.slice(0, bytesRead); this.pos += bytesRead; this.length += bytesRead; if (this.end && this.pos > this.end) { if (this.needReadable) { this.emit('readable'); } this.emit('end'); } else { this.buffers.push(data); if (this.needReadable) { this.emit('readable'); this.needReadable = false; } } } else { if (this.needReadable) { this.emit('readable'); } return this.emit('end'); } }) } if (this.length == 0 || (this.length < this.highWaterMark)) { _read(0); } return ret; } destroy() { fs.close(this.fd, (err) => { this.emit('close'); }); } pause() { this.flowing = false; } resume() { this.flowing = true; this.read(); } pipe(dest) { this.on('data', (data) => { let flag = dest.write(data); if (!flag) this.pause(); }); dest.on('drain', () => { this.resume(); }); this.on('end', () => { dest.end(); }); } } module.exports = ReadStream;
为了实现可读流,引用Readable接口并用它构造新对象
var stream = require('stream'); var util = require('util'); util.inherits(Counter, stream.Readable); function Counter(options) { stream.Readable.call(this, options); this._index = 0; } Counter.prototype._read = function() { if(this._index++<3){ this.push(this._index+''); }else{ this.push(null); } }; var counter = new Counter(); counter.on('data', function(data){ console.log("读到数据: " + data.toString());//no maybe }); counter.on('end', function(data){ console.log("读完了"); });
Readable Stream 存在两种模式(flowing mode 与 paused mode),这两种模式决定了chunk数据流动的方式---自动流动仍是手工流动。那如何触发这两种模式呢:
若是 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 好比, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种状况
可读流的三种状态
webpack
在任意时刻,任意可读流应确切处于下面三种状态之一:
两种模式取决于可读流flowing状态:
flowing mode
web
经过注册data、pipe、resume能够自动获取所须要的数据,咱们来看下源码的实现
// data事件触发flowing mode if (ev === 'data') { // Start flowing on next tick if stream isn't explicitly paused if (this._readableState.flowing !== false) this.resume(); } else if (ev === 'readable') { const state = this._readableState; if (!state.endEmitted && !state.readableListening) { state.readableListening = state.needReadable = true; state.emittedReadable = false; if (!state.reading) { process.nextTick(nReadingNextTick, this); } else if (state.length) { emitReadable(this); } } } // resume触发flowing mode Readable.prototype.resume = function() { var state = this._readableState; if (!state.flowing) { debug('resume'); state.flowing = true; resume(this, state); } return this; } // pipe方法触发flowing模式 Readable.prototype.resume = function() { if (!state.flowing) { this.resume() } }
flowing mode的三种方法最后均是经过resume方法,将状态变为true:state.flowing = true
paused mode
json
在paused mode下,须要手动地读取数据,而且能够直接指定读取数据的长度
能够经过监听事件readable,触发时手工读取chunk数据:
let fs = require('fs'); let rs = fs.createReadStream('./1.txt',{ highWaterMark:3 }); rs.on('readable',function(){ console.log(rs._readableState.length); //read若是不加参数表示读取整个缓存区数据 //读取一个字段,若是可读流发现你要读的字节小于等于缓存字节大小,则直接返回 let chunk = rs.read(1); console.log(chunk); console.log(rs._readableState.length); //当你读完指定的字节后,若是可读流发现剩下的字节已经比最高水位线小了。则会立马再次读取填满 最高水位线 setTimeout(function(){ console.log(rs._readableState.length); },200) });
注意:一旦注册了readable事件,必须手工读取read数据,不然数据就会流失,咱们来看下源码的实现
function emitReadable(stream) { var state = stream._readableState; state.needReadable = false; if (!state.emittedReadable) { debug('emitReadable', state.flowing); state.emittedReadable = true; process.nextTick(emitReadable_, stream); } } function emitReadable_(stream) { var state = stream._readableState; debug('emit readable'); if (!state.destroyed && (state.length || state.ended)) { stream.emit('readable'); } state.needReadable = !state.flowing && !state.ended; flow(stream); } function flow(stream) { const state = stream._readableState; debug('flow', state.flowing); while (state.flowing && stream.read() !== null); } function endReadable(stream) { var state = stream._readableState; debug('endReadable', state.endEmitted); if (!state.endEmitted) { state.ended = true; process.nextTick(endReadableNT, state, stream); } } Readable.prototype.read = function(n) { debug('read', n); n = parseInt(n, 10); var state = this._readableState; var nOrig = n; if (n !== 0) state.emittedReadable = false; if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { debug('read: emitReadable', state.length, state.ended); if (state.length === 0 && state.ended) endReadable(this); else emitReadable(this); return null; } n = howMuchToRead(n, state); if (n === 0 && state.ended) { if (state.length === 0) endReadable(this); return null; }
flow方法直接read数据,将获得的数据经过事件data交付出去,然而此处没有注册data事件监控,所以,获得的chunk数据并无交付给任何对象,这样数据就白白流失了,因此在触发emit('readable')时,须要提早read数据
可写流是对数据写入'目的地'的一种抽象
Writable:可写流的例子包括了:
下面举个可写流的简单例子
let fs = require('fs'); let ws = fs.createWriteStream('./2.txt',{ flags:'w', mode:0o666, start:3, highWaterMark:3//默认是16K });
let flag = ws.write('1'); console.log(flag);//true flag =ws.write('2'); console.log(flag);//true flag =ws.write('3'); console.log(flag);//false flag =ws.write('4'); console.log(flag);//false
'drain' 事件
gulp
若是调用 stream.write(chunk) 方法返回 false,流将在适当的时机触发 'drain' 事件,这时才能够继续向流中写入数据当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 而且返回 false。 一旦全部当前全部缓存的数据块都排空了(被操做系统接受来进行输出), 那么 'drain' 事件就会被触发缓存
建议, 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块服务器
举个简单的例子说明一下:异步
let fs = require('fs'); let ws = fs.createWriteStream('2.txt',{ flags:'w', mode:0o666, start:0, highWaterMark:3 }); let count = 9; function write(){ let flag = true;//缓存区未满 //写入方法是同步的,可是写入文件的过程是异步的。在真正写入文件后还会执行咱们的回调函数 while(flag && count>0){ console.log('before',count); flag = ws.write((count)+'','utf8',(function (i) { return ()=>console.log('after',i); })(count)); count--; } } write();//987 //监听缓存区清空事件 ws.on('drain',function () { console.log('drain'); write();//654 321 }); ws.on('error',function (err) { console.log(err); }); /** before 9 before 8 before 7 after 9 after 8 after 7 **/
若是已经再也不须要写入了,能够调用end方法关闭写入流,一旦调用end方法以后则不能再写入
好比在ws.end();
后写ws.write('x');
,会报错write after end
'pipe'事件
linux精典的管道的概念,前者的输出是后者的输入pipe是一种最简单直接的方法链接两个stream,内部实现了数据传递的整个过程,在开发的时候不须要关注内部数据的流动
pipe方法的原理
var fs = require('fs'); var ws = fs.createWriteStream('./2.txt'); var rs = fs.createReadStream('./1.txt'); rs.on('data', function (data) { var flag = ws.write(data); if(!flag) rs.pause(); }); ws.on('drain', function () { rs.resume(); }); rs.on('end', function () { ws.end(); });
下面举个简单的例子说明一下pipe的用法:
let fs = require('fs'); let rs = fs.createReadStream('./1.txt',{ highWaterMark:3 }); let ws = fs.createWriteStream('./2.txt',{ highWaterMark:3 }); rs.pipe(ws); //移除目标可写流 rs.unpipe(ws);
rs.on('data',function (data) { console.log(data); let flag = ws.write(data); if(!flag){ rs.pause(); } });
ws.on('drain',function () { console.log('drain'); rs.resume(); });
unpipe
readable.unpipe()方法将以前经过stream.pipe()方法绑定的流分离
简单距离说明下unpipe的用法:
let fs = require('fs'); var from = fs.createReadStream('./1.txt'); var to = fs.createWriteStream('./2.txt'); from.pipe(to); setTimeout(() => { console.log('关闭向2.txt的写入'); from.unpipe(writable); console.log('手工关闭文件流'); to.end(); }, 1000);
let fs = require('fs'); let ReadStream = require('./ReadStream'); let rs = ReadStream('./1.txt', { flags: 'r', encoding: 'utf8', highWaterMark: 3 }); let FileWriteStream = require('./WriteStream'); let ws = FileWriteStream('./2.txt',{ flags:'w', encoding:'utf8', highWaterMark:3 }); rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) { this.on('data', (data)=>{ let flag = dest.write(data); if(!flag){ this.pause(); } }); dest.on('drain', ()=>{ this.resume(); }); this.on('end', ()=>{ dest.end(); }); } ReadStream.prototype.pause = function(){ this.flowing = false; } ReadStream.prototype.resume = function(){ this.flowing = true; this.read(); }
const stream = require('stream') var index = 0; const readable = stream.Readable({ highWaterMark: 2, read: function () { process.nextTick(() => { console.log('push', ++index) this.push(index+''); }) } }) const writable = stream.Writable({ highWaterMark: 2, write: function (chunk, encoding, next) { console.log('写入:', chunk.toString()) } }) readable.pipe(writable);
let fs = require('fs'); let FileWriteStream = require('./FileWriteStream'); let ws = FileWriteStream('./2.txt',{ flags:'w', encoding:'utf8', highWaterMark:3 }); let i = 10; function write(){ let flag = true; while(i&&flag){ flag = ws.write("1",'utf8',(function(i){ return function(){ console.log(i); } })(i)); i--; console.log(flag); } } write(); ws.on('drain',()=>{ console.log("drain"); write(); }); /** 10 9 8 drain 7 6 5 drain 4 3 2 drain 1 **/
let EventEmitter = require('events'); let util = require('util'); let fs = require('fs'); util.inherits(WriteStream, EventEmitter); function WriteStream(path, options) { EventEmitter.call(this); if (!(this instanceof WriteStream)) { return new WriteStream(path, options); } this.path = path; this.fd = options.fd; this.encoding = options.encoding||'utf8'; this.flags = options.flags || 'w'; this.mode = options.mode || 0o666; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.pos = this.start;//开始写入的索引位置 this.open();//打开文件进行操做 this.writing = false;//没有在写入过程 中 this.buffers = []; this.highWaterMark = options.highWaterMark||16*1024; //若是监听到end事件,并且要求自动关闭的话则关闭文件 this.on('end', function () { if (this.autoClose) { this.destroy() } }); } WriteStream.prototype.close = function(){ fs.close(this.fd,(err)=>{ if(err) this.emit('error',err); }); } WriteStream.prototype.open = function () { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) return this.emit('error', err); this.fd = fd;//把文件描述符赋给当前实例的fd属性 //发射open事件 this.emit('open', fd); }); } /** * 会判断当前是后台是否在写入过程当中,若是在写入过程当中,则把这个数据放在待处理的缓存中,若是不在写入过程当中,能够直接写。 */ WriteStream.prototype.write = function (chunk, encoding, cb) { chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding); //先把数据放在缓存里 this.buffers.push({ chunk, encoding, cb }); let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark; //只有当缓存区写满了,那么清空缓存区的时候才会发射drain事件,不然 不发放 this.needDrain = isFull; //若是说文件尚未打开,则把写入的方法压入open事件的监听函数。等文件一旦打开,马上执行写入操做 if (typeof this.fd !== 'number') { this.once('open', () => { this._write(); }); return !isFull; }else{ if(!this.writing){ setImmediate(()=>{ this._write(); this.writing = true; }); } return !isFull; } } WriteStream.prototype._write = function () { let part = this.buffers.shift(); if (part) { fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{ if(err)return this.emit('error',err); part.cb && part.cb(); this._write(); }); }else{ //发射一个缓存区清空的事件 this.emit('drain'); this.writing = false; } } module.exports = WriteStream;
为了实现可写流,咱们须要使用流模块中的Writable构造函数。 咱们只需给Writable构造函数传递一些选项并建立一个对象。惟一须要的选项是write函数,该函数揭露数据块要往哪里写
var stream = require('stream'); var util = require('util'); util.inherits(Writer, stream.Writable); let stock = []; function Writer(opt) { stream.Writable.call(this, opt); } Writer.prototype._write = function(chunk, encoding, callback) { setTimeout(()=>{ stock.push(chunk.toString('utf8')); console.log("增长: " + chunk); callback(); },500) }; var w = new Writer(); for (var i=1; i<=5; i++){ w.write("项目:" + i, 'utf8'); } w.end("结束写入",function(){ console.log(stock); });
Duplex 流是同时实现了 Readable 和 Writable 接口的流
双工流的可读性和可写性操做彻底独立于彼此,这仅仅是将两个特性组合成一个对象Duplex 流的实例包括了:
下面简单实现双工流:
const {Duplex} = require('stream'); const inoutStream = new Duplex({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); }, read(size) { this.push((++this.index)+''); if (this.index > 3) { this.push(null); } } }); inoutStream.index = 0; process.stdin.pipe(inoutStream).pipe(process.stdout);
变换流(Transform streams) 是一种 Duplex 流。它的输出与输入是经过某种方式关联的。和全部 Duplex 流同样,变换流同时实现了 Readable 和 Writable 接口转换流的输出是从输入中计算出来的
对于转换流,咱们没必要实现read或write的方法,咱们只须要实现一个transform方法,将二者结合起来。它有write方法的意思,咱们也能够用它来push数据变换流的实例包括:
下面简单实现转换流:
const {Transform} = require('stream'); const upperCase = new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } }); process.stdin.pipe(upperCase).pipe(process.stdout);
默认状况下,流处理的数据是Buffer/String类型的值。有一个objectMode标志,咱们能够设置它让流能够接受任何JavaScript对象
const {Transform} = require('stream'); let fs = require('fs'); let rs = fs.createReadStream('./users.json'); rs.setEncoding('utf8'); let toJson = Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push(JSON.parse(chunk)); callback(); } }); let jsonOut = Transform({ writableObjectMode: true, transform(chunk, encoding, callback) { console.log(chunk); callback(); } }); rs.pipe(toJson).pipe(jsonOut);