http://segmentfault.com/a/1190000003479884html
|
来表示流;node中经过pipe方法在node stream中能够看到第一段的描述:node
A stream is an abstract interface implemented by various objects in Node. For example a request to an HTTP server is a stream, as is stdout. Streams are readable, writable, or both. All streams are instances of EventEmitter
对上面一段话进行解析,能够获得以下几点:git
Readable Stream
是提供数据的Stream,外部来源的数据均会存储到内部的buffer数组内缓存起来。github
writeable Stream
是消费数据的Stream,从readable stream
中获取数据,而后对获得的chunk块数据进行处理,至于如何处理,就依赖于具体实现(也就是_write的实现)。segmentfault
首先看看Readdable Stream
与writeable stream
两者之间的流动关系:api
stream内部是如何从readable stream流到writeable stream里面呢?有两种方法:数组
a) pipe 链接两个stream缓存
先看一个简单地demo工具
var Read = require('stream').Readable; var Write = require('stream').Writable; var r = new Read(); var w = new Write(); r.push('hello '); r.push('world!'); r.push(null) w._write = function (chunk, ev, cb) { console.log(chunk.toString()); cb(); } r.pipe(w);
pipe是一种最简单直接的方法链接两个stream,内部实现了数据传递的整个过程,在开发的时候不须要关注内部数据的流动:ui
Readable.prototype.pipe = function (dest, pipeOpts) { var src = this; ... src.on('data', ondata); function ondata(chunk) { var ret = dest.write(chunk); if (false === ret) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; src.pause(); } } ... }
b) 事件data + 事件drain联合实现
var Read = require('stream').Readable; var Write = require('stream').Writable; var r = new Read(); var w = new Write(); r.push('hello '); r.push('world!'); r.push(null) w._write = function (chunk, ev, cb) { console.log(chunk.toString()); cb(); } r.on('data', function (chunk) { if (!w.write(chunk)) { r.pause(); } }) w.on('drain', function () { r.resume(); }) // hello // world!
Readable Stream
存在两种模式(flowing mode 与 paused mode),这两种模式决定了chunk数据流动的方式---自动流动仍是手工流动。那如何触发这两种模式呢:
让咱们再深刻一些,看看里面具体是如何实现的:
// data事件触发flowing mode Readable.prototype.on = function(ev, fn) { ... if (ev === 'data' && false !== this._readableState.flowing) { this.resume(); } ... } // resume触发flowing mode Readable.prototype.resume = function() { var state = this._readableState; if (!state.flowing) { debug('resume'); state.flowing = true; resume(this, state); } return this; } // pipe方法触发flowing模式 Readable.prototype.resume = function() { if (!state.flowing) { this.resume() } }
结论
a. paused mode
在paused mode下,须要手动地读取数据,而且能够直接指定读取数据的长度:
var Read = require('stream').Readable; var r = new Read(); r.push('hello'); r.push('world'); r.push(null); console.log('输出结果为: ', r.read(1).toString()) // 输出结果为: 'h'
还能够经过监听事件readable,触发时手工读取chunk数据:
var Read = require('stream').Readable; var r = new Read(); r.push('hello'); r.push('world'); r.push(null); r.on('readable', function () { var chunk = r.read(); console.log('get data by readable event: ', chunk.toString()) }); // get data by readable event: hello world!
须要注意的是,一旦注册了readable事件,必须手工读取read数据,不然数据就会流失,看看内部实现:
function emitReadable_(stream) { debug('emit readable'); stream.emit('readable'); flow(stream); } function flow(stream) { var state = stream._readableState; debug('flow', state.flowing); if (state.flowing) { do { var chunk = stream.read(); } while (null !== chunk && state.flowing); } } Readable.prototype.read = function (n) { ... var res = fromList(n, state); if (!util.isNull(ret)) { this.emit('data', ret); } ... }
flow方法直接read数据,将获得的数据经过事件data交付出去,然而此处没有注册data事件监控,所以,获得的chunk数据并无交付给任何对象,这样数据就白白流失了,因此在触发emit('readable')时,须要提早read数据。
b. flowing mode
经过注册data、pipe、resume能够自动获取所须要的数据,看看内部实现:
// 事件data方式 var Read = require('stream').Readable; var r = new Read(); r.push('hello '); r.push('world!'); r.push(null) r.on('data', function (chunk) { console.log('chunk :', chunk.toString()) }) // chunk : hello // chunk : world!
// 经过pipe方式 var r = new Read(); r.push('hello '); r.push('world!'); r.push(null) r.pipe(process.stdout) // hello world!
c. 两种mode的总结
用过browserify的人都知道,browserify是一种基于stream的模块打包工具,里面存在browserify.prototype.transform(tr)方法,其中的tr就要求是transform stream,且browserify内部经过through2
构建了不少tranform stream。也能够说browserify是创建在transform stream的基础上。那么具有readable、writeablestream的transform stream内部是如何工做的呢?
自定义stream很简单,只要实现相应的内部待实现方法就能够了,具体来讲:
// 自定义readable stream的实现 var Stream = require('stream'); var Read = Stream.Readable; var util = require('util'); util.inherits(MyReadStream, Read); function MyReadStream(data, opt) { Read.call(this, opt); this.data = data || []; } MyReadStream.prototype._read = function () { var _this = this; this.data.forEach(function (d) { _this.push(d); }) this.push(null); } var data = ['aa', 'bb', 'cc']; var r = new MyReadStream(data); r.on('data', function (chunk) { console.log(chunk.toString()); })