Rxjs--组合操做符

Observable 组合(combine)、串行(concat)、并行(merge)、一对一合并(zip)、切换(switch)等html

combineLatest

public combineLatest(other: ObservableInput, project: function): Observable

combineLatest 结合传入的多个 Observables。 经过顺序的订阅每一个输入Observable, 在每次任一输入Observables发送的时候收集每一个输入Observables最新的值组成一个数组, 而后要么将这个数组传给可选的投射函数并发送投射函数返回的结果, 或者在没有提供投射函数时仅仅发出该数组。es6

combineAll

public combineAll(project: function): Observable

当高阶 Observable 完成时(即源Observable发出了complete),经过使用 combineLatest 将其打平数组

注意只有完成时才打平执行订阅!!!并发

let clicks = Rx.Observable.fromEvent(document, 'click');

    let higherOrder = clicks.map(ev =>
      Rx.Observable.interval(100).take(3)
    ).take(2);

    let result = higherOrder.combineAll();
    result.subscribe(x => console.log(x));

concat

public concat(other: ObservableInput, scheduler: Scheduler): Observable

顺序的、串行的将全部输入 Observable 的值合并给输出 Observable函数

concatAll

public concatAll(): Observable

串行链接源(高阶 Observable)所发出的每一个 Observable,只有当一个内部 Observable 完成的时候才订阅下 一个内部 Observable,并将它们的全部值合并到返回的 Observable 中spa

经过顺序地链接内部 Observable,将高阶 Observable 转化为一阶 Observablecode

var clicks = Rx.Observable.fromEvent(document, 'click');
    var higherOrder = clicks.map(ev => Rx.Observable.interval(1000).take(4));
    var firstOrder = higherOrder.concatAll();
    firstOrder.subscribe(x => console.log(x));

若是源发送很快则会遇到内存问题,由于传入的 Observables 会在无界缓冲区中收集htm

exhaust

public exhaust(): Observable

exhaust 订阅高阶 Observable 。 每次观察到这些已发出的内部 Observables 中的其中一个时,输出 Observable 开始发出该内部 Observable 要发出的项, 然而若是前一个 Observable 还未完成的话,exhaust 会忽略每一个新的内部 Observable 。一旦完成,它将接受并打平下一个 内部 Observable ,而后重复此过程。(和concatAll相似,可是会抛弃掉在前一个高阶Observable完成前发出的高阶Observable)ip

merge

public static merge(observables: ...ObservableInput, concurrent: number, scheduler: Scheduler): Observable

merge 订阅每一个给定的输入 Observable (做为参数),而后只是将全部输入 Observables 的全部值发送(不进行任何转换)到输出 Observable 。全部的输入 Observable 都完成了,输出 Observable 才 能完成。任何由输入 Observable 发出的错误都会当即在输出 Observalbe 上发出。内存

observables ...ObservableInput  

合并到一块儿的输入Observables。

concurrent number
  • 可选的
  • 默认值: Number.POSITIVE_INFINITY

能够同时订阅的输入 Observables 的最大数量。

scheduler Scheduler
  • 可选的
  • 默认值: null

调度器用来管理并行的输入Observables。

mergeAll

mergeAll 订阅高阶 Observable 。 每当观察到发出的内部 Observable 时,它会订阅并发出输出 Observable 上的这个 内部 Observable 的全部值。全部的内部 Observable 都完成了,输出 Observable 才能完成。任何由内部 Observable 发出的错误都会当即在输出 Observalbe 上发出。

 

race

public race(): Observable

返回 Observable,该 Observable 是源 Observable 和提供的 Observables 的组合中 第一个发出项的 Observable 的镜像

startWith

public startWith(values: ...T, scheduler: Scheduler): Observable

返回的 Observable 会先发出做为参数指定的项,而后再发出由源 Observable 所发出的项

switch

public switch(): Observable<T>

switch 订阅高阶 Observable。老是只会订阅最新的内部 Observable, 并取消订阅以前的。

withLatestFrom

public withLatestFrom(other: ObservableInput, project: Function): Observable

withLatestFrom 结合源 Observablecombines(实例)和其余输入 Observables 的最新值,当且仅当 source 发出数据时, 可选的使用 project 函数以决定输出 Observable 将要发出的值。 在输出 Observable 发出值以前,全部的输入 Observables 都必须发出至少一个值

zip

public static zip(observables: *): Observable<R>

将多个 Observable 组合以建立一个 Observable,该 Observable 的值是由全部输入 Observables 的值按顺序计算而来的。

let age = Rx.Observable.of<number>(27, 25, 29);
    let name = Rx.Observable.of<string>('Foo', 'Bar', 'Beer');
    let isDev = Rx.Observable.of<boolean>(true, true, false);

    Rx.Observable
      .zip(age, name, isDev, (age: number, name: string, isDev: boolean) => ({age, name, isDev}))
      .subscribe(x => console.log(x));

zipAll

public zipAll(project: *): Observable<R> | WebSocketSubject<T> | Observable<T>