node中stream模块是很是,很是,很是重要的一个模块,由于不少模块都是这个模块封装的:node
因为node原码的可读流有将近一千行的代码,其中有大量的异常处理,debug调试,各类可读流的兼容处理,加码解码处理等,因此这里采起一个简化版的实现,源码中使用链表做为buffer,这里采用数组进行简化,主要是阐述可读流的处理过程。web
const EE = require('events');
const util = require('util');
const fs = require('fs');
function Readable(path,options) {//这个参数是源码没有的,这里主要是为了读取fs为案例加的
EE.call(this);//构造函数继承EventEmiter
this.path = path;
this.autoClose = options.autoClose || true;
this.highWaterMark = options.highWaterMark || 64 * 1024;//64k
this.encoding = options.encoding || null;
this.flags = options.flags || 'r';//// 这个源码没有的,这里主要是为了fs读取案例加的
this.needEmitReadable = false;// 须要触发readable事件,默认不须要
this.position = 0;// 偏移量
this.cache = []; // 缓存区
this.reading = false;// 是否正在从缓存中读取,消费者消耗中
this.length = 0; // 缓存区大小,控制长度
this.open(); // 这个源码没有的,这里主要是为了fs读取案例加的
this.on('newListener', (type) => {
if (type === 'readable') { // 看一下是不是'readable'事件模式
this.read();//消耗者,从buffer读取数据
}
})
}
util.inherits(Readable, EE);//原型继承EventEmiter
复制代码
下面这个函数在Readable没有,可是在ReadStream中存在,这里为了利用fs读取操做说明流,简化实现版本上添加了这个方法,后面说明ReadStream模块和Readable的继承关系数组
Readable.prototype.open = function(){//这里是异步打开的操做
fs.open(this.path, this.flags, (err, fd) => {
if (err) { // 销毁文件
if (this.autoClose) { // 若是须要自动关闭触发一下销毁事件
this.destroy(); // 它销毁文件
}
return this.emit('error', err);
}
this.fd = fd;
this.emit('open', fd);
});
}
//源码中的destory不是这样的,这里只是ReadStream中的destory,源码中作了各类可读流的兼容组合处理
Readable.prototype.destroy = function() {
if (typeof this.fd != 'number') {
this.emit('close');
} else {
fs.close(this.fd, () => {
this.emit('close');
})
}
}
复制代码
Readable.prototype.read = function(n) {
let buffer = null;
if(n>this.len){// 若是缓存区中有数据不够此次读取,则调整highWaterMark而且补充缓存区
this.highWaterMark = computeNewHighWaterMark(n);//从新计算调整内存2的次方 exp: 5 => 8
this.needEmitReadable = true;
this.reading = true;
this._read();
}
if (n > 0 && n < this.len) { // 若是缓存区中有数据够此次读取,则从缓存区中读取
buffer = Buffer.alloc(n);
let current;
let index = 0;
let flag = true;
//这里的代码就是源码中fromList()的功能,对buffer进行调整,exp:123 456 789读取12 => 3 456 789
while (flag && (current = this.cache.shift())) {//current是一个buffer
for (let i = 0; i < current.length; i++) {
buffer[index++] = current[i];//将缓存区中的chunk内容copy到buffer中
if (index == n) {//n个数据读取完毕
flag = false;
this.length -= n; //缓存区长度更新
let c = current.slice(i + 1);//获取完的chunk exp:123 => 3
if (c.length) {
this.cache.unshift(c);//将没有取完的chunk放回 exp: 3
}
break;
}
}
}
}
if(this.length === 0){//缓存中没有数据
this.needEmitReadable = true; 须要触发'readable'
}
if (this.length < this.highWaterMark) {//缓存区没有满,补充缓存区
this.reading = true;
this._read();
}
return buffer;//read()返回值为一个buffer
}
//第一次读取是内置的自动读取到缓存区
//而后触发readable是从缓存区中读取消耗的同时,而且也会补充缓存区
Readable.prototype._read = function(n) {
if (typeof this.fd !== 'number') {
return this.once('open', () => this._read());//由于fs.open是异步函数,当执行read必需要在open以后
}
let buffer = Buffer.alloc(this.highWaterMark);
//源码中经过Readable.prototype.push()调用readableAddChunk()再调用addChunk()
//这里经过fs.read来调用addChunk(this,bytesRead)
fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
addChunk(this,bytesRead);
})
}
//源码中经过Readable.prototype.push()调用readableAddChunk()再调用addChunk()
function addChunk(stream, chunk) {
stream.length += bytesRead; // 增长缓存的个数
stream.position += bytesRead;//记录文件读取的位置
stream.reading = false;
stream.cache.push(buffer);//数据放入到缓存中
if (stream.needEmitReadable) {
stream.needEmitReadable = false;
stream.emit('readable');
}
}
//源码中这个函数是经过howMatchToRead()调用的,由于howMatchToRead()在其余的流中也会用到,因此兼容了其余状况
function computeNewHighWaterMark(n) {
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
return n;
}
复制代码
const fs = require('fs');
const rs = fs.createReadStream('./1.txt',{//123456789
flags:'r',
autoClose:true,
highWaterMark:3,
encoding:null
});
rs.on('readable',function () {
// 若是缓存区没这么大会返回null
let r = rs.read(1);
console.log(r);
console.log(rs._readableState.length);
rs.read(1);
setTimeout(() => {//由于补充是异步的
console.log(rs._readableState.length);
}, 1000);
});
复制代码
ReadStream实际上是Readable的子类,它继承了Readable,以fs.createReadStream为例(node/lib/internal/fs/streams.js): 缓存
const EE = require('events');
const util = require('util');
const fs = require('fs');
function ReadStream (path,options) {
this.path = path;
this.flags = options.flags || 'r'; //用来标识打开文件的模式
this.encoding = options.encoding || null;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.start = options.start || 0; //读取(文件)的开始位置
this.end = options.end || null; //读取(文件)的结束位置
this.autoClose = options.autoClose || true;
this.flowing = null; // 默认非流动模式
this.position = this.start // 记录读取数据的位置
this.open(); // 打开文夹
this.on('newListener', function (type) {
if (type === 'data') { // 用户监听了data事件
this.flowing = true;
this.read();
}
})
}
ReadStream.prototype.read = function (){
if (typeof this.fd !== 'number') {// open操做是异步的,因此必须等待文件打开this.fd存在说明打开文件
return this.once('open', () => this.read());
}
let buffer = Buffer.alloc(this.highWaterMark); // 把数据读取到这个buffer中
//判断每次读取的数据是多少exp:数据源1234567890 highWaterMark=3
//最后一次读取长度为1
let howMuchToRead = Math.min(this.end - this.pos + 1, this.highWaterMark);
fs.read(this.fd, buffer, 0, howMuchToRead, this.position, (err, byteRead) => {
if (byteRead > 0) {
this.emit('data', buffer.slice(0, byteRead));
this.position += byteRead;//更新读取的起点
if (this.flowing) {//处在flowing模式中就一直读
this.read();
}
}else{//读取完毕
this.flowing = null;
this.emit('end');
if(this.autoClose){
this.destroy();
}
}
}
//经过flowing控制暂停仍是继续读取
ReadStream.prototype.pause = function(){
this.flowing = false;
}
ReadStream.prototype.resume = function(){
this.flowing = true;
this.read();
}
ReadStream.prototype.destroy = function () {
if (typeof this.fd != 'number') {
this.emit('close');
} else {
fs.close(this.fd, () => {
this.emit('close');
})
}
};
ReadStream.prototype.open = function() {
fs.open(this.path, this.flags, (err, fd) => {// fd文件描述符 只要文件打开了就是number
if (err) {
if (this.autoClose) { // 若是须要自动关闭 触发一下销毁事件
this.destroy(); // 销毁文件
}
return this.emit('error', err);
}
this.fd = fd;
this.emit('open', fd);
});
};
复制代码
let fs = require('fs');
let ReadStream = require('./ReadStream')
let rs = fs.createReadStream('1.txt',{//1234567890
encoding:null,
flags:'r+',
highWaterMark:3,
autoClose:true,
start:0,
end:3
});
let arr = [];
rs.on('open',function () {
console.log(' 文件开启了')
});
rs.on('data',function (data) {
console.log(data);
arr.push(data);
});
rs.on('end',function () { // 只有目标文件读取完毕后才触发
console.log('结束了');
console.log(Buffer.concat(arr).toString());
});
rs.pause()
setTimeout(function () {
rs.resume(); // 恢复的是data事件的触发
},1000)
rs.on('error',function (err) {
console.log('出错了')
})
rs.on('close',function () {
console.log('close')
});
复制代码
但愿这篇文章可以让各位看官对Stream熟悉,由于这个模块是node中的核心,不少模块都是继承这个模块实现的,若是熟悉了这个模块,对node的使用以及koa等框架的使用将大有好处,接下来会逐步介绍writable等流模式本文参考:bash