上一部分: http://www.cnblogs.com/cgzl/p/8641738.htmlcss
Subject比较特殊, 它便是Observable又是Observer.html
做为Observable, Subject是比较特殊的, 它能够对多个Observer进行广播, 而普通的Observable只能单播, 它有点像EventEmitters(事件发射器), 维护着多个注册的Listeners.app
做为Observable, 你能够去订阅它, 提供一个Observer就会正常的收到推送的值. 从Observer的角度是没法分辨出这个Observable是单播的仍是一个Subject.this
从Subject内部来说, subscribe动做并无调用一个新的执行来传递值, 它只是把Observer注册到一个列表里, 就像其余库的AddListener同样.spa
做为Observer, 它是一个拥有next(), error(), complete()方法的对象, 调用next(value)就会为Subject提供一个新的值, 而后就会多播到注册到这个Subject的Observers.3d
例子 subject.ts:code
import { Subject } from "rxjs/Subject"; const subject = new Subject(); const subscriber1 = subject.subscribe({ next: (v) => console.log(`observer1: ${v}`) }); const subscriber2 = subject.subscribe({ next: (v) => console.log(`observer2: ${v}`) }); subject.next(1); subscriber2.unsubscribe(); subject.next(2); const subscriber3 = subject.subscribe({ next: (v) => console.log(`observer3: ${v}`) }); subject.next(3);
订阅者1,2从开始就订阅了subject. 而后subject推送值1的时候, 它们都收到了. component
而后订阅者2, 取消了订阅, 随后subject推送值2, 只有订阅者1收到了.server
后来订阅者3也订阅了subject, 而后subject推送了3, 订阅者1,3都收到了这个值.htm
下面是一个angular 5的例子:
app.component.html:
<h3>从Subject共享Observable到多个Subscribers</h3>
<input type="text" placeholder="start typing" (input)="mySubject.next($event)" (keyup)="mySubject.next($event)">
<br> Subscriber to input events got {{inputValue}}
<br>
<br> Subscriber to keyup events got {{keyValue}}
app.component.ts:
import { Component } from '@angular/core'; import { Subject } from 'rxjs/Subject'; import 'rxjs/add/operator/filter'; import 'rxjs/add/operator/map'; @Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent { title = 'app'; keyValue: string; inputValue: string; mySubject: Subject<Event> = new Subject(); constructor() { // subscriber 1 this.mySubject.filter(({ type }) => type === 'keyup') .map(e => (<KeyboardEvent>e).key) .subscribe(value => this.keyValue = value); // subscriber 2 this.mySubject.filter(({ type }) => type === 'input') .map(e => (<HTMLInputElement>e.target).value) .subscribe(value => this.inputValue = value); } }
input和keyup动做都把event推送到mySubject, 而后mySubject把值推送给订阅者, 订阅者1经过过滤和映射它只处理keyup类型的事件, 而订阅者2只处理input事件.
效果:
BehaviorSubject 是Subject的一个变种, 它有一个当前值的概念, 它会把它上一次发送给订阅者值保存起来, 一旦有新的Observer进行了订阅, 那这个Observer立刻就会从BehaviorSubject收到这个当前值.
也能够这样理解BehaviorSubject的特色:
例子 behavior-subject.ts:
import { BehaviorSubject } from "rxjs/BehaviorSubject"; const subject = new BehaviorSubject(0); subject.subscribe({ next: v => console.log(`Observer1: ${v}`) }); subject.next(1); subject.next(2); subject.subscribe({ next: v => console.log(`Observer2: ${v}`) }); subject.next(3);
效果:
concat: 按顺序合并observables. 只会在前一个observable结束以后才会订阅下一个observable.
它适合用于顺序处理, 例如http请求.
例子:
import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/timer'; import 'rxjs/add/operator/mapTo'; import 'rxjs/add/observable/concat'; let firstReq = Observable.timer(3000).mapTo('First Response'); let secondReq = Observable.timer(1000).mapTo('Second Response'); Observable.concat(firstReq, secondReq) .subscribe(res => console.log(res));
效果:
把多个输入的observable交错的混合成一个observable, 不按顺序.
merge其实是订阅了每一个输入的observable, 它只是把输入的observable的值不带任何转换的发送给输出的Observable. 只有当全部输入的observable都结束了, 输出的observable才会结束. 任何在输入observable传递来的错误都会当即发射到输出的observable, 也就是把整个流都杀死了 .
例子:
import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/timer'; import 'rxjs/add/operator/mapTo'; import 'rxjs/add/observable/merge'; let firstReq = Observable.timer(3000).mapTo('First Response'); let secondReq = Observable.timer(1000).mapTo('Second Response'); Observable.merge(firstReq, secondReq) .subscribe(res => console.log(res));
效果:
mergeMap把每一个输入的Observable的值映射成Observable, 而后把它们混合成一个Observable.
mergeMap能够把嵌套的observables拼合成非嵌套的observable.
它有这些好处:
这个仍是经过例子来理解比较好:
import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/from'; import 'rxjs/add/operator/mergeMap'; function getData() { const students = Observable.from([ { name: 'Dave', age: 17 }, { name: 'Nick', age: 18 }, { name: 'Lee', age: 15 } ]); const teachers = Observable.from([ { name: 'Miss Wan', age: 28 }, { name: 'Mrs Wang', age: 31 }, ]); return Observable.create( observer => { observer.next(students); observer.next(teachers); } ); } getData() .mergeMap(persons => persons) .subscribe( p => console.log(`Subscriber got ${p.name} - ${p.age}`) );
效果:
switchMap把每一个值都映射成Observable, 而后使用switch把这些内部的Observables合并成一个.
switchMap有一部分很想mergeMap, 但也仅仅是一部分像而已.
由于它还具备取消的效果, 每次发射的时候, 前一个内部的observable会被取消, 下一个observable会被订阅. 能够把这个理解为切换到一个新的observable上了.
这个仍是看marble图比较好理解:
例子:
// 当即发出值, 而后每5秒发出值 const source = Rx.Observable.timer(0, 5000); // 当 source 发出值时切换到新的内部 observable,发出新的内部 observable 所发出的值 const example = source.switchMap(() => Rx.Observable.interval(500)); // 输出: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8 const subscribe = example.subscribe(val => console.log(val));
更好的例子是: 网速比较慢的时候, 客户端发送了屡次重复的请求, 若是前一次请求在2秒内没有返回的话, 那么就取消前一次请求, 再也不须要前一次请求的结果了, 这里就应该使用debounceTime配合switchMap.
mergeMap:
import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/interval'; import 'rxjs/add/operator/take'; import 'rxjs/add/operator/map'; import 'rxjs/add/operator/mergeMap'; import 'rxjs/add/operator/switchMap'; const outer = Observable.interval(1000).take(2); const combined = outer.mergeMap(x => { return Observable.interval(400) .take(3) .map(y => `outer ${x}: inner ${y}`); }); combined.subscribe(res => console.log(`result ${res}`));
效果:
switchMap:
import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/interval'; import 'rxjs/add/operator/take'; import 'rxjs/add/operator/map'; import 'rxjs/add/operator/mergeMap'; import 'rxjs/add/operator/switchMap'; const outer = Observable.interval(1000).take(2); const combined = outer.switchMap(x => { return Observable.interval(400) .take(3) .map(y => `outer ${x}: inner ${y}`); }); combined.subscribe(res => console.log(`result ${res}`));
zip操做符也会合并多个输入的observables成为一个observable. 多个输入的observable的值, 按顺序, 按索引进行合并, 若是某一个observable在该索引上的值尚未发射值, 那么会等它, 直到全部的输入observables在该索引位置上的值都发射出来, 输出的observable才会发射该索引的值.
例子:
import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/of'; import 'rxjs/add/observable/zip'; let age$ = Observable.of<number>(27, 25, 29); let name$ = Observable.of<string>('Foo', 'Bar', 'Beer'); let isDev$ = Observable.of<boolean>(true, true, false); Observable .zip(age$, name$, isDev$, (age: number, name: string, isDev: boolean) => ({ age, name, isDev })) .subscribe(x => console.log(x));
效果:
就不往下写了, 其实看文档就行, 最重要的仍是上一部分.