node.js中的流

node.js中的流是一种数据传输手段,流是有顺序的。流不关心总体流程,只管取出数据,获取数据后的操做。
流有四种基本的类型
Readable - 可读的流 (例如 fs.createReadStream()). 
Writable - 可写的流 (例如 fs.createWriteStream()).
Duplex - 可读写的流 (例如 net.Socket). 
Transform - 在读写过程当中能够修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).
可读流有两种模式
flowing 流动模式
paused 暂停模式

流动模式:flowing 没有缓存区。读一点数据,发射一点数据。当数据所有读完了触发一个end事件。例如:pipe(),resume()方法不走缓存.
data事件,当你一旦开始监听data事件的时候,流就能够读文件的内容而且发射data 。默认请况下,当你监听data事件以后,会不停的读数据,而后触发data事件,触发完data事件后再次读数据。
html

let rs=fs.createReadStream('./11.txt',{
    highWaterMark:3
});
rs.setEncoding('utf8');rs.on('data',function(data){
    //data获取到的是个buffer,要想获取字符须要设置编码
    console.log(data);
});
rs.on('end',function(){
    console.log('文件读完了');
});复制代码

pipe是可读流 的方法node

ReadStream.prototype.pipe = function (dest) {
    this.on('data', (data)=>{
        let flag = dest.write(data);//写入数据,返回true,说明缓存区没满还能够继续写。返回
        false暂停一下。监听drain事件,等到触发drain事件说明数据消化完了,再继续读取数据

        if(!flag){
            this.pause();
        }
    });
    dest.on('drain', ()=>{
        this.resume();
    });
    this.on('end', ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}
复制代码

dest 数据写入目标
能够在单个可读流上绑定多个可写流。
复制代码

const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);复制代码

暂停模式:paused (初始化模式) 内部设置一个缓存区,缓存区默认大小64kb.实际大小以highWaterMark的值为准。当你监听 readable事件的时候,会进入暂停模式。读取highWaterMark的值放入缓存区,触发readable事件。
api

let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3
});
rs.on('readable',()=>{
   onsole.log(rs._readableState.length);//3
   let ch = rs.read(1);
   //当你读了一个字节后,发现只剩下2个字节,不够highWaterMark,会再次读取highWaterMark个字节并填到
缓存区内
       console.log(rs._readableState.length);//2
       let ch = rs.read(3);
       setTimeout(()=>{
           console.log(rs._readableState.length);//5
       },200)
});
复制代码


可写流就是往里面写数据

当你往可写流里写数据的时候,不是会马上写入文件的,而是会先写入缓存区,缓存区的大小就是highWaterMark的值,默认是16k。 而后等缓存区满了以后再次真正的写入文件里。
缓存

let fs=require('fs');
let ws=fs.createWriteStream('22.txt',{
    flags:'w',
    mode:0o666,
    start:0,
    highWaterMark:3
});
//若是缓存区已满,返回false.若是缓存区没满返回true.
//若是能接着写,返回true.若是不能写返回false.
//按理说若是返回了false,就不能再往里面写了。可是若是继续往里面写,也不会丢失,会缓存在内存里。
等缓存区清空以后再从内存里读出来let flag=ws.write('1');
console.log(flag);//true

flag=ws.write('2');
console.log(flag);//true

flag=ws.write('3');
console.log(flag);//false复制代码
自定义流
let {Writable,Readable,Duplex,Transform} = require('stream');复制代码
自定义可读流
为了实现 可读流,引用Readable接口并用它构造新对象。
  • 咱们能够直接把供使用的数据push出去。
  • 当push一个null对象就意味着咱们想发出信号——这个流没有更多数据了。


var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
    stream.Readable.call(this, options);
    this._index = 0;
}
Counter.prototype._read = function() {
    if(this._index++<3){
        this.push(this._index+'');
    }else{
        this.push(null);
    }
};
var counter = new Counter();

counter.on('data', function(data){
    console.log("读到数据: " + data.toString());//no maybe
});
counter.on('end', function(data){
    console.log("读完了");
});
复制代码

自定义可写流

为了实现可写流,咱们须要使用流模块中的Writable构造函数。 咱们只需给Writable构造函数传递一些选项并建立一个对象。惟一须要的选项是write函数,该函数揭露数据块要往哪里写。bash

  • chunk一般是一个buffer,除非咱们配置不一样的流。
  • encoding是在特定状况下须要的参数,一般咱们能够忽略它。
  • callback是在完成处理数据块后须要调用的函数。这是写数据成功与否的标志。若要发出故障信号,请用错误对象调用回调函数

var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
    stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
    setTimeout(()=>{
        stock.push(chunk.toString('utf8'));
        console.log("增长: " + chunk);
        callback();
    },500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
    w.write("项目:" + i, 'utf8');
}
w.end("结束写入",function(){
    console.log(stock);
});
复制代码

双工流
双工流(可读可写流)是 可读流可写流的实现。例如:net.Socket

let {Duplex} = require('stream');
let index = 0;
let s = Duplex({
    read(){
        if(index++<3)
          this.push('a'); 
          else 
       this.push(null);   
    },
    write(chunk,encoding,cb){
       console.log(chunk.toString().toUpperCase());
       cb();
    }
});
//process.stdin 标准输入流
//proces.stdout标准输出流
process.stdin.pipe(s).pipe(process.stdout);复制代码

Transform转换流
转换流是实现数据转换的,( 可读流 可写流)只能实现一种。

let {Transform}  = require('stream');
let t = Transform({
    transform(chunk,encoding,cb){
        this.push(chunk.toString().toUpperCase());
        cb();
    }
});
process.stdin.pipe(t).pipe(process.stdout);
复制代码
相关文章
相关标签/搜索