学习本无底,前进莫徬徨node
今天跟你们分享的是node.js中的流(stream
)。它的做用你们应该都在日常使用node的时候看到过,好比:express
gulp
中的pipe就是流的一种方法,经过可写流和可读流的配合,达到不占用多余缓存的一种读写方式。可写流
,req是可读流
,他们都是经过封装node中的net模块的socket(双工流
,便可写、可读流)而来的。可能不少时候你们都知道怎么用,但不了解它的原理,很尴尬,就像这样gulp
Readable
(可读流)、Writable
(可写流)、Duplex
(双工流)、Transform
(转换流)二进制模式
:每一个分块都是buffer、string对象。对象模式
:流内部处理的是一系列普通对象。可读流分为
flowing
和paused
两种模式windows
path
:读取的文件的路径option
:
highWaterMark
:水位线,一次可读的字节,通常默认是64k
flags
:标识,打开文件要作的操做,默认是r
encoding
:编码,默认为bufferstart
:开始读取的索引位置end
:结束读取的索引位置(包括结束位置)autoClose
:读取完毕是否关闭,默认为truelet ReadStream = require('./ReadStream')
//读取的时候默认读64k
let rs = new ReadStream('./a.txt',{
highWaterMark: 2,//一次读的字节 默认64k
flags: 'r', //标示 r为读 w为写
autoClose: true, //默认读取完毕后自动关闭
start: 0,
end: 5, //流是闭合区间包start,也包end 默认是读完
encoding: 'utf8' //默认编码是buffer
})
复制代码
data
:切换到流动模式,能够流出数据rs.on('data', function (data) {
console.log(data);
});
复制代码
open
:流打开文件的时候会触发此监听rs.on('open', function () {
console.log('文件被打开');
});
复制代码
error
:流出错的时候,监听错误信息rs.on('error', function (err) {
console.log(err);
});
复制代码
end
:流读取完成,触发endrs.on('end', function (err) {
console.log('读取完成');
});
复制代码
close
:关闭流,触发rs.on('close', function (err) {
console.log('关闭');
});
复制代码
pause
:暂停流(改变流的flowing,不读取数据了);resume
:恢复流(改变流的flowing,继续读取数据)//流经过一次后,中止流动,过了2s后再动
rs.on('data', function (data) {
rs.pause();
console.log(data);
});
setTimeout(function () {
rs.resume();
},2000);
复制代码
fs.read()
:可读流底层调用的就是这个方法,最原生的读方法//fd文件描述符,通常经过fs.open中获取
//buffer是读取后的数据放入的缓存目标
//0,从buffer的0位置开始放入
//BUFFER_SIZE,每次放BUFFER_SIZE这么长的长度
//index,每次从文件的index的位置开始读
//bytesRead,真实读到的个数
fs.read(fd,buffer,0,BUFFER_SIZE,index,function(err,bytesRead){
})
复制代码
可爱
的读流吧!let fs = require('fs')
let EventEmitter = require('events')
class ReadStream extends EventEmitter{
constructor(path,options = {}){
super()
this.path = path
this.highWaterMark = options.highWaterMark || 64*1024
this.flags = options.flags || 'r'
this.start = options.start || 0
this.pos = this.start //会随着读取的位置改变
this.autoClose = options.autoClose || true
this.end = options.end || null
//默认null就是buffer
this.encoding = options.encoding || null
//参数的问题
this.flowing = null //非流动模式
//建立个buffer用来存储每次读出来的数据
this.buffer = Buffer.alloc(this.highWaterMark)
//打开这个文件
this.open()
//此方法默认同步调用 每次设置on监听事件时都会调用以前全部的newListener事件
this.on('newListener',(type)=>{// 等待着他监听data事件
if(type === 'data'){
this.flowing = true
//开始读取 客户已经监听的data事件
this.read()
}
})
}
//默认第一次调用read方法时fd还没获取 因此不能直接读
read(){
if(typeof this.fd != 'number'){
//等待着触发open事件后fd确定拿到了 再去执行read方法
return this.once('open',()=>{this.read()})
}
//每次读的时候都要判断一下下次读几个 若是没有end就根据highWaterMark来(读全部的) 若是有且大于highWaterMark就根据highWaterMark来 若是小于highWaterMark就根据end来
let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark
fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,byteRead)=>{
this.pos += byteRead
let b = this.encoding?this.buffer.slice(0,byteRead).toString(this.encoding):this.buffer.slice(0,byteRead)
this.emit('data',b)
//若是读取到的数量和highWaterMark同样 说明还得继续读
if((byteRead === this.highWaterMark)&&this.flowing){
this.read()
}
if(byteRead < this.highWaterMark){
this.emit('end')
this.destory()
}
})
}
destory(){
if(typeof this.fd != 'number'){
return this.emit('close')
}
//若是文件被打开过 就关闭文件而且触发close事件
fs.close(this.fd,()=>{
this.emit('close')
})
}
pause(){
this.flowing = false
}
resume(){
this.flowing = true
this.read()
}
open(){
//fd表示的就是当前this.path的这个文件,从3开始(number类型)
fs.open(this.path,this.flags,(err,fd)=>{
//有可能fd这个文件不存在 须要作处理
if(err){
//若是有自动关闭 则帮他销毁
if(this.autoClose){
//销毁(关闭文件,触发关闭文件事件)
this.destory()
}
//若是有错误 就会触发error事件
this.emit('error',err)
return
}
//保存文件描述符
this.fd = fd
//当文件打开成功时触发open事件
this.emit('open',this.fd)
})
}
}
复制代码
这个方法是可读流的一种
暂停模式
,他的模式能够参考为读流是往水杯倒水的人,Readable是喝水的人,他们之间存在着一种联系,只要Readable喝掉一点水,读流就会继续往里倒
。数组
流会监听,若是有人读过流(喝过水),而且减小,就会再去读一次(倒点水)
行读取器(LineReader)
let fs = require('fs')
let read = require('./ReadableStream')
let rs = fs.createReadStream('./a.txt', {
//每次读7个
highWaterMark: 7
})
//若是读流第一次所有读下来而且小于highWaterMark,就会再读一次(再触发一次readable事件)
//若是rs.read()不加参数,一次性读完,会从缓存区再读一次,为null
//若是readable每次都恰好读完(即rs.read()的参数恰好和highWaterMark相等),就会一直触发readable事件,若是最后不足他想喝的数,他就会先触发一次null,最后把剩下的喝完
//一开始缓存区为0的时候也会默认调一次readable事件
rs.on('readable', () => {
let result = rs.read(2)
console.log(result)
})
复制代码
实战:行读取器(日常咱们的文件可能有回车、换行,此时若是要每次想读一行的数据,就得用到readable
)缓存
let EventEmitter = require('events')
//若是要将内容所有读出就用on('data'),精确读取就用on('readable')
class LineReader extends EventEmitter {
constructor(path) {
super()
this.rs = fs.createReadStream(path)
//回车符的十六进制
let RETURN = 0x0d
//换行符的十六进制
let LINE = 0x0a
let arr = []
this.on('newListener', (type) => {
if (type === 'newLine') {
this.rs.on('readable', () => {
let char
//每次读一个,当读完的时候会返回null,终止循环
while (char = this.rs.read(1)) {
switch (char[0]) {
case RETURN:
break;
//Mac下只有换行符,windows下是回车符和换行符,须要根据不一样的转换。由于我这里是Mac
case LINE:
//若是是换行符就把数组转换为字符串
let r = Buffer.from(arr).toString('utf8')
//把数组清空
arr.length = 0
//触发newLine事件,把获得的一行数据输出
this.emit('newLine', r)
break;
default:
//若是不是换行符,就放入数组中
arr.push(char[0])
}
}
})
}
})
//以上只能取出以前的换行符前的代码,最后一行的后面没有换行符,因此须要特殊处理。当读流读完须要触发end事件时
this.rs.on('end', () => {
//取出最后一行数据,转成字符串
let r = Buffer.from(arr).toString('utf8')
arr.length = 0
this.emit('newLine', r)
})
}
}
let lineReader = new LineReader('./a.txt')
lineReader.on('newLine', function (data) {
console.log(data)
})
复制代码
那么Readable究竟是怎样的存在呢?咱们接下来实现他的源码,看看内部到底怎么回事服务器
let fs = require('fs')
let EventEmitter = require('events')
class ReadStream extends EventEmitter{
constructor(path,options = {}){
super()
this.path = path
this.highWaterMark = options.highWaterMark || 64*1024
this.flags = options.flags || 'r'
this.start = options.start || 0
this.pos = this.start //会随着读取的位置改变
this.autoClose = options.autoClose || true
this.end = options.end || null
//默认null就是buffer
this.encoding = options.encoding || null
//参数的问题
this.reading = false //非流动模式
//建立个buffer用来存储每次读出来的数据
this.buffers = []
//缓存区长度
this.len = 0
//是否要触发readable事件
this.emittedReadable = false
//触发open获取文件的fd标识符
this.open()
//此方法默认同步调用 每次设置on监听事件时都会调用以前全部的newListener事件
this.on('newListener',(type)=>{// 等待着他监听data事件
if(type === 'readable'){
//开始读取 客户已经监听的data事件
this.read()
}
})
}
//readable真正的源码中的方法,计算出和n最接近的2的幂次数
computeNewHighWaterMark(n) {
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
return n;
}
read(n){
//当读的数量大于水平线,会经过取2的幂次取比他大和最接近的数
if(this.len < n){
this.highWaterMark = this.computeNewHighWaterMark(n)
//从新触发readbale的callback,因此第一次会触发null
this.emittedReadable = true
//从新读新的水位线
this._read()
}
//真正读取到的
let buffer = null
//说明缓存里有这么多,取出来
if(n>0 && n<=this.len){
//定义一个buffer
buffer = Buffer.alloc(n)
let buf
let flag = true
let index = 0
//[buffer<1,2,3,4>,buffer<1,2,3,4>,buffer<1,2,3,4>]
//每次取出缓存前的第一个buffer
while(flag && (buf = this.buffers.shift())){
for(let i=0;i<buf.length;i++){
//把取出的一个buffer中的数据放入新定义的buffer中
buffer[index++] = buf[i]
//当buffer的长度和n(参数)长度同样时,中止循环
if(index === n){
flag = false
//维护缓存,由于可能缓存中的buffer长度大于n,当取出n的长度时,还会剩下其他的buffer,咱们须要切割buf而且放到缓存数组以前
this.len -= n
let r = buf.slice(i+1)
if(r.length){
this.buffers.unshift(r)
}
break
}
}
}
}
//若是缓存区没有东西,等会读完须要触发readable事件
//这里会有一种情况,就是若是每次Readable读取的数量正好等于highWaterMark(流读取到缓存的长度),就会每次都等于0,每次都触发Readable事件,就会每次读,读到没有为止,最后还会触发一下null
if(this.len === 0){
this.emittedReadable = true
}
if(this.len < this.highWaterMark){
//默认,一开始的时候开始读取
if(!this.reading){
this.reading = true
//真正多读取操做
this._read()
}
}
return buffer&&buffer.toString()
}
_read(){
if(typeof this.fd != 'number'){
//等待着触发open事件后fd确定拿到了 再去执行read方法
return this.once('open',()=>{this._read()})
}
//先读这么多buffer
let buffer = Buffer.alloc(this.highWaterMark)
fs.read(this.fd,buffer,0,buffer.length,this.pos,(err,byteRead)=>{
if(byteRead > 0){
//当第一次读到数据后,改变reading的状态,若是触发read事件,可能还会在触发第二次_read
this.reading = false
//每次读到数据增长缓存取得长度
this.len += byteRead
//每次读取以后,会增长读取的文件的读取开始位置
this.pos += byteRead
//将读到的buffer放入缓存区buffers中
this.buffers.push(buffer.slice(0,byteRead))
//触发readable
if(this.emittedReadable){
this.emittedReadable = false
//能够读取了,默认开始的时候杯子填满了
this.emit('readable')
}
}else{
//没读到就出发end事件
this.emit('end')
}
})
}
destory(){
if(typeof this.fd != 'number'){
return this.emit('close')
}
//若是文件被打开过 就关闭文件而且触发close事件
fs.close(this.fd,()=>{
this.emit('close')
})
}
open(){
//fd表示的就是当前this.path的这个文件,从3开始(number类型)
fs.open(this.path,this.flags,(err,fd)=>{
//有可能fd这个文件不存在 须要作处理
if(err){
//若是有自动关闭 则帮他销毁
if(this.autoClose){
//销毁(关闭文件,触发关闭文件事件)
this.destory()
}
//若是有错误 就会触发error事件
this.emit('error',err)
return
}
//保存文件描述符
this.fd = fd
//当文件打开成功时触发open事件
this.emit('open',this.fd)
})
}
}
复制代码
let rs = fs.createReadStream('./a.txt')
rs.on('data',(data)=>{
console.log(data)
})
//由于上面的data事件把数据读了,清空缓存区。因此致使下面的readable读出为null
rs.on('readable',()=>{
let result = r.read(1)
console.log(result)
})
复制代码
由于createReadStream
内部调用了ReadStream
类,ReadStream
又实现了Readable
接口,ReadStream
实现了_read()
方法,因此咱们经过自定义一个类继承stream
模块的Readable
,并在原型
上自定义一个_read()
就能够自定义本身的可读流koa
let { Readable } = require('stream')
class MyRead extends Readable{
//流须要一个_read方法,方法中push什么,外面就接收什么
_read(){
//push方法就是上面_read方法中的push同样,把数据放入缓存区中
this.push('100')
//若是push了null就表示没有东西可读了,中止(若是不写,就会一直push上面的值,死循环)
this.push(null)
}
}
复制代码
path
:写入的文件的路径option
:
highWaterMark
:水位线,一次可写入缓存中的字节,通常默认是64k
flags
:标识,写入文件要作的操做,默认是w
encoding
:编码,默认为bufferstart
:开始写入的索引位置end
:结束写入的索引位置(包括结束位置)autoClose
:写入完毕是否关闭,默认为truelet ReadStream = require('./ReadStream')
//读取的时候默认读64k
let rs = new ReadStream('./a.txt',{
highWaterMark: 2,//一次读的字节 默认64k
flags: 'r', //标示 r为读 w为写
autoClose: true, //默认读取完毕后自动关闭
start: 0,
end: 5, //流是闭合区间包start,也包end 默认是读完
encoding: 'utf8' //默认编码是buffer
})
复制代码
write
let fs = require('fs')
let ws = fs.createWriteStream('./d.txt',{
flags: 'w',
encoding: 'utf8',
start: 0,
//write的highWaterMark只是用来触发是否是干了
highWaterMark: 3 //写是默认16k
})
//返回boolean 每当write一次都会在ws中吃下一个馒头 当吃下的馒头数量达到highWaterMark时 就会返回false 吃不下了会把其他放入缓存 其他状态返回true
//write只能放string或者buffer
flag = ws.write('1','utf8',()=>{
console.log(i)
})
复制代码
drain
//drain只有嘴塞满了 吃完(包括内存中的,就是地下的)才会触发 这里是两个条件 一个是必须是吃下highWaterMark个馒头 而且在吃完的时候才会callback
ws.on('drain',()=>{
console.log('干了')
})
复制代码
fs.write()
:可读流底层调用的就是这个方法,最原生的读方法//wfd文件描述符,通常经过fs.open中获取
//buffer,要取数据的缓存源
//0,从buffer的0位置开始取
//BUFFER_SIZE,每次取BUFFER_SIZE这么长的长度
//index,每次写入文件的index的位置
//bytesRead,真实写入的个数
fs.write(wfd,buffer,0,bytesRead,index,function(err,bytesWrite){
})
复制代码
let fs = require('fs')
let EventEmitter = require('events')
//只有第一次write的时候直接用_write写入文件 其他都是放到cache中 可是len超过了highWaterMark就会返回false告知须要drain 很占缓存
//从第一次的_write开始 回去一直经过clearBuffer递归_write写入文件 若是cache中没有了要写入的东西 会根据needDrain来判断是否触发干点
class WriteStream extends EventEmitter{
constructor(path,options = {}){
super()
this.path = path
this.highWaterMark = options.highWaterMark || 64*1024
this.flags = options.flags || 'r'
this.start = options.start || 0
this.pos = this.start
this.autoClose = options.autoClose || true
this.mode = options.mode || 0o666
//默认null就是buffer
this.encoding = options.encoding || null
//打开这个文件
this.open()
//写文件的时候须要哪些参数
//第一次写入的时候 是给highWaterMark个馒头 他会硬着头皮写到文件中 以后才会把多余吃不下的放到缓存中
this.writing = false
//缓存数组
this.cache = []
this.callbackList = []
//数组长度
this.len = 0
//是否触发drain事件
this.needDrain = false
}
clearBuffer(){
//取缓存中最上面的一个
let buffer = this.cache.shift()
if(buffer){
//有buffer的状况下
this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer(),buffer.callback)
}else{
//没有的话 先看看需不须要drain
if(this.needDrain){
//触发drain 并初始化全部状态
this.writing = false
this.needDrain = false
this.callbackList.shift()()
this.emit('drain')
}
this.callbackList.map(v=>{
v()
})
this.callbackList.length = 0
}
}
_write(chunk,encoding,clearBuffer,callback){
//由于write方法是同步调用的 因此可能还没获取到fd
if(typeof this.fd != 'number'){
//直接在open的时间对象上注册一个一次性事件 当open被emit的时候会被调用
return this.once('open',()=>this._write(chunk,encoding,clearBuffer,callback))
}
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWrite)=>{
this.pos += byteWrite
//每次写完 相应减小内存中的数量
this.len -= byteWrite
if(callback) this.callbackList.push(callback)
//第一次写完
clearBuffer()
})
}
//写入方法
write(chunk,encoding=this.encoding,callback){
//判断chunk必须是字符串或者buffer 为了统一都变成buffer
chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding)
//维护缓存的长度 3
this.len += chunk.length
let ret = this.len < this.highWaterMark
if(!ret){
//表示要触发drain事件
this.needDrain = true
}
//正在写入的应该放到内存中
if(this.writing){
this.cache.push({
chunk,
encoding,
callback
})
}else{
//这里是第一次写的时候
this.writing = true
//专门实现写的方法
this._write(chunk,encoding,()=>this.clearBuffer(),callback)
}
// console.log(ret)
//能不能继续写了 false表明下次写的时候更占内存
return ret
}
destory(){
if(typeof this.fd != 'number'){
return this.emit('close')
}
//若是文件被打开过 就关闭文件而且触发close事件
fs.close(this.fd,()=>{
this.emit('close')
})
}
open(){
//fd表示的就是当前this.path的这个文件,从3开始(number类型)
fs.open(this.path,this.flags,(err,fd)=>{
//有可能fd这个文件不存在 须要作处理
if(err){
//若是有自动关闭 则帮他销毁
if(this.autoClose){
//销毁(关闭文件,出发关闭文件事件)
this.destory()
}
//若是有错误 就会触发error事件
this.emit('error',err)
return
}
//保存文件描述符
this.fd = fd
//当文件打开成功时触发open事件
this.emit('open',this.fd)
})
}
}
复制代码
由于createWriteStream
内部调用了WriteStream
类,WriteStream
又实现了Writable
接口,WriteStream
实现了_write()
方法,因此咱们经过自定义一个类继承stream
模块的Writable
,并在原型
上自定义一个_write()
就能够自定义本身的可写流socket
let { Writable } = require('stream')
class MyWrite extends Writable{
_write(chunk,encoding,callback){
//write()的第一个参数,写入的数据
console.log(chunk)
//这个callback,就至关于咱们上面的clearBuffer方法,若是不执行callback就不会继续从缓存中取出写
callback()
}
}
let write = new MyWrite()
write.write('1','utf8',()=>{
console.log('ok')
})
复制代码
管道流,是可读流上的方法,至于为何放到这里,主要是由于须要2个流的基础知识,是可读流配合可写流的一种
传输方式
。若是用原来的读写,由于写比较耗时
,因此会多读少写
,耗内存
,但用了pipe
就不会了,始终用规定
的内存。学习
let fs = require('fs')
//pipe方法叫管道 能够控制速率
let rs = fs.createReadStream('./d.txt',{
highWaterMark: 4
})
let ws = fs.createWriteStream('./e,txt',{
highWaterMark: 1
})
//会监听rs的on('data')将读取到的数据,经过ws.write的方法写入文件
//调用写的一个方法 返回boolean类型
//若是返回false就调用rs的pause方法 暂停读取
//等待可写流 写入完毕在监听drain resume rs
rs.pipe(ws) //会控制速率 防止淹没可用内存
复制代码
let fs = require('fs')
//这两个是上面本身写的ReadStream和WriteStream
let ReadStream = require('./ReadStream')
let WriteStream = require('./WriteStream')
//若是用原来的读写,由于写比较耗时,因此会多读少写,耗内存
ReadStream.prototype.pipe = function(dest){
this.on('data',(data)=>{
let flag = dest.write(data)
//若是写入的时候嘴巴吃满了就不继续读了,暂停
if(!flag){
this.pause()
}
})
//若是写的时候嘴巴里的吃完了,就会继续读
dest.on('drain',()=>{
this.resume()
})
this.on('end',()=>{
this.destory()
//清空缓存中的数据
fs.fsync(dest.fd,()=>{
dest.destory()
})
})
}
复制代码
有了双工流,咱们能够在同一个对象上
同时实现可读和可写
,就好像同时继承这两个接口。 重要的是双工流的可读性和可写性操做彻底独立
于彼此。这仅仅是将两个特性组合成一个对象。
let { Duplex } = require('stream')
//双工流,可读可写
class MyDuplex extends Duplex{
_read(){
this.push('hello')
this.push(null)
}
_write(chunk,encoding,clearBuffer){
console.log(chunk)
clearBuffer()
}
}
let myDuplex = new MyDuplex()
//process.stdin是node自带的process进程中的可读流,会监听命令行的输入
//process.stdout是node自带的process进程中的可写流,会监听并输出在命令行中
//因此这里的意思就是在命令行先输出hello,而后咱们输入什么他就出来对应的buffer(先做为可读流出来)
process.stdin.pipe(myDuplex).pipe(process.stdout)
复制代码
转换流的输出是从输入中
计算
出来的。对于转换流,咱们没必要实现read
或write
的方法,咱们只须要实现一个transform
方法,将二者结合起来。它有write
方法的意思,咱们也能够用它来push
数据。
let { Transform } = require('stream')
class MyTransform extends Transform{
_transform(chunk,encoding,callback){
console.log(chunk.toString().toUpperCase())
callback()
}
}
let myTransform = new MyTransform()
class MyTransform2 extends Transform{
_transform(chunk,encoding,callback){
console.log(chunk.toString().toUpperCase())
this.push('1')
// this.push(null)
callback()
}
}
let myTransform2 = new MyTransform2()
//此时myTransform2被做为可写流触发_transform,输出输入的大写字符后,会经过可读流push字符到下一个转换流中
//当写入的时候才会触发transform的值,此时才会push,因此后面的pipe拿到的chunk是前面的push的值
process.stdin.pipe(myTransform2).pipe(myTransform)
复制代码
嘴
真正的吃满了,而且等到把嘴里的和地上的馒头(缓存中的)都吃下了才会触发drain
事件隔离
的_transform
方法中。跟双工流的区别就是,他的可读可写是在一块儿
的。OK,讲完收工,今后你就是流
魔王