forkJoin
, zip
, combineLatest
是rxjs
中的合并操做符,用于对多个流进行合并。不少人第一次接触rxjs
时每每分不清它们之间的区别,其实这很正常,由于当你准备用来合并的流是那种只会发射一次数据就关闭的流时(好比http
请求),就结果而言这三个操做符没有任何区别。数组
const ob1 = Rx.Observable.of(1).delay(1000); const ob2 = Rx.Observable.of(2).delay(2000); const ob3 = Rx.Observable.of(3).delay(3000); Rx.Observable.forkJoin(ob1, ob2, ob3).subscribe((data) => console.log(data)); Rx.Observable.zip(ob1, ob2, ob3).subscribe((data) => console.log(data)); Rx.Observable.combineLatest(ob1, ob2, ob3).subscribe((data) => console.log(data)); // [1, 2, 3] // [1, 2, 3] // [1, 2, 3] // 都是在3秒的时候打印
rxjs
中不少操做符功能相近,只有当其操做的流会屡次发射数据时才会体现出它们之间的区别,下面咱们来详细解释forkJoin
, zip
, 和combineLatest
。并发
首先咱们要知道,一个流(或者说Observable
序列)的生命周期中,每次发射数据会发出next
信号(Notification
),结束发射时会发出complete
信号,发生错误时发出error
信号,三个信号分别对应observer
的三个方法。next
信号会因为发射源的不一样发射0到屡次;而complete
和error
仅会发射其中一个,且只发射一次,标志着流的结束。subscribe
接收一个observer
对象用来处理上述三种信号,只传入一个函数会被认为是next
方法,所以传入subscribe
的next
方法会执行0到N次,N为序列正常发射信号的次数。函数
forkJoin
用forkJoin
合并的流,会在每一个被合并的流都发出结束信号时发射一次也是惟一一次数据。假设咱们有两个流:code
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3); const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2); Rx.Observable.forkJoin(ob1, ob2).subscribe((data) => console.log(data)); // ["ob1:2", "ob2:1"]
ob1
会在发射完第三个数据时中止发射,ob2
会在发射完第二个数据时中止,而forkJoin
合并后的流会等到ob1
和ob2
都结束时,发射一次数据,也就是触发一次subscribe
里的回调,接收到的数据为ob1
和ob2
发射的最后一次数据的数组。server
zip
zip
工做原理以下,当每一个传入zip
的流都发射完毕第一次数据时,zip
将这些数据合并为数组并发射出去;当这些流都发射完第二次数据时,zip
再次将它们合并为数组并发射。以此类推直到其中某个流发出结束信号,整个被合并后的流结束,再也不发射数据。对象
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3); const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2); Rx.Observable.zip(ob1, ob2).subscribe({ next: (data) => console.log(data), complete: () => console.log('complete') }); // ["ob1:0", "ob2:0"] ob1等待ob2发射数据,以后合并 // ["ob1:1", "ob2:1"] 此时ob2结束,整个合并的流也结束 // "complete"
zip
和forkJoin
的区别在于,forkJoin
仅会合并各个子流最后发射的一次数据,触发一次回调;zip
会等待每一个子流都发射完一次数据而后合并发射,以后继续等待,直到其中某个流结束(由于此时不能使合并的数据包含每一个子流的数据)。rxjs
combineLatest
combineLatest
与zip
很类似,combineLatest
一开始也会等待每一个子流都发射完一次数据,可是在合并时,若是子流1在等待其余流发射数据期间又发射了新数据,则使用子流最新发射的数据进行合并,以后每当有某个流发射新数据,再也不等待其余流同步发射数据,而是使用其余流以前的最近一次数据进行合并。生命周期
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3); const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2); Rx.Observable.combineLatest(ob1, ob2).subscribe({ next: (data) => console.log(data), complete: () => console.log('complete') }); // ["ob1:1", "ob2:0"] ob1等待ob2发射,当ob2发射时ob1已经发射了第二次数据,使用ob1的第二次数据 // ["ob1:2", "ob2:0"] ob1继续发射第三次也是最后一次数据,ob2虽然还未发射,可是能够使用它上一次的数据 // ["ob1:2", "ob2:1"] ob2发射第二次也是最后一次数据,使ob1上一次的数据。 // "complete"
本期内容结束,下一期会继续带来rxjs
的其余操做符或者概念详解。ip