Node.js Stream(流)

流的概念

  • 流是一组有序的,有起点和终点的字节数据传输手段
  • 它不关心文件的总体内容,只关注是否从文件中读到了数据,以及读到数据以后的处理
  • 流(stream)在Node.js中是一个抽象接口,被Node中的不少对象所实现。好比HTTP 服务器request和response对象都是流
  • 流能够是可读的、可写的,或是可读写的。全部的流都是 EventEmitter 的实例。

可读流

1、两种模式

可读流会工做在下面两种模式之一node

  • flowing模式:可读流 自动(不断的) 从底层读取数据(直到读取完毕),并经过EventEmitter 接口的事件尽快将数据提供给应用
  • paused模式:必须显示调用stream.read() 方法来从流中读取数据片断

全部初始工做模式为 paused 的可读流,能够经过下面三种途径切换到 flowing 模式:api

  • 监听 'data' 事件。
  • 调用 stream.resume() 方法。
  • 调用 stream.pipe() 方法将数据发送到 Writable。

可读流能够经过下面途径切换到 paused 模式:缓存

  • 若是不存在管道目标(pipe destination),能够经过调用 stream.pause() 方法实现。
  • 若是存在管道目标,能够经过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除全部管道目标来实现。

下面演示如何从流中读取数据
注:文件1.txt中的内容是1234567890bash

let fs = require("fs");
let rs = fs.createReadStream('1.txt',{   //这些参数是可选的,不须要精细控制能够不设置
    flags:'r',      //文件的操做是读取操做
    encoding:'utf8',//默认是null,null表明buffer,会按照encoding输出内容
    highWaterMark:3,//单位是字节,表示一次读取多少字节,默认是64k
    autoClose:true,//读完是否自动关闭
    start:0,       //读取的起始位置
    end:9          //读取的结束位置,包括9这个位置的内容
})
//rs.setEncoding('utf8');  //能够设置编码方式

rs.on('open',function(){
    console.log('open');
})

rs.on('data',function(data){
     console.log('data');
})

rs.on('error',function(){
    console.log('error');
})
rs.on('end',function(){
    console.log('end');
})
rs.on('close',function(){
    console.log('close');
})
复制代码

执行结果
服务器

open
123
456
789
0
end
close
复制代码

一、fs.createReadStream建立可读流实例时,默认打开文件,触发open事件(并非每一个流都会触发open事件),但此时并不会将文件中的内容输出(由于处于‘暂停模式’,没有事件消费),而是将数据存储到内部的缓冲器buffer,buffer的大小取决于highWaterMark参数,读取大小达到highWaterMark指定的阈值时,流会暂停从底层资源读取数据,直到当前缓冲器的数据被消费
异步

二、这里的rs能够理解为流的消费者,当消费者监听了'data'事件时,就开始消费数据,可读流会从paused切换到flowing“流动模式”,不断的向消费者提供数据,直到没有数据
函数

三、从打印结果能够看出,可读流每次读取highWaterMark个数据,交给消费者,因此先打印123,再打印456 ... ...
ui

四、当读完文件,也就是数据被彻底消费后,触发end事件
编码

五、最后流或者底层资源文件关闭后,这里就是1.txt这个文件关闭后,触发close事件
spa

六、error事件一般会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。

七、fs.createReadStream第二个参数是可选的,可不填,或只设置部分,好比编码,不须要精细控制能够不设置

模式切换

rs.on('data',function(data){ // 暂停模式 -> 流动模式
    console.log(data);
    rs.pause(); // 暂停方法 表示暂停读取,暂停data事件触发
});
setTimeout(function () {
    rs.resume(); //恢复data事件触发,变为流动模式
},1000)
//结果 open  123  456
复制代码

一、上例当监听data事件时,可读流处于flowing模式,调用了pause()方法,会暂停data事件的触发,切换到paused模式
二、resume()能够恢复data事件触发,再切换到flowing模式
三、上例中,setTimeout中切换流到flowing模式后,data事件触发,但又遇到pause(),因此暂停了输出,只打印到6

注意: 若是可读流切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 好比, 调用了可读流的resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种状况。

2、readable事件

let fs = require('fs');
let rs = fs.createReadStream('1.txt',{
    highWaterMark:2
});

rs.on('readable',function(){
    console.log('begin');
    let result = rs.read(2);
    console.log('result '+result);
});
复制代码

'readable' 事件将在流中有数据可供读取时触发
上面已经说过,当咱们建立可读流时,就会先把缓存区填满(highWaterMark为指定的单次缓存区大小),等待消费
若是缓存区被清空(消费)后,会触发readable事件
到达流数据尾部时,readable事件也会触发,触发顺序在end事件以前

rs.read(size)

  • 该方法从内部缓冲区中收取并返回一些数据,若是没有可读数据,返回null
  • size是可选的,指定要读取size个字节,若是没有指定,内部缓冲区所包含的全部数据将返回
  • 若是size字节不可读,返回null,若是此时流没有结束(除非流已经结束),会将全部保留在内部缓冲区的数据将被返回。好比:文件中有1个可读字节,可是指定size为2,这时调用read(2)会返回null,若是流没有结束,那么会再次触发readable事件,将已经读到内部缓冲区中的那一个字节也返回
  • rs.read()方法只应该在暂停模式下的可读流上运行,在流动模式下,read会自动调用,直到内部缓冲区数据彻底耗尽

因此,上例中,若是文件1.txt中内容是 a,输出结果

begin
result null
begin
result a
复制代码

说明:highWaterMark是2,但文件只有a,因此只有1个字节在缓存区,而size指定了2,2个字节被认为是不可读的,返回null;再次触发readable,将缓存区内容所有返回

若是内容是ab

begin
result ab
begin
result null
复制代码

说明:highWaterMark是2,因此一开始缓存中有2个字节,size指定了2,因此将ab所有读取,缓存清空——>继续缓存,发现到文件末尾,因而触发readable返回null

若是内容是abc,输出

begin
result ab
begin
result null
begin
result c
复制代码

说明:一开始缓存了2个,被消费掉,继续缓存c,并触发readable,再次read(2),此时没有2个字节的数据,被认为是不可读的,返回null,而且再次触发readable将缓存中剩余数据读取返回

若是内容是abcd,输出

begin
result ab
begin
result cd
begin
result null
复制代码

说明:先读完2个字节,即ab输出,缓存区被清空,因此会再次触发readable事件,再read(2)读出cd,继续自动缓存,发现到了文件末尾,又会触发readable,返回null

在某些状况下,为 'readable' 事件添加回调将会致使一些数据被读取到内部缓存中

这句话个人理解是,当消费数据大小 < 缓存区大小,可读流会自动添加highWaterMark个数据到缓存,那么新添加的数据和以前缓存区中未被消费的数据加一块儿,有可能超过了highWaterMark大小,即缓存区大小增长了

下面将highWaterMark改成3,read(1)再来看看怎么执行的

let rs = fs.createReadStream('1.txt',{
    highWaterMark:3
});
rs.on('readable',function(){
    console.log('begin');
    let result = rs.read(1);
    console.log('result '+result);
});
复制代码

当1.txt内容是 a,输出

begin
result a
begin
result null
复制代码

说明:缓存中只有a,也只读了一个(read(1)),消费后,缓存区清空,再去读取时,已经到了文件末尾,返回null

当1.txt内容是 ab,输出

begin
result a
begin
result b
复制代码

说明:缓存中有ab,当读完a后,继续缓存,发现到了文件末尾,触发readable,而此时缓存中还有b,所以将b返回

当读取个数size > 缓存区个数,会去更改缓存区的大小highWaterMark(规则为找知足>=size的最小的2的几回方)

let rs = fs.createReadStream('1.txt',{
    highWaterMark:3
});

rs.on('readable',function(){
    console.log('begin');
    let result = rs.read(4);
    console.log('result '+result);
});
复制代码

当1.txt中内容是abcdefgh,输出

begin
result null
begin
result abcd
复制代码

说明:读取的size(4)>缓存,认为是不可读的,size返回null;这时会从新计算highWaterMark大小,离4最近的是2的2次方,为4,因此highWaterMark此时等于4,返回了abcd;继续缓存efgh

但若是1.txt内容是abcdefg,输出

begin
result null
begin
result abcd
begin
result efg
复制代码

同上,但当返回abcd继续自动缓存4个时,发现读到文件末尾,将缓存数据返回,因此efg也输出

可写流

可写流是对数据写入'目的地'的一种抽象。

可写流基本用法

let fs = require('fs');
let ws = fs.createWriteStream('./1.txt',{
    flags:'w',
    mode:0o666,
    autoClose:true,
    highWaterMark:3, // 默认是16k ,而createReadStream是64k
    encoding:'utf8',//默认是utf8
    start:0
});
for(let i = 0;i<4;i++){
    let flag =  ws.write(i+'');
    console.log(flag)
}
ws.end("ok");// 标记文件末尾

ws.on('open',function(){
    console.log('open')
});

ws.on('error',function(err){
    console.log(err);
});

ws.on('finish',function(err){
    console.log('finish');
});

ws.on('close',function(){
    console.log('close')
});
复制代码

打印结果

true
true
false
false
open
finish
close
复制代码

写入文件1.txt的结果 0123ok

一、fs.createWriteStream建立可写流,一样默认会打开文件

二、可写流经过反复调用 ws.write(chunk) 方法将数据放到内部缓冲器
写入的数据chunk必须是字符串或者buffer
write虽然是个异步方法,但有返回值,这个返回值flag的含义,不是文件是否写入,而是表示可否继续写入
即缓冲器总大小 < highWaterMark时,能够继续写入,flag为true;
一旦内部缓冲器大小达到或超过highWaterMark,flag返回false;
注意,即便flag为flase,写入的内容也不会丢失

三、上例中指定的highWaterMark是3,调用write时一次写入了一个字节,当调用第三次write方法时,缓冲器中的数据大小达到3这个阈值,开始返回flase,因此先打印了两次true,后打印了两次false

四、ws.end("ok"); end方法用来标记文件末尾,表示接下来没有数据要写入可写流;
能够传入可选的 chunk 和 encoding 参数,在关闭流以前再写入一段数据;
若是传入了可选的 callback 函数,它将做为 'finish' 事件的回调函数。因此'ok'会被写入文件末尾。
注意,ws.write()方法必须在ws.end()方法以前调用

五、在调用了 ws.end() 方法,且缓冲区数据都已经传给底层系统(这里是文件1.txt)以后, 'finish' 事件将被触发。

六、'close' 事件将在流或其底层资源(好比一个文件)关闭后触发。'close'事件触发后,该流将不会再触发任何事件。不是全部 可写流/可读流 都会触发 'close' 事件。

drain事件

若是调用 stream.write(chunk) 方法返回 false,'drain' 事件会在适合恢复写入数据到流的时候触发。

drain触发条件

  • 缓冲器满了,即write返回false
  • 缓冲器的数据都写入到流,即数据都被消费掉后,才会触发

将上例中for循环改成以下

let i = 8;
function write(){
    let flag = true;
    while(i>0&&flag){
        flag = ws.write(--i+'','utf8',()=>{});
        console.log(flag)
    }

    if(i <= 0){
        ws.end("ok");
    }
 }
 write();
 // drain只有当缓存区充满后 ,而且被消费后触发
 ws.on('drain',function(){
   console.log('drain');
   write();
 });
复制代码

打印

true
true
false
open
drain
true
true
false
drain
true
true
finish
close
复制代码

文件1.txt写入 76543210ok

上例当write返回为false,即缓冲器满了时,中止while循环,等待;当缓冲器数据都写入1.txt以后,会触发drain事件,这时继续write,直到写到0,中止写入,调用end,在文件末尾写入ok,关闭文件

管道流 & pipe事件

管道提供了一个输出流到输入流的机制。一般咱们用于从一个流中获取数据并将数据传递到另一个流中

以下,将1.txt的内容,按照读一点,写一点的方式 写入2.txt

let fs = require('fs');
let rs = fs.createReadStream('1.txt',{
    highWaterMark:4
});
let ws = fs.createWriteStream('2.txt',{
    highWaterMark:3
});
rs.pipe(ws);   //可读流上调用pipe()方法,pipe方法就是读一点写一点
复制代码

这段代码工做原理相似于下面这段代码

rs.on('data',function(chunk){ // chunk 读到的内容
    let flag = ws.write(chunk);
    if(!flag){  //若是缓冲器满了,写不下了,就中止读
        rs.pause();
    }
});
ws.on('drain',function(){ //当缓存都写到文件了,恢复读
    console.log('写一点');
    rs.resume();
});
复制代码

参考资料 一、nodejs.cn/api/stream.…
二、www.runoob.com/nodejs/node…

相关文章
相关标签/搜索