原文连接: blog.angularindepth.com/rxjs-how-to…html
本文为 RxJS 中文社区 翻译文章,如需转载,请注明出处,谢谢合做!git
若是你也想和咱们一块儿,翻译更多优质的 RxJS 文章以奉献给你们,请点击【这里】es6
在个人上篇文章 理解 publish 和 share 操做符中,只是简单介绍了 refCount 方法。在这篇文章中咱们将深刻介绍。github
简单回顾一下, RxJS 多播的基本心智模型包括: 一个源 observable,一个订阅源 observable 的 subject 和多个订阅 subject 的观察者。multicast
操做符封装了基于 subject 的基础结构并返回拥有 connect
和 refCount
方法的 ConnectableObservable
。bash
顾名思义,refCount
返回的 observable 维护订阅者的引用计数。函数
当观察者订阅有引用计数的 observable 时,引用计数会增长,若是上一个引用计数为零的话,负责多播基础结构的 subject 会订阅源 observable 。而当观察者取消订阅时,引用计数则会减小,若是引用计数归零的话,subject 会取消源 observable 的订阅。工具
这种引用计数的行为有两种用途:post
咱们来详细介绍每一种状况,而后创建一些使用 refCount
的通用指南。ui
publish
操做符返回 ConnectableObservable
。调用 ConnectableObservable
的 connect
方法时,负责多播基础结构的 subject 会订阅源 observable 并返回 subscription (订阅)。subject 会保持对源 observable 的订阅直到调用 subscription 的 unsubscribe
方法。spa
咱们来看下面的示例,观察者会接收一个值,而后(隐式地)取消对调用过 publish
的 observable 的订阅:
const source = instrument(Observable.interval(100));
const published = source.publish();
const a = published.take(1).subscribe(observer("a"));
const b = published.take(1).subscribe(observer("b"));
const subscription = published.connect();
复制代码
本文中的示例都将使用下面的工具函数来让源 observable 具有日志功能,以及建立有名称的观察者:
function instrument<T>(source: Observable<T>) {
return Observable.create((observer: Observer<T>) => {
console.log("source: subscribing");
const subscription = source
.do(value => console.log(`source: ${value}`))
.subscribe(observer);
return () => {
subscription.unsubscribe();
console.log("source: unsubscribed");
};
}) as Observable<T>;
}
function observer<T>(name: string) {
return {
next: (value: T) => console.log(`observer ${name}: ${value}`),
complete: () => console.log(`observer ${name}: complete`)
};
}
复制代码
示例的输出以下所示:
source: subscribing
source: 0
observer a: 0
observer a: complete
observer b: 0
observer b: complete
source: 1
source: 2
source: 3
...
复制代码
两个观察者都只接收一个值而后完成,完成的同时取消对调用过 publish
的 observable 的订阅。可是,多播基础结构仍然保持着对源 observable 的订阅。
若是不想显示地执行取消订阅操做的话,可使用 refCount
:
const source = instrument(Observable.interval(100));
const counted = source.publish().refCount();
const a = counted.take(1).subscribe(observer("a"));
const b = counted.take(1).subscribe(observer("b"));
复制代码
观察者订阅使用引用计数的 observable 的话,当引用计数归零时,负责多播的基础结构的 subject 会取消源 observable 的订阅,示例的输出以下所示:
source: subscribing
source: 0
observer a: 0
observer a: complete
observer b: 0
observer b: complete
source: unsubscribed
复制代码
当引用计数归零后,多播的基础结构除了取消源 observable 的订阅,当负责引用计数的 observable 再次发生订阅时,它还会从新订阅源 observable 。
咱们使用下面的示例来看看当使用已完成的源 observable 时会发生什么:
const source = instrument(Observable.timer(100));
const counted = source.publish().refCount();
const a = counted.subscribe(observer("a"));
setTimeout(() => a.unsubscribe(), 110);
setTimeout(() => counted.subscribe(observer("b")), 120);
复制代码
示例中使用 timer
observable 做为源。它会等待指定的毫秒数后发出 next
和 complete
通知。还有两个观察者: a
在源 observable 完成后订阅,在源 observable 完成后取消订阅;b
在 a
取消订阅后订阅。
示例的输出以下:
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: complete
复制代码
当 b
订阅时,引用计数为零,因此多播的基础结构会指望 subject 从新订阅源 observable 。可是,因为 subject 已经收到了源 observable 的 complete
通知,而且 subject 是没法复用的,因此实际上并无进行从新订阅,b
只能收到 complete
通知。
若是使用 publishBehavior(-1)
来代替 publish()
的话,输出相似,但会包含 BehaviorSubject
的初始值:
observer a: -1
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: complete
复制代码
一样的,b
仍是只能收到 complete
通知。
若是使用 publishReplay(1)
来代替 publish()
的话,状况会有些变化,输出以下:
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: 0
observer b: complete
复制代码
一样的,此次也没有从新订阅源 observable,由于 subject 已经完成了。可是,已完成的 ReplaySubject
将通知重放给后来的订阅者,因此 b
能收到重放的 next
通知和 complete
通知。
若是使用 publishLast()
来代替 publish()
的话,状况又会有些不一样,输出以下:
source: subscribing
source: 0
source: unsubscribed
observer a: 0
observer a: complete
observer b: 0
observer b: complete
复制代码
一样的,依然没有从新订阅源 observable,由于 subject 已经完成了。可是,AsyncSubject
会将最后收到的 next
通知发给它的订阅者,因此 a
和 b
都收到的是 next
和 complete
通知。
综上所述,根据示例咱们能够发现 publish
以及它的变种:
publish
和 publishBehavior
与 refCount
一块儿使用时,后来的订阅者只会收到 complete
通知,这彷佛并非咱们想要的效果。publishReplay
和 publishLast
与 refCount
一块儿使用时,后来的订阅者会收到预期的通知。咱们已经看过了从新订阅已完成的源 observable 时会发生什么,如今咱们再来看看从新订阅未完成的源 observable 是怎样一个状况。
这个示例中将使用 interval
observable 来替代 timer
observable,它会根据指定的时间间隔重复地发出包含自增数字的 next
通知:
const source = instrument(Observable.interval(100));
const counted = source.publish().refCount();
const a = counted.subscribe(observer("a"));
setTimeout(() => a.unsubscribe(), 110);
setTimeout(() => counted.subscribe(observer("b")), 120);
复制代码
示例的输出以下所示:
source: subscribing
source: 0
observer a: 0
source: unsubscribed
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
复制代码
与使用已完成的源 observable 的示例不一样的是,负责多播基础结构的 subject 可以被从新订阅,因此源 observable 能够产生新的订阅。b
所收到的 next
通知即是从新订阅的证据: 该通知包含数值0,由于从新订阅已经开启了全新的 interval
序列。
若是使用 publishBehavior(-1)
来代替 publish()
的话,状况会有所不一样,输出以下所示:
observer a: -1
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer b: 0
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
复制代码
输出是相似的,能够清楚地看到从新订阅开启了全新的 interval
序列。可是,在收到 interval
的 next
通知前,a
还收到了包含 BehaviorSubject
初始值-1的 next
通知,b
会收到包含 BehaviorSubject
当前值0的 next
通知。
若是使用 publishReplay(1)
来代替 publish()
的话,状况又会有所不一样,输出以下所示:
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer b: 0
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
复制代码
输出也是相似的,能够清楚地看到从新订阅开启了全新的 interval
序列。可是,b
在收到源 observable 的第一个 next
通知以前会收到重放的 next
通知。
综上所述,根据示例咱们能够发现,当对未完成的源 observable 使用 refCount
时,publish
、publishBehavior
和 publishReplay
的行为都如预期通常,没有让人出乎意料之处。
在 RxJS 5.4.0 版本中引入了 shareReplay 操做符。它与 publishReplay().refCount()
十分类似,只是有一个细微的差异。
与 share
相似, shareReplay
传给 multicast
操做符的也是 subject 的工厂函数。这意味着当从新订阅源 observable 时,会使用工厂函数来建立出一个新的 subject 。可是,只有当前一个被订阅 subject 未完成的状况下,工厂函数才会返回新的 subject 。
publishReplay
传给 multicast
操做符的是 ReplaySubject
实例,而不是工厂函数,这是影响行为不一样的缘由。
对调用了 publishReplay().refCount()
的 observable 进行从新订阅,subject 会一直重放它的可重放通知。可是,对调用了 shareReplay()
的 observable 进行从新订阅,行为未必如前者同样,若是 subject 还未完成,会建立一个新的 subject 。因此区别在于,使用调用了 shareReplay()
的 observable 的话,当引用计数归零时,若是 subject 还未完成的话,可重放的通知会被冲洗掉。
根据咱们看过的这些示例,能够概括出以下使用准则:
refCount
能够与 publish
及其变种一块儿使用,从而自动地取消源 observable 的订阅。refCount
来自动取消已完成的源 observable 的订阅时,publishReplay
和 publishLast
的行为会如预期同样,可是,对于后来的订阅,publish
和 publishBehavior
的行为并没太大帮助,因此你应该只使用 publish
和 publishBehavior
来自动取消订阅。refCount
来自动取消未完成的源 observable 的订阅时,publish
、publishBehavior
和 publishRelay
的行为都会如预期同样。shareReplay()
的行为相似于 publishReplay().refCount()
,在对二者进行选择时,应该根据在对源 observable 进行从新订阅时,你是否想要冲洗掉可重放的通知。上面所描述的 shareReplay
的行为只适用于 RxJS 5.5 以前的版本。在 5.5.0 beta 中,shareReplay
作出了变动: 当引用计数归零时,操做符再也不取消源 observable 的订阅。
这项变化当即使得引用计数变得多余,由于只有当源 observable 完成或报错时,源 observable 的订阅才会取消订阅。这项变化也意味着只有在处理错误时,shareReplay
和 publishReplay().refCount()
才有所不一样:
publishReplay().refCount()
返回的 observable 的任何后来订阅者都将收到错误。shareReplay
返回的 observable 的任何后来订阅者都将产生一个源 observable 的新订阅。===================================结尾分界线=================================
这是多播三连的最后一篇,也应该是年前更新的最后一篇,在这里提早祝你们春节快乐,阖家欢乐,18年开开心心学 Rx 。
顺便预告下,年后回来咱们会来两篇实战型的文章。