Node.js的流就是为了在有限的内存中实现咱们操做"海量"数据的目标。node
流是一组有序的,有起点和终点的字节数据传输手段,它是一个抽象的接口,被 Node 中的不少对象所实现。node里不少内容都应用到流,好比HTTP 服务器request和response对象都是流。缓存
它不关心文件的总体内容,只关注是否从文件中读到了数据,以及读到数据以后的处理。bash
Node.js中Stream 有四种流类型。服务器
可读流分为:流动模式(flowing mode)
和暂停模式(paused mode)
异步
可读流在建立时都是暂停模式。暂停模式和流动模式能够互相转换。函数
1) 流动模式(flowing mode)流动模式下,数据会源源不断地生产出来,造成“流动”现象。监听流的data事件即可进入该模式。ui
2) 暂停模式(paused mode)暂停模式下,须要显示地调用read(),触发data事件。this
在初始状态下,监听data事件,会使流进入流动模式。但若是在暂停模式下,监听data事件并不会使它进入流动模式。为了消耗流,须要显示调用read()方法。编码
3)相互转化
若是不存在管道目标,调用readable.resume()可以使流进入流动模式spa
若是存在管道目标,调用 stream.unpipe()并取消'data'事件监听
var rs = fs.createReadStream(path,[options]);复制代码
path读取文件的路径
options
flags 打开文件要作的操做,默认为'r'读取
encoding 默认为null,表明buffer。若是指定utf8编码highWaterMark要大于3个字节
start 开始读取的索引位置
end 结束读取的索引位置(包括结束位置)
highWaterMark 读取缓存区默认的大小64kb
autoClose 读取完毕后是否自动关闭
let fs=require('fs');
let path=require('path');
let rs=fs.createReadStream(path.join(__dirname,'1.txt'),{ //这里的参数通常不会写
flags:'r',//文件的操做是读取操做
encoding:'utf8', // 默认是null null表明的是buffer
autoClose:true, // 读取完毕后自动关闭
highWaterMark:3,// 默认是64k 64*1024b
start:0, //读取的起始位置
end:3 // 读取的结束位置,包前又包后,至关于闭区间
})
//默认状况下 不会将文件中的内容输出
//内部会先建立一个buffer先读取3b
//至关于有盖子的水管,不会流出来,存储在管中
//有两种模式 非流动模式/暂停模式
//由于建立时第二个参数通常不会写,读出来的类型是buffer,这个方法能够指定编码
rs.setEncoding('utf8');
//打开文件
rs.on('open',function(data){
console.log(data)
})
//关闭文件
rs.on('close',function(data){
console.log(data)
})
//有错误就会报错误
rs.on('err',function(data){
console.log(data)
})
//暂停模式->流动模式
//流动模式只要监听了会疯狂的触发data事件,直到读取完毕
rs.on('data',function(data){
console.log(data);
//一打开水龙头就哗哗出水,有个方法可让它暂停
rs.pause(); //暂停方法,表示暂停读取,暂停data事件触发
})
setInterval(function(){
rs.resume(); //恢复data事件的触发,变为流动模式继续读取
},3000)
rs.on('end',function(data){ //先end再close关闭
console.log(data)
})
复制代码
let fs=require('fs');
let path=require('path');
let rs=fs.createReadStream(path.join(__dirname,'1.txt'));
rs.setEncoding('utf8');
// 当我只要建立一个流,就会先把缓存区填满,等待着你本身消费
// 若是当前缓存区被清空后会再次触发readable事件
// 当你消费小于最高水位线时,会自动添加highWater这么多数据
rs.on('readable', () => {
let d = rs.read(1)
console.log(d)
})复制代码
let EventEmitter = require('events');
let fs = require('fs');
class ReadStream extends EventEmitter {
constructor(path,options){
super();
this.path = path;
this.flags = options.flags || 'r';
this.autoClose = options.autoClose || true;
this.highWaterMark = options.highWaterMark|| 64*1024;
this.start = options.start||0;
this.end = options.end;
this.encoding = options.encoding || null
this.open();//打开文件 fd
this.flowing = null; // null就是暂停模式
// 看是否监听了data事件,若是监听了 就要变成流动模式
// 要创建一个buffer 这个buffer就是要一次读多少
this.buffer = Buffer.alloc(this.highWaterMark);
this.pos = this.start; // pos 读取的位置 可变 start不变的
this.on('newListener',(eventName,callback)=>{
if(eventName === 'data'){
// 至关于用户监听了data事件
this.flowing = true;
// 监听了 就去读
this.read(); // 去读内容了
}
})
}
read(){
// 此时文件还没打开呢
if(typeof this.fd !== 'number'){
// 当文件真正打开的时候 会触发open事件,触发事件后再执行read,此时fd确定有了
return this.once('open',()=>this.read())
}
// 此时有fd了
let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark;
fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{
// 读到了多少个 累加
if(bytesRead>0){
this.pos+= bytesRead;
let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead);
this.emit('data',data);
// 当读取的位置 大于了末尾 就是读取完毕了
if(this.pos > this.end){
this.emit('end');
this.destroy();
}
if(this.flowing) { // 流动模式继续触发
this.read();
}
}else{
this.emit('end');
this.destroy();
}
});
}
resume(){
this.flowing = true;
this.read();
}
pause(){
this.flowing = false;
}
destroy(){
// 先判断有没有fd 有关闭文件 触发close事件
if(typeof this.fd ==='number'){
fs.close(this.fd,()=>{
this.emit('close');
});
return;
}
this.emit('close'); // 销毁
};
open(){
// copy 先打开文件
fs.open(this.path,this.flags,(err,fd)=>{
if(err){
this.emit('error',err);
if(this.autoClose){ // 是否自动关闭
this.destroy();
}
return;
}
this.fd = fd; // 保存文件描述符
this.emit('open'); // 文件打开了
});
}
}
module.exports = ReadStream;复制代码
.pipe()
函数是接受一个源头src
并将数据输出到一个可写的流dst
中简单来讲,边读边写东西,读太快,来不及写,就先暂停读,等写完了再继续读。
let fs = require('fs');
let path = require('path');
let ReadStream = require('./ReadStream');
let WriteStream = require('./WriteStream');
let rs = new ReadStream(path.join(__dirname,'./1.txt'),{
highWaterMark:4
});
let ws = new WriteStream(path.join(__dirname,'./2.txt'),{
highWaterMark:1
});
// 读四个,写一个
rs.pipe(ws); // pipe就是读一点写一点复制代码
pipe原理实现,写在ReadStream的方法中
pipe(ws){
this.on('data',(chunk)=>{
let flag = ws.write(chunk);
if(!flag){
this.pause();
}
});
ws.on('drain',()=>{
this.resume();
})
}复制代码
let fs = require('fs');
let EventEmitter = require('events');
//当读取内容大于缓存区,从新计算读取数量n的大小的方法
function computeNewHighWaterMark(n) {
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
return n;
}
class ReadStream extends EventEmitter {
constructor(path, options) {
super();
this.path = path;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.autoClose = options.autoClose || true;
this.start = 0;
this.end = options.end;
this.flags = options.flags || 'r';
this.buffers = []; // 缓存区
this.pos = this.start;
this.length = 0; // 缓存区大小
this.emittedReadable = false;
this.reading = false; // 不是正在读取的
this.open();
this.on('newListener', (eventName) => {
if (eventName === 'readable') {
this.read();
}
})
}
read(n) { // 想取1个
if(n>this.length){
// 更改缓存区大小 读取五个就找 2的几回放最近的
this.highWaterMark = computeNewHighWaterMark(n)
this.emittedReadable = true;
this._read();
}
// 若是n>0 去缓存区中取吧
let buffer=null;
let index = 0; // 维护buffer的索引的
let flag = true;
if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多
// 在缓存区中取 [[2,3],[4,5,6]]
buffer = Buffer.alloc(n); // 这是要返回的buffer
let buf;
while (flag&&(buf = this.buffers.shift())) {
for (let i = 0; i < buf.length; i++) {
buffer[index++] = buf[i];
if(index === n){ // 拷贝够了 不须要拷贝了
flag = false;
this.length -= n;
let bufferArr = buf.slice(i+1); // 取出留下的部分
// 若是有剩下的内容 在放入到缓存中
if(bufferArr.length > 0){
this.buffers.unshift(bufferArr);
}
break;
}
}
}
}
// 当前缓存区 小于highWaterMark时在去读取
if (this.length == 0) {
this.emittedReadable = true;
}
if (this.length < this.highWaterMark) {
if(!this.reading){
this.reading = true;
this._read(); // 异步的
}
}
return buffer
}
// 封装的读取的方法
_read() {
// 当文件打开后在去读取
if (typeof this.fd !== 'number') {
return this.once('open', () => this._read());
}
// 上来我要喝水 先倒三升水 []
let buffer = Buffer.alloc(this.highWaterMark);
fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
if (bytesRead > 0) {
// 默认读取的内容放到缓存区中
this.buffers.push(buffer.slice(0, bytesRead));
this.pos += bytesRead; // 维护读取的索引
this.length += bytesRead;// 维护缓存区的大小
this.reading = false;
// 是否须要触发readable事件
if (this.emittedReadable) {
this.emittedReadable = false; // 下次默认不触发
this.emit('readable');
}
} else {
this.emit('end');
this.destroy();
}
})
}
destroy() {
if (typeof this.fd !== 'number') {
return this.emit('close')
}
fs.close(this.fd, () => {
this.emit('close')
})
}
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
this.emit('error', err);
if (this.autoClose) {
this.destroy();
}
return
}
this.fd = fd;
this.emit('open');
});
}
}
module.exports = ReadStream;复制代码
var ws = fs.createWriteStream(path,[options]);
path写入的文件路径
options
flags打开文件要作的操做,默认为'w'
encoding默认为utf8
highWaterMark写入缓存区的默认大小16kb
let fs=require('fs');
let path=require('path');
//写的时候文件不存在,会建立文件
let ws = fs.createWriteStream('./1.txt',{
flags:'w',
mode:0o666,
autoClose:true,
highWaterMark:3, // 默认写是16k
encoding:'utf8',
start:0
});
//第一个参数写入的数据必须是字符串或者Buffer
//第二个参数写入以什么编码写进去
//第三个参数callback
//有返回值,表明是否能继续写,写的时候,有个缓存区的概念。可是返回false,也不会丢失,就是会把内容放到内存中
let flag=ws.write(1+'','utf8',()=>{})//这是异步的方法
//传入的参数,写完后也会写入文件内
ws.end('ok'); //当写完后,就不能再继续写了
//抽干方法,当写入完后会触发drain方法
//缓存区必须满了,满了清空后才会触发drain
//若是调用end后,再调用这个方法没有意义了
ws.on('drain',function(){
console.log('drain')
})
复制代码
let EventEmitter = require('events');
let fs = require('fs');
class WriteStream extends EventEmitter{
constructor(path,options){
super();
this.path = path;
this.highWaterMark = options.highWaterMark||16*1024;
this.autoClose = options.autoClose||true;
this.mode = options.mode;
this.start = options.start||0;
this.flags = options.flags||'w';
this.encoding = options.encoding || 'utf8';
// 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中
// 在源码中是一个链表 => []
this.buffers = [];
// 标识 是否正在写入
this.writing = false;
// 是否知足触发drain事件
this.needDrain = false;
// 记录写入的位置
this.pos = 0;
// 记录缓存区的大小
this.length = 0;
this.open();
}
destroy(){
if(typeof this.fd !=='number'){
return this.emit('close');
}
fs.close(this.fd,()=>{
this.emit('close')
})
}
open(){
fs.open(this.path,this.flags,this.mode,(err,fd)=>{
if(err){
this.emit('error',err);
if(this.autoClose){
this.destroy();
}
return
}
this.fd = fd;
this.emit('open');
})
}
write(chunk,encoding=this.encoding,callback=()=>{}){
chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
// write 返回一个boolean类型
this.length+=chunk.length;
let ret = this.length<this.highWaterMark; // 比较是否达到了缓存区的大小
this.needDrain = !ret; // 是否须要触发needDrain
// 判断是否正在写入 若是是正在写入 就写入到缓存区中
if(this.writing){
this.buffers.push({
encoding,
chunk,
callback
}); // []
}else{
// 专门用来将内容 写入到文件内
this.writing = true;
this._write(chunk,encoding,()=>{
callback();
this.clearBuffer();
}); // 8
}
return ret;
}
clearBuffer(){
let buffer = this.buffers.shift();
if(buffer){
this._write(buffer.chunk,buffer.encoding,()=>{
buffer.callback();
this.clearBuffer()
});
}else{
this.writing = false;
if(this.needDrain){ // 是否须要触发drain 须要就发射drain事件
this.needDrain = false;
this.emit('drain');
}
}
}
_write(chunk,encoding,callback){
if(typeof this.fd !== 'number'){
return this.once('open',()=>this._write(chunk,encoding,callback));
}
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
this.length -= byteWritten;
this.pos += byteWritten;
callback(); // 清空缓存区的内容
});
}
}
module.exports = WriteStream;复制代码
啊~~文章彷佛太长太啰嗦了,看来怎么把给本身看的笔记整理成一个好的文章也是一门学问!