若是你了解 RxJs,在响应式编程中,Observable 和 Obsever 是 push 模型,与之对应的,还有一个 pull 模型:html
f(): B
):返回一个值。f(x: A): void
):响应式的,当有值产生时,会发出一个事件,并携带上这个值。订阅了该事件的观察者(Observer)将得到反馈。JavaScript 中的 Math.random()
、window.outerHeight
等都是 pull 模型:git
const height = window.outerHeight();
// 或者是迭代器写法
function* getWindowHeight() {
while(true) {
yield window.outerHeight;
}
}
var iter = getWindowHeight()
iter.next()
复制代码
pull 模型包含两个部分:github
在 pull 模型中,数据是按需索取的。typescript
再经过 RxJs 看一个 push 模型的例子:编程
Rx.Observable
.fromEvent(document, 'click')
.map(event => `Event time: ${event.timeStamp}`)
.subscribe(function observer(val) {
console.log(val);
})
复制代码
push 模型的组成包含了两个部分:bash
与 pull 模型不一样,观察者不能主动索取数据,而是观察数据源,当数据源有数据时,才可消费和使用。框架
push 模型有这么一些优势:dom
Cycle.js 的做者 Andre Staltz 长久以来面对一个问题,Cycle.js 及其推荐使用的响应式编程库 xstream 都是 push 模型的,这让框架的模型和业务代码都受益于 push 模型的优势。可是,实际项目中,咱们仍是有很多 pull 模型下的需求,Andre Staltz 也开了一个 issue ,讨论如何更好的使用代码描述 pull 模型。函数
stalz 看到,咱们的 Observable 和 Observer:优化
interface Observer {
next(x): void;
error(e): void;
complete(): void;
}
interface Observable {
subscribe(observer): Subscription;
unsubsribe(): void;
}
复制代码
能够经过函数进行描述:
function observable(msgType, msgPayload) {}
复制代码
msgType == 0
:payload 是 observer,意味着 observer 向 observable 问好,须要订阅这个 observerble。(subscribe)msgType == 1
:意味着 observer 将取消对 observable 的订阅。(unsubscribe)function observer(msgType, msgPayload) {}
复制代码
当:
msgType == 1
:对应 observer.next(payload)
,即 observable 交付数据给 observer,此时 payload 携带了数据。msgType == 2
且 payload 为 undefined
:对应于 observer.complete()
。msgType == 2
且 payload 含有值:对应于 observer.error(payload)
,此时 payload 描述了错误。进一步归纳就是:
Observer:
observer(1, data): void
复制代码
observer(2, err): void
复制代码
observer(2): void
复制代码
Observable:
observable(0, observer): void
复制代码
observable(2): void
复制代码
这么归纳下来,咱们发现,pull 模型也能够进行相似的归纳:
Consumer:
consumer(0, producer): void
复制代码
consumer(1, data): void
复制代码
consumer(2, err): void
复制代码
consumer(2): void
复制代码
Producer:
producer(0, consumer): void
复制代码
producer(1, data): void
复制代码
producer(2): void
复制代码
综上,咱们发现,push 和 pull 模型是同型的(具备同样的角色和函数签名),所以,能够经过一个规范同时定义两者。
staltz 为 push 和 pull 模型建立了一个名为 callbag 的规范,这个规范的内容以下:
(type: number, payload?: any) => void
复制代码
(type: 0 | 1 | 2, payload?: any) => void
0
为第一个参数被调用,咱们就说 该 callbag 被问好了
。此时函数执行的操做是: “向这个 callbag 问好”。1
为第一个参数被调用,咱们就说 “这个 callbag 正被交付数据”。此时函数执行的操做是:“交付数据给这个 callbag”。2
为第一个参数被调用,咱们就说 “这个 callbag 被终止了”。此时函数执行的操做是:“终止这个 callbag”。问好(Greets): (type: 0, cb: Callbag) => void
当第一个参数是 0
,而第二个参数是另一个 callbag(即一个函数)的时候,这个 callbag 就被问好了。
握手(Handshake)
当一个 source 被问好,并被做为 payload 传递给了某个 sink,sink 必须使用一个 callbag payload 进行问好,这个 callbag 能够是他本身,也能够是另外的 callbag。换言之,问好是相互的。相互间的问好被称为握手。
终止(Termination): (type: 2, err?: any) => void
当第一个参数是 0
,而第二个参数要么是 undefined(因为成功引发的终止),要么是任何的真实值(因为失败引发的终止),这个 callbag 就被终止了。
在握手以后,source 可能终止掉 sink,sink 也可能会终止掉 source。若是 source 终止了 sink,则 sink 不该当终止 source,反之亦然。换言之,终止行为不该该是相互的。
数据交付(Data delivery) (type: 1, data: any) => void
交付次数:
有效交付的窗口:
callbag 的组成能够简单概括为:
定义问好过程:在问好阶段,能够知道在和谁对讲:
function sink(type, data) {
if (type === 0) {
// sink 收到了来自 source 的问好
// 问好的时候肯定 source 和 sink 的对讲方式
const talkback = data;
// 3s 后,sink 终止和 source 的对讲
setTimeout(() => talkback(2), 3000);
}
}
复制代码
定义数据处理过程
function sink(type, data) {
if (type === 0) {
const talkback = data;
setTimeout(() => talkback(2), 3000);
}
if (type === 1) {
console.log(data);
}
}
复制代码
定义结束过程
let handle;
function sink(type, data) {
if (type === 0) {
const talkback = data;
setTimeout(() => talkback(2), 3000);
}
if (type === 1) {
console.log(data);
}
if (type === 2) {
clearTimeout(handle);
}
}
复制代码
能够再用工厂函数让代码干净一些:
function makeSink() {
let handle;
return function sink(type, data) {
if (type === 0) {
const talkback = data;
handle = setTimeout(() => talkback(2), 3000);
}
if (type === 1) {
console.log(data);
}
if (type === 2) {
clearTimeout(handle);
}
}
}
复制代码
puller sink 则能够向 source 主动请求数据:
let handle;
function sink(type, data) {
if (type === 0) {
const talkback = data;
setInterval(() => talkback(1), 1000);
}
if (type === 1) {
console.log(data);
}
if (type === 2) {
clearTimeout(handle);
}
}
复制代码
定义问好过程:
function source(type, data) {
if (type === 0) {
// 若是 source 收到 sink 的问好,
// 则 payload 即为 sink,source 能够向 sink 发送数据了
const sink = data;
setInterval(() => {
sink(1, null);
}, 1000);
}
// 让 source 也和 sink 问好,完成一次握手
sink(0, /* talkback callbag here */)
}
复制代码
当 sink 想要中止观察,须要让 source 有处理中止的能力,另外,listenable 的 source 不会理会 sink 主动的数据索取。所以,咱们这么告知 sink 沟通方式:
function source(type, data) {
if (type === 0) {
const sink = data;
let handle = setInterval(() => {
sink(1, null);
}, 1000);
}
const talkback = (type, data) => {
if (type === 2) {
clearInterval(handle);
}
}
sink(0, talkback);
}
复制代码
优化一下代码可读性:
function source(start, sink) {
if (start !== 0) return;
let handle = setInterval(() => {
sink(1, null);
}, 1000);
const talkback = (t, d) => {
if (t === 2) clearInterval(handle);
};
sink(0, talkback);
}
复制代码
pullable source 中,值时按照 sink 的须要获取的,所以,只有在 sink 索取值时,source 才须要交付数据:
function source(start, sink) {
if (start !== 0) retrun;
let i = 10;
const talkback = (t, d) => {
if (t == 1) {
if (i <= 20) sink(1, i++);
else sink(2);
}
}
sink(0, talkback)
}
复制代码
借助于 operator,可以不断的构建新的 source,operator 的通常范式为:
const myOperator = args => inputSource => outputSource
复制代码
借助于管道技术,咱们能一步步的声明新的 source:
pipe(
source,
myOperator(args),
iterate(x => console.log(x))
)
// same as...
pipe(
source,
inputSource => outputSource,
iterate(x => console.log(x))
)
复制代码
下面咱们建立了一个乘法 operator:
const multiplyBy = factor => inputSource => {
return function outputSource(start, outputSink) {
if (start !== 0) return;
inputSource(start, (type, data) => {
if (type === 1) {
outputSink(1, data * factor);
} else {
outputSink(1, data * factor);
}
})
}
}
复制代码
使用:
function source(start, sink) {
if (start !== 0) return;
let i = 0;
const handle = setInterval(() => sink(1, i++), 3000);
const talkback = (type, data) => {
if (type === 2) {
clearInterval(handle);
}
}
sink(0, talkback);
}
let timeout;
function sink(type, data) {
if (type === 0) {
const talkback = data;
timetout = setTimeout(() => talback(2), 9000);
}
if (type === 1) {
console.log('data is', data);
}
if (type === 2) {
clearTimeout(handle);
}
}
const newSource = multiplyBy(3)(source);
newSource(0, sink);
复制代码
经过 callbag ,咱们能够近乎一致的处理数据源和数据源的消费:
例如,下面是 listenable 数据源,咱们用 forEach
消费:
const {forEach, fromEvent, map, filter, pipe} = require('callbag-basics');
pipe(
fromEvent(document, 'click'),
filter(ev => ev.target.tagName === 'BUTTON'),
map(ev => ({x: ev.clientX, y: ev.clientY})),
forEach(coords => console.log(coords))
);
复制代码
下面则是 pullable 数据源,咱们仍能够用 forEach
进行消费:
const {forEach, fromIter, take, map, pipe} = require('callbag-basics');
function* getRandom() {
while(true) {
yield Math.random();
}
}
pipe(
fromIter(getRandom()),
take(5),
forEach(x => console.log(x))
);
复制代码