流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 能够很容易地来构建实现流接口的对象。api
Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。缓存
流能够是可读的、可写的,或是可读写的。全部的流都是 EventEmitter 的实例。异步
这里咱们举一个简单的例子:函数
咱们打算读取一个文件,使用 fs.readFileSync 同步读取一个文件,程序会被阻塞,全部的数据都会被读取到内存中。ui
换用 fs.readFile 读取文件,程序不会被阻塞,可是全部的数据依旧会被一次性所有被读取到内存中。this
当处理大文件压缩、归档、媒体文件和巨大的日志文件的时候,内存使用就成了问题,如今你们通常家用机内存大多数都是8G、16G,软件包还在日益增大,在这种状况下,流的优点就体现出来了。spa
流被设计为异步的方式,在内存中只开启一个固定的空间,将文件化整为零,以流动的方式进行传输操做,解决了以上问题。设计
Node.js 中有四种基本的流类型:日志
Readable - 可读的流 (例如 fs.createReadStream()).code
Writable - 可写的流 (例如 fs.createWriteStream()).
Duplex - 可读写的流 (例如 net.Socket).
Transform - 在读写过程当中能够修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).
可读流有两种模式:
一、流动模式(flowing):可读流自动读取数据,经过EventEmitter接口的事件尽快将数据提供给应用。
二、暂停模式(paused):必须显式调用stream.read()方法来从流中读取数据片断。
能够经过三种途径切换到流动模式:
流动模式切换到暂停模式的api有:
可读流事件:'data','readable','error','close','end'
咱们能够想象下家用热水器的模型,热水器的水箱(buffer缓存区)里面存着热水(数据),在咱们用热水的时候,开启水龙头,自来水会不断的进入水箱,再从水箱由水龙头流出来供咱们使用。这就是进入了“flowing”模式。当咱们关闭水龙头时候,水箱则会暂停进水,水龙头也会暂停出水,这是就进入了“paused”模式。
const fs = require('fs') const path = require('path') const rs = fs.createReadStream(path.join(__dirname, './1.txt')) rs.setEncoding('utf8') rs.on('data', (data) => { console.log(data) })
const fs = require('fs') const path = require('path') const rs = fs.createReadStream(path.join(__dirname, './1.txt')) rs.setEncoding('utf8') rs.on('readable', () => { let d = rs.read(1) console.log(d) })
咱们来实现一个简单的流动模式下的可读流介绍其原理,由NODEJS官方文档可知,流继承自EventEmitter模块,而后咱们定义一些默认参数、缓存区、模式:
let EventEmitter = require('events'); let fs = require('fs'); class ReadStream extends EventEmitter { constructor(path,options) { super(); this.path = path; this.flags = options.flags || 'r'; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; this.encoding = options.encoding || null this.buffer = Buffer.alloc(this.highWaterMark);//定义缓存区大小 this.pos = this.start; // pos 读取的位置 可变 start不变的 this.flowing = null; // null就是暂停模式 } } module.exports = ReadStream;
接着在咱们须要定义一个打开文件的方法用于打开文件。还有一个一个destroy方法,用于在文件操做出错或者读完以后关闭文件。
open(){ fs.open(this.path,this.flags,(err,fd)=>{ if(err){ this.emit('error',err); if(this.autoClose){ // 是否自动关闭 this.destroy(); } return; } this.fd = fd; // 保存文件描述符 this.emit('open'); // 文件打开了 }); } destroy(){ // 先判断有没有fd 有关闭文件 触发close事件 if(typeof this.fd ==='number'){ fs.close(this.fd,()=>{ this.emit('close'); }); return; } this.emit('close'); // 销毁 }
接着要在构造函数中调用open方法,当用户绑定data监听时,修改可读流的模式:
constructor(path,options){ super(); this.path = path; this.flags = options.flags || 'r'; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; this.encoding = options.encoding || null this.flowing = null; this.buffer = Buffer.alloc(this.highWaterMark); this.pos = this.start; this.open();//打开文件 fd this.on('newListener',(eventName,callback)=>{ if(eventName === 'data'){ // 至关于用户监听了data事件 this.flowing = true; // 监听了 就去读 this.read(); // 去读内容了 } }) }
接下来咱们实现最总要的read方法,首先要保证文件已经打开,接着镀组文件进入缓存,触发data事件传入数据,若是处于流动模式,继续读取直到读完文件。
read(){ // 此时文件还没打开呢 if(typeof this.fd !== 'number'){ // 当文件真正打开的时候 会触发open事件,触发事件后再执行read,此时fd确定有了 return this.once('open',()=>this.read()) } // 此时有fd了 // 应该填highWaterMark? // 想读4个 写的是3 每次读3个 // 123 4 let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark; fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{ // 读到了多少个 累加 if(bytesRead>0){ this.pos+= bytesRead; let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead); this.emit('data',data); // 当读取的位置 大于了末尾 就是读取完毕了 if(this.pos > this.end){ this.emit('end'); this.destroy(); } if(this.flowing) { // 流动模式继续触发 this.read(); } }else{ this.emit('end'); this.destroy(); } }); }
剩下的pause和resume方法,很简单
resume() { this.flowing = true; this.read(); } pause() { this.flowing = false; }
简单的流实现完成了,看一下完整代码
let EventEmitter = require('events'); let fs = require('fs'); class ReadStream extends EventEmitter { constructor(path, options) { super(); this.path = path; this.flags = options.flags || 'r'; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; this.encoding = options.encoding || null this.open(); this.flowing = null; // null就是暂停模式 this.buffer = Buffer.alloc(this.highWaterMark); this.pos = this.start; this.on('newListener', (eventName,callback) => { if (eventName === 'data') { this.flowing = true; this.read(); } }) } read(){ if (typeof this.fd !== 'number') { return this.once('open', () => this.read()) } let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos+1) : this.highWaterMark; fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err,bytesRead) => { if (bytesRead > 0) { this.pos += bytesRead; let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead); this.emit('data', data); if(this.pos > this.end){ this.emit('end'); this.destroy(); } if(this.flowing) { this.read(); } }else{ this.emit('end'); this.destroy(); } }); } resume() { this.flowing = true; this.read(); } pause() { this.flowing = false; } destroy() { if(typeof this.fd === 'number') { fs.close(this.fd, () => { this.emit('close'); }); return; } this.emit('close'); }; open() { fs.open(this.path, this.flags, (err,fd) => { if (err) { this.emit('error', err); if (this.autoClose) { this.destroy(); } return; } this.fd = fd; this.emit('open'); }); } } module.exports = ReadStream;
以上是流动模式的可读流实现原理,暂停模式的可读流原理与流动模式的主要区别在于监听readable事件的绑定与read方法,先实现监听绑定readable事件回调函数时,调用read方法读取数据到缓存区,定义一个读取方法_read
constructor(path, options) { super(); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.autoClose = options.autoClose || true; this.start = 0; this.end = options.end; this.flags = options.flags || 'r'; this.buffers = []; // 缓存区 this.pos = this.start; this.length = 0; // 缓存区大小 this.emittedReadable = false; this.reading = false; // 不是正在读取的 this.open(); this.on('newListener', (eventName) => { if (eventName === 'readable') { this.read(); } }) } read(n) { if (this.length == 0) { this.emittedReadable = true; } if (this.length < this.highWaterMark) { if(!this.reading) { this.reading = true; this._read(); } } } _read() { if (typeof this.fd !== 'number') { return this.once('open', () => this._read()); } let buffer = Buffer.alloc(this.highWaterMark); fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => { if (bytesRead > 0) { this.buffers.push(buffer.slice(0, bytesRead)); this.pos += bytesRead; this.length += bytesRead; this.reading = false; if (this.emittedReadable) { this.emittedReadable = false; this.emit('readable'); } } else { this.emit('end'); this.destroy(); } }) }
由api可知,暂停模式下的可读流手动调用read方法参数能够大于highWaterMark,为了处理这种状况,咱们先写一个函数computeNewHighWaterMark,取到大于等于n的最小2的n次方的整数
function computeNewHighWaterMark(n) { n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; return n; }
而后写read方法,要考虑全n的各类状况,上代码
read(n) { if(n>this.length){ // 更改缓存区大小 读取五个就找 2的几回放最近的 this.highWaterMark = computeNewHighWaterMark(n) this.emittedReadable = true; this._read(); } // 若是n>0 去缓存区中取吧 let buffer=null; let index = 0; // 维护buffer的索引的 let flag = true; if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多 // 在缓存区中取 [[2,3],[4,5,6]] buffer = Buffer.alloc(n); // 这是要返回的buffer let buf; while (flag&&(buf = this.buffers.shift())) { for (let i = 0; i < buf.length; i++) { buffer[index++] = buf[i]; if(index === n){ // 拷贝够了 不须要拷贝了 flag = false; this.length -= n; let bufferArr = buf.slice(i+1); // 取出留下的部分 // 若是有剩下的内容 在放入到缓存中 if(bufferArr.length > 0){ this.buffers.unshift(bufferArr); } break; } } } } // 当前缓存区 小于highWaterMark时在去读取 if (this.length == 0) { this.emittedReadable = true; } if (this.length < this.highWaterMark) { if(!this.reading){ this.reading = true; this._read(); // 异步的 } } return buffer }
附上可读流暂停模式的完整实现原理代码
let fs = require('fs'); let EventEmitter = require('events'); function computeNewHighWaterMark(n) { n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; return n; } class ReadStream extends EventEmitter { constructor(path, options) { super(); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.autoClose = options.autoClose || true; this.start = 0; this.end = options.end; this.flags = options.flags || 'r'; this.buffers = []; // 缓存区 this.pos = this.start; this.length = 0; // 缓存区大小 this.emittedReadable = false; this.reading = false; // 不是正在读取的 this.open(); this.on('newListener', (eventName) => { if (eventName === 'readable') { this.read(); } }) } read(n) { if(n>this.length){ // 更改缓存区大小 读取五个就找 2的几回放最近的 this.highWaterMark = computeNewHighWaterMark(n) this.emittedReadable = true; this._read(); } // 若是n>0 去缓存区中取吧 let buffer=null; let index = 0; // 维护buffer的索引的 let flag = true; if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多 // 在缓存区中取 [[2,3],[4,5,6]] buffer = Buffer.alloc(n); // 这是要返回的buffer let buf; while (flag&&(buf = this.buffers.shift())) { for (let i = 0; i < buf.length; i++) { buffer[index++] = buf[i]; if(index === n){ // 拷贝够了 不须要拷贝了 flag = false; this.length -= n; let bufferArr = buf.slice(i+1); // 取出留下的部分 // 若是有剩下的内容 在放入到缓存中 if(bufferArr.length > 0){ this.buffers.unshift(bufferArr); } break; } } } } // 当前缓存区 小于highWaterMark时在去读取 if (this.length == 0) { this.emittedReadable = true; } if (this.length < this.highWaterMark) { if(!this.reading){ this.reading = true; this._read(); // 异步的 } } return buffer } // 封装的读取的方法 _read() { // 当文件打开后在去读取 if (typeof this.fd !== 'number') { return this.once('open', () => this._read()); } // 上来我要喝水 先倒三升水 [] let buffer = Buffer.alloc(this.highWaterMark); fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => { if (bytesRead > 0) { // 默认读取的内容放到缓存区中 this.buffers.push(buffer.slice(0, bytesRead)); this.pos += bytesRead; // 维护读取的索引 this.length += bytesRead;// 维护缓存区的大小 this.reading = false; // 是否须要触发readable事件 if (this.emittedReadable) { this.emittedReadable = false; // 下次默认不触发 this.emit('readable'); } } else { this.emit('end'); this.destroy(); } }) } destroy() { if (typeof this.fd !== 'number') { return this.emit('close') } fs.close(this.fd, () => { this.emit('close') }) } open() { fs.open(this.path, this.flags, (err, fd) => { if (err) { this.emit('error', err); if (this.autoClose) { this.destroy(); } return } this.fd = fd; this.emit('open'); }); } } module.exports = ReadStream;