以上操做符在版本6中已经只存在静态方法,不能在pipe中使用。数组
import {concat,merge,zip,combineLatest}
依次将多个observable首尾合并,必须在第一个obs1的数据所有完成,才能进行第二个obs2,若是第一个为interval(1000),那么obs2 和obs3 也就永远没有机会获得输出。异步
concat(of(1,2,3),interval).subscribe(console.log); // 1 2 3 0 1 2 3 ...
merge会第⼀时间订阅全部的上游Observable,而后对上游的数据采起“先到先得”的策略,任何⼀个Observable只要有数据推下来,就⽴刻转给下游Observable对象。函数
merge(interval(1000),of(1,2,3)).subscribe(console.log); merge(of(1,2,3),interval(1000)).subscribe(console.log); //两种状况的输出结果同样,都是先一次性输出1 2 3 再间隔一秒依次输出0 1 2 ...
const source1$ = Observable.timer(0, 1000).map(x => x+'A'); const source2$ = Observable.timer(500, 1000).map(x => x+'B'); merge(source1$, source2$).subscribe( console.log, null, () => console.log('complete') ); //0A //0B //1A //1B //2A //2B
merge 的应用场景:咱们知道fromEvent能够从⽹页中获取事件,只惋惜,fromEvent⼀次只能从⼀个DOM元素获取⼀种类型的事件。⽐如,咱们关⼼某个元素的click事件,同时也关⼼这个元素上的touchend事件,由于在移动设备上touchend事件出现得⽐click更早,这两个事件的处理是⼀模⼀样的,可是fromEvent不能同时得到两个事件的数据流,这时候就要借助merge的⼒量了,代码以下:code
const click$ = Rx.Observable.fromEvent(element, 'click'); const touchend$ = Rx.Observable.fromEvent(element, 'touchend'); merge(click$, touchend$).subscribe(eventHandler)
一对一的合并server
const source1$ = Observable.of(1, 2, 3); const source2$ = Observable.of('a', 'b', 'c'); zip(source1$, source2$).subscribe( console.log, null, () => console.log('complete') ); //[ 1, 'a' ] //[ 2, 'b' ] //[ 3, 'c' ] //complete
const source1$ = Observable.timer(500, 1000); const source2$ = Observable.timer(1000, 1000); combineLatest(source1$,source2$).subscribe( console.log, null, () => console.log('complete') ); //[ 0, 0 ] //[ 1, 0 ] //[ 1, 1 ] //[ 2, 1 ] //[ 2, 2 ] //[ 3, 2 ]
const source1$ = Observable.timer(0, 2000).map(x => 100 * x); const source2$ = Observable.timer(500, 1000); source1$.pipe( withLatestFrom(source2$, (a,b)=> a+b); ).subscribe( console.log, null, () => console.log('complete') );
source1$产⽣第⼀个数据0时,withLatestFrom的另⼀个输⼊Observable对象source2$尚未产⽣数据,因此这个0也被忽略了。对象
解决glitch
例1:blog
const original$ = Observable.timer(0, 1000); const source1$ = original$.map(x => x+'a'); const source2$ = original$.map(x => x+'b'); const result$ = source1$.pipe(withLatestFrom(source2$);) result$.subscribe( console.log, null, () => console.log('complete') );
例2:事件
const event$ = Rx.Observable.fromEvent(document.body, 'click'); const x$ = event$.map(e => e.x); const y$ = event$.map(e => e.y); const result$ = x$.pipe(combineLatest(y$, (x, y) => `x: ${x}, y: ${y}`)).subscribe( (location) => { console.log('#render', location); document.querySelector('#text').innerText = location; } );
race就是“竞争”,多个Observable对象在⼀起,看谁最早产⽣数据,不过这种竞争是⼗分残酷的,胜者通吃,败者则失去全部机会。
简而言之,经过race合并多个observable时,最早吐出数据那个observable会成为数据源,其它的observable会被淘汰。ip
startWith只有实例操做符的形式,其功能是让⼀个Observable对象在被订阅的时候,老是先吐出指定的若⼲个数据。下⾯是使⽤startWith的⽰例代码element
of(0,1,2).pipe(startWith('a','b')).subscribe(console.log); //先依次吐出 a b 0 1 2
js forkJoin(interval(1000).pipe(take(3)),of(1,2,3),timer(2000,1000).pipe(take(3))).subscribe(console.log); // [2,3,2]
js简言之:⾼阶函数就是产⽣函数的函数;相似,所谓⾼阶Observable,指的是产⽣的数据依然是Observable的Observable
concatAll只有⼀个上游Observable对象,这个Observable对象预期是⼀个⾼阶Observable对象,concatAll会对其中的内部Observable对象作concat的操做.
interval(1000).pipe( take(2), map(x=>interval(1500).pipe(take(2),map(x=> `${x}:x,y:${y}`))), concatAll() ).subscribe(console.log); // 0:a,b:0 // 0:a,b:1 // 1:a,b:0 // 1:a,b:1
concat 实际运用
fromEvent(document.body,'mousedown').pipe( map( e=>fromEvent(document.body,'mousemove').pipe(map(e=>{return {x:e.clientX,y:e.clientY}}), takeUntil(fromEvent(document.body,'mouseup'))) ), concatAll() ).subscribe(console.log);
mergeAll就是处理⾼阶Observable的merge,只是全部的输⼊Observable来⾃于上游产⽣的内部Observable对象.
interval(1000).pipe( take(2), map(x => Observable.interval(1500).map(y => x+':'+y).take(2)), mergeAll() )
mergeAll只要
发现上游产⽣⼀个内部Observable就会⽴刻订阅,并从中抽取收据,因此在上图中,第⼆个内部Observable产⽣的数据1:0会出如今第⼀个内部Observable产⽣的数据0:1以前.
interval(1000).pipe( take(2), map(x => Observable.interval(1500).map(y => x+':'+y).take(2)), zipAll() ) //[ '0:0', '1:0' ] //[ '0:1', '1:1' ] //complete
combineAll就是处理⾼阶Observable的combineLatest,多是由于combine-LatestAll太长了,因此RxJS选择了combineAll这个名字。
interval(1000).pipe( take(2), map(x => Observable.interval(1500).map(y => x+':'+y).take(2)), combeneAll() ) //[ '0:0', '1:0' ] //[ '0:1', '1:0' ] //[ '0:1', '1:1' ] //complete
interval(1000).pipe( take(3), map(x => Observable.interval(1500).map(y => x+':'+y).take(2)), switchAll() ) //1:0 //1:1 //complete
第⼀个Observable对象有机会产⽣数据0:0,可是在第⼆个数据0:1产⽣以前,第⼆个内部Observable对象产⽣,这时发⽣切换,第⼀个内部Observable就退场了。一样,第⼆个内部Observable只有机会产⽣⼀个数据1:0,而后第三个内部Observable对象产⽣,以后没有新的内部Observable对象产⽣,因此第三个Observable对象的两个数据2:0和2:1都进⼊了下游。
interval(1000).pipe( take(3), map(x => Observable.interval(700).map(y => x+':'+y).take(2)), exhaust() )