最近看到SICP 3.5: Stream,里面介绍了惰性求值,以及如何经过组合与抽象来操做惰性数据流。让我感觉最深的几点:html
接下来,让咱们用JavaScript来本身实现惰性求值,在这个过程当中我会解释上面的几条领悟。python
假设原始的数据是[1, 1000]
的整数:git
Array.from(Array(1000)).map((x,i)=>i+1)
假设咱们如今想找出其中的偶数:github
Array.from(Array(1000)).map((x,i)=>i+1) .filter(x=>x%2===0)
假设咱们如今想找出其中的3的倍数:编程
Array.from(Array(1000)).map((x,i)=>i+1) .filter(x=>x%2===0) .filter(x=>x%3===0)
假设咱们只须要找出第10个这样的数:数组
Array.from(Array(1000)).map((x,i)=>i+1) .filter(x=>x%2===0) .filter(x=>x%3===0) [9]
这是在js中很是常见的数组管道式处理,借鉴了函数式的思想,将多个处理步骤串联组合,让代码很是清晰简洁。
可是你应该发现了一个很严重的问题:有大量的计算被浪费。咱们只须要第9个偶数,可是前面的处理链路实际上会把整个大数组都处理一遍。在现实编程中,数据源更大、数据处理步骤更多,计算浪费的问题会更严重。虽然管道式处理可以大大简化代码,可是经常有计算浪费的通病。模块化
为了避免作多余计算,咱们每每只能抛弃管道式处理,对于每一个数据,先让它走完整个处理流程,而后循环,将下一个数据推入一样的处理流程:函数
function compute() { let count = 0; for (let i = 1; i <= 1000; i++) { if (i % 2 == 0 && i % 3 == 0) { count++; if (count === 10) return i; } } }
这样确实可以避免计算浪费,可是失去了管道式处理的模块化和简洁性。在循环的方案中:循环代码、处理代码、判断结束的代码糅杂在一块儿了。而在前面的管道式处理中,每种处理都是界线清晰的,很容易作到模块化。
假设如今要实现一个UI界面,让用户经过鼠标拖拽的方式(而不是编写代码)来编排数据处理流程,你必定会选择管道模型,而不是迭代模型。由于管道模型很是容易作到模块化,你只要将不一样的处理功能都实现成模块,用户只须要给这些模块排个顺序就行了。而迭代模型,你须要让用户可以建立迭代,而后在迭代里面加入处理逻辑、跳出逻辑……工具
那么,有没有方式可以保持管道模型的简洁性,同时可以避免计算浪费呢?有,那就是惰性数据流。惰性数据流是一种特殊的数据序列(与之相对应的,数组是一种常见的数据序列),它只有在真正须要的时候才会计算出下一项的值。下面是一个例子,getRangeIterator
能够返回一个惰性数据流,它的数据就是依次返回[min,max]
的整数。测试
function getRangeIterator(min, max) { return () => { const done = min > max; if (done) return { done: true }; return { done: false, val: min, next: getRangeIterator(min + 1, max) }; }; }
若是你不适应这种函数式的编程,能够经过显式维护一个状态变量来记录数据流状态:
function getRangeIterator2(min, max) { let current = min; return function next() { const done = current > max; if (done) return { done: true }; return { done: false, val: current++ }; }; }
下面的文章都会使用函数式的风格,不会使用带反作用的赋值。你能够尝试实现过程式风格的版本,对比哪一种编程风格更加简洁。
为了测试咱们的惰性数据流,咱们写一个工具函数来打印数据流中的全部数据:
function iteratorAll(it, cb) { const { done, val, next } = it(); // 计算下一个数据 if (done) return; cb(val); iteratorAll(next, cb); } iteratorAll(getRangeIterator(20, 30), console.log); // 会依次打印出20~30中的全部数字
使用惰性数据流来进行管道式的数据处理,完整代码:
// 搭建数据流处理管道 const it = takeItUntil( // 第三道处理(中止逻辑) filterIt( // 第二道处理 filterIt( // 第一道处理 getRangeIterator(1, 1000), // 数据源 x => x % 2 === 0 ), x => x % 3 === 0 ), (x, idx) => idx >= 10 ); // 打印数据流的最后一项 console.log(takeLast(it)); // 60 // 生成顺序数字流的方法 function getRangeIterator(min, max) { return () => { const done = min > max; if (done) return { done: true }; return { done: false, val: min, next: getRangeIterator(min + 1, max) }; }; } // 流处理方法,输入是流,输出也是流 function filterIt(it, cb) { return () => { const { done, val, next } = it(); if (done) return { done: true }; if (cb(val)) return { done: false, val, next: filterIt(next, cb) }; return filterIt(next, cb)(); }; } function takeItUntil(it, cb, currentIdx = 0) { return () => { const { done, val, next } = it(); if (done || cb(val, currentIdx)) return { done: true }; return { done: false, val, next: takeItUntil(next, cb, currentIdx + 1) }; }; } // 这个不是流处理,而是消费流的方法,由于它输出的是数字而不是流 function takeLast(it) { const { done, val, next } = it(); if (done) throw new Error("can't takeLast from empty stream"); return helper(next, val); function helper(it, pre) { const { done, val, next } = it(); if (done) return pre; return helper(next, val); } }
下半部分定义的那些方法,所有都是能够复用的流处理工具。
上半部分定义的流处理管道,实际是结构很是一致的,一层套一层,从内到外的处理管道。
咱们还能够优化一下流处理管道的组合方式,把嵌套的组合方式改为链式调用的组合方式,更符合人的阅读习惯:
// 搭建数据流处理管道 const it = chain(getRangeIterator(1, 1000)) // 数据源 .pipe(filter(x => x % 2 === 0)) // 第一道处理 .pipe(filter(x => x % 3 === 0)) // 第二道处理 .pipe(takeUntil((x, idx) => idx >= 10)) // 第三道处理 .unWrap(); // 打印数据流的最后一项 console.log(takeLast(it)); // 60 // 生成顺序数字流的方法 function getRangeIterator(min, max) { return () => { const done = min > max; if (done) return { done: true }; return { done: false, val: min, next: getRangeIterator(min + 1, max) }; }; } // 流处理方法配置方法 function filter(cb) { // 返回流处理函数 return function _filter(it) { // 流处理函数输入一个流,返回一个流 return () => { const { done, val, next } = it(); if (done) return { done: true }; if (cb(val)) return { done: false, val, next: _filter(next) }; // val被filter掉,从流中拿出下一个值 return _filter(next)(); }; }; } function takeUntil(cb) { return it => _takeUntil(it, 0); function _takeUntil(it, currentIdx) { return () => { const { done, val, next } = it(); if (done || cb(val, currentIdx)) return { done: true }; return { done: false, val, next: _takeUntil(next, currentIdx + 1) }; }; } } // takeLast不是流处理,而是消费流的方法(相似于前面的iteratorAll),由于它输出的不是流 function takeLast(it) { const { done, val, next } = it(); if (done) throw new Error("can't takeLast from empty stream"); return helper(next, val); function helper(it, pre) { const { done, val, next } = it(); if (done) return pre; return helper(next, val); } } function chain(it) { return { pipe: transformer => chain(transformer(it)), unWrap: () => it }; }
能够看到,链式调用的惰性数据流处理,与文章最前面的数组链式调用同样简洁。
【计算关系的创建】与【实际计算工做】的时机分离,这个是惰性数据流与普通数组处理的最大不一样:
// 搭建数据流处理管道 const it = chain(getRangeIterator(1, 1000)) // 数据源 .pipe(filter(x => x % 2 === 0)) // 第一道处理 .pipe(filter(x => x % 3 === 0)) // 第二道处理 .pipe(takeUntil((x, idx) => idx >= 10)) // 第三道处理 .unWrap(); // 这个时候实际尚未任何数据拉取、处理,只是创建了计算关系 // 消费下游数据的时候,才会把上游数据抽出来 console.log(takeLast(it));
创建计算关系的代码不会消费和处理数据。
而普通的数组处理则作不到这一点,创建计算关系的代码会马上完成全部工做:
Array.from(Array(1000)).map((x,i)=>i+1) .filter(x=>x%2===0) // 前面已经完成全部数据的计算 .filter(x=>x%3===0) // 前面已经完成全部数据的计算 [9] // 前面已经完成全部数据的计算
若是你仔细观察,会发现不少大型计算系统都是先定义好计算关系,而后才真正开始计算的。好比TensorFlow:
model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ])
普通的(急切的)数据流是提早准备好全部数据,所以你没法表示和处理无穷大的数据序列(好比斐波那契数列)。
而惰性数据流则是至关于只保存数列每一项的计算方法,在须要的时候才计算出来。这就是找规律和死记硬背的区别~
举个例子,斐波那契数据流:
// 生成顺序斐波那契流的方法 function getFibStream() { return helper(0, 1); function helper(pre2, pre1) { return () => { const val = pre2 + pre1; return { done: false, val, next: helper(pre1, val) }; }; } }
上游数据流有无穷多个,是常常发生的事情,只要下游转换流、消费者会中止,程序就会终止。举个例子,打印1000之内的斐波那契数:
// 搭建数据流处理管道 const it = chain(getFibStream()) .pipe(takeUntil(x => x >= 1000)) .unWrap(); // 打印小于1000的斐波那契数 iteratorAll(it, console.log); // 生成顺序斐波那契流的方法 function getFibStream() { return helper(0, 1); function helper(pre2, pre1) { return () => { const val = pre2 + pre1; return { done: false, val, next: helper(pre1, val) }; }; } } function takeUntil(cb) { return it => _takeUntil(it, 0); function _takeUntil(it, currentIdx) { return () => { const { done, val, next } = it(); if (done || cb(val, currentIdx)) return { done: true }; return { done: false, val, next: _takeUntil(next, currentIdx + 1) }; }; } } function iteratorAll(it, cb) { const { done, val, next } = it(); if (done) return; cb(val); iteratorAll(next, cb); } function chain(it) { return { pipe: transformer => chain(transformer(it)), unWrap: () => it }; }