解读Node核心模块Stream系列一(Writable和pipe)

node中的流

  • node中stream模块是很是,很是,很是重要的一个模块,由于不少模块都是这个模块封装的:
  • Readable:可读流,用来读取数据,好比 fs.createReadStream()。
  • Writable:可写流,用来写数据,好比 fs.createWriteStream()。
  • Duplex:双工流,可读+可写,好比 net.Socket()。
  • Transform:转换流,在读写的过程当中,能够对数据进行修改,好比 zlib.createDeflate()(数据压缩/解压)。

系列连接

Writable

Writable的例子

  • 客户端上的 HTTP 请求
  • 服务器上的 HTTP 响应
  • fs 写入的流
  • zlib 流
  • crypto 流
  • TCP socket
  • 子进程 stdin
  • process.stdout、process.stderr

Writable的特色和简化实现

特色

  1. Writable拥有一个缓存数据的buffer,同时有一个length来记录buffer的长度
  2. Writable拥有一个highWaterMark来标明buffer的最大容量,若是length小于highWaterMark,则返回 true,不然返回 false
  3. Writable拥有writing来标识生产者正在增长length
  4. Writable拥有write()从写入缓存区数据的同时也会根据标志判断是否调用消费者消耗缓存区
  5. Writable经过clearBuffer来消费缓存区
  6. Writable订阅'drain'事件当一旦全部当前被缓冲的数据块都被排空了(被操做系统接受来进行输出)触发

构造函数

  1. Writable拥有一个缓存数据的buffer,同时有一个length来记录buffer的长度
  2. Writable拥有一个highWaterMark来标明buffer的最大容量,若是length小于highWaterMark,则返回 true,不然返回 false
  3. Writable拥有writing来标识生产者正在增长length
const EE = require('events');
const util = require('util');
const fs = require('fs');

function Writable(path,options) {//这个参数是源码没有的,这里主要是为了读取fs为案例加的
    EE.call(this);//构造函数继承EventEmiter
    
    this.path = path;
    this.autoClose = options.autoClose || true;
    this.highWaterMark = options.highWaterMark || 64 * 1024;//64k
    this.encoding = options.encoding || null;
    this.flags = options.flags || 'w';//// 这个源码没有的,这里主要是为了fs读取案例加的
    this.needEmitDrain = false;// 须要触发drain事件,默认不须要
    this.position = 0;// 偏移量
    this.cache = []; // 缓存区
    this.writing = false;// 是否正在从缓存中读取,生产者增长
    this.length = 0; // 缓存区大小,控制长度
    this.open(); // 这个源码没有的,这里主要是为了fs读取案例加的
}
util.inherits(Writable, EE);//原型继承EventEmiter
复制代码

write和_write

  1. Writable拥有write()从写入缓存区数据的同时也会根据标志判断是否调用消费者消耗缓存区
  2. Writable经过clearBuffer来消费缓存区
  3. Writable订阅'drain'事件当一旦全部当前被缓冲的数据块都被排空了(被操做系统接受来进行输出)触发
Writable.prototype.write = function (chunk, encoding=this.encoding, callback=()=>{}) {
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
    //第一次虽然数据没有放入到缓存,可是因为后面会调用_write会将这个长度减去,因此先加上,保证length的正确性
    this.length += chunk.length;
    if (this.length >= this.highWaterMark ) {//消耗缓存的长度大于缓存的最大容量触发drain
        this.needDrain = true; 
    }
    if (this.writing) {//若是正在执行写操做,则后面要写入目标的数据先存入缓存
        this.cache.push({
            chunk, encoding, callback
        })
    } else {// 没有执行写操做则执行写操做
        this.writing = true; 
        //源码中在这里调用dowrite()而后调用_write()和__writev()
        this._write(chunk, encoding, () => {callback();this.clearBuffer()});
    }
    return this.length < this.highWaterMark //若是缓存区的内容大于了highWaterMark 那就返回false
  }
  
// 源码中在write()中调用dowrite()而后调用_write()和__writev()来进行读操做
Writable.prototype._write = function (chunk, encoding, callback) {
    if (typeof this.fd !== 'number') {//这里是异步打开的操做,要保证有fd,没有则绑定once等文件open再触发
        return this.once('open', () => this._write(chunk, encoding, callback));
    }
    
    // 源码中clearBuffer()调用dowrite()来消耗缓存
    // 源码中dowrite()再调用onwriteStateUpdate()对length进行更新
    // 因此源码中这里不须要调用clearBuffer
    {
        this.position += bytesWritten // 位置增长便宜
        this.length -= bytesWritten;// 缓存长度更新
        callback();//里面包含了clearBuffer()    
    }
}

//源码中clearBuffer()实是在end的时候调用的,
//源码中clearBuffer()调用dowrite()而后调用_write()和__writev()来消耗内存
//源码中dowrite()再调用onwriteStateUpdate()对缓存length进行更新
//这里只是为了简化
function clearBuffer(){ 
    let obj = this.cache.shift(); 
    if(obj){
        this._write(obj.chunk,obj.encoding,()=>{obj.callback();this.clearBuffer()});
    }else{
        if(this.needDrain){
            this.writing = false;
            this.needDrain = false;
            this.emit('drain'); // 触发drain事件
        }
    }
 }
复制代码

WriteStream

WriteStream和writable的关系

WriteStream实际上是writabl的子类,它继承了writabl,以fs.createWriteStream为例(node/lib/internal/fs/streams.js) node

fs/streams
而后对上面的_write方法进行了覆盖:
fs/streams._write
以及对_writev方法进行了覆盖:
fs/streams._writev
而且在其上扩展了open和close:
fs/streams.read
fs/streams.read

WriteStream简化实现

只须要对上面的Writable进行showier的修改web

const EE = require('events');
const util = require('util');
const fs = require('fs');

function Writable(path,options) {
    EE.call(this);
    
    this.path = path;
    this.autoClose = options.autoClose || true;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.encoding = options.encoding || null;
    this.flags = options.flags || 'w';
    this.needEmitDrain = false;
    this.position = 0;
    this.cache = []; 
    this.writing = false;
    this.length = 0; 
    this.open(); 
}
util.inherits(Writable, EE);

Writable.prototype.write = function (chunk, encoding=this.encoding, callback=()=>{}) {
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
    this.length += chunk.length;
    if (this.length >= this.highWaterMark ) {
        this.needDrain = true; 
    }
    if (this.writing) {
        this.cache.push({
            chunk, encoding, callback
        })
    } else {
        this.writing = true; 
        this._write(chunk, encoding, () => {callback();this.clearBuffer()});
    }
    return this.length < this.highWaterMark 
  }
  
Writable.prototype._write = function (chunk, encoding, callback) {
    if (typeof this.fd !== 'number') {//这里是异步打开的操做,要保证有fd,没有则绑定once等文件open再触发
        return this.once('open', () => this._write(chunk, encoding, callback));
    }
    
    //将_write和fs.write结合
    //源码中是覆盖_write和_writev
    fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
        this.pos += bytesWritten 
        this.len -= bytesWritten;
        callback();
    });
}

Writable.prototype.destroy = function () {
    if (typeof this.fd != 'number') {
        this.emit('close');
    } else {
        fs.close(this.fd, () => {
            this.emit('close');
        })
    }
}
Writable.prototype.open = function () {
    fs.open(this.path, this.flags, (err, fd) => { // fd文件描述符 只要文件打开了就是number
        if (err) { // 销毁文件
            if (this.autoClose) { // 若是须要自动关闭 触发一下销毁事件
            this.destroy(); 
            }
            return this.emit('error', err);
        }
        this.fd = fd;
        this.emit('open', fd);
    });
};
function clearBuffer(){ 
    let obj = this.cache.shift(); 
    if(obj){
        this._write(obj.chunk,obj.encoding,()=>{obj.callback();this.clearBuffer()});
    }else{
        if(this.needDrain){
            this.writing = false;
            this.needDrain = false;
            this.emit('drain'); // 触发drain事件
        }
    }
 }
复制代码

pipe

pipe的使用

const fs = require('fs');
const ReadStream = require('./ReadStream');
const WriteStream = require('./WriteStream');
let rs = new ReadStream('./1.txt',{
    highWaterMark:4
});
let ws = new WriteStream('./3.txt',{
    highWaterMark:1
});
rs.pipe(ws);
复制代码

pipe的实现

因为pipe方法是在ReadStream上调用的,因此咱们能够修改上篇的ReadStream来实现,源码中Readable和Writable都有pipe的实现缓存

const EE = require('events');
const util = require('util');
const fs = require('fs');
function ReadStream (path,options) {
    this.path = path;
    this.flags = options.flags || 'r'; //用来标识打开文件的模式
    this.encoding = options.encoding || null;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.start = options.start || 0; //读取(文件)的开始位置
    this.end = options.end || null; //读取(文件)的结束位置
    this.autoClose = options.autoClose || true;
    this.flowing = null; // 默认非流动模式
    this.position = this.start // 记录读取数据的位置
    this.open(); // 打开文夹
    this.on('newListener', function (type) {
        if (type === 'data') { // 用户监听了data事件
            this.flowing = true;
            this.read();
        }
    })
}
ReadStream.prototype.read = function (){
    if (typeof this.fd !== 'number') {// open操做是异步的,因此必须等待文件打开this.fd存在说明打开文件
        return this.once('open', () => this.read());
    }
    let buffer = Buffer.alloc(this.highWaterMark); // 把数据读取到这个buffer中
    //判断每次读取的数据是多少exp:数据源1234567890 highWaterMark=3
    //最后一次读取长度为1
    let howMuchToRead = Math.min(this.end - this.pos + 1, this.highWaterMark);
    fs.read(this.fd, buffer, 0, howMuchToRead, this.position, (err, byteRead) => {
    if (byteRead > 0) {
        this.emit('data', buffer.slice(0, byteRead));
        this.position += byteRead;//更新读取的起点
        if (this.flowing) {//处在flowing模式中就一直读
            this.read();
        }
    }else{//读取完毕
        this.flowing = null;
        this.emit('end');
        if(this.autoClose){
            this.destroy();
        }
    }
}
//经过flowing控制暂停仍是继续读取
ReadStream.prototype.pause = function(){
    this.flowing = false;
}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}
ReadStream.prototype.pipe = function (ws){
    this.on('data', (data)=> {
        let flag = ws.write(data);//读完以后写,根据flag判断不须要读操做来增长缓存的长度
        if (!flag) {
            this.pause();
        }
    });
    ws.on('drain',()=> {//当写完缓存以后,lenght=0,发射drain来恢复读取往缓存中添加内容
        this.resume();
    })
  }
ReadStream.prototype.destroy = function () {
    if (typeof this.fd != 'number') {
        this.emit('close');
    } else {
        fs.close(this.fd, () => {
        this.emit('close');
        })
    }
};

ReadStream.prototype.open = function() {
    fs.open(this.path, this.flags, (err, fd) => {// fd文件描述符 只要文件打开了就是number
        if (err) {
            if (this.autoClose) { // 若是须要自动关闭 触发一下销毁事件
            this.destroy(); // 销毁文件
        }
        return this.emit('error', err);
    }
    this.fd = fd;
    this.emit('open', fd);
    });
};
复制代码

结语:

但愿这篇文章可以让各位看官对Stream熟悉,由于这个模块是node中的核心,不少模块都是继承这个模块实现的,若是熟悉了这个模块,对node的使用以及koa等框架的使用将大有好处,接下来会逐步介绍其余流模式本文参考:bash

  1. 深刻理解Node Stream内部机制
  2. node API
  3. node 源码
相关文章
相关标签/搜索