[译] RxJS: multicast 操做符的秘密

原文连接: blog.angularindepth.com/rxjs-multic…git

本文为 RxJS 中文社区 翻译文章,如需转载,请注明出处,谢谢合做!github

若是你也想和咱们一块儿,翻译更多优质的 RxJS 文章以奉献给你们,请点击【这里】函数

multicast 操做符有一个秘密。publish 操做符也是如此,它封装了 multicast 。这个秘密有时候真的挺好用的。单元测试

秘密

multicastpublish 的文档中都提到了 ConnectableObservableConnectableObservable 是一种特殊类型的 observable,只有调用它的 connect 方法后,它才会开始向订阅者发送通知。然而,multicastpublish 操做符并不是永远返回 ConnectableObservable测试

咱们先来看下 publish源码:ui

export function publish<T>(
  this: Observable<T>,
  selector?: (source: Observable<T>) => Observable<T>
): Observable<T> | ConnectableObservable<T> {
  return selector ?
    multicast.call(this, () => new Subject<T>(), selector) :
    multicast.call(this, new Subject<T>());
}
复制代码

能够很清楚地看出,publish 只是对 multicast 进行了一层很薄的封装。它建立了 subject 并传给 multicast,还有一个可选的 selector 函数。最有趣的部分是在 multicast 实现之中,它包含以下代码:this

if (typeof selector === 'function') {
  return this.lift(new MulticastOperator(subjectFactory, selector));
}

const connectable: any = Object.create(this, connectableObservableDescriptor);
connectable.source = this;
connectable.subjectFactory = subjectFactory;
return <ConnectableObservable<T>> connectable;
复制代码

只有在不传入 selector 函数的状况下,multicast 才返回 ConnectableObservable。若是传入 selector 函数的话,会使用 lift 机制来使得源 observable 建立出适当类型的 observable 。不须要在返回的 observable 上调用 connect 方法,而且在 selector 函数的做用域中会共享源 observable 。spa

这意味着 multicast (以及 publish) 操做符能够用来轻松实现源 observable 的本地共享。翻译

使用 publish 进行本地共享

咱们来看看使用 publish 的示例。code

RxJS 引入了 defaultIfEmpty 操做符,它接收一个值,若是源 observable 为空的话,会将这个值发出。有时候,可以指定一个默认 observable 的话要比指定单个值有用得多,那么让咱们来实现一个 defaultObservableIfEmpty 函数,它能够与 let 操做符一块儿使用。

下面的弹珠图展现了源 observable 为空时它的行为:

RxJS 引入了 isEmpty 操做符,当源 observable 完成时,它会发出布尔值以标识源 observable 是否为空。可是,要在 defaultObservableIfEmpty 实现中使用它的话,须要共享源 observable,由于须要发出值的通知,而 isEmpty 没法作到这点。publish 操做符使得源 observable 的共享变得简单,实现以下所示:

function defaultObservableIfEmpty<T>( defaultObservable: Observable<T> ): (source: Observable<T>) => Observable<T> {

  return source => source.publish(shared => shared.merge(
    shared.isEmpty().mergeMap(empty => empty ?
      defaultObservable :
      Observable.empty<T>()
    )
  ));
}
复制代码

传给 publishselector 函数接收共享的源 observable 。selector 返回的 observable 是由共享源 observable 和根据源 observable 是否为空获得的 observable (若是源 observable 为空,则为传入的默认 observable,不然为空 observable) 的组合而成。

源 observable 的共享彻底是由 publish 管理的。使用 selector 函数,就可以根据须要屡次订阅共享 observable,而不会影响源 observable 后面的订阅。

使用 multicast 进行本地共享

咱们来看另外一个示例,此次使用 multicast

RxJS 引入了 takeWhile 操做符,它返回的 observable 会发出源 observable 的值,直到不知足给定条件的值出现,此刻 observable 完成。不知足条件的那个值不会被发出。咱们来实现一个 takeWhileInclusive 函数,它能够与 let 操做符一块儿使用。

下面的弹珠图展现了值不知足条件时的行为:

可使用 takeWhile 操做符做为基础来实现,当不知足条件时,只须要再链接不知足条件的那个值便可。要在 takeWhile 返回的 observable 完成后取得这个值,可使用 ReplaySubject:

function takeWhileInclusive<T>(
  predicate: (value: T) => boolean
): (source: Observable<T>) => Observable<T> {

  return source => source.multicast(
    () => new ReplaySubject<T>(1),
    shared => shared
      .takeWhile(predicate)
      .concat(shared.take(1).filter(t => !predicate(t)))
  );
}
复制代码

这里使用了缓冲区大小为1的 ReplaySubject 来共享源 observable 。当 takeWhile 操做符返回的 observable 完成时,共享的 observable 是串联的,使用 take(1) 能够确保只考虑重放的值,而 filter 能够确保只有当不知足条件时才进行追加。

这种方式可靠吗?

RxJS 5 是至关新的库,它的文档扔在进行中,因此这种方式尚未在文档中出现,只是在内部使用。公开的 TypeScript 签名指出了并不是永远返回的是 ConnectableObservable,也有相对应的单元测试

RxJS 一般比较灵活,所以实现这些函数还有其余方式,但上面的示例说明了当须要本地共享源 observable 时,publishmulticast 简单易用,值得考虑。

相关文章
相关标签/搜索