原文连接: blog.angularindepth.com/rxjs-unders…html
本文为 RxJS 中文社区 翻译文章,如需转载,请注明出处,谢谢合做!git
若是你也想和咱们一块儿,翻译更多优质的 RxJS 文章以奉献给你们,请点击【这里】es6
照片取自 Unsplash,做者 Kimberly Farmer 。github
我常常会被问及 publish
操做符的相关问题:bash
publish 和 share 之间的区别是什么?dom
如何导入 refCount 操做符?函数
什么时候使用 AsyncSubject?post
咱们来解答这些问题,并让你了解到更多内容,首先从基础入手。ui
多播是一个术语,它用来描述由单个 observable 发出的每一个通知会被多个观察者所接收的状况。一个 observable 是否具有多播的能力取决于它是热的仍是冷的。spa
热的和冷的 observable 的特征在于 observable 通知的生产者是在哪建立的。在 Ben Lesh 的 热的 Vs 冷的 Observables 一文中,他详细讨论了二者间的差别,这些差别能够概括以下:
timer
observable 就是冷的,每次订阅时都会建立一个新的定时器。fromEvent
建立的 observable 就是热的,产生事件的元素存在于 DOM 之中,它不是观察者订阅时所建立的。冷的 observables 是单播的,每一个观察者所接收到的通知都是来自不一样的生产者,生产者是观察者订阅时所建立的。
热的 observables 是多播的,每一个观察者所接收到的通知都是来自同一个生产者。
有些时候,须要冷的 observable 具备多播的行为,RxJS 引入了 Subject
类使之成为可能。
Subject 便是 observable,又是 observer (观察者)。经过使用观察者来订阅 subject,而后 subject 再订阅冷的 observable,可让冷的 observable 变成热的。这是 RxJS 引入 subjects 的主要用途,在 Ben Lesh 的 关于 RxJS 中的 Subject 一文中,他指出:
多播是 RxJS 中 Subjects 的主要用法。
咱们来看下面的示例:
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
const source = Observable.defer(() => Observable.of(
Math.floor(Math.random() * 100)
));
function observer(name: string) {
return {
next: (value: number) => console.log(`observer ${name}: ${value}`),
complete: () => console.log(`observer ${name}: complete`)
};
}
const subject = new Subject<number>();
subject.subscribe(observer("a"));
subject.subscribe(observer("b"));
source.subscribe(subject);
复制代码
示例中的 source
是冷的。每次观察者订阅 source
时,传给 defer
的工厂函数会建立一个发出随机数后完成的 observable 。
要让 source
变成多播的,须要观察者订阅 subject,而后 subject 再订阅 source
。source
只会看到一个订阅 ( subscription ),它也只生成一个包含随机数的 next
通知和一个 complete
通知。Subject 会将这些通知发送给它的观察者,输出以下所示:
observer a: 42
observer b: 42
observer a: complete
observer b: complete
复制代码
此示例能够做为 RxJS 多播的基本心智模型: 一个源 observable,一个订阅源 observable 的 subject 和多个订阅 subject 的观察者。
RxJS 引入了 multicast
操做符,它能够应用于 observable ,使其变成热的。此操做符封装了 subject 用于多播 observable 时所涉及的基础结构。
在看 multicast
操做符以前,咱们使用一个简单实现的 multicast
函数来替代上面示例中的 subject :
function multicast<T>(source: Observable<T>) {
const subject = new Subject<T>();
source.subscribe(subject);
return subject;
}
const m = multicast(source);
m.subscribe(observer("a"));
m.subscribe(observer("b"));
复制代码
代码改变后,示例的输出以下:
observer a: complete
observer b: complete
复制代码
这并非咱们想要的结果。在函数内部订阅 subject 使得 subject 在被观察者订阅以前就已经收到了 next
和 complete
通知,因此观察者只能收到 complete
通知。
这是可避免的,任何链接多播基础结构的函数的调用者须要可以在 subject 订阅源 observable 时进行控制。RxJS 的 multicast
操做符经过返回一个特殊的 observable 类型 ConnectableObservable 来实现的。
ConnectableObservable 封装了多播的基础结构,但它不会当即订阅源 observable ,只有当它的 connect
方法调用时,它才会订阅源 observable 。
咱们来使用 multicast
操做符:
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/multicast";
const source = Observable.defer(() => Observable.of(
Math.floor(Math.random() * 100)
));
function observer(name: string) {
return {
next: (value: number) => console.log(`observer ${name}: ${value}`),
complete: () => console.log(`observer ${name}: complete`)
};
}
const m = source.multicast(new Subject<number>());
m.subscribe(observer("a"));
m.subscribe(observer("b"));
m.connect();
复制代码
代码改变后,如今观察者能够收到 next
通知了:
observer a: 54
observer b: 54
observer a: complete
observer b: complete
复制代码
调用 connect
时,传入 multicast
操做符的 subject 会订阅源 observable,而 subject 的观察者会收到多播通知,这正符合 RxJS 多播的基本心智模型。
ConnectableObservable 还有另一个方法 refCount
,它能够用来肯定源 observable 什么时候产生了订阅。
refCount
看上去就像是操做符,也就是说,它是在 observable 上调用的方法而且返回另外一个 observable,可是它只是 ConnectableObservable
的方法并且不须要导入。顾名思义,refCount
返回 observable, 它负责维护已产生的订阅的引用计数。
当观察者订阅负责引用计数的 observable 时,引用计数会增长,若是前一个引用计数为0的话,负责多播基础结构的 subject 会订阅源 observable 。当观察者取消订阅时,引用计数会减小,若是引用计数归零的话,subject 会取消对源 observable 的订阅。
咱们来使用 refCount
:
const m = source.multicast(new Subject<number>()).refCount();
m.subscribe(observer("a"));
m.subscribe(observer("b"));
复制代码
代码改变后,输出以下所示:
observer a: 42
observer a: complete
observer b: complete
复制代码
只有第一个观察者收到了 next
通知。咱们来看看缘由。
示例中的源 observable 会当即发出通知。也就是说,一旦订阅了,源 observable 就会发出 next
和 complete
通知,complete
通知致使在第二个观察者订阅以前第一个就取消了订阅。当第一个取消订阅时,引用计数会归零,因此负责多播基础结构的 subject 也会取消源 observable 的订阅。
当第二个观察者订阅时,subject 会再次订阅源 observable,但因为 subject 已经收到了 complete
通知,因此它没法被重用。
向 multicast
传入 subject 的工厂函数能够解决此问题:
const m = source.multicast(() => new Subject<number>()).refCount();
m.subscribe(observer("a"));
m.subscribe(observer("b"));
复制代码
代码改变后,每次源 observable 被订阅时,都会建立一个新的 subject,输出以下所示:
observer a: 42
observer a: complete
observer b: 54
observer b: complete
复制代码
由于源 observable 会当即发出通知,因此观察者收到的通知是分开的。将 source
进行修改,以便延迟通知:
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/delay";
import "rxjs/add/operator/multicast";
const source = Observable.defer(() => Observable.of(
Math.floor(Math.random() * 100)
)).delay(0);
复制代码
观察者依然会收到多播通知,输出以下所示:
observer a: 42
observer b: 42
observer a: complete
observer b: complete
复制代码
总结一下,上述示例展现了 multicast
操做符的如下特色:
connect
方法以用于肯定源 observable 什么时候产生了订阅;refCount
方法以用于自动管理源 observable 的订阅;refCount
,必须传入 Subject
的工厂函数,而不是 Subject
实例;接下来咱们来看 publish
和 share
操做符,以及 publish
的变种,看看它们是如何在 multicast
操做符所提供的基础之上创建的。
咱们经过下面的示例来看看 publish
操做符:
import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/delay";
import "rxjs/add/operator/publish";
function random() {
return Math.floor(Math.random() * 100);
}
const source = Observable.concat(
Observable.defer(() => Observable.of(random())),
Observable.defer(() => Observable.of(random())).delay(1)
);
function observer(name: string) {
return {
next: (value: number) => console.log(`observer ${name}: ${value}`),
complete: () => console.log(`observer ${name}: complete`)
};
}
const p = source.publish();
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);
复制代码
示例中的源 observable 会当即发出一个随机数,通过短暂的延迟后发出另外一个随机数,而后完成。这个示例可让咱们看到订阅者在 connect
调用前、connect
调用后以及调用过 publish 的 observable 完成后订阅分别会发生什么。
publish
操做符是对 multicast
操做符进行了一层薄薄的封装。它会调用 multicast
并传入 Subject
。
示例的输出以下所示:
observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
复制代码
观察者收到的通知可概括以下:
a
是在 connect
调用前订阅的,因此它能收到两个 next
通知和 complete
通知。b
是在 connect
调用后订阅的,此时第一个当即发送的 next
通知已经发出过了,因此它只能收到第二个 next
通知和 complete
通知。c
是在源 observable 完成后订阅的,因此它只能收到 complete
通知。使用 refCount
来代替 connect
:
const p = source.publish().refCount();
p.subscribe(observer("a"));
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);
复制代码
示例的输出以下所示:
observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
复制代码
输出跟使用 connect
时的相似。这是为何?
b
没有收到第一个 next
通知是由于源 observable 的第一个 next
通知是当即发出的,因此只有 a
能收到。
c
是在调用过 publish 的 observable 完成后订阅的,因此订阅的引用计数已是0,此时将会再生成一个订阅。可是,publish
传给 multicast
的是 subject,而不是工厂函数,由于 subjects 没法被复用,因此 c
只能收到 complete
通知。
publish
和 multicast
操做符都接受一个可选的 selector
函数,若是指定了此函数,操做符的行为将会有很大的不一样。这将在另外一篇文章 multicast 操做符的秘密中详细介绍。
publish
操做符有几个变种,它们都以一种相似的方式对 multicast
进行了包装,传入的是 subjects,而不是工厂函数。可是,它们传入的是不一样类型的 subjects 。
publish
变种使用的特殊类型的 subjects 包括:
BehaviorSubject
ReplaySubject
AsyncSubject
关于如何使用这些特殊类型的 subjects 的答案是: 每一个变种都与一个特殊类型的 subject 相关联,当你须要的行为相似于某个 publish
变种时,就使用相对应的 subject 。咱们来看看这些变种的行为是怎样的。
publishBehavior
传给 multicast
的是 BehaviorSubject
,而不是 Subject
。BehaviorSubject
相似于 Subject
,但若是 subject 的订阅发生在源 observable 发出 next
通知以前,那么 subject 会发出包含初始值的 next
通知。
咱们更改下示例,给生成随机数的源 observable 加上短暂的延迟,这样它就不会当即发出随机数:
const delayed = Observable.timer(1).switchMapTo(source);
const p = delayed.publishBehavior(-1);
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);
复制代码
示例的输出以下所示:
observer a: -1
observer b: -1
observer a: 42
observer b: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
复制代码
观察者收到的通知可概括以下:
a
是在 connect
调用前订阅的,因此它能收到带有 subject 的初始值的 next
通知、源 observable 的两个 next
通知和 complete
通知。b
是在 connect
调用后但在 subject 收到源 observable 的第一个 next
通知前订阅的,因此它能收到带有 subject 的初始值的 next
通知、源 observable 的两个 next
通知和 complete
通知。c
是在源 observable 完成后订阅的,因此它只能收到 complete
通知。publishReplay
传给 multicast
的是 ReplaySubject
,而不是 Subject
。顾名思义,每当观察者订阅时,ReplaySubject
会重放指定数量的 next
通知。
const p = source.publishReplay(1);
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);
复制代码
使用了 publishReplay
,示例的输出以下所示:
observer a: 42
observer b: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 54
observer c: complete
复制代码
观察者收到的通知可概括以下:
a
是在 connect
调用前订阅的,此时 subject 尚未收到 next
通知,因此 a
能收到源 observable 的两个 next
通知和 complete
通知。b
是在 connect
调用后订阅的,此时 subject 已经收到了源 observable 的第一个 next
通知,因此 b
能收到重放的 next
通知、源 observable 的第二个 next
通知和 complete
通知。c
是在源 observable 完成后订阅的,因此它能收到重放的 next
通知和 complete
通知。来看看 c
的行为,很明显,不一样于 publish
操做符,publishReplay
操做符适合使用 refCount
方法,由于观察者在源 observable 完成后订阅依然能收到任意数量的重放的 next
通知。
publishLast
传给 multicast
的是 AsyncSubject
,而不是 Subject
。AsyncSubject
是最特别的特殊类型 subjects 。只有当它完成时,才会发出 next
通知 (若是有 next
通知的话) 和 complete
通知,这个 next
通知是源 observable 中的最后一个 next
通知。
const p = source.publishLast();
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);
复制代码
使用了 publishLast
,示例的输出以下所示:
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 54
observer c: complete
复制代码
观察者收到的通知可概括以下:
a
和 b
都是在源 observable 完成前订阅的,但直到源 observable 完成它们才能收到通知,它们能收到带有第二个随机数的 next
通知和 complete
通知。c
是在源 observable 完成后订阅的,它能收到带有第二个随机数的 next
通知和 complete
通知。与 publishReplay
相似,publishLast
操做符适合使用 refCount
方法,由于观察者在源 observable 完成后订阅依然能收到任意数量的重放的 next
通知。
share
操做符相似于使用 publish().refCount()
。可是,share
传给 multicast
的是工厂函数,这意味着在引用计数为0以后发生订阅的话,会建立一个新的 Subject
来订阅源 observable 。
const s = source.share();
s.subscribe(observer("a"));
s.subscribe(observer("b"));
setTimeout(() => s.subscribe(observer("c")), 10);
复制代码
使用了 share
,示例的输出以下所示:
observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 6
observer c: 9
observer c: complete
复制代码
观察者收到的通知可概括以下:
a
订阅后当即收到第一个 next
通知,随后是第二个 next
通知和 complete
通知。b
只能收到第二个 next
通知和 complete
通知。c
是在源 observable 完成后订阅的,会建立一个新的 subject 来订阅源 observable,它会当即收到第一个 next
通知,随后是第二个 next
通知和 complete
通知。在上面这些示例中,咱们介绍了 publish
和 share
操做符,当源 observable 完成时,a
和 b
会自动取消订阅。若是源 observable 报错,它们也一样会自动取消订阅。publish
和 share
操做符还有另一个不一样点:
publish
返回的 observable 的任何未来的订阅者都将收到 error
通知。share
返回的 observable 的任何未来的订阅者会生成源 observable 的一个新订阅,由于错误会自动取消任何订阅者的订阅,将其引用计数归零。就这样了,本文到此结束。咱们介绍了六个操做符,但它们全是经过一种相似的方式来实现的,它们全都符合同一个基本的心智模型: 一个源 observable、一个订阅源 observable 的 subject 和多个订阅 subject 的观察者。
本文只是简略地介绍了 refCount
方法。想要深刻了解,请参见 RxJS: 如何使用 refCount。