through2原理解析

写在前面

through2常常被用于处理nodestream,假如使用过gulp的话,对于这个包必定不会陌生,如:node

gulp.task('rewrite', () => {
  return gulp.src('./through/enter.txt')
    .pipe(through2.obj(function(chunk, enc, callback) {
      const { contents } = chunk;
      for (var i = 0; i < contents.length; i++) {
        if (contents[i] === 97) {
          contents[i] = 122;
        }
      }

      chunk.contents = contents;
      this.push(chunk);

      callback();
    }))
    .pipe(gulp.dest('./dist'));
});

这里将文件中全部的字符a转换为字符z,在写gulp插件时必定会应用到这个包,下面就来窥探一下这个使用率很是高的包。git

Transform stream

through2的源码仅仅就100多行,本质上就是对于node原生的transform流进行的封装,先来看下Transform streamTransform是一个双工流,既可读,也可写,可是与Duplex仍是有着一些区别,Duplex的写和读能够说是没有任何的关联,是两个缓冲区和管道互补干扰,而Transform将其输入和输出是存在相互关联的,中间作了处理。具体差异能够参考下面图片对比:github

Duplex stream:gulp

Duplex 图示

Transform stream:api

transform 图示

Transform stream的两个缓存区相互关联,对于每一个缓冲区来讲,highWaterMark为阈值,超过阈值后,将会中止读或者写操做,如:缓存

let i = 0;
const readable = Readable({
  highWaterMark: 2,
  read: function () {
    var data = i < 26 ? String.fromCharCode(i++ + 97) : null;
    console.log('push', data);
    this.push(data);
  }
});

const transform = Transform({
  highWaterMark: 2,
  transform: function (buf, enc, next) {
    console.log('transform', buf.toString());
    next(null, buf);
  }
})

readable.pipe(transform);

stream流向为:函数

因为阈值为2,因此只能pushf,这时readable的缓存区已满,transform的读缓存区和写缓存区已经满了(因为transform的两个缓存区的阈值为2,因此写缓存区在写入b以后就已经满了,后续不能继续写入),所有满以后,天然中止了读取,最终e,f存在A中,c,d存在B中,a,b存在C中,想要解决很简单,在添加一个流向就能够:oop

readable.pipe(transform).pipe(process.stdout);

through2源码

在了解Transform stream以后,through2的源码很是的简单,就是对于其的一层封装,暴露出三个api(through2through2.objthrough2.ctor)并且三者接收的参数一致,由于都是由一个工厂方法创造出的:ui

function through2 (construct) {
  return function (options, transform, flush) {
    // 作了一些参数整理
    if (typeof options == 'function') {
      flush     = transform
      transform = options
      options   = {}
    }

    if (typeof transform != 'function')
      transform = noop

    if (typeof flush != 'function')
      flush = null

    return construct(options, transform, flush)
  }
}

来看一下through2对于Transform stream的再加工,也就是源码中的DestroyableTransform,与其名字同样,就是一个替咱们实现好了destory方法的Transform streamthis

DestroyableTransform.prototype.destroy = function(err) {
  if (this._destroyed) return
  this._destroyed = true

  var self = this
  // 触发destory后,close掉流
  process.nextTick(function() {
    if (err)
      self.emit('error', err)
    self.emit('close')
  })
}

through2through2.obj所有是创造出一个再加工后的Transform,区别以下:

  • 后者开启了对象模式(objectMode属性为true),写入的参数不单单限制在string or uint8Array
  • 后者下降了阈值(highWaterMark16,而不是默认的16kb),这样作的缘由,是为了和node的默认保持一致,具体能够参见这里

through2.ctor能够用来再次定制,其返回的是一个构造函数,用法能够参考下面:

const Tran = through.ctor(function(chunk, enc, callback) {
  console.log('transform', chunk.toString());
  callback(null, chunk);
});
const transform = new Tran();

写在最后

streamnode中有着很是普遍的应用,可是它使用起来却不是那么友好,throgh2的出现能够减小使用上的麻烦,其原理也很是的简单;以上内容均为本人理解,若有错误还请指出,不胜感激~

相关文章
相关标签/搜索