https://github.com/kumavis/obj-multiplexhtml
simple stream multiplexing for objectMode
node
其实就是一个多路复用流可以使用name来区分各个子流,以达到一个parent流下其实有多个子流在运行,能够经过多个子流来读入写出数据,效率更高。并且parent流结束了,则全部子流也会被销毁git
// create multiplexer const mux = new ObjMultiplex() // setup substreams const streamA = mux.createStream('hello') const streamB = mux.createStream('world') // pipe over transport (and back) mux.pipe(transport).pipe(mux) // send values over the substreams streamA.write({ thisIsAn: 'object' }) streamA.write(123) // or pipe together normally streamB.pipe(evilAiBrain).pipe(streamB)
obj-multiplex/index.jsgithub
const { Duplex } = require('readable-stream') const endOfStream = require('end-of-stream')//看博客mafintosh/end-of-stream const once = require('once') const noop = () => {} const IGNORE_SUBSTREAM = {} class ObjectMultiplex extends Duplex { constructor(_opts = {}) { const opts = Object.assign({}, _opts, { objectMode: true,//流可传各种形式数据 }) super(opts)//生成这个流 this._substreams = {} } createStream (name) {//就是建立两个流,一个是这个流,另外一个是parent是这个流的一个子流,并返回子流 // validate name if (!name) throw new Error('ObjectMultiplex - name must not be empty')//name不能为空 if (this._substreams[name]) throw new Error('ObjectMultiplex - Substream for name "${name}" already exists')//name不能重复 // create substream const substream = new Substream({ parent: this, name: name }) this._substreams[name] = substream // listen for parent stream to end anyStreamEnd(this, (err) => {//定义当parent流结束,则相应的全部子流也要被销毁 substream.destroy(err)//substream被destroy,若是出错返回的错误信息即err }) return substream } // ignore streams (dont display orphaned data warning) ignoreStream (name) {//就是将以前建立的name的子流的内容清空 // validate name if (!name) throw new Error('ObjectMultiplex - name must not be empty') if (this._substreams[name]) throw new Error('ObjectMultiplex - Substream for name "${name}" already exists') // set this._substreams[name] = IGNORE_SUBSTREAM } // stream plumbing //下面就是parent流可以作的一系列读写操做 _read () {} _write(chunk, encoding, callback) {//当调用 时,数据会被缓冲在可写流中。 // parse message,就是当parent流write时,将根据其传入的name来决定该数据是写到哪一个子流上的 const name = chunk.name const data = chunk.data if (!name) {//name不能为空,不然不知道是哪一个子流 console.warn(`ObjectMultiplex - malformed chunk without name "${chunk}"`) return callback() } // get corresponding substream const substream = this._substreams[name]//而后根据name获得子流 if (!substream) {//若是为空则warn console.warn(`ObjectMultiplex - orphaned data for stream "${name}"`) return callback() } // push data into substream if (substream !== IGNORE_SUBSTREAM) {//只有当子流不为{}时,才将data压入 substream.push(data)//当调用 时,数据会被缓冲在可读流中。 若是流的消费者没有调用 ,则数据会保留在内部队列中直到被消费 } callback() }//_write }//class class Substream extends Duplex { constructor ({ parent, name }) { super({ objectMode: true, }) this._parent = parent this._name = name } _read () {}//读入的操做即Duplex的定义 _write (chunk, enc, callback) {//当子流被写入时,实际上是将数据压入流parent中 this._parent.push({ name: this._name, data: chunk, }) callback()//而后调用回调函数 } } module.exports = ObjectMultiplex // util function anyStreamEnd(stream, _cb) {//就是当stream结束的时候就会调用cb回调函数 const cb = once(_cb) endOfStream(stream, { readable: false }, cb) endOfStream(stream, { writable: false }, cb) }writable.write(chunk)stream.push(chunk)stream.read()
经过测试学习该库的使用:函数
obj-multiplex/test/index.jsoop
const test = require('tape') const once = require('once') const { PassThrough, Transform } = require('readable-stream')//PassThrough本质也是Transform流,是最简单的Transform流,只是将数据今后处传过
// a passthrough stream.
// basically just the most minimal sort of Transform stream.
// Every written chunk gets output as-is
const endOfStream = require('end-of-stream') const pump = require('pump') const ObjMultiplex = require('../index.js') test('basic - string', (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() bufferToEnd(outStream, (err, results) => { t.error(err, 'should not error') t.deepEqual(results, ['haay', 'wuurl'], 'results should match') t.end() }) // pass in messages inStream.write('haay') inStream.write('wuurl') // simulate disconnect setTimeout(() => inTransport.destroy()) }) test('basic - obj', (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() bufferToEnd(outStream, (err, results) => { t.error(err, 'should not error') t.deepEqual(results, [{ message: 'haay' }, { message: 'wuurl' }], 'results should match') t.end() }) // pass in messages inStream.write({ message: 'haay' }) inStream.write({ message: 'wuurl' }) // simulate disconnect setTimeout(() => inTransport.destroy()) }) test('roundtrip', (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() const doubler = new Transform({ objectMode: true, transform (chunk, end, callback) {//对流内数据进行*2计算 // console.log('doubler!', chunk) const result = chunk * 2 callback(null, result) } }) pump(//即将从outStream处获得的数据进行*2处理后再传回outStream outStream, doubler, outStream ) bufferToEnd(inStream, (err, results) => { t.error(err, 'should not error') t.deepEqual(results, [20, 24], 'results should match') t.end() }) // pass in messages inStream.write(10) inStream.write(12) // simulate disconnect setTimeout(() => outTransport.destroy(), 100) }) // util function basicTestSetup() { // setup multiplex and Transport const inMux = new ObjMultiplex()//定义了两个parent流 const outMux = new ObjMultiplex() const inTransport = new PassThrough({ objectMode: true }) const outTransport = new PassThrough({ objectMode: true }) // setup substreams const inStream = inMux.createStream('hello')//分别在两个parent流中各自定义一个name为hello的子流 const outStream = outMux.createStream('hello') pump(//造成一个pipe流 inMux, inTransport, outMux, outTransport, inMux ) return { inTransport, outTransport, inMux, outMux, inStream, outStream, } } function bufferToEnd(stream, callback) { const results = [] endOfStream(stream, (err) => callback(err, results))//定义了流结束后的回调 stream.on('data', (chunk) => results.push(chunk))//并监听data事件,用于将数据压入流 }