用JavaScript本身实现惰性数据流、数据流操做符

最近看到SICP 3.5: Stream,里面介绍了惰性求值,以及如何经过组合与抽象来操做惰性数据流。让我感觉最深的几点:html

  • 基于流的编程方式可以将数据处理的代码组织成很是模块化的方式,可以经过组合与抽象实现及其复杂的行为,同时最大程度地控制项目代码的复杂度
  • 在JavaScript的世界中,ES6的iterable和generator的背后的思想其实就是惰性数据流。对比如今人们对这个思想的诠释(迭代器模式)和40年前的诠释,感受很是奇妙,加深了我对iterable和generator的理解。
  • 对比惰性数据流和rxjs事件流,感受就像是来到了镜中世界,几乎能一一对应,却又彻底不一样。

接下来,让咱们用JavaScript来本身实现惰性求值,在这个过程当中我会解释上面的几条领悟。python

从急切求值到惰性求值

假设原始的数据是[1, 1000]的整数:git

Array.from(Array(1000)).map((x,i)=>i+1)

假设咱们如今想找出其中的偶数:github

Array.from(Array(1000)).map((x,i)=>i+1)
    .filter(x=>x%2===0)

假设咱们如今想找出其中的3的倍数:编程

Array.from(Array(1000)).map((x,i)=>i+1)
    .filter(x=>x%2===0)
    .filter(x=>x%3===0)

假设咱们只须要找出第10个这样的数:数组

Array.from(Array(1000)).map((x,i)=>i+1)
    .filter(x=>x%2===0)
    .filter(x=>x%3===0)
    [9]

这是在js中很是常见的数组管道式处理,借鉴了函数式的思想,将多个处理步骤串联组合,让代码很是清晰简洁。
可是你应该发现了一个很严重的问题:有大量的计算被浪费。咱们只须要第9个偶数,可是前面的处理链路实际上会把整个大数组都处理一遍。在现实编程中,数据源更大、数据处理步骤更多,计算浪费的问题会更严重。虽然管道式处理可以大大简化代码,可是经常有计算浪费的通病模块化

为了避免作多余计算,咱们每每只能抛弃管道式处理,对于每一个数据,先让它走完整个处理流程,而后循环,将下一个数据推入一样的处理流程:函数

function compute() {
  let count = 0;
  for (let i = 1; i <= 1000; i++) {
    if (i % 2 == 0 && i % 3 == 0) {
      count++;
      if (count === 10) return i;
    }
  }
}

这样确实可以避免计算浪费,可是失去了管道式处理的模块化和简洁性。在循环的方案中:循环代码、处理代码、判断结束的代码糅杂在一块儿了。而在前面的管道式处理中,每种处理都是界线清晰的,很容易作到模块化。
假设如今要实现一个UI界面,让用户经过鼠标拖拽的方式(而不是编写代码)来编排数据处理流程,你必定会选择管道模型,而不是迭代模型。由于管道模型很是容易作到模块化,你只要将不一样的处理功能都实现成模块,用户只须要给这些模块排个顺序就行了。而迭代模型,你须要让用户可以建立迭代,而后在迭代里面加入处理逻辑、跳出逻辑……工具

那么,有没有方式可以保持管道模型的简洁性,同时可以避免计算浪费呢?有,那就是惰性数据流。惰性数据流是一种特殊的数据序列(与之相对应的,数组是一种常见的数据序列),它只有在真正须要的时候才会计算出下一项的值。下面是一个例子,getRangeIterator能够返回一个惰性数据流,它的数据就是依次返回[min,max]的整数。测试

function getRangeIterator(min, max) {
  return () => {
    const done = min > max;
    if (done) return { done: true };
    return {
      done: false,
      val: min,
      next: getRangeIterator(min + 1, max)
    };
  };
}

若是你不适应这种函数式的编程,能够经过显式维护一个状态变量来记录数据流状态:

function getRangeIterator2(min, max) {
  let current = min;
  return function next() {
    const done = current > max;
    if (done) return { done: true };
    return {
      done: false,
      val: current++
    };
  };
}

下面的文章都会使用函数式的风格,不会使用带反作用的赋值。你能够尝试实现过程式风格的版本,对比哪一种编程风格更加简洁。
为了测试咱们的惰性数据流,咱们写一个工具函数来打印数据流中的全部数据:

function iteratorAll(it, cb) {
  const { done, val, next } = it(); // 计算下一个数据
  if (done) return;
  cb(val);
  iteratorAll(next, cb);
}

iteratorAll(getRangeIterator(20, 30), console.log);
// 会依次打印出20~30中的全部数字

使用惰性数据流来进行管道式的数据处理,完整代码:

// 搭建数据流处理管道
const it = takeItUntil(
  // 第三道处理(中止逻辑)
  filterIt(
    // 第二道处理
    filterIt(
      // 第一道处理
      getRangeIterator(1, 1000), // 数据源
      x => x % 2 === 0
    ),
    x => x % 3 === 0
  ),
  (x, idx) => idx >= 10
);
// 打印数据流的最后一项
console.log(takeLast(it)); // 60

// 生成顺序数字流的方法
function getRangeIterator(min, max) {
  return () => {
    const done = min > max;
    if (done) return { done: true };
    return {
      done: false,
      val: min,
      next: getRangeIterator(min + 1, max)
    };
  };
}
// 流处理方法,输入是流,输出也是流
function filterIt(it, cb) {
  return () => {
    const { done, val, next } = it();
    if (done)
      return {
        done: true
      };
    if (cb(val))
      return {
        done: false,
        val,
        next: filterIt(next, cb)
      };
    return filterIt(next, cb)();
  };
}

function takeItUntil(it, cb, currentIdx = 0) {
  return () => {
    const { done, val, next } = it();
    if (done || cb(val, currentIdx)) return { done: true };
    return {
      done: false,
      val,
      next: takeItUntil(next, cb, currentIdx + 1)
    };
  };
}

// 这个不是流处理,而是消费流的方法,由于它输出的是数字而不是流
function takeLast(it) {
  const { done, val, next } = it();
  if (done) throw new Error("can't takeLast from empty stream");
  return helper(next, val);

  function helper(it, pre) {
    const { done, val, next } = it();
    if (done) return pre;
    return helper(next, val);
  }
}

下半部分定义的那些方法,所有都是能够复用的流处理工具。
上半部分定义的流处理管道,实际是结构很是一致的,一层套一层,从内到外的处理管道。

链式调用

咱们还能够优化一下流处理管道的组合方式,把嵌套的组合方式改为链式调用的组合方式,更符合人的阅读习惯:

// 搭建数据流处理管道
const it = chain(getRangeIterator(1, 1000)) // 数据源
  .pipe(filter(x => x % 2 === 0)) // 第一道处理
  .pipe(filter(x => x % 3 === 0)) // 第二道处理
  .pipe(takeUntil((x, idx) => idx >= 10)) // 第三道处理
  .unWrap();

// 打印数据流的最后一项
console.log(takeLast(it)); // 60

// 生成顺序数字流的方法
function getRangeIterator(min, max) {
  return () => {
    const done = min > max;
    if (done) return { done: true };
    return {
      done: false,
      val: min,
      next: getRangeIterator(min + 1, max)
    };
  };
}
// 流处理方法配置方法
function filter(cb) {
  // 返回流处理函数
  return function _filter(it) {
    // 流处理函数输入一个流,返回一个流
    return () => {
      const { done, val, next } = it();
      if (done)
        return {
          done: true
        };
      if (cb(val))
        return {
          done: false,
          val,
          next: _filter(next)
        };
      // val被filter掉,从流中拿出下一个值
      return _filter(next)();
    };
  };
}

function takeUntil(cb) {
  return it => _takeUntil(it, 0);
  function _takeUntil(it, currentIdx) {
    return () => {
      const { done, val, next } = it();
      if (done || cb(val, currentIdx)) return { done: true };
      return {
        done: false,
        val,
        next: _takeUntil(next, currentIdx + 1)
      };
    };
  }
}

// takeLast不是流处理,而是消费流的方法(相似于前面的iteratorAll),由于它输出的不是流
function takeLast(it) {
  const { done, val, next } = it();
  if (done) throw new Error("can't takeLast from empty stream");
  return helper(next, val);

  function helper(it, pre) {
    const { done, val, next } = it();
    if (done) return pre;
    return helper(next, val);
  }
}

function chain(it) {
  return {
    pipe: transformer => chain(transformer(it)),
    unWrap: () => it
  };
}

能够看到,链式调用的惰性数据流处理,与文章最前面的数组链式调用同样简洁。

特性一:计算关系的创建与实际计算工做分离

【计算关系的创建】与【实际计算工做】的时机分离,这个是惰性数据流与普通数组处理的最大不一样:

// 搭建数据流处理管道
const it = chain(getRangeIterator(1, 1000)) // 数据源
  .pipe(filter(x => x % 2 === 0)) // 第一道处理
  .pipe(filter(x => x % 3 === 0)) // 第二道处理
  .pipe(takeUntil((x, idx) => idx >= 10)) // 第三道处理
  .unWrap();

// 这个时候实际尚未任何数据拉取、处理,只是创建了计算关系

// 消费下游数据的时候,才会把上游数据抽出来
console.log(takeLast(it));

创建计算关系的代码不会消费和处理数据。
而普通的数组处理则作不到这一点,创建计算关系的代码会马上完成全部工做:

Array.from(Array(1000)).map((x,i)=>i+1)
    .filter(x=>x%2===0) // 前面已经完成全部数据的计算
    .filter(x=>x%3===0) // 前面已经完成全部数据的计算
    [9]                 // 前面已经完成全部数据的计算

若是你仔细观察,会发现不少大型计算系统都是先定义好计算关系,而后才真正开始计算的。好比TensorFlow

model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(10)
])

特性二:惰性数据流能够是无穷多的

普通的(急切的)数据流是提早准备好全部数据,所以你没法表示和处理无穷大的数据序列(好比斐波那契数列)。

而惰性数据流则是至关于只保存数列每一项的计算方法,在须要的时候才计算出来。这就是找规律和死记硬背的区别~
举个例子,斐波那契数据流:

// 生成顺序斐波那契流的方法
function getFibStream() {
  return helper(0, 1);

  function helper(pre2, pre1) {
    return () => {
      const val = pre2 + pre1;
      return {
        done: false,
        val,
        next: helper(pre1, val)
      };
    };
  }
}

上游数据流有无穷多个,是常常发生的事情,只要下游转换流、消费者会中止,程序就会终止。举个例子,打印1000之内的斐波那契数:

// 搭建数据流处理管道
const it = chain(getFibStream())
  .pipe(takeUntil(x => x >= 1000))
  .unWrap();

// 打印小于1000的斐波那契数
iteratorAll(it, console.log);

// 生成顺序斐波那契流的方法
function getFibStream() {
  return helper(0, 1);

  function helper(pre2, pre1) {
    return () => {
      const val = pre2 + pre1;
      return {
        done: false,
        val,
        next: helper(pre1, val)
      };
    };
  }
}

function takeUntil(cb) {
  return it => _takeUntil(it, 0);
  function _takeUntil(it, currentIdx) {
    return () => {
      const { done, val, next } = it();
      if (done || cb(val, currentIdx)) return { done: true };
      return {
        done: false,
        val,
        next: _takeUntil(next, currentIdx + 1)
      };
    };
  }
}

function iteratorAll(it, cb) {
  const { done, val, next } = it();
  if (done) return;
  cb(val);
  iteratorAll(next, cb);
}

function chain(it) {
  return {
    pipe: transformer => chain(transformer(it)),
    unWrap: () => it
  };
}

敬请期待

相关文章
相关标签/搜索