[译] 经过构建 Observable 来学习 Observable

原文连接: medium.com/@benlesh/le…
本文为 RxJS 中文社区 翻译文章,如需转载,请注明出处,谢谢合做!
若是你也想和咱们一块儿,翻译更多优质的 RxJS 文章以奉献给你们,请点击【这里】javascript

经过社交媒体或在活动现场,我常常会被问到关于“热的” vs “冷的” observables,或者 observable 到底是“多播”仍是“单播”。对于 Rx.Observable,人们以为它内部的工做原理彻底是黑魔法,这令他们感到十分困惑。当被问及如何描述 observable 时,人们会说:“他们是流”或“他们相似于 promises ”。事实上,我在不少场合甚至在公开演讲中都谈论过这些内容。java

与 promises 进行比较是必要的,但也是不幸的。鉴于 promieses 和 observables 都是异步基本类型 ( async primitives ),而且 promises 已被 JavaScript 社区普遍使用和熟悉,这一般是一个很好的起点。将 promise 的 then 与 observable 的 subscribe 进行比较,promise 是当即执行,而 observable 是惰性执行,是可取消、可复用的,等等。这是向初学者介绍 observables 的理想方式。git

但这样有个问题: Observables 与 promises 的不一样点要远多于它们之间的相同点。Promises 永远是多播的。Promise 的解析 ( resolution ) 和拒绝 ( rejection ) 永远是异步的。当人们处理 observables 时,仿佛就是在处理 promises,因此他们指望二者的行为也是类似的,但这不并老是对的。Observables 有时是多播的。Observables 一般是异步的。我也有些自责,由于我滋长了这种误解的蔓延。github

Observables 只是一个函数,它接收 observer 并返回函数

若是你真的想要理解 observable,你能够本身写个简单的。这并无听上去那么困难,真心的。将 observable 归结为最精简的部分,无外乎就是某种特定类型的函数,该函数有其针对性的用途。编程

模型:

  • 函数
  • 接收 observer: observer 是有 nexterrorcomplete 方法的对象
  • 返回一个可取消的函数

目的:

将观察者 ( observer ) 与生产者 ( producer ) 链接,并返回一种手段来拆解与生产者之间的链接。观察者其实是处理函数的注册表,处理函数能够随时间推移推送值。数组

基本实现:

function myObservable(observer) {
    const datasource = new DataSource();
    datasource.ondata = (e) => observer.next(e);
    datasource.onerror = (err) => observer.error(err);
    datasource.oncomplete = () => observer.complete();
    return () => {
        datasource.destroy();
    };
}复制代码

(你能够点击这里进行在线调试)promise

如你所见,并无太多东西,只是一个至关简单的契约。安全

安全的观察者: 让观察者变得更好

当谈论到 RxJS 或响应式编程时,一般 observables 出现的频率是最高的。但实际上,观察者的实现才是这类响应式编程的中流砥柱。Observables 是惰性的。它们只是函数而已。它们什么也不作直到你 subscribe 它们,它们装配好了观察者,而后就完事了,与沉闷的老式函数并没有差异,等待着被调用而已。另外一方面,观察者保持活跃状态并监听来自生产者的事件。异步

你可使用任何有 nexterrocomplete 方法的简单 JavaScript 对象 (POJO) 来订阅 observable,但你所用来订阅 observable 的 POJO 观察者真的只是个开始。在 RxJS 5中,咱们须要为你提供一些保障。下面罗列了一些重要的保障:async

观察者保障

  1. 若是你传递的观察者彻底没有以上所述的三个方法,也是能够的。
  2. 你不想在 completeerror 以后调用 next
  3. 若是取消订阅了,那么你不想任何方法被调用。
  4. 调用 completeerror 须要调用取消订阅逻辑。
  5. 若是 nextcompleteerror 处理方法抛出异常,你想要调用取消订阅逻辑,以确保不会泄露资源。
  6. nexterrorcomplete 实际上都是可选的。你无需处理每一个值、错误或完成。你可能只是想要处理其中一二。

为了完成列表中的任务,咱们须要将你提供的匿名观察者包装在 “SafeObserver” 中以实施上述保障。由于上面的#2,咱们须要追踪 completeerror 是否被调用过。由于#3,咱们须要使 SafeObserver 知道消费者什么时候要取消订阅。最后,由于#4,SafeObserver 实际上须要了解取消订阅逻辑,这样当 completeerror 被调用时才能够调用它。

若是咱们想要用上面临时实现的 observable 函数来作这些的话,会变得有些粗糙... 这里有个 JSBin 代码片断,你能够看看并感觉下有多粗糙。我并无想要在这个示例中实现很是正宗的 SafeObserver,由于那么占用整篇文章篇幅,下面是咱们的 observable,此次它使用了 SafeObserver:

function myObservable(observer) {
   const safeObserver = new SafeObserver(observer);
   const datasource = new DataSource();
   datasource.ondata = (e) => safeObserver.next(e);
   datasource.onerror = (err) => safeObserver.error(err);
   datasource.oncomplete = () => safeObserver.complete();

   safeObserver.unsub = () => {
       datasource.destroy();
   };

   return safeObserver.unsubscribe.bind(safeObserver);
}复制代码

设计 Observable: 确保观察者安全

将 observables 做为类/对象使咱们可以轻松地将 SafeObserver 应用于传入的匿名观察者(和处理函数,若是你喜欢 RxJS 中的 subscribe(fn, fn, fn) 签名的话) 并为开发人员提供更好的开发体验。经过在 Observable 的 subscribe 实现中处理 SafeObserver 的建立,Observables 能够再次以最简单的方式来定义:

const myObservable = new Observable((observer) => {
    const datasource = new DataSource();
    datasource.ondata = (e) => observer.next(e);
    datasource.onerror = (err) => observer.error(err);
    datasource.oncomplete = () => observer.complete();
    return () => {
        datasource.destroy();
    };
});复制代码

你会注意到上面的代码片断与第一个示例看起来几乎同样。但它更容易阅读,也更容易理解。我扩展了 JSBin 示例来展现 Observable 的最小化实现

操做符: 一样只是函数

RxJS 中的“操做符”只不过是接收源 observable 的函数,并返回一个新的 observable,当你订阅该 observable 时它会订阅源 observable 。咱们能够实现一个基础、独立的操做符,如这个在线 JSBin 示例所示:

function map(source, project) {
  return new Observable((observer) => {
    const mapObserver = {
      next: (x) => observer.next(project(x)),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    };
    return source.subscribe(mapObserver);
  });
}复制代码

最重要的是要注意此操做符在作什么: 当你订阅它返回的 observable 时,它会建立 mapObserver 来完成工做并将 observermapObserver 链接起来。
构建操做符链实际上只是建立一个将观察者与订阅 ( subscription ) 链接起来的模板。

设计 Observable: 更优雅的操做符链

若是咱们全部的操做符都是像上面示例中那样用独立函数实现的话,将操做符连接起来会有些难看:

map(map(myObservable, (x) => x + 1), (x) => x + 2);复制代码

能够想象一下上面的代码,嵌套了5个或6个更复杂的操做符将会产生更多的参数。致使代码彻底不可读。

你可使用简单的 pipe 实现 (正如 Texas Toland 所建议的那样),它会对操做符数组进行累加以生成最终的 observable,但这意味着要编写更复杂的、返回函数的操做符,(点击这里查看 JSBin 示例)。这一样没有使得一切变得完美:

pipe(myObservable, map(x => x + 1), map(x => x + 2));复制代码

理想的是咱们可以将操做符以一种更天然的方式连接起来,好比这样:

myObservable.map(x => x + 1).map(x => x + 2);复制代码

幸运的是,咱们的 Observable 类已经支持这种操做符的链式行为。它不会给操做符的实现代码任何额外的复杂度,但它的代价是违背了我所提倡的“摒弃原型 ( prototype )”,一旦添加了足够你使用的操做符,原型中的方法或许就太多了。点击 (这里的 JSBin 示例) 查看咱们添加到 Observable 实现原型中的 map 操做符:

Observable.prototype.map = function (project) {
    return new Observable((observer) => {
        const mapObserver = {
            next: (x) => observer.next(project(x)),
            error: (err) => observer.error(err),
            complete: () => observer.complete()
        };
        return this.subscribe(mapObserver);
    });
};复制代码

如今咱们拥有了更好的语法。这种方法还有其余好处,也更高级。例如,咱们能够将 Observable 子类化为特定类型的 observables (例如包装 Promise 或一组静态值的 observables),并经过覆盖它们来对咱们的运算符进行优化。

TLDR: Observable 是函数,它接收 observer 并返回函数

记住,阅读以上全部内容后,全部的这一切都是围绕一个简单的函数设计的。Observables 是函数,它接收 observer 并返回函数。仅此而已。若是你编写了一个函数,它接收 observer 并返回函数,那它是异步的,仍是同步的?都不是,它就是个函数。任何函数的行为都彻底取决于它是如何实现的。因此,当处理 Observable 时,就像你所传递的函数引用那样对待它,而不是一些所谓的魔法,有状态的外星类型。当你构建操做符链式,你真正要作的是构成一个函数,该函数会设置连接在一块儿的观察者链,并将值传递给观察者。

注意: 示例中的 Observable 实现仍然返回的是函数,而 RxJS 和 es-observable 规范返回的是 Subscription 对象。Subscription 对象是一种更好的设计,但我又得写一整篇文章来说它。因此我只保留了它的取消订阅功能,以保持本文中全部示例的简单性。

相关文章
相关标签/搜索