在开发中直接接触Transform流的状况不是不少,每每是使用相对成熟的模块或者封装的API来完成流的处理,最为特殊的莫过于through2模块和gulp流操做。那么,Transform流到底有什么特色呢?html
从名称上说,Transform意为处理,相似于生产流水线上的每一道工序,每道工序针对到来的产品做相应的处理;从结构上看,Transform是一个双工流,通俗的解释它既能够做为可读流,也可做为可写流。可是,node却对Transform流针对其特性作了更为特殊的定制,使Transform不是单纯的Duplex流。node
Transform流因为包含了Readable和Writeable特性,所以Transform在实际使用中有着多种方式:它既能够只做为消费者消费数据,也可同时做为生产者和消费者完成数据中间处理。下面将逐渐深刻内部阐述Transform的运行机理及使用技巧。c++
上图表示一个Transform实例的组成部分:Readable部分缓冲(数组)、内部_read函数、Writeable部分缓冲(链表)、内部_write函数、Transform实例必须实现的内部_transform函数以及系统提供的回调函数afterTransform。因为Transform实例同时拥有两部分缓冲,所以2个缓冲的存储、消耗的顺序也就须要了解,这对于后面使用原生Transform编写代码有很大的指导意义。gulp
传统意义的流(即Readable和Writeable)的实现者都须要实现对应的内部函数_read()和_write(),对于Readable实例而言,_read函数用于准备从源文件中获取数据并添加到读缓冲中;对于Writeable实例_write函数则从写缓冲链表中一次刷入到磁盘中。它们分别对应了读写流程的首尾步骤,具体能够关注node中的Stream一文。数组
而Transform中的_read和_write函数的实现大有不一样,因为须要兼顾流的处理,所以着重分析Transform的内部函数执行流程。缓存
示例demo: readable.pipe(transform);
以上段示例代码为例,transform做为消费者消费readable。
Transform的实例transform拥有transormState和readableState属性,保存了相关属性,如tranform状态信息、回调函数存储和编码等。transform做为消费者,会在其write函数中消费数据,在node中的Stream文中介绍了write函数的实现细节,经过内部调用_write函数实现数据的写入。而在Transform中_write函数已经重写:架构
若是一切顺利,readable的数据会顺利执行transform的**write->_write->_read**,那么本来负责填充读缓冲的_read在Transform中发生了哪些改变呢?函数
Transform.prototype._read = function(n) { var ts = this._transformState; if (ts.writechunk !== null && ts.writecb && !ts.transforming) { ts.transforming = true; this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); } else { // mark that we need a transform, so that any data that comes in // will get processed, now that we've asked for it. ts.needTransform = true; } };
可见,_read的实现很是简单,根据条件选择执行_transform函数。须要注意的是_read的参数n并未有使用,由于是否插入数据至读缓冲是由开发者在_transform中来决定。相信你们对_transform函数并不陌生,node规定Transform实例必须提供_transform函数,而该函数正是在_read中调用。oop
_transform有三个参数,第一个为待处理的chunk数据,第二个为编码,第三个为回调函数。前两个参数很好理解,咱们能够在_transform中尽情的处理数据,最后调用回调函数完成处理。那么,这个回调函数到底是什么? 它就是Transform架构图中的afterTransform函数,它有几个功能:ui
可见,在afterTransform函数执行后,才基本宣告transform第一阶段的结束。为什么是第一阶段呢?由于transform才完成了做为消费者(即Writeable)的做用,若是用户在_transform中传入了数据到读缓冲区,那么此时transform也同时是一个生产者,提供数据让后面的消费者消费数据,这就涉及到了Transform使用上的问题。
const stream = require('stream') var c = 0; const readable = stream.Readable({ highWaterMark: 2, read: function () { var data = c < 26 ? String.fromCharCode(c++ + 97) : null; console.log('push', data); this.push(data); } }) const transform = stream.Transform({ highWaterMark: 2, transform: function (buf, enc, next) { console.log('transform', buf.toString()); next(null, buf); } }) readable.pipe(transform);
示例代码很简单,建立了一个可读流,向消费者提供a-z的小写字母;建立了一个转换流,在_transform函数中针对数据并不作处理仅做打点输出,并向回调函数传递数据至读缓冲区。咱们的目的是经过transform输出26个小写字母,可是当前程序执行的结果并不让人满意:
执行结果: push a push b transform a push c transform b push d push e push f
tranform仅仅处理到字母b,readable也仅仅提供了a-f的数据便戛然而止,这是为什么?
这一切都归结于transform对象。认真读过上文后咱们知道,全部的Transform实例同时有两个缓冲区,其中写缓冲区用来接收生产者的数据进行转换操做,读缓冲区则缓存数据给消费者使用。而在当前的实现中,transform._transform函数输出了待处理数据,同时执行next(null, buf);。该函数上文已有分析,即afterTransform函数,第一个参数为Error实例,第二个则为存入读缓冲区的数据。在本例中,执行完_transform后将处理后的数据存入读缓冲区,等待后面的消费者消费读缓冲区的数据。但是,transform后面没有消费者了,所以transform在处理完字母b存入读缓冲区后,读缓冲区已经满了(设定highWaterMark为2,即读写缓冲区的最大值均为2字节)。当字母c、d也执行到tranform._write后,因为不知足执行transform._read的条件没法执行transform._transform函数,更没法执行afterTransform函数,致使没法刷新写缓冲区的数据,形成字母c、d贮存在写缓冲区。而字母e、f则因为transform的写缓冲区满(transform.write()返回false),只有存储在readable的读缓冲区中,等待消费。这就形成了死循环,readable和transform全部的缓冲区都满了,流也就中止了。
解决这个问题的方法很简单,有两种不一样方案:
其实本质上都是让transform的读缓冲区获得消耗。
第一种方案:
保证transform的读缓冲区为空: const transform = stream.Transform({ highWaterMark: 2, transform: function (buf, enc, next) { console.log('transform', buf.toString()) next(null, null) } })
只需向next函数传入null便可,这样transform消费完数据后即宣告数据处理结束,读缓冲区始终为空。
第二种方案:
添加消费者: const transform = stream.Transform({ highWaterMark: 2, transform: function (buf, enc, next) { console.log('transform', buf.toString()) next(null, buf) } }) readable.pipe(transform).pipe(process.stdout);
transform实现不变,只是添加了消费者process.stdout。这样也同时保证了transform的读缓冲区处于可添加状态,也给了afterTransform函数刷新写缓冲区的机会,开启新的数据处理流程。
through2的重头戏在于Transform流,使用through2的API可方便的建立一个Transform实例,完成数据流的处理。
function through2 (construct) { return function (options, transform, flush) { if (typeof options == 'function') { flush = transform transform = options options = {} } if (typeof transform != 'function') transform = noop if (typeof flush != 'function') flush = null return construct(options, transform, flush) } } module.exports = through2(function (options, transform, flush) { var t2 = new DestroyableTransform(options) t2._transform = transform if (flush) t2._flush = flush return t2 })
可见,through2模块仅仅是封装了Transform的构造函数,并封装了更为易用的objectMode模式。之因此建议使用through2建立Transform对象,不只仅是由于其提供了方便的API,更主要的是为了兼容性。Transform对象是属于Stream2.0的特性,早先版本的node并无实现,而经过through2建立的Transform实例在以前版本的node下仍可正常使用,这是因为through2并未引用node默认提供的stream模块,而是使用社区中较为流行的“readable-stream”模块。
本文旨在深刻through2中的使用的Transform流进行探究,并做为上一篇文章node中的stream的回顾和应用。经过文末简单的示例了解Transform在开发中可能出现的问题,学会随意切换Transform的生产者和消费者的身份,更好的指导实际开发。