原文连接: blog.angularindepth.com/rxjs-multic…git
本文为 RxJS 中文社区 翻译文章,如需转载,请注明出处,谢谢合做!github
若是你也想和咱们一块儿,翻译更多优质的 RxJS 文章以奉献给你们,请点击【这里】函数
multicast
操做符有一个秘密。publish
操做符也是如此,它封装了 multicast
。这个秘密有时候真的挺好用的。单元测试
multicast
和 publish
的文档中都提到了 ConnectableObservable
。ConnectableObservable
是一种特殊类型的 observable,只有调用它的 connect
方法后,它才会开始向订阅者发送通知。然而,multicast
和 publish
操做符并不是永远返回 ConnectableObservable
。测试
咱们先来看下 publish
的源码:ui
export function publish<T>(
this: Observable<T>,
selector?: (source: Observable<T>) => Observable<T>
): Observable<T> | ConnectableObservable<T> {
return selector ?
multicast.call(this, () => new Subject<T>(), selector) :
multicast.call(this, new Subject<T>());
}
复制代码
能够很清楚地看出,publish
只是对 multicast
进行了一层很薄的封装。它建立了 subject 并传给 multicast
,还有一个可选的 selector
函数。最有趣的部分是在 multicast
实现之中,它包含以下代码:this
if (typeof selector === 'function') {
return this.lift(new MulticastOperator(subjectFactory, selector));
}
const connectable: any = Object.create(this, connectableObservableDescriptor);
connectable.source = this;
connectable.subjectFactory = subjectFactory;
return <ConnectableObservable<T>> connectable;
复制代码
只有在不传入 selector
函数的状况下,multicast
才返回 ConnectableObservable
。若是传入 selector
函数的话,会使用 lift 机制来使得源 observable 建立出适当类型的 observable 。不须要在返回的 observable 上调用 connect
方法,而且在 selector
函数的做用域中会共享源 observable 。spa
这意味着 multicast
(以及 publish
) 操做符能够用来轻松实现源 observable 的本地共享。翻译
咱们来看看使用 publish
的示例。code
RxJS 引入了 defaultIfEmpty
操做符,它接收一个值,若是源 observable 为空的话,会将这个值发出。有时候,可以指定一个默认 observable 的话要比指定单个值有用得多,那么让咱们来实现一个 defaultObservableIfEmpty
函数,它能够与 let
操做符一块儿使用。
下面的弹珠图展现了源 observable 为空时它的行为:
RxJS 引入了 isEmpty
操做符,当源 observable 完成时,它会发出布尔值以标识源 observable 是否为空。可是,要在 defaultObservableIfEmpty
实现中使用它的话,须要共享源 observable,由于须要发出值的通知,而 isEmpty
没法作到这点。publish
操做符使得源 observable 的共享变得简单,实现以下所示:
function defaultObservableIfEmpty<T>( defaultObservable: Observable<T> ): (source: Observable<T>) => Observable<T> {
return source => source.publish(shared => shared.merge(
shared.isEmpty().mergeMap(empty => empty ?
defaultObservable :
Observable.empty<T>()
)
));
}
复制代码
传给 publish
的 selector
函数接收共享的源 observable 。selector
返回的 observable 是由共享源 observable 和根据源 observable 是否为空获得的 observable (若是源 observable 为空,则为传入的默认 observable,不然为空 observable) 的组合而成。
源 observable 的共享彻底是由 publish
管理的。使用 selector
函数,就可以根据须要屡次订阅共享 observable,而不会影响源 observable 后面的订阅。
咱们来看另外一个示例,此次使用 multicast
。
RxJS 引入了 takeWhile
操做符,它返回的 observable 会发出源 observable 的值,直到不知足给定条件的值出现,此刻 observable 完成。不知足条件的那个值不会被发出。咱们来实现一个 takeWhileInclusive
函数,它能够与 let
操做符一块儿使用。
下面的弹珠图展现了值不知足条件时的行为:
可使用 takeWhile
操做符做为基础来实现,当不知足条件时,只须要再链接不知足条件的那个值便可。要在 takeWhile
返回的 observable 完成后取得这个值,可使用 ReplaySubject
:
function takeWhileInclusive<T>(
predicate: (value: T) => boolean
): (source: Observable<T>) => Observable<T> {
return source => source.multicast(
() => new ReplaySubject<T>(1),
shared => shared
.takeWhile(predicate)
.concat(shared.take(1).filter(t => !predicate(t)))
);
}
复制代码
这里使用了缓冲区大小为1的 ReplaySubject
来共享源 observable 。当 takeWhile
操做符返回的 observable 完成时,共享的 observable 是串联的,使用 take(1)
能够确保只考虑重放的值,而 filter
能够确保只有当不知足条件时才进行追加。
RxJS 5 是至关新的库,它的文档扔在进行中,因此这种方式尚未在文档中出现,只是在内部使用。公开的 TypeScript 签名指出了并不是永远返回的是 ConnectableObservable
,也有相对应的单元测试。
RxJS 一般比较灵活,所以实现这些函数还有其余方式,但上面的示例说明了当须要本地共享源 observable 时,publish
和 multicast
简单易用,值得考虑。