callbag,一个有趣的规范

push 和 pull 模型

若是你了解 RxJs,在响应式编程中,Observable 和 Obsever 是 push 模型,与之对应的,还有一个 pull 模型:html

  • Pull(f(): B:返回一个值。
  • Push(f(x: A): void:响应式的,当有值产生时,会发出一个事件,并携带上这个值。订阅了该事件的观察者(Observer)将得到反馈。

JavaScript 中的 Math.random()window.outerHeight 等都是 pull 模型:git

const height = window.outerHeight();
// 或者是迭代器写法
function* getWindowHeight() {
    while(true) {
        yield window.outerHeight;
    }
}
var iter = getWindowHeight()
iter.next()
复制代码

pull 模型包含两个部分:github

  • 生产者:负责生产数据,是数据源
  • 消费者:负责消费数据,是数据的使用方

在 pull 模型中,数据是按需索取的。typescript

再经过 RxJs 看一个 push 模型的例子:编程

Rx.Observable
    .fromEvent(document, 'click')
	.map(event => `Event time: ${event.timeStamp}`)
    .subscribe(function observer(val) {
    	console.log(val);
	})
复制代码

push 模型的组成包含了两个部分:bash

  • 可观察(可监听)对象:是数据来源
  • 观察者(监听者):是数据的使用方

与 pull 模型不一样,观察者不能主动索取数据,而是观察数据源,当数据源有数据时,才可消费和使用。框架

push 模型有这么一些优势:dom

  • 高度复用的可观察对象:经过对源可观察对象使用不一样的运算子,可构建出新的可观察对象。
  • 延迟执行:可观察对象只有被观察者订阅,才会派发数据。
  • 声明式、描述将来的代码:咱们只用声明数据源和数据消费方式,而不用关心数据交付时的细节。

Cycle.js 的做者 Andre Staltz 长久以来面对一个问题,Cycle.js 及其推荐使用的响应式编程库 xstream 都是 push 模型的,这让框架的模型和业务代码都受益于 push 模型的优势。可是,实际项目中,咱们仍是有很多 pull 模型下的需求,Andre Staltz 也开了一个 issue ,讨论如何更好的使用代码描述 pull 模型。函数

push 与 pull 能够是同型的

stalz 看到,咱们的 Observable 和 Observer:优化

interface Observer {
  next(x): void;
  error(e): void;
  complete(): void;
}

interface Observable {
  subscribe(observer): Subscription;
  unsubsribe(): void;
}
复制代码

能够经过函数进行描述:

function observable(msgType, msgPayload) {}
复制代码
  • msgType == 0:payload 是 observer,意味着 observer 向 observable 问好,须要订阅这个 observerble。(subscribe)
  • msgType == 1:意味着 observer 将取消对 observable 的订阅。(unsubscribe)
function observer(msgType, msgPayload) {}
复制代码

当:

  • msgType == 1:对应 observer.next(payload),即 observable 交付数据给 observer,此时 payload 携带了数据。
  • msgType == 2 且 payload 为 undefined:对应于 observer.complete()
  • msgType == 2 且 payload 含有值:对应于 observer.error(payload),此时 payload 描述了错误。

进一步归纳就是:

Observer:

  • observer(1, data): void
    复制代码
    • 数据交付 :observable 将数据交付给 observer
  • observer(2, err): void
    复制代码
    • 出错:observable 将错误告知 observer
  • observer(2): void
    复制代码
    • 完成:observable 再也不有数据,告知 observer 任务完成

Observable:

  • observable(0, observer): void
    复制代码
    • 问好:observer 订阅了 observable
  • observable(2): void
    复制代码
    • 结束:observer 取消对 observable 的订阅

这么归纳下来,咱们发现,pull 模型也能够进行相似的归纳:

Consumer

  • consumer(0, producer): void
    复制代码
    • 问好:在 pull 模型中,producer 须要向 consumer 问好,告诉 consumer 有须要时,从哪里取值
  • consumer(1, data): void
    复制代码
    • 数据交付:producer 将数据交付给 consumer
  • consumer(2, err): void
    复制代码
    • 出错:producer 将错误告知 consumer
  • consumer(2): void
    复制代码
    • 完成:producer 告知 consumer 任务已完成

Producer

  • producer(0, consumer): void
    复制代码
    • 问好:consumer 肯定和哪一个 producer 交互
  • producer(1, data): void
    复制代码
    • 数据交付:在 pull 模型中,consumer 须要主动向 producer 取值
  • producer(2): void
    复制代码
    • 结束:consumer 结束了和 producer 的交互

综上,咱们发现,push 和 pull 模型是同型的(具备同样的角色和函数签名),所以,能够经过一个规范同时定义两者。

callbag

staltz 为 push 和 pull 模型建立了一个名为 callbag 的规范,这个规范的内容以下:

(type: number, payload?: any) => void
复制代码

定义(Defination)

  • Callbag:一个函数,函数签名为: (type: 0 | 1 | 2, payload?: any) => void
  • Greet:若是一个 callbag 以 0 为第一个参数被调用,咱们就说 该 callbag 被问好了。此时函数执行的操做是: “向这个 callbag 问好”。
  • Deliver:若是一个 callbag 以 1 为第一个参数被调用,咱们就说 “这个 callbag 正被交付数据”。此时函数执行的操做是:“交付数据给这个 callbag”。
  • Terminate:若是一个 callbag 以 2 为第一个参数被调用,咱们就说 “这个 callbag 被终止了”。此时函数执行的操做是:“终止这个 callbag”。
  • Source:一个负责交付数据的 callbag。
  • Sink:一个负责接收(消费)数据的 callbag。

协议(Protocal)

问好(Greets): (type: 0, cb: Callbag) => void

当第一个参数是 0,而第二个参数是另一个 callbag(即一个函数)的时候,这个 callbag 就被问好了。

握手(Handshake)

当一个 source 被问好,并被做为 payload 传递给了某个 sink,sink 必须使用一个 callbag payload 进行问好,这个 callbag 能够是他本身,也能够是另外的 callbag。换言之,问好是相互的。相互间的问好被称为握手

终止(Termination): (type: 2, err?: any) => void

当第一个参数是 0,而第二个参数要么是 undefined(因为成功引发的终止),要么是任何的真实值(因为失败引发的终止),这个 callbag 就被终止了。

在握手以后,source 可能终止掉 sink,sink 也可能会终止掉 source。若是 source 终止了 sink,则 sink 不该当终止 source,反之亦然。换言之,终止行为不该该是相互的。

数据交付(Data delivery) (type: 1, data: any) => void

交付次数:

  • 一个 callbag(source 或者 sink)可能会被一次或屡次交付数据

有效交付的窗口:

  • 一个 callbag 必定不能在被问好以前被交付数据
  • 一个 callbag 必定不能在终止后被交付数据
  • 一个 sink 必定不能在其终止了它的 source 后被交付数据

建立本身的 callbag

callbag 的组成能够简单概括为:

  • handshake:一次握手过程,source 和 sink 如何握手
  • talkback:对讲对象,sink 和 source 正在和谁沟通

listener(observer)sink

  • 定义问好过程:在问好阶段,能够知道在和谁对讲:

    function sink(type, data) {
      if (type === 0) {
        // sink 收到了来自 source 的问好
        // 问好的时候肯定 source 和 sink 的对讲方式
        const talkback = data;
        // 3s 后,sink 终止和 source 的对讲
        setTimeout(() => talkback(2), 3000);
      }
    }
    复制代码
  • 定义数据处理过程

    function sink(type, data) {
        if (type === 0) {
            const talkback = data;
            setTimeout(() => talkback(2), 3000);
        }
        if (type === 1) {
            console.log(data);
        }
    }
    复制代码
  • 定义结束过程

    let handle;
    function sink(type, data) {
        if (type === 0) {
            const talkback = data;
            setTimeout(() => talkback(2), 3000);
        }
        if (type === 1) {
            console.log(data);
        }
        if (type === 2) {
            clearTimeout(handle);
        }
    }
    复制代码

    能够再用工厂函数让代码干净一些:

    function makeSink() {
      let handle;
      return function sink(type, data) {
        if (type === 0) {
          const talkback = data;
          handle = setTimeout(() => talkback(2), 3000);
        } 
        if (type === 1) {
          console.log(data);
        }
        if (type === 2) {
          clearTimeout(handle);
        }
      }
    }
    复制代码

puller(consumer)sink

puller sink 则能够向 source 主动请求数据:

let handle;
function sink(type, data) {
    if (type === 0) {
        const talkback = data;
        setInterval(() => talkback(1), 1000);
    }
    if (type === 1) {
        console.log(data);
    }
    if (type === 2) {
        clearTimeout(handle);
    }
}
复制代码

listenable(observable)source

  • 定义问好过程:

    function source(type, data) {
      if (type === 0) {
        // 若是 source 收到 sink 的问好,
        // 则 payload 即为 sink,source 能够向 sink 发送数据了
        const sink = data;
        setInterval(() => {
          sink(1, null);
        }, 1000);
      }
      // 让 source 也和 sink 问好,完成一次握手
      sink(0, /* talkback callbag here */)
    }
    复制代码
  • 当 sink 想要中止观察,须要让 source 有处理中止的能力,另外,listenable 的 source 不会理会 sink 主动的数据索取。所以,咱们这么告知 sink 沟通方式:

    function source(type, data) {
        if (type === 0) {
            const sink = data;
            let handle = setInterval(() => {
                sink(1, null);
            }, 1000);
        }
        const talkback = (type, data) => {
            if (type === 2) {
                clearInterval(handle);
            } 
        }
        sink(0, talkback);
    }
    复制代码
  • 优化一下代码可读性:

    function source(start, sink) {
      if (start !== 0) return;
      let handle = setInterval(() => {
        sink(1, null);
      }, 1000);
      const talkback = (t, d) => {
        if (t === 2) clearInterval(handle);
      };
      sink(0, talkback);
    }
    复制代码

pullable(iterable)source

pullable source 中,值时按照 sink 的须要获取的,所以,只有在 sink 索取值时,source 才须要交付数据:

function source(start, sink) {
    if (start !== 0) retrun;
    let i = 10;
    const talkback = (t, d) => {
        if (t == 1) {
            if (i <= 20) sink(1, i++);
            else sink(2);
        }
    }
    sink(0, talkback)
}
复制代码

建立运算子

借助于 operator,可以不断的构建新的 source,operator 的通常范式为:

const myOperator = args => inputSource => outputSource
复制代码

借助于管道技术,咱们能一步步的声明新的 source:

pipe(
  source,
  myOperator(args),
  iterate(x => console.log(x))
)
// same as...
pipe(
  source,
  inputSource => outputSource,
  iterate(x => console.log(x))
)
复制代码

下面咱们建立了一个乘法 operator:

const multiplyBy = factor => inputSource => {
    return function outputSource(start, outputSink) {
        if (start !== 0) return;
        inputSource(start, (type, data) => {
            if (type === 1) {
                outputSink(1, data * factor);
            } else {
                outputSink(1, data * factor);
            } 
        })
    }
}
复制代码

使用:

function source(start, sink) {
    if (start !== 0) return;
    let i = 0;
    const handle = setInterval(() => sink(1, i++), 3000);
    const talkback = (type, data) => {
        if (type === 2) {
            clearInterval(handle);
        }
    }
    sink(0, talkback);
}

let timeout;
function sink(type, data) {
    if (type === 0) {
        const talkback = data;
        timetout = setTimeout(() => talback(2), 9000);
    }
    if (type === 1) {
        console.log('data is', data);
    }
    if (type === 2) {
        clearTimeout(handle);
    }
}

const newSource = multiplyBy(3)(source);
newSource(0, sink);
复制代码

总结

经过 callbag ,咱们能够近乎一致的处理数据源和数据源的消费

例如,下面是 listenable 数据源,咱们用 forEach 消费:

const {forEach, fromEvent, map, filter, pipe} = require('callbag-basics');

pipe(
  fromEvent(document, 'click'),
  filter(ev => ev.target.tagName === 'BUTTON'),
  map(ev => ({x: ev.clientX, y: ev.clientY})),
  forEach(coords => console.log(coords))
);
复制代码

下面则是 pullable 数据源,咱们仍能够用 forEach 进行消费:

const {forEach, fromIter, take, map, pipe} = require('callbag-basics');

function* getRandom() {
  while(true) {
    yield Math.random();
  }
}

pipe(
  fromIter(getRandom()),
  take(5),
  forEach(x => console.log(x))
);
复制代码

参考资料

相关文章
相关标签/搜索