RxJS: 详解forkJoin, zip, combineLatest之间的区别

前言

forkJoin, zip, combineLatestrxjs中的合并操做符,用于对多个流进行合并。不少人第一次接触rxjs时每每分不清它们之间的区别,其实这很正常,由于当你准备用来合并的流是那种只会发射一次数据就关闭的流时(好比http请求),就结果而言这三个操做符没有任何区别。数组

const ob1 = Rx.Observable.of(1).delay(1000);
const ob2 = Rx.Observable.of(2).delay(2000);
const ob3 = Rx.Observable.of(3).delay(3000);

Rx.Observable.forkJoin(ob1, ob2, ob3).subscribe((data) => console.log(data));
Rx.Observable.zip(ob1, ob2, ob3).subscribe((data) => console.log(data));
Rx.Observable.combineLatest(ob1, ob2, ob3).subscribe((data) => console.log(data));

// [1, 2, 3]
// [1, 2, 3]
// [1, 2, 3]
// 都是在3秒的时候打印

rxjs中不少操做符功能相近,只有当其操做的流会屡次发射数据时才会体现出它们之间的区别,下面咱们来详细解释forkJoin, zip, 和combineLatest并发

一个基本概念

首先咱们要知道,一个流(或者说Observable序列)的生命周期中,每次发射数据会发出next信号(Notification),结束发射时会发出complete信号,发生错误时发出error信号,三个信号分别对应observer的三个方法。next信号会因为发射源的不一样发射0到屡次;而completeerror仅会发射其中一个,且只发射一次,标志着流的结束。
subscribe接收一个observer对象用来处理上述三种信号,只传入一个函数会被认为是next方法,所以传入subscribenext方法会执行0到N次,N为序列正常发射信号的次数。函数

forkJoin

forkJoin合并的流,会在每一个被合并的流都发出结束信号时发射一次也是惟一一次数据。假设咱们有两个流:code

const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.forkJoin(ob1, ob2).subscribe((data) => console.log(data));
// ["ob1:2", "ob2:1"]

ob1会在发射完第三个数据时中止发射,ob2会在发射完第二个数据时中止,而forkJoin合并后的流会等到ob1ob2都结束时,发射一次数据,也就是触发一次subscribe里的回调,接收到的数据为ob1ob2发射的最后一次数据的数组。server

zip

zip工做原理以下,当每一个传入zip的流都发射完毕第一次数据时,zip将这些数据合并为数组并发射出去;当这些流都发射完第二次数据时,zip再次将它们合并为数组并发射。以此类推直到其中某个流发出结束信号,整个被合并后的流结束,再也不发射数据。对象

const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.zip(ob1, ob2).subscribe({
  next: (data) => console.log(data),
  complete: () => console.log('complete')
});
// ["ob1:0", "ob2:0"] ob1等待ob2发射数据,以后合并
// ["ob1:1", "ob2:1"] 此时ob2结束,整个合并的流也结束
// "complete"

zipforkJoin的区别在于,forkJoin仅会合并各个子流最后发射的一次数据,触发一次回调;zip会等待每一个子流都发射完一次数据而后合并发射,以后继续等待,直到其中某个流结束(由于此时不能使合并的数据包含每一个子流的数据)。rxjs

combineLatest

combineLatestzip很类似,combineLatest一开始也会等待每一个子流都发射完一次数据,可是在合并时,若是子流1在等待其余流发射数据期间又发射了新数据,则使用子流最新发射的数据进行合并,以后每当有某个流发射新数据,再也不等待其余流同步发射数据,而是使用其余流以前的最近一次数据进行合并。生命周期

const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.combineLatest(ob1, ob2).subscribe({
  next: (data) => console.log(data),
  complete: () => console.log('complete')
});
// ["ob1:1", "ob2:0"] ob1等待ob2发射,当ob2发射时ob1已经发射了第二次数据,使用ob1的第二次数据
// ["ob1:2", "ob2:0"] ob1继续发射第三次也是最后一次数据,ob2虽然还未发射,可是能够使用它上一次的数据
// ["ob1:2", "ob2:1"] ob2发射第二次也是最后一次数据,使ob1上一次的数据。
// "complete"

本期内容结束,下一期会继续带来rxjs的其余操做符或者概念详解。ip

相关文章
相关标签/搜索