原文连接: medium.com/@benlesh/ho…
本文为 RxJS 中文社区 翻译文章,如需转载,请注明出处,谢谢合做!
若是你也想和咱们一块儿,翻译更多优质的 RxJS 文章以奉献给你们,请点击【这里】javascript
TL;DR: 当不想一遍又一遍地建立生产者( producer )时,你须要热的 Observable 。java
// 冷的
var cold = new Observable((observer) => {
var producer = new Producer();
// observer 会监听 producer
});复制代码
// 热的
var producer = new Producer();
var hot = new Observable((observer) => {
// observer 会监听 producer
});复制代码
我最新的文章经过构建 Observable 来学习 Observable 主要是为了说明 Observable 只是函数。这篇文章的目标是为了揭开 Observable 自身的神秘面纱,但它并无真正深刻到 Observable 让初学者最容易困惑的问题: “热”与“冷”的概念。git
Observables 是将观察者和生产者联系起来的函数。 仅此而已。它们并不必定要创建生产者,它们只需创建观察者来监听生产者,而且一般会返回一个拆卸机制来删除该监听器。github
生产者是 Observable 值的来源。它能够是 Web Socket、DOM 事件、迭代器或在数组中循环的某种东西。基本上,这是你用来获取值的任何东西,并将它们传递给 observe.next(value)
。设计模式
若是底层的生产者是在订阅期间建立并激活的,那么 Observable 就是“冷的”。这意味着,若是 Observables 是函数,而生产者是经过调用该函数建立并激活的。数组
下面的示例 Observable 是“冷的”,由于它在订阅函数(在订阅该 Observable 时调用)中建立并监听了 WebSocket :socket
const source = new Observable((observer) => {
const socket = new WebSocket('ws://someurl');
socket.addEventListener('message', (e) => observer.next(e));
return () => socket.close();
});复制代码
因此任何 source
的订阅都会获得本身的 WebSocket 实例,当取消订阅时,它会关闭 socket 。这意味着 source
是真正的单播,由于生产者只会发送给一个观察者。这是用来阐述概念的基础 JSBin 示例。函数
若是底层的生产者是在 订阅¹ 外部建立或激活的,那么 Observable 就是“热的”。学习
若是咱们沿用上面的示例并将 WebSocket 的建立移至 Observable 的外部,那么 Observable 就会变成“热的”:ui
const socket = new WebSocket('ws://someurl');
const source = new Observable((observer) => {
socket.addEventListener('message', (e) => observer.next(e));
});复制代码
如今任何 source
的订阅都会共享同一个 WebSocket 实例。它实际上会多播给全部订阅者。但还有个小问题: 咱们再也不使用 Observable 来运行拆卸 socket 的逻辑。这意味着像错误和完成这样的通知再也不会为咱们来关闭 socket ,取消订阅也同样。因此咱们真正想要的实际上是使“冷的” Observable 变成“热的”。这是用来展现基础概念的 JSBin 示例。
从上面展现冷的 Observable 的第一个示例中,你能够发现全部冷的 Observables 可能都会些问题。就拿一件事来讲,若是你不止一次订阅了 Observable ,而这个 Observable 自己建立一些稀缺的资源,好比 WebSocket 链接,你不想一遍又一遍地建立这个 WebSocket 链接。实际上真的很容易建立了一个 Observable 的多个订阅而却没有意识到。假如说你想要在 WebSocket 订阅外部过滤全部的“奇数”和“偶数”值。在此场景下最终你会建立两个订阅:
source.filter(x => x % 2 === 0)
.subscribe(x => console.log('even', x));
source.filter(x => x % 2 === 1)
.subscribe(x => console.log('odd', x));复制代码
在将“冷的” Observable 变成“热的”以前,咱们须要介绍一个新的类型: Rx Subject 。它有以下特性:
next
的任何值。subscribe()
传递给它的 Observers 都会被添加到内部的观察者列表。next
值给它,值会从它 observable 那面出来。Rx 中的 Subject 之因此叫作 “Subject” 是由于上面的第3点。在 GoF (译注: 大名鼎鼎的《设计模式》一书) 的观察者模式中,“Subjects” 一般是有 addObserver
的类。在这里,咱们的 addObserver
方法就是 subscribe
。这是用来展现 Rx Subject 的基础行为的 JSBin 示例。
了解了上面的 Rx Subject 后,咱们可使用一些功能性的程序将任何“冷的” Observable 变成“热的”:
function makeHot(cold) {
const subject = new Subject();
cold.subscribe(subject);
return new Observable((observer) => subject.subscribe(observer));
}复制代码
咱们的新方法 makeHot
接收任何冷的 Observable 并经过建立由所获得的 Observable 共享的 Subject 将其变成热的。这是用来演示 JSBin 示例。
还有一点问题,就是没有追踪源的订阅,因此当想要拆卸时该如何作呢?咱们能够添加一些引用计数来解决这个问题:
function makeHotRefCounted(cold) {
const subject = new Subject();
const mainSub = cold.subscribe(subject);
let refs = 0;
return new Observable((observer) => {
refs++;
let sub = subject.subscribe(observer);
return () => {
refs--;
if (refs === 0) mainSub.unsubscribe();
sub.unsubscribe();
};
});
}复制代码
如今咱们有了一个热的 Observable ,当它的全部订阅结束时,咱们用来引用计数的 refs
会变成0,咱们将取消冷的源 Observable 的订阅。这是用来演示的 JSBin 示例。
publish()
或 share()
你可能不该该使用上面说起的任何 makeHot
函数,而是应该使用像 publish()
和 share()
这样的操做符。将冷的 Observable 变成热的有不少种方式和手段,在 Rx 中有高效和简洁的方式来完成此任务。关于 Rx 中能够作此事的各类操做符能够写上一整篇文章,但这不是文本的目标。本文的目标是巩固概念,什么是“热的”和“冷的” Observable 以及它们的真正意义。
在 RxJS 5 中,操做符 share()
会产生一个热的,引用计数的 Observable ,它能够在失败时重试,或在成功时重复执行。由于 Subjects 一旦发生错误、完成或取消订阅,便没法复用,因此 share()
操做符会重复利用已完结的 Subjects,以使结果 Observable 启用从新订阅。
这是 JSBin 示例,演示了在 RxJS 5 中使用 share()
将源 Observable 变热,以及它能够重试。
鉴于上述一切,人们可以看到 Observable 是怎样的,它只是一个函数,实际上能够同时是“热的”和“冷的”。或许它观察了两个生产者?一个是它建立的而另外一个是它复用的?这可能不是个好主意,但有极少数状况下多是必要的。例如,多路复用的 WebSocket 必须共享 socket ,但同时发送本身的订阅并过滤出数据流。
若是在 Observable 中复用了生产者的共享引用,它就是“热的”,若是在 Observable 中建立了新的生产者,它就是“冷的”。若是二者都作了…。那它究竟是什么?我猜是“暖的”。
¹ (注意: 生产者在订阅内部“激活”,直到将来某个时间点才“建立”出来,这种作法是有些奇怪,但使用代理的话,这也是可能的。) 一般“热的” Observables 的生产者是在订阅外部建立和激活的。
² 热的 Observables 一般是多播的,可是它们可能正在监听一个只支持一个监听器的生产者。在这一点上将其称之为“多播”有点勉强。