[译] RxJS: 如何使用 refCount

原文连接: blog.angularindepth.com/rxjs-how-to…html

本文为 RxJS 中文社区 翻译文章,如需转载,请注明出处,谢谢合做!git

若是你也想和咱们一块儿,翻译更多优质的 RxJS 文章以奉献给你们,请点击【这里】es6

在个人上篇文章 理解 publish 和 share 操做符中,只是简单介绍了 refCount 方法。在这篇文章中咱们将深刻介绍。github

refCount 的做用是什么?

简单回顾一下, RxJS 多播的基本心智模型包括: 一个源 observable,一个订阅源 observable 的 subject 和多个订阅 subject 的观察者。multicast 操做符封装了基于 subject 的基础结构并返回拥有 connectrefCount 方法的 ConnectableObservablebash

顾名思义,refCount 返回的 observable 维护订阅者的引用计数。函数

当观察者订阅有引用计数的 observable 时,引用计数会增长,若是上一个引用计数为零的话,负责多播基础结构的 subject 会订阅源 observable 。而当观察者取消订阅时,引用计数则会减小,若是引用计数归零的话,subject 会取消源 observable 的订阅。工具

这种引用计数的行为有两种用途:post

  • 当全部观察者都取消订阅后,自动取消 subject 对源 observable 的订阅
  • 当全部观察者都取消订阅后,自动取消 subject 对源 observable 的订阅,而后当再有观察者订阅该引用计数的 observable 时,subject 从新订阅源 observable

咱们来详细介绍每一种状况,而后创建一些使用 refCount 的通用指南。ui

使用 refCount 自动取消订阅

publish 操做符返回 ConnectableObservable 。调用 ConnectableObservableconnect 方法时,负责多播基础结构的 subject 会订阅源 observable 并返回 subscription (订阅)。subject 会保持对源 observable 的订阅直到调用 subscription 的 unsubscribe 方法。spa

咱们来看下面的示例,观察者会接收一个值,而后(隐式地)取消对调用过 publish 的 observable 的订阅:

const source = instrument(Observable.interval(100));
const published = source.publish();
const a = published.take(1).subscribe(observer("a"));
const b = published.take(1).subscribe(observer("b"));
const subscription = published.connect();
复制代码

本文中的示例都将使用下面的工具函数来让源 observable 具有日志功能,以及建立有名称的观察者:

function instrument<T>(source: Observable<T>) {
  return Observable.create((observer: Observer<T>) => {
    console.log("source: subscribing");
    const subscription = source
      .do(value => console.log(`source: ${value}`))
      .subscribe(observer);
    return () => {
      subscription.unsubscribe();
      console.log("source: unsubscribed");
    };
  }) as Observable<T>;
}

function observer<T>(name: string) {
  return {
    next: (value: T) => console.log(`observer ${name}: ${value}`),
    complete: () => console.log(`observer ${name}: complete`)
  };
}
复制代码

示例的输出以下所示:

source: subscribing
source: 0
observer a: 0
observer a: complete
observer b: 0
observer b: complete
source: 1
source: 2
source: 3
...
复制代码

两个观察者都只接收一个值而后完成,完成的同时取消对调用过 publish 的 observable 的订阅。可是,多播基础结构仍然保持着对源 observable 的订阅。

若是不想显示地执行取消订阅操做的话,可使用 refCount:

const source = instrument(Observable.interval(100));
const counted = source.publish().refCount();
const a = counted.take(1).subscribe(observer("a"));
const b = counted.take(1).subscribe(observer("b"));
复制代码

观察者订阅使用引用计数的 observable 的话,当引用计数归零时,负责多播的基础结构的 subject 会取消源 observable 的订阅,示例的输出以下所示:

source: subscribing
source: 0
observer a: 0
observer a: complete
observer b: 0
observer b: complete
source: unsubscribed
复制代码

从新订阅已完成的 observables

当引用计数归零后,多播的基础结构除了取消源 observable 的订阅,当负责引用计数的 observable 再次发生订阅时,它还会从新订阅源 observable 。

咱们使用下面的示例来看看当使用已完成的源 observable 时会发生什么:

const source = instrument(Observable.timer(100));
const counted = source.publish().refCount();
const a = counted.subscribe(observer("a"));
setTimeout(() => a.unsubscribe(), 110);
setTimeout(() => counted.subscribe(observer("b")), 120);
复制代码

示例中使用 timer observable 做为源。它会等待指定的毫秒数后发出 nextcomplete 通知。还有两个观察者: a 在源 observable 完成后订阅,在源 observable 完成后取消订阅;ba 取消订阅后订阅。

示例的输出以下:

source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: complete
复制代码

b 订阅时,引用计数为零,因此多播的基础结构会指望 subject 从新订阅源 observable 。可是,因为 subject 已经收到了源 observable 的 complete 通知,而且 subject 是没法复用的,因此实际上并无进行从新订阅,b 只能收到 complete 通知。

若是使用 publishBehavior(-1) 来代替 publish() 的话,输出相似,但会包含 BehaviorSubject 的初始值:

observer a: -1
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: complete
复制代码

一样的,b 仍是只能收到 complete 通知。

若是使用 publishReplay(1) 来代替 publish() 的话,状况会有些变化,输出以下:

source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: 0
observer b: complete
复制代码

一样的,此次也没有从新订阅源 observable,由于 subject 已经完成了。可是,已完成的 ReplaySubject 将通知重放给后来的订阅者,因此 b 能收到重放的 next 通知和 complete 通知。

若是使用 publishLast() 来代替 publish() 的话,状况又会有些不一样,输出以下:

source: subscribing
source: 0
source: unsubscribed
observer a: 0
observer a: complete
observer b: 0
observer b: complete
复制代码

一样的,依然没有从新订阅源 observable,由于 subject 已经完成了。可是,AsyncSubject 会将最后收到的 next 通知发给它的订阅者,因此 ab 都收到的是 nextcomplete 通知。

综上所述,根据示例咱们能够发现 publish 以及它的变种:

  • 当源 observable 完成时,负责多播基础结构的 subject 也会完成,并且这会阻止对源 observable 的从新订阅。
  • publishpublishBehaviorrefCount 一块儿使用时,后来的订阅者只会收到 complete 通知,这彷佛并非咱们想要的效果。
  • publishReplaypublishLastrefCount 一块儿使用时,后来的订阅者会收到预期的通知。

从新订阅未完成的 observables

咱们已经看过了从新订阅已完成的源 observable 时会发生什么,如今咱们再来看看从新订阅未完成的源 observable 是怎样一个状况。

这个示例中将使用 interval observable 来替代 timer observable,它会根据指定的时间间隔重复地发出包含自增数字的 next 通知:

const source = instrument(Observable.interval(100));
const counted = source.publish().refCount();
const a = counted.subscribe(observer("a"));
setTimeout(() => a.unsubscribe(), 110);
setTimeout(() => counted.subscribe(observer("b")), 120);
复制代码

示例的输出以下所示:

source: subscribing
source: 0
observer a: 0
source: unsubscribed
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
复制代码

与使用已完成的源 observable 的示例不一样的是,负责多播基础结构的 subject 可以被从新订阅,因此源 observable 能够产生新的订阅。b 所收到的 next 通知即是从新订阅的证据: 该通知包含数值0,由于从新订阅已经开启了全新的 interval 序列。

若是使用 publishBehavior(-1) 来代替 publish() 的话,状况会有所不一样,输出以下所示:

observer a: -1
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer b: 0
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
复制代码

输出是相似的,能够清楚地看到从新订阅开启了全新的 interval 序列。可是,在收到 intervalnext 通知前,a 还收到了包含 BehaviorSubject 初始值-1的 next 通知,b 会收到包含 BehaviorSubject 当前值0的 next 通知。

若是使用 publishReplay(1) 来代替 publish() 的话,状况又会有所不一样,输出以下所示:

source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer b: 0
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
复制代码

输出也是相似的,能够清楚地看到从新订阅开启了全新的 interval 序列。可是,b 在收到源 observable 的第一个 next 通知以前会收到重放的 next 通知。

综上所述,根据示例咱们能够发现,当对未完成的源 observable 使用 refCount 时,publishpublishBehaviorpublishReplay 的行为都如预期通常,没有让人出乎意料之处。

shareReplay 的做用是什么?

在 RxJS 5.4.0 版本中引入了 shareReplay 操做符。它与 publishReplay().refCount() 十分类似,只是有一个细微的差异。

share 相似, shareReplay 传给 multicast 操做符的也是 subject 的工厂函数。这意味着当从新订阅源 observable 时,会使用工厂函数来建立出一个新的 subject 。可是,只有当前一个被订阅 subject 未完成的状况下,工厂函数才会返回新的 subject 。

publishReplay 传给 multicast 操做符的是 ReplaySubject 实例,而不是工厂函数,这是影响行为不一样的缘由。

对调用了 publishReplay().refCount() 的 observable 进行从新订阅,subject 会一直重放它的可重放通知。可是,对调用了 shareReplay() 的 observable 进行从新订阅,行为未必如前者同样,若是 subject 还未完成,会建立一个新的 subject 。因此区别在于,使用调用了 shareReplay() 的 observable 的话,当引用计数归零时,若是 subject 还未完成的话,可重放的通知会被冲洗掉。

不彻底使用准则

根据咱们看过的这些示例,能够概括出以下使用准则:

  • refCount 能够与 publish 及其变种一块儿使用,从而自动地取消源 observable 的订阅。
  • 当使用 refCount 来自动取消已完成的源 observable 的订阅时,publishReplaypublishLast 的行为会如预期同样,可是,对于后来的订阅,publishpublishBehavior 的行为并没太大帮助,因此你应该只使用 publishpublishBehavior 来自动取消订阅。
  • 当使用 refCount 来自动取消未完成的源 observable 的订阅时,publishpublishBehaviorpublishRelay 的行为都会如预期同样。
  • shareReplay() 的行为相似于 publishReplay().refCount(),在对二者进行选择时,应该根据在对源 observable 进行从新订阅时,你是否想要冲洗掉可重放的通知。

上面所描述的 shareReplay 的行为只适用于 RxJS 5.5 以前的版本。在 5.5.0 beta 中,shareReplay 作出了变动: 当引用计数归零时,操做符再也不取消源 observable 的订阅。

这项变化当即使得引用计数变得多余,由于只有当源 observable 完成或报错时,源 observable 的订阅才会取消订阅。这项变化也意味着只有在处理错误时,shareReplaypublishReplay().refCount() 才有所不一样:

  • 若是源 observable 报错,publishReplay().refCount() 返回的 observable 的任何后来订阅者都将收到错误。
  • 可是,shareReplay 返回的 observable 的任何后来订阅者都将产生一个源 observable 的新订阅。

===================================结尾分界线=================================

这是多播三连的最后一篇,也应该是年前更新的最后一篇,在这里提早祝你们春节快乐,阖家欢乐,18年开开心心学 Rx 。

顺便预告下,年后回来咱们会来两篇实战型的文章。

相关文章
相关标签/搜索