TLDR;
这篇文章的风格是在致敬 Jim 老师;致敬,致敬,懂吗,不是抄袭,程序员的事怎么能叫抄袭。
固然我对 Node.js 的 stream 也是现学现卖,有使用不当的地方,敬请指出。
原文连接 欢迎 star。javascript
写这篇文章的初衷是年前看 SICP 的时候,第二章介绍构造数据抽象的时候有提到 Lisp 对序列的处理采用相似『信号流』的方式。因此很天然的就想到了 Node.js 中的 pipe 方式,因而就一直想用 pipe 的方式尝试一下。java
同 Jim 老师的这篇 文章 中描述的同样, 我也是懒癌发做,从年尾拖到今年年初,而后在年初又看到了 Jim 老师 的博客,深受启发,终于下定决心要开始码了...... 而后,嗯,又拖到昨天。促使我下定决心要写的主要缘由是昨天部门的年会!反正年会跟我这种死肥宅也没多大关系,在你们 happy 的时候构思了下代码实现,回家用了一夜的时候补上了代码。node
Jim 老师在他的文章里面也说了,JS 的那些数组操做 (map
/ reduce
/filter
) 啥的,每次调用的时候都会进行一次完整的遍历。试想一下若是有一个第一个数是1,长度是 1亿 的递增为 1 的数组,须要把全部的数组都乘 3,再排除其中的奇数,若是用 (map
/filter
) 的方法,只要也须要循环 一亿五千万次;那么若是有其余办法能只循环一亿次,是否是节省了大量的内存资源和循环消耗的时间。git
废话很少说,直接上代码吧。程序员
在编写代码时,咱们应该有一些方法将程序像链接水管同样链接起来 -- 当咱们须要获取一些数据时,能够去经过"拧"其余的部分来达到目的。这也应该是IO应有的方式。 -- Doug McIlroy. October 11, 1964github
关于 node 的 stream 能够看看这篇 文章。数组
下面是代码部分,整个代码我是在边学 pipe 边用一夜的时间仓促写就的,懒癌发做,也不想再重构了,各位相公讲究看吧,求别喷代码。app
const stream = require('stream') const last = Symbol() // 在 selfArray 中接收一个真正的数组 // 返回一个可读流 // 若是再作的精细点,能够作成可读可写流,这样就能经过控制流的大小,来控制内存的大小,别几亿条数据直接撑爆内存了 // 不过对后面 reduce 的处理就比较麻烦 function selfArray(a) { const rs = new stream.Readable({ objectMode: true }) a.forEach((v, index) => { rs.push(v) }) rs.push(last) rs.push(null) return rs }
上面的 selfArray 在流的最后面 push 了一个 Symbol 对象来标志整个流的输入结束,留待为以后 reduce 的使用。性能
Map
/Filter
/Reduce
的实现function forEach(callback) { const ws = new stream.Writable({ objectMode: true }) let index = 0 ws._write = function (chunk, enc, next) { if (chunk !== last) { callback(chunk, index++) next() } } return ws } function filter(callback) { const trans = new stream.Transform({ readableObjectMode: true, writableObjectMode: true }) let index = 0 trans._transform = function (chunk, enc, next) { if (chunk === last) { next(null, last) } else { let condition = callback(chunk, index++) if (condition) { this.push(chunk) } next() } } return trans } function map(callback) { const trans = new stream.Transform({ readableObjectMode: true, writableObjectMode: true }) let index = 0 trans._transform = function (chunk, enc, next) { if (chunk === last) { next(null, last) } else { next(null, callback(chunk, index++)) } } return trans } function reduce(callback, initial) { const trans = new stream.Transform({ readableObjectMode: true, writableObjectMode: true }) let index = 0, current = initial, prev = initial trans._transform = function (chunk, enc, next) { if (chunk === last) { if (index > 1) { prev = callback(prev, current, index - 1) } this.push(prev) this.push(last) return next(null, last) } if (initial === void 0 && index === 0) { prev = chunk } if (index > 0) { prev = callback(prev, current, index - 1) } current = chunk index++ next() } return trans }
上面的代码在 reduce 的实现稍微麻烦了一些,reduce 对没有初始值,原始数组为空的条件下有各类不一样的处理状况,翻看了下 MDN 的解释又本身实现了下。测试
selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4]) .pipe(map(v => v * 3)) .pipe(filter(v => v % 2)) .pipe(reduce((p, c) => p + c, 0)) .pipe(forEach(v => { console.log('pipe 计算最后的结果是:', v) }))
为了好看我故意把各类括号都删掉了。嗯,看起来还挺完美,咱们来测试下
selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4]) .pipe(map(v => { console.log('map:', v) return v * 3 })) .pipe(filter(v => { console.log('filter:', v) return v % 2 })) .pipe(reduce((p, c) => { console.log('reduce:', p, c) return p + c }, 0)) .pipe(forEach(v => { console.log('pipe 计算最后的结果是:', v) })) 加上 log 以后能够看到结算结果是: map: 9 filter: 27 map: 2 filter: 6 map: 6 filter: 18 map: 3 filter: 9 reduce: 0 27 map: 5 filter: 15 reduce: 27 9 map: 6 filter: 18 map: 7 filter: 21 reduce: 36 15 map: 1 filter: 3 reduce: 51 21 map: 4 filter: 12 map: 4 filter: 12 reduce: 72 3 pipe 计算最后的结果是: 75
从上面的 log 能够看到, 第一个数 9 先执行了 map
,而后在 3 以后就直接进入了 filter
,此时第 2 个数 2 也开始被 map
处理,而后被 filter
处理,可是因为 3 以后是偶数不会被 reduce
接收, reduce
会一直等到第二个奇数,也就是 3 进入以后才会被处理... 嗯,直到最终的计算结果是 75, 被 forEach
消耗。
虽然我没有像 Jim 老师同样进行性能测试,可是猜想也知道 pipe 的方式在数量比较小的时候确定要弱于正常方式,pipe 的好处在于数据量比较大的时候,可使用比较小的内存,尽快的处理数组中前置的数据。