angular2 学习笔记 ( rxjs 流 )

更新: 2019-11-24html

startWith 和 pairwise jquery

s.pipe(
  startWith('c'),
  map(v => v + 1),
  tap(v => console.log(v)), // c1
  startWith(null),
  pairwise(),
  tap(v => console.log(v)), // [null, c1]
).subscribe(([before, after]) => {
  // console.log(before, after);
});

2 个点要留意git

第一,pairwise 须要 2次值才会触发. 因此 startWith('c') 到 pairwise 就被吃掉了. subscribe 不会触发github

因此须要 startWith(null) 来喂它一次. ajax

第二, startWith 的次序. 最后一个 tap 的值是 [null, c1] 而不是 [c1, null] json

startWith('c1'),
startWith('c2'),
tap(v => console.log(v)), // [c2, c1]

要记住哦。api

 

 

 

 

 

更新: 2019-07-18promise

unsubscribe vs complete 缓存

在用 ng 的时候, 咱们会纠结何时要调用 unsubscribe, 由于听说 angular 会帮咱们处理...性能优化

其实最好是每一次都调用 unsubscribe. 好比下面这个例子, setimeout 表明 component destroy 

当 destroy 时,即便我 subject.complete(), 我也没法阻止 tap 的触发. 因此仍是 unsubscribe 稳当一些.

按逻辑讲, 当 component destroy 咱们是取消咱们对外部的监听, 意思是咱们再也不处理了, 而 complete 则是它再也不发送了. 

它再也不发送, 和我再也不处理是 2 个概念. 它不发, 可是我手上的工做仍是得作完, 我不处理是我立马停掉手上工做. 

const s = new Subject();
const o = s.asObservable();
const sub = o.pipe(delay(3000), tap(() => console.log('tap'))).subscribe(() => {
  console.log('done');
});
s.next('dada');
setTimeout(() => {
  s.complete();
  // sub.unsubscribe();
}, 1000);

 

 

更新 : 2019-06-2

defer 用于延后一个 function 执行. 当 subscribe 后才被执行, 一般用于作 promise retry 

好比我有一个 promise 方法

async function getValueAsync(ok: boolean): Promise<string> {
    return new Promise((resolve, reject) => {
        console.log('run');
        setTimeout(() => {
            if (ok) {
                resolve('dada');
            }
            else {
                reject('fail');
            }
        }, 3000);
    });
}
from(getValueAsync(false)).pipe(
    retry(1)
).subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

若是我直接这样跑是不能 retry 的. refer : https://stackoverflow.com/questions/33072512/rx-frompromise-and-retry 这里有解释 

用 defer

defer(() =>  getValueAsync(false)).pipe(
    retry(2)
).subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

这样就能够了

 

 

 

更新 : 2018-03-12 

学 rxjs 最好的就是看官网的文档,解释的很清楚. 

http://cn.rx.js.org/manual/overview.html#h39

https://rxjs-cn.github.io/learn-rxjs-operators/

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalID);
  };
}
var unsubscribe = subscribe({next: (x) => console.log(x)});
var unsubscribe = subscribe({next: (x) => console.log(x)});
// 稍后:
unsubscribe(); // 清理资源

上面这一段代码让我明白了几个重点

1. 惰性 (当 observable 被建立时,没有 subscribe 是不会开始运行的, 由于 observable 就像函数调用 )

2. 每个 subscribe 是独立的 

3. observable 和经常使用的 addEventListener 不一样,它不会吧全部的 observer 保存在列表里头. 

 

而 subject 则是道道地地的 addEventListener, 它会保存全部 observer 在列表里. 

而把它们结合的方式就是 observable.subscribe(subject); 

由于 subject 就是一个 observer 拥有 next 方法. 

这就是 rxjs 其中 2 大核心, observable and subject 

至于其它的 subject 还有一堆的 operator, create observable 等,都是基于这 2 个核心的扩展而已. 

 

 

更新 : 2017-11-14 

最近重新看了 30 天 rxjs, 这里补上一些笔记.

顺便提一下, ng 5.x 开始 rxjs 的写法换掉了 

参考 : https://github.com/ReactiveX/rxjs/blob/master/doc/lettable-operators.md

import { combineLatest } from 'rxjs/observable/combineLatest';
import { catchError, take } from 'rxjs/operators';

combineLatest(pendingEmitters).pipe(
    take(1),
    catchError(() => {
        reject();
        return '';
    })
).subscribe(() => {
    resolve();
});

不像 jquery 那样串连了. 

 

1.concat 

https://ithelp.ithome.com.tw/articles/10187520

concat 是把多个 observeable 合并起来, 其特点是只有前面一个 observeable complete 了后边的 observeable 才开始起做用.

concat(o1,o2,o3), o1 没有 complete 的话, o2 怎么叫都不会触发.

另外一个说法就是 concat 先 subcribe o1, 而后等到 o1 completed 后再去 subscribe o2, 一直到完, 那它本身也就 completed 了.

concat 是能够调用的 import { concat } from 'rxjs/observable/concat';

 

2. merge

https://ithelp.ithome.com.tw/articles/10187520

它和 concat 都是用来合并的. 区别是它不须要等 complete, 任何一个 observable 触发都会有效果. 

换句话说就是 merge 会直接 subcribe 全部的 obserable 不像 concat 那样会等 completed.

merge(o1,o2) o1 没有 complete, o2 叫同样有效.

 

3.concatAll

concatAll 属于 operators 

o1.pipe(map(_ => o2),concatAll()) 

每一次 o1 叫,都会产生多一个 o2 

好比 o1 叫了 3 次 , 那么就有 3 个 o2 

concatAll 就是把这 3 个组合起来( 每一次 o1 叫, 都会 push 新的 o2 去这个 array ) 而后 concat(o2,o2,o2),后续的步骤就和 concat 一摸同样了. 因此它起到了打平和 concat 的做用.

 

4. mergeAll

mergeAll 和 concat 是同一个原理只是, 最后不是用 concat 而是用 merge(o2,o2,o2);

另外, mergeAll(2) 能够传入一个变量去控制容许并发的数量 

好比你输入 2,  那么 a 叫了 3次, 你有 merge(o2,o2,o2), 第3个 o2 叫的时候原本是会有效果的,可是因为限制了 2, 那么只能等第一或第二个 o2 complete, 第 3 o2 叫才有效果了。确保同一时期只有 2 个.

note mergeAll(1) === concatAll()

 

5.switchAll

没有 switch 只有 switchAll 

它和 concatAll, mergeAll 的区别是, o1 每一次叫, 永远只保留最新的 o2, 以前的 o2 通通丢掉. 

 

concatAll, switchAll, mergeAll 

https://ithelp.ithome.com.tw/articles/10188325

这 3 个都是用来打平 obs 

concatAll 上面讲了重点是会等上一个 complete 才去下一个 

switchAll 则是一旦有新的一个来,旧的就忽略掉. 

mergeAll 则是并发处理.

 

6 concatMap, switchMap, mergeMap 

https://ithelp.ithome.com.tw/articles/10188387

就是 map() + switchAll(), map() + concatAll(), map() + mergeAll() 的缩写而已. 

还有个好处是它能够传入一个方法 (a,b,c,d) => next, 能够获取到 o1,o2 的值而后返回下一个值. 

 

6.5 exhaustMap 

这个和 concatMap 很像. 惟一的区别是, concat 等待第一个 complete 了之后会去 subscribe 下一个(第二个)

而 exhasust 呢, 它会去 subscribe 下下下下一个 (最后一个), 

 

7. combineLatest 

https://ithelp.ithome.com.tw/articles/10187638

它也是用来作合拼处理的,

它须要等 "每个" observable 至少有一个开始值以后才开始工做. 这和 merge 不一样, merge 不须要等

它每一次触发均可以获取到全部 observable 当前的值, 这和 merge 不一样, merge 每一次触发只能获取一个 observable 的值.

 

8.withLatestFrom

https://ithelp.ithome.com.tw/articles/10187638

它和 combineLatest 同样,惟一的区别是, 它只有 main observable next 值时才会触发 callback , 其它 observable next 只是记入值而已. 

 2019-06-14 补上一个例子, form value update 可是只有 button click 的时候才 emit

this.formGroup = this.formBuilder.group({
  date: [null, Validators.required]
});

const f = this.formBuilder.group({
  name: [''],
  age: [11]
});
const button = new Subject();

button.pipe(withLatestFrom(f.valueChanges)).subscribe(v => {
  console.log(v);
});


f.setValue({ name: 'dada', age: 15 });
f.setValue({ name: 'dada', age: 18 });
button.next('dada');
button.next('dada2');

 

 

9. zip 

https://ithelp.ithome.com.tw/articles/10187638

它也是合并. 

它的关键是顺位, 好比 2 个 observables, 2 个都 next 1次 的时候就会 callback 并获得 2 个的第一个值, 若是 2 个不平均, 好比一个 next 了 10 次, 另外一个 next 2 次, 那么 callback 就只有 2 次. 

等第 2 个 next 第 3 次时, callback 就会获得 第1和第2个的第3次 next 的值. 

 

10. scan 

https://ithelp.ithome.com.tw/articles/10187882

就是 js 的 reduce, 区别在于它老是返回 observable. 每一次 next 触发, callback 均可以获取上一次的值 + 如今的值作一些处理, 返回下一个值. 

 

11. buffer 

https://ithelp.ithome.com.tw/articles/10187882

buffer, bufferTime, bufferCount 

它用于累积 next 等待另外一个 obserable next 的时候才触发 callback 

o1.pipe(buffer(o2))...   o2 next 的时候 o1 的 callback 才触发, 而且返回期间全部的 o1 next 值. 

 

12. delay & delayWhen

https://ithelp.ithome.com.tw/articles/10187999

delay(1000) 就是延迟 1 秒咯, 若是咱们要每一次都不用的话. 

就用 delayWhen(v => empty().delay(v + 1000)); 必须返回一个 observable

 

13.debounce & debounceTime 

https://ithelp.ithome.com.tw/articles/10188121

它和 buffer 有点像, 都是会累积值, 可是区别在于, 当一个新值被 next 进来, 它会把以前的值释放掉, 而且时间重新开始算. 

 

14. throttle & throttleTime

https://ithelp.ithome.com.tw/articles/10188121

它用于限制一个时间内, 最高触发的频率. 好比 throttleTime(1000) 就限制了一秒内无论 next 几回, 只有第一次会 callback 日后的都不会, 直到下一秒开始. 

 

14.1 auditTime

它和 debounceTime 很像,只是 debounceTime 会 clear 掉上一次,而这个不会

3个分别的使用 : 

用户 keydown 时我想监听 

1. debounceTime 1000, 用户连续 keydown 我等 1 秒, 一秒中内用户又 keydown 了,我从新记时,再等 1 秒... 一直到用户 1 秒内再也没有 keydown 我才触发

2. auditTime 1000 用户连续 keydown, 我等 1 秒,一秒中内用户又 keydown 了, 但我 "不" 从新记时了, 1 秒后我就触发, 也就是说, 用户 1 秒内按多少次,我都当成 1 次 而且在 1 秒后才触发. 

3. debounceTime 1000, 用户连续 keydown, 我直接触发, 一秒中内用户又 keydown 了, 我不理,直到 1 秒后 

特点

debounceTime 从新几时, 一直不触发

auditTime 等..触发

debounceTime 触发...等

 

 

15.distinct, distinctUntilChanged

https://ithelp.ithome.com.tw/articles/10188194

它就是咱们熟悉的 distinct, 若是值相同就忽略 callback 

distinct((x) => { return x.value }); 能够提供一个取值的方法,对比估计是用 === 

distinct 会把全部的值都存起来作对比, distinctUntilChanged 的区别是它只会存最后一次的值作对比.

 

16.catchError, retry, retryWhen, repeat

https://ithelp.ithome.com.tw/articles/10188263

catchError 是捕获错误, 

catchError((error, obs) => obs); 返回第 2 个参数 obs 能够实现从跑. 

retry 就是作了上述的事情, 而 retry 多了一个能够指定次数, retry(3) 从试 3 次 

retryWhen(errorObs => errorObs.delay(1000)) retryWhen 可让咱们操做更多,好比间隔多久才 retry 下一次等等. 

repeat 和 retry 是同样的,区别在于 retry 必须在 error 时才会有, 而 repeat 则是无论有没有 error 都会执行. 

 

 

17.  ReplaySubject, BehaviorSubject

behavior 表明一个保持值的 subject, 一旦订阅立刻会触发 callback 并获取到最新的值. 即使不订阅也能够调用 .value 来获取当前值.

replay 会缓存以前的 next 一旦新的订阅加入,就会 playback 以前全部的 next 值. 

 

18. race 

race(s1, s2, s3) 的意思是, 哪个先触发,那么以后我就 watch 这一个罢了,另外 2 个 subject 就不理会了。

 

19. mapTo

对比 map, mapTo 的参数是一个值,而不是获取值的方法, 因此值就只有一个,要留意哦。

 

 

  

rxjs 在处理 dom 事件时是很是好用的. 

步骤通常上是 获取全部 element, 创建全部的 event 

而后就是各作 rxjs operator 对 event 和 element 的处理. 

参考 : https://ithelp.ithome.com.tw/articles/10187756

 

 

 

 

  

更新 : 2017-10-14 

import 'rxjs/add/observable/combineLatest'; //每个至少一次后才开始触发(无需 completed), 一个一次能够获取你们最新的 value
import 'rxjs/add/observable/forkJoin'; // 相似 promise.all 全部 observable 须要 最少next 一次 and completed 才会触发,因此只触发一次获取全部 value
import 'rxjs/add/observable/merge'; // 用来监听多个 click event 一个一次, merge(a,b) 若是 a,b 都有初始值, 那么会立刻触发 2 次.
import 'rxjs/add/observable/concat'; // 和 merge 同样, 惟一不一样的是它须要第一个 completed 才会触发第 2 个, 第一还没 completed 第 2 next 都不会叫哦

 

更新 2017-05-17 

今天被 toPromise 给骗了. 

我一直觉得, 全部的 "流" 均可以轻松的转成 await stream.toPromise();

后来我发现有个流一直没有反应 

let subject = new BehaviorSubject('a');
let o = subject.asObservable();
o.toPromise().then(() => console.log('pro')); //不会跑
setTimeout(()=> {
  subject.next('haha');
  //subject.complete();
}, 1000);

上网找了一下才发现,原来 toPromise().then 必须是 completed 才会跑

因此上面的 subject.complete() 必需要打开才行. 这也意味着 toPromise 只能用在一次的 async 中, 若是是要持续 subscribe 的状况下请使用 .subscribe() 

 

 

 

更新 2017-04-02 

Subject 的主要功能就是观察者模式. 

咱们能够随时写入值,彻底本身操控. 

可是有时候咱们但愿它依赖于其它 stream 那么咱们使用 connect 

let s1 = new Subject();
let o1 = s1.asObservable(); //咱们想以来的 stream 

let s2 = new Subject();
s2.subscribe(v => console.log(v));

o1.subscribe(v => s2.next(v),v => s2.error(v)); //第1种写法,超麻烦
o1.subscribe(s2); //第2种,但是我没有要立刻 subscribe 的话呢 ?  
let connector = o1.multicast(s2); //第3种
connector.connect();

s1.next("value");

 

 

更新 2017-03-31

好文,力推 : http://ithelp.ithome.com.tw/articles/10189028?sc=iThomeR
再谈谈 cold & hot 

observeable 是 default cold 的. 

code 的意思是说, 当有多个 subscribe 时,每个都是一条独立的链.

好比 http 多个 subscribe 的话,你会发现你的 request 会发了好几个.

hot 的意思则是每一个 subscribe 共享一个链, 无论你什么以后插入subscribe 你都不会重新开始. 

把一个 cold 变成 hot 的方法是使用 Subject 充当中间人. 

具体看这 3 篇就明白了 

http://ithelp.ithome.com.tw/articles/10188633

http://ithelp.ithome.com.tw/articles/10188677

http://ithelp.ithome.com.tw/articles/10188750

这里介绍一下 ReplaySubject

Subject.subscirbe() , 不会立刻执行, 由于要等待下一个 Subject.next

BehaviorSubject.subscribe() , 立刻执行, 由于里面必定会有值. 

ReplaySubject.subscribe(), 不必定立刻执行,若是曾经 .next 过才会执行 

multicast, refCount,publish,share 的目的就是把 cold 转换成 hot .

其原理就是使用了 Subject 系列. 

multicast 后来被 publish 取代了. publish 对应 subject 因此有 publishBehavior, pulishReplay 

refCount 是 connect 的意思,就是把 observer 连接上 subject 的动做. 

因为 publish().refCount() 太常常用到,因此发明了 share 写的更快了嘻嘻。

在使用 ng 的 http 时要注意. observable 是 cold 的。但不少状况下咱们更但愿它是 hot 的. 

每一次的 subscribe 应该只返回同一个结果, 而这个 http 只发一次请求. 

这时咱们须要这样写 : http.get().publishReplay(1).refCount() 

publishReplay(1) 以后的每个 subscribe 都会获得同一个资料了.

 

 

更新 : 2017-03-27 

何时须要 unsubscribe ? 

http 不须要, router param 也不须要. 

下面说说几个状况 

1. Subject.complete 以后, 全部的 subscribe 都不会再触发, 新的 subscribe 也加不进来了. 

因此若是咱们知道订阅的 Subject 以后会被 complete 那么咱们能够无需担忧 unsubscribe 的问题 

2. 使用 async/await toPromise 能够避开 unsubscribe 的问题. 

3. 可使用 .first(判断) 表示何时开始拿而后停 (好比一个值须要等待 ajax)

4. 使用 takeWhile(判断) 来决定何时取消订阅. 

5. 使用 OnDestroy 

 

 

更新 : 2017-03-18

Observable, 
Subject,
BehaviorSubject
的区别和用法 
 
当咱们手中有一个 Observable, 咱们能够去监听它,那么日后的事情发生咱们都会知道。
可是, 以前发生的事情咱们都没办法知道. 咱们也没办法用它广播一个新的事件。
Observable 能作的事情, Subject 均可以作到. 而 Subject 多了一个能力,就是能够用它广播一个新事件出去. 
因此若是你站在一个监听者的角度, Subject 和 Observable 没啥区别. 你依然没办法知道过往发生的事情. 也只有在下一次事件广播时才会被通知. 
Subject 能作的事情 BehaviorSubject 均可以作,能监听也能广播. 最大的特色在于它能知道过往的事情。
当你手中有一个 BehaviorSubject 你能够立刻调用 .value 获取当前的值, 你订阅它的话,你也会立马收到一个事件,而不像 Subject 或 Observable 一直傻傻等下一次广播才能获得值. 
 
本身选择用吧.
 
ng 的 http.get 返回的是 Observable, 你一监听它你会立刻得到响应, 从这里咱们就能够推断出沿着这条链往上走最终就是一个 BehaviorSubject. 由于只有 BehaviorSubject 被监听的时候才会立刻获得响应。

 

2016-09-23

RxJS 博大精深,看了好几篇文章都没有明白. 

范围牵扯到了函数响应式开发去了... 我对函数式只知其一;不知其二, 响应式更是第一次听到... 

唉...不过日子仍是得过...混着过先呗

我目前所理解的很浅, 大体上是这样的概念.

1.某些场景下它比 promise 好用, 它善于过滤掉不关心的东西. 

2.它是观察者模式 + 迭代器模式组成的 

3.跟时间,事件, 变量有密切关系

4.世界上有一种东西叫 "流" stream, 一个流能表示了一段时间里,同样东西发生的变化. 

  好比有一个值, 它在某段时间里从 "我" 变成 "你" 再变成 "他". 

  而咱们能够对这个流进行观察,因此只要它发生变化,咱们就会发现而后作任何事情。

5.站在游览器的角度, 服务器推送数据过来, 用户操做界面, timer 都是咱们关心的流.

好,来看例子. 

咱们经过 new Subject 来建立流. 也可使用 new EventEmitter 或者 BehaviorSubject. 这些都继承了 Subject

EventEmitter 是 ng2 提供的

BehaviorSubject 能够填入初始值

import { Subject } from "rxjs/Subject";
private textEmitter: Subject<string> = new Subject(); 

要改变流中的值,咱们使用 .next(value), 这个是迭代器的概念咯

keyup(value : string)
{
    this.textEmitter.next(value);
}

那么订阅是这样的 

ngOnInit() {
    this.text$ = this.textEmitter
        .debounceTime(500)
        .distinctUntilChanged()
        .switchMap(v => this.getDataAsync(v));

    this.text$.subscribe((value) => {
        console.log(value);
    });            
}

input keyup 性能优化, 咱们一般会写一个 timeout + cleartimeout 的方式, 这个 debounceTime 就是干这个的 

流更新结束后 500ms 才会通知观察者 

distinctUntilChanged 是说只有当值和上一次通知时的值不同的时候才通知观察者 

.map 和 .switchMap 都是用来对值进行处理的, 这个和 array.map 概念是同样的

而 .map 和 .switchMap 的区别是 .swichMap 处理那些返回 Observeable 的值 

getDataAsync(value : string): Observable<string>
{        
    let subject = new Subject();
    setTimeout(() => {
        console.log("after 2second");
        subject.next(value + "final");
    }, 2000);
    return subject;
}

若是咱们使用 map 的话,它会直接返回 "subject" 这个对象, 而若是用 switchMap 它会返回这个 subject 对象的响应值.

<input type="text" #input (keyup)="keyup(input.value)" />
<p>{{ text$ | async }}</p>

ng2 提供了一个 async Pipe, 它会监听左边这个 text$ stream. 后面加一个 $ 符号一般用来代表这是一个 stream.

还有一个经常使用的功能是 combineLatest

就是能够同时监听多个流,只要其中一个有变更,那么全部的最新值都会发布出去, 能够用来实现依赖属性.

这里须要注意一点 combineLatest 的全部流都必须有值, 不能够是一个历来都没有 next 过的 Observable 否则它就不会运行了.

最简单的方法是使用 observable.startWith(null) 让它有一个值. 

@Component({
    selector: "compute-property",
    template: ` 
        <input type="text" #input1 (keyup)="text1.next(input1.value)" />
        <input type="text" #input2 (keyup)="text2.next(input2.value)" />  
        {{ result$ | async }}                 
    `
})
export class ComputePropertyComponent implements OnInit {

    text1: BehaviorSubject<string> = new BehaviorSubject<string>("a");
    text2: BehaviorSubject<string> = new BehaviorSubject<string>("b");
    result$: Observable<string>;
    constructor() {}
    
    ngOnInit() {
        this.result$ = Observable.combineLatest(this.text1, this.text2).map(values => {          
            return values[0] + " " + values[1];
        }); 
    }     
}

还有 bufferCount, bufferTime 也是经常使用到

text: Subject<number> = new Subject<number>();
    
ngOnInit() {
    this.text.bufferCount(2)
        .subscribe(v => console.log(v)); //[v1,v2] 存够 count 了就发布

    this.text.bufferTime(2000)
        .subscribe(v => console.log(v)); //[v1,v2,...]把 2 秒内的全部 next value 放进来
}

Observable.of 能够简单的返回一个默认值 

Observable.of<string>("").subscribe(v => console.log(v));

rxjs 整个文档很是大,要按需加载.

一般作法是为项目开一个 rxjs-operators.ts 

import 'rxjs/add/observable/throw'; 
import 'rxjs/add/observable/combineLatest'; 
import 'rxjs/add/observable/from'; 
import 'rxjs/add/observable/of'; 
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/debounceTime';
import 'rxjs/add/operator/distinctUntilChanged';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/switchMap';
import 'rxjs/add/operator/toPromise';
import 'rxjs/add/operator/startWith'; 
import 'rxjs/add/operator/bufferCount';
import 'rxjs/add/operator/bufferTime';

放入经常使用的方法 

而后再 app.module.ts 里面导入它 

import './rxjs-operators'; 

 

Hot or cold , share or not

refer : 

http://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html

http://blog.csdn.net/tianjun2012/article/details/51351823

1. by default, observable is not share.

let sub = new Subject();
let obs = sub.map(v => {
    console.log("ajax call"); 
});
obs.subscribe(v => console.log("subscribe 1"));
obs.subscribe(v => console.log("subscribe 2"));         
sub.next("value");

ajax 发了 2 次. angular2 的 Http 也是 not share 哦. 

因此当咱们有多个 subscribe 的时候要想想是否咱们须要 share 

let obs = sub.map(v => {
    console.log("ajax call"); 
}).share();

调用一个 share 方法就能够了,或者是 

let obs = sub.map(v => {
    console.log("ajax call"); 
}).publish().refCount();

效果是同样的. 

 

by default, observable is cold.

意思是说只有在 subscribe 出现了之后才会启动. ( 当第一个 subscribe 出现时, observable 就会马上启动了哦 ) 

let sub = new Subject();
let obs = sub.map(v => {
    console.log("ajax call");
});
sub.next("aaa");
//obs.subscribe(v => console.log("subscribe 1"));
//obs.subscribe(v => console.log("subscribe 2")); 

ajax 不会触发. 

若是咱们但愿它在没有 subscribe 的状况下触发的话, 能够这样写. 

let sub = new Subject();
let obs = sub.map(v => {
    console.log("ajax call");
}).publish();
obs.connect();
sub.next("aaa");

至于什么状况下使用哪种,我尚未实战,之后再说.

多一个例子解释: 

let obs = Observable.create(observer => {
    console.log("observer run");
    observer.next(Date.now());
});
obs.subscribe(v => console.log("1st subscriber: " + v));
obs.subscribe(v => console.log("2nd subscriber: " + v));     
//observer run
//1st subscriber: 1474649902498
//observer run
//2nd subscriber: 1474649902501 

no share. 因此 observer run 了 2 次. 

let obs = Observable.create(observer => {
    console.log("observer run");
    observer.next(Date.now());
}).share();
obs.subscribe(v => console.log("1st subscriber: " + v));
obs.subscribe(v => console.log("2nd subscriber: " + v));     
//observer run
//1st subscriber: 1474650049833

share 了, 因此 observer only run 1 次.

cold, 因此当第一个 subcribe 出现后 observer 马上运行 -> .next 更新了 value -> 第一个 subcribe callback 被调用 -> 整个过程结束 -> 而后第2个 subcribe 注册 .. 因为是 share 因此 observer 没有载被触发. 第2个 subscribe callback 没有被调用. 

延后触发的作法 : 

let obs = Observable.create(observer => {
    console.log("observer run");
    observer.next(Date.now());
}).publish();
obs.subscribe(v => console.log("1st subscriber: " + v));
obs.subscribe(v => console.log("2nd subscriber: " + v));
obs.connect();
//observer run 
//1st subscriber: 1474650370505 
//2nd subscriber: 1474650370505

能够看到 .publish() 以后, subscribe 再也不能激活 observer 了,而必须手动调用 .connect() 才能激活 observer. 

这几个例子只是为了让你了解它们的玩法.

小结:

observer default is cold and not share.

cold 表示只有 subscribe 出现 observer 才会被激活.

not share 表示每个 subscribe 都会激活 observer 链. 

 

经常使用 : 

1. finally 的使用

import 'rxjs/add/operator/finally';

this
.http.get( "http://localhost:58186/api/products", { headers: new Headers({ "Accept": "application/json" })} ).finally(() => { console.log("finally"); //无论 success or error 最后都会跑这个 }).subscribe(response => { console.log("success"); }, response => { console.log("fail"); }, () => { console.log("success final"); }); //result : //success -> success final -> finally //fail -> finally

 

2. 错误处理 throw catch  

 
 

import 'rxjs/add/operator/catch';
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/throw';


this
.http.get( "http://localhost:58186/api/products", { headers: new Headers({ "Accept": "application/json" }) }) .map(r => r.json()) .catch((r) => { if ("1" == "1") { //do something ... return null; //catch 了在返回真确 } else { return Observable.throw("error"); //catch 了继续返回错误 } }) .subscribe( r => console.log(r), r => { console.log("fail") } );

 

3. previous / current value 

用 .pairwise()

let userSubject = new BehaviorSubject<string>("default value");
let user$ = userSubject.asObservable().pairwise();
user$.subscribe(([before, after]) => { console.log(before), console.log(after); });  
userSubject.next("super");
userSubject.next("ttc");
//result : 
//["default value","super"]
//["super","ttc"] 
相关文章
相关标签/搜索