若是你对NodeJs系列感兴趣,欢迎关注微信公众号:前端神盾局或 github NodeJs系列文章html
流从早先的unix初出茅庐,在过去的几十年的时间里,它被证实是一种可依赖的编程方式,它能够将一个大型的系统拆成一些很小的部分,而且让这些部分之间完美地进行合做。前端
在node中,流的身影几乎无处不在,不管是操做文件、建立本地服务器仍是简单的console
,都极有可能涉及到流。node
Node.js 中有四种基本的流类型:git
假设咱们须要使用node来实现一个简单的静态文件服务器:github
const http = require('http');
const fs = require('fs');
http.createServer((req,res)=>{
fs.readFile('./test.html',function(err,data){
if(err){
res.statusCode = 500;
res.end();
}else{
res.end(data);
}
})
}).listen(3000)
复制代码
上述代码简单实现了静态文件的读取和发送,逻辑上是彻底可行的。可是因为readFile
是一次性将读取的文件存放在内存中的,假设test.html
文件很是大或者访问量增多的状况下,服务器内存颇有可能耗尽。这时咱们就须要使用流的方式进行改进:编程
const http = require('http');
const fs = require('fs');
http.createServer((req,res)=>{
fs.createReadStream('./test.html').pipe(res);
}).listen(3000);
复制代码
fs.createReadStream
建立一个可读流,逐次读取文件内容供给下游消费,这种逐步读取和消费的方式,有效减缓了内存的消耗。api
咱们能够把 Readable Stream拆分红两个阶段:push阶段和pull阶段,在push阶段,经过实现_read
方法将数据从底层数据资源池中推送到缓存池中,这是数据的生产阶段,而pull阶段,则是将缓存池的数据拉出,供下游使用,这是数据的消费阶段。缓存
在开始进一步讲解以前,咱们先来介绍几个字段,这些字段来源于node源码:bash
state.buffer
: Array
缓存池,每一个元素对应push(data)中的datastate.length
: Number
缓存池中的数据量,在objectMode
模式下,state.length === state.buffer.length
,不然,其值是state.buffer
中数据字节数的总和state.ended
: Boolean
表示底层数据池没有可读数据了(this.pull(null)
)state.flowing
: Null|Boolean
表示当前流的模式,其值有三种状况:null
(初始状态)、true
(流动模式)、false
(暂停模式)state.needReadable
: Boolean
是否须要触发readable
事件state.reading
: Boolean
是否正在读取底层数据state.sync
: Boolean
是否当即触发data
/readable
事件,false
为当即触发、true
下一个tick再触发(process.nextTick
)可读流存在两种模式:流动模式(flowing)和暂停模式(paused),在源码中使用state.flowing
来标识。服务器
两种模式其基本流程都遵循上图中的push和pull阶段,区别在于pull阶段的自主性。对于流动模式而言,只要缓存池还有未消耗的数据,那么数据便会不断的被提取,咱们能够把它想象成一个自动的水泵,只要通电了,不抽干水池的水它是不会停下来的。而对于暂停模式,它更像是打水桶,须要的时候再从水池里面打点水出来。
全部可读流都开始于暂停模式,能够经过如下方式切换到流动模式:
data
事件句柄(前提是state.flowing === null
)stream.resume()
stream.pipe()
可读流也能够经过如下方式切换回暂停模式:
readable
事件句柄stream.pause()
。stream.unpipe()
能够移除多个管道目标。read
开始对于可读流而言,消费驱动生产,只有经过调用pull阶段的read
函数,才能唤醒push阶段的数据产生,从而带动整个流的运动。因此对于可读流而言read
是一切的起点。
这是根据源码整理的一个简单的流程图,后面将对一些环节加以说明。
howMuchToRead
调用read(n)
过程当中,node会根据实际状况调整读取的数量,实际值由howMuchRead
决定
function howMuchToRead(n,state){
// 若是size <= 0或者不存在可读数据
if (n <= 0 || (state.length === 0 && state.ended))
return 0;
// objectMode模式下 每次制度一个单位长度的数据
if (state.objectMode)
return 1;
// 若是size没有指定
if (Number.isNaN(n)) {
// 执行read()时,因为流动模式下数据会不断输出,
// 因此每次只输出缓存中第一个元素输出,而非流动模式则会将缓存读空
if (state.flowing && state.length)
return state.buffer.head.data.length;
else
return state.length;
}
if (n > state.highWaterMark)
// 更新highWaterMark
state.highWaterMark = computeNewHighWaterMark(n);
// 若是缓存中的数据量够用
if (n <= state.length)
return n;
// 若是缓存中的数据不够用,
// 且资源池还有可读取的数据,那么这一次先不读取缓存数据
// 留着下一次数据量足够的时候再读取
// 不然读空缓存池
if (!state.ended) {
state.needReadable = true;
return 0;
}
return state.length;
}
复制代码
end
事件在read
函数调用过程当中,node会择机断定是否触发end
事件,断定标准主要是如下两个条件:
if (state.length === 0 && state.ended) endReadable(this);
复制代码
state.ended
为true
,经过调用
pull(null)
表示底层数据当前已经没有可读数据了
state.length === 0
本事件在调用read([size])
时触发(知足上述条件时)
doRead
doRead
用于判断是否读取底层数据
// 若是当前是暂停模式`state.needReadable`
var doRead = state.needReadable;
// 若是当前缓存池是空的或者没有足够的缓存
if (state.length === 0 || state.length - n < state.highWaterMark){
doRead = true;
}
if (state.ended || state.reading) {
doRead = false;
} else if (doRead) {
// ...
this._read(state.highWaterMark);
// ...
}
复制代码
state.reading
标志上次从底层取数据的操做是否已完成,一旦push
方法被调用,就会设置为false
,表示这次_read()
结束
data
事件在官方文档中提到:添加data
事件句柄,可使Readable Stream的模式切换到流动模式,但官方没有提到的是这一结果成立的条件-state.flowing
的值不为null
,即只有在初始状态下,监听data事件,会使流进入流动模式。举个例子:
const { Readable } = require('stream');
class ExampleReadable extends Readable{
constructor(opt){
super(opt);
this._time = 0;
}
_read(){
this.push(String(++this._time));
}
}
const exampleReadable = new ExampleReadable();
// 暂停 state.flowing === false
exampleReadable.pause();
exampleReadable.on('data',(chunk)=>{
console.log(`Received ${chunk.length} bytes of data.`);
});
复制代码
运行这个例子,咱们发现终端没有任何输出,为何会这样呢?缘由咱们能够从源码中看出端倪
if (state.flowing !== false)
this.resume();
复制代码
由此咱们能够把官方表述再完善一些:在可读流初始化状态下(state.flowing === null
),添加data
事件句柄会使流进入流动模式。
只能被可读流的实现调用,且只能在 readable._read() 方法中调用。
push是数据生产的核心,消费方经过调用read(n)
促使流输出数据,而流经过_read()使底层调用push方法将数据传给流。
在这个过程当中,push方法有可能将数据存放在缓存池内,也有可能直接经过data
事件输出。下面咱们一一分析。
若是当前流是流动的(state.flowing === true
),且缓存池内没有可读数据, 那么数据将直接由事件data
输出
// node 源码
if (state.flowing && state.length === 0 && !state.sync){
state.awaitDrain = 0;
stream.emit('data', chunk);
}
复制代码
咱们举个例子:
const { Readable } = require('stream');
class ExampleReadable extends Readable{
constructor(opt){
super(opt);
this.max = 100;
this.time = 0;
}
_read(){
const seed = setTimeout(()=>{
if(this.time > 100){
this.push(null);
}else{
this.push(String(++this.time));
}
clearTimeout(seed);
},0)
}
}
const exampleReadable = new ExampleReadable({ });
exampleReadable.on('data',(data)=>{
console.log('from data',data);
});
复制代码
readable
事件exampleReadable.on('readable',()=>{
....
});
复制代码
当咱们注册一个readable
事件后,node就会作如下处理:
state.flowing = false;
state.needReadable = true;
复制代码
readable
,stream.emit('readable');
复制代码
self.read(0);
state.flow === false
当前处于暂停模式state.length || state.ended
return !state.ended &&
(state.length < state.highWaterMark || state.length === 0);
复制代码