版本:8.1.3javascript
stream
模块的API被设计成可以很容的使用javascript
的原型继承模式来实现streams
。
首先,stream的开发者必须先声明一个新的javascript类,而且继承如下四个基础stream类中的一个,并确保他们适当的调用父类的构造函数。java
const { Writable } = require('stream'); class MyWritable extends Writable { constructor(options) { super(options); // ... } }
而且新的stream类必须实现所继承的类的一个或者更多的特定方法。具体以下表所示:es6
Use-case | Class | Method(s) to implement |
---|---|---|
Reading only | Readable | _read |
Writing only | Writable | _write,_writev, _final |
Reading and Writing | Duplex | _read, _write,_writev, _final |
Operate on written data, then read the result | Transform | _transform, _flush, _final |
注意实现stream的代码毫不能调用stream"public"
方法,由于那是为了给消费者使用的。若是那样可能会对应用在在使用流时致使不利的影响。segmentfault
虽然有不少简单的实例,可是也可以直接构造一个stream
而不依赖任何继承。可以直接使用stream.Writable
,stream.Readable
,stream.Duplex
或者stream.Transform
来建立实例对象而且传递适当的方法做为构造函数的options。
例如:缓存
const { Writable } = require('stream'); const myWritable = new Writable({ write(chunk, encoding, callback) { // ... } });
stream.Writable
被用来扩展实现一个Writable
流。安全
自定义一个Writable
流必须调用new stream.Writable([options])
构造函数而且实现writable._write()
方法。writable._wirtev()
方法也许也要实现。app
options <Object>
异步
highWaterMark <number>
stream.write()
开始返回false
的最高Buffer
限制。默认值为16384(16kb)
,对象模式是16
。socket
decodeStrings <boolean>
传入方法stream._write()
以前是否对字符串解码,默认是true
。函数
objectMode <boolean>
stream.write(anyOjb)
方法是否有效。当stream
实现支持,而且设置该值为true
,那么将可以写进一个javascript
的值而不是字符串,Bufer
,或者Uint8Array
。默认值为false
。
write <Function>
实现stream._write()
方法。
writev <Function>
实现stream._writev()
方法。
destroy <Function>
实现stream._destroy()
方法。
final <Function>
实现stream._final()
方法。
实例:
const { Writable } = require('stream'); class MyWritable extends Writable { constructor(options) { // Calls the stream.Writable() constructor super(options); // ... } }
chunk <Buffer>
|<string>
|<any>
被写入的chunk
,在不设置decodeStrings
项为false
的状况下将会是Buffer
。或者stream
在对象模式下工做。
encoding <string>
若是chunk
是一个字符串,那么encoding
是该字符串的格式。若是chunk
是Buffer
,或者stream
是对象模式,encoding
将会被忽视。
callback <Function>
(会有一个error
参数),在chunk
被彻底提供时,调用这个函数。
一个Writable stream
必须实现和提供一个writable._write()
方法来发送数据给潜在的源。
注意 Transform stream
提供他们本身的该方法writable._write()
的实现。
注意该方法不可以被应用直接调用,它在子类中实现,而且只能被Writable
类内部的方法调用。
回调函数必须被调用,用来讲明数据写入成功仍是出现了错误。第一个参数必须是Error
,若是写入失败那么他是一个错误对象,若是成功那么是null
。
咱们应该着重注意到,writable.write()
的调用发生在writable._write()
被调用和回调函数被调用之时,此时写入的数据将被缓存。一旦回掉函数 被调用,stream
将会发出drain
事件,若是stream
被实现为可以一次处理多chunk
的功能,那么writable._writev()
方法应该被实现。
若是decodeStrings
属性在构造函数的options
中设置,那么chunk
应该是一个字符串而不是Buffer
,此时encoding
将会检测字符串字符的编码。这个能够用来优化被传入的字符串已知其编码格式。若是decodeStrings
明确设置为false
,那么encoding
参数会被安全的忽视,而且chunk
将被保持为同一个对象而后传入.write()
。
writable._write()
被设置为有一个前置下划线,由于这是类内部定义的,而且不可以被外部程序直接调用。
chunks <Array> 被写入的chunk,每一个chunk有这样的格式:{chunk:..., encoding:...}。
callback <Function> (会有一个error
参数),在chunk
被彻底提供时,调用这个函数。
注意该方法不可以被应用直接调用,它在子类中实现,而且只能被Writable
类内部的方法调用。
在实现了writable._write()方法后也相互会实现writable._writev()方法,用来实现一次处理多个chunk。
err
<Error>
一个错误。
callback
<Function>
一个带有可操做的错误的参数,在writable被毁掉时调用。
callback
<Function>
当你结束等待的书局时调用。(可添加一个可选的错误参数。)
注意 _final()
不可以被直接调用,在实现子类时,他只能被内部的Writable类调用。
可选的函数会在流结束以前调用,finish事件会在回调函数执行以后触发。这在关闭源或者在流结束前写入缓存数据时是有用的。
建议在使用writable._write()以及writable._writev()时在回调函数的第一个参数写入err,以防执行时发生错误。这个会致使error
时间触发。把错误抛弃可能会致使依赖流的一些行为发生意外或矛盾。使用回掉函数保证处理可遇到的错误。
const { Writable } = require('stream'); const myWritable = new Writable({ write(chunk, encoding, callback) { if (chunk.toString().indexOf('a') >= 0) { callback(new Error('chunk is invalid')); } else { callback(); } } });
stream.Readable
类被用来实现一个Readable1
流。
自定义的Readable流必须调用new stream.Readable([options])
构造函数,而且实现readable._read()
方法。
options <Object>
highWaterMark <number>
在内部buffer存储的最大字节数,数据为在打包读取底层数据源以前。默认为16384(16kb)
,objectMode
模式为16。
encoding <string>
若是设置,那么buffer将被格式化为指定格式的字符串。默认值为null
。
objectMode <boolean>
设置这个流的表现形式是流仍是对象。决定着stream.read(n)
返回的是单一的值仍是一个必定大小的Buffer。默认是false。
read <Function>
stream._read()
方法的实现。
destory <Function>
stream._destroy()
方法的实现。
例子:
es6
const {Readable} = require('stream') class MyReadable extends Readable{ constructor(options){ //调用stream.Readable(options)构造函数 super(options) //。。。 } }
或者ES6版本以前的构造函数语法风格es5
const {Readable} = require('stream') const util = require('util') function MyReadable(options){ if(!(this instanceof MyReadable)) return new MyReadable(options) Readable.call(this, options) } util.inherits(MyReadable, Readable)
size
<number>
异步读取的字节大小。
注意 这个函数必定不能被应用的代买直接调用,他应该在子类中实现,而且只可以被内部Readable类方法调用。
全部的Readable流的实现必须提供一个readable._read()
方法的实现,用来取得来自底部的数据源数据。
当调用readable._read()
若是自源的数据可用,那么应该使用readable.push(dataChunk)
开始把数据推动读取队列。_read()
应该持续凑够源读取数据而后推动队列,直至readable.push()
返回false
。只有再次调用readable._read()
那么原先中止的流才会再次恢复添加数据到队列。
注意一旦readable._read()
方法被调用,那么在readable.push()
被调用前是不用再次调用的。
size
参数是一个公告,在read是个单一操做,那么返回的数据可使用size来限制大小。其余的实现也许会忽视这个参数,而且仅在数据可读取是才提供。调用stream.push(chunk)
时不必等待size
足够再调用。
readable._read()
是底层源的体现设置,由于只可以在类的内部定义它,因此永远不能在其余用户的程序里直接调用。
chunk <Buffer>
|<Uint8Array>
|<string>
|<null>
|<any>
Chunk的数据要被推动读取队列。chunk必须是一个字符串,Buffer或者Uint8Array。若是是对象模式,那么chunk或许是个js值。
encoding <string>
对chunk的字符串编码。不准是一个有效的Buffer编码格式,例如'utf8'
或者'ascii'
。
returns <boolean>
`true 若是添加的chunk可以继续push;
false`反之。
若是chunk
是一个Buffer
,Uint8Array
或者string
,那么chunk
将被添加到内部队列等待流用户来消费。传递的chunk
是一个null
,说明到了流的结束,以后便不会有数据被写入。
当Readable在paused
模式下操做时,readable.push()
方法可以添加从readable.read()(被
readable方法触发)
方法中读取的数据。
当Readable在flowin
g模式下操做时,readable.push()
方法更够在data
事件触发下传递数据。
readable.push()
被设计的很灵活,例如,当封装从pause/resume机制或者回到函数的chunk提供的一些数据这些底层源可以 被自定义的readale实例来封装,以下:
// source is an object with readStop() and readStart() methods, // and an `ondata` member that gets called when it has data, and // an `onend` member that gets called when the data is over. class SourceWrapper extends Readable { constructor(options) { super(options); this._source = getLowlevelSourceObject(); // Every time there's data, push it into the internal buffer. this._source.ondata = (chunk) => { // if push() returns false, then stop reading from source if (!this.push(chunk)) this._source.readStop(); }; // When the source ends, push the EOF-signaling `null` chunk this._source.onend = () => { this.push(null); }; } // _read will be called when the stream wants to pull more data in // the advisory size argument is ignored in this case. _read(size) { this._source.readStart(); } }
注意readable.push()
只可以被Readable的实现者调用,而且只可以在readable._read()
里面。
建议在使用readable._read()
方法发生错误时触发error
事件,并且不是抛弃。在readable_read()
里面抛弃错误会在操做流是否依赖flowing
或者parse
模式形成矛盾,以及意外。使用error
事件确保一致以及处理可预测的错误。
const { Readable } = require('stream'); const myReadable = new Readable({ read(size) { if (checkSomeErrorCondition()) { process.nextTick(() => this.emit('error', err)); return; } // do some work } });
const { Readable } = require('stream'); class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000000; this._index = 1; } _read() { const i = this._index++; if (i > this._max) this.push(null); else { const str = '' + i; const buf = Buffer.from(str, 'ascii'); this.push(buf); } } }
一个Duplex
流及实现了可读流又实现了可写流,例如TCP socket
链接(这样翻译顺点)。
因为javaScript
不能多个继承,stream.Duplex
类被用来实现双向流/Duplex
的一个类(毕竟同时继承stream.Readable
和stream.Writable
类是相抵触的)。
注意 stream.Duplex
是原型继承stream.Readable
类,寄生继承stream.Writable
,可是instanceof
将会正确表示基于两者,由于该类覆盖了stream.Writable
的Symbole.hasInstance
方法。
自定义的Duplex
流必须调用new stream.Duplex([options])
构造函数以及实现readable._read()
和writable._write()
方法。
options <object>
同时传给可读流和和谐刘的构造函数,而且额外有下面的域。
allowHalfOpen <boolean>
默认true
,若是设置为false
,流会在可写流结束时中止可读流,反之亦然。
readableObjectMode <boolean>
默认false
。 设置流的可读流模式为objectMode
,若是为true
则没有效果。
writableObjectMode <boolean>
默认false
。 设置流的可写流模式为objectMode
,若是为true则没有效果。
例子:
lang:ES6
const { Duplex } = require('stream'); class MyDuplex extends Duplex { constructor(options) { super(options); // ... } }
或者使用es6版本以前的构造函数风格。
const { Duplex } = require('stream'); const util = require('util'); function MyDuplex(options) { if (!(this instanceof MyDuplex)) return new MyDuplex(options); Duplex.call(this, options); } util.inherits(MyDuplex, Duplex);
下面是一个Duplex
流的简单示例,虽然Node.js
的streams
的API是不兼容的,可是咱们向这个包了个底层源对象写入数据,同时也可用从其中读取数据。该例子演示向Writable
接口中写入数据,从Readable
接口中读取数据。
lang:ES6
const { Duplex } = require('stream'); const kSource = Symbol('source'); class MyDuplex extends Duplex { constructor(source, options) { super(options); this[kSource] = source; } _write(chunk, encoding, callback) { // The underlying source only deals with strings if (Buffer.isBuffer(chunk)) chunk = chunk.toString(); this[kSource].writeSomeData(chunk); callback(); } _read(size) { this[kSource].fetchSomeData(size, (data, encoding) => { this.push(Buffer.from(data, encoding)); }); } }
这个语法形式中最重要的是尽管可读流和可写流共存于一个对象实例,可是对它们进行的相对对立的操做。
在Duplex流中,能够根据设置中的readableObjectMode
或者writableObjectMode
项把可读流或者可写流其中之一设置为对象模式。
下面的例子,是一个Transform
流(Duplex流的一种)的例子,其建立的可写流是一个对象模式,接受一个可读流传来的数据,该可读流接受javaScript数字而且转换成十六进制的字符串。
lang:ES6
const { Transform } = require('stream'); // All Transform streams are also Duplex Streams const myTransform = new Transform({ writableObjectMode: true, transform(chunk, encoding, callback) { // Coerce the chunk to a number if necessary chunk |= 0; // Transform the chunk into something else. const data = chunk.toString(16); // Push the data onto the readable queue. callback(null, '0'.repeat(data.length % 2) + data); } }); myTransform.setEncoding('ascii'); myTransform.on('data', (chunk) => console.log(chunk)); myTransform.write(1); // Prints: 01 myTransform.write(10); // Prints: 0a myTransform.write(100); // Prints: 64
一个Transfrom
流是一个把输入从新计算过的Duplex
流。包括可以压缩的[zlib][]
流或者对数据加密解密的crypto流。
注意 streams
没有输入和输出的数据必须大小一致,chuank
的数量一致,或者同时送达,的规范要求。例如,一个Hash
流在输入结束时只会有一个输出chunk
。一个zlib
流产生的输出可能比输入的大也可能小。
stream.Transform
类用来扩展实现Transform
类。
stream.Transform
类原型继承于stream.Duplex
类,而且须要实现本身的writable._write()
和readable._read()
方法。自定义的Transform类还必须实现transform._transform()
方法,必要时也须要实现transform._flush()
方法。
注意 在使用Transfrom流,往流中写入数据时要特别当心,由于若是Reable一方没有消费者是可能到致使Writable一方的流暂停。
options <Object>
传递给Writable
和Readalbe
构造函数,同时也包含如下的域。
transform <Function>
实现stream._transform()
方法。
flush <Function>
实现stream._flush()
方法。
例子:
ES6
const { Transform } = require('stream'); class MyTransform extends Transform { constructor(options) { super(options); // ... } }
es6以前版本的构造函数风格。
const { Transform } = require('stream'); const util = require('util'); function MyTransform(options) { if (!(this instanceof MyTransform)) return new MyTransform(options); Transform.call(this, options); } util.inherits(MyTransform, Transform);
finish
和end
事件finish
和end
来自继承的stream.Writable
以及stream.Readable
类。finish
在stream.end()
调用是触发,此时全部chunk
已经被stream._transform()
处理完毕。end
事件在全部的数据被输出,执行transform._flusn()
方法的回调函数时触发。
callback <Function>
回调函数(另外有可选的error,和data参数),当数据流完时执行。
注意 这个方法绝对不能应用的代码直接调用。其只应该被继承的子类的Readable
类中的方法调用。
在一些状况下,一个transform操做须要发出一些额外的数据在流的结尾。例如,一个zlib
压缩流会保存一些有关内部状态的数据来优化压缩输出。当流结束,然而额外的信息须要被发出来,至此整个压缩过程才算结束。
自定义的Transform
类实现也许会实现transform._flush()
方法。当没有更多的数据须要被写入时会被触发,可是会在end
事件发出告诉Readable
流结束以前。
实现transform._flush()
方法,可能会在内部适当调用readable.push()
方法零次或屡次。在flush
操做结束后必须调用回调函数。
transform._flush()
方法有一个前置的下划线,代表这是一个类内部定义的,永远不该该被用户的程序直接调用。
chunk <Buffer>
|<string>
|<any>
须要被转换的chunk。在不设置decodeString
参数为false
或者不为对象模式状况下,用仍是buffer.
encoding <string>
若是chunk是字符串,那么该参数设置其编码类型。若是chunk是buffer,那么这个值是'buffer'
,这种状况下会忽略。
calback <Function>
一个回调函数(带有可选的error和data参数),提供的chunk
被处理后会被调用。
注意 这个方法绝对不能应用的代码直接调用。其只应该被继承的子类的Readable
类中的方法调用。
全部的Transofrom流的实现都必须提供一个_transform()
方法类接受输入,产生输出。transfrom._transform()
方法实现处理被写入的字节,计算输出,而后从readable部分使用readable.push()
传出。
transform.push()
方法可能会被屡次调用,根据input chunk来生成output。取决于chunk能产生多少output结果。
有时也可能发生不管什么样的数据chunk也产生不了output。
当前的chunk被消耗完了以后必须调用回调函数。回调函数的第一个参数必须是Error
对象,若是发生错误该对象会被传递,不然能够是null
。若是有第二个参数,那么它将被推荐到readable.push()
方法。简单来讲,以下:
transform.prototype._transform = function(data, encoding, callback) { this.push(data); callback(); }; transform.prototype._transform = function(data, encoding, callback) { callback(null, data); };
此外,该方法也是一个类的内部方法,不该该被用户程序直接调用。
stream.PassThough
类是一个Transform
流的一个琐碎实现,可以简单的传递input字节到output。原本的目的是检验和测试。可是也有一些场景,例如整理小说章节分类。