参考文章:介绍RxJS在Angular中的应用css
HTTP
响应流仍是定时器,对这些值进行监听和中止监听的接口都是同样的。Observable
)的实例,其中定义了一个订阅者(subscriber
)函数。订阅者函数用于定义“如何获取或生成那些要发布的值或消息”。订阅者函数会接收一个 观察者(observer
),并把值发布给观察者的 next()
方法subscribe()
方法时,这个订阅者函数就会执行。做为消费者,要执行所建立的可观察对象,并开始从中接收通知,你就要调用可观察对象的 subscribe()
方法,并传入一个观察者(observer
)。JavaScript
对象,它定义了你收到的这些消息的处理器(handler
)。subscribe()
调用会返回一个 Subscription
对象,该对象具备一个 unsubscribe()
方法。 当调用该方法时,你就会中止接收通知。// 在有消费者订阅它以前,这个订阅者函数并不会实际执行
const locations = new Observable((observer) => {
const {next, error} = observer;
let watchId;
if ('geolocation' in navigator) {
watchId = navigator.geolocation.watchPosition(next, error);
} else {
error('Geolocation not available');
}
return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }};
});
// subscribe() 调用会返回一个 Subscription 对象,该对象具备一个 unsubscribe() 方法。
// subscribe()传入一个观察者对象,定义了你收到的这些消息的处理器
const locationsSubscription = locations.subscribe({
next(position) { console.log('Current Position: ', position); },
error(msg) { console.log('Error Getting Location: ', msg); }
});
// 10 seconds后调用该方法时,你就会中止接收通知。
setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);
复制代码
通知类型 | 说明 |
---|---|
next |
必要。用来处理每一个送达值。在开始执行后可能执行零次或屡次。 |
error |
可选。用来处理错误通知。错误会中断这个可观察对象实例的执行过程。 |
complete |
可选。用来处理执行完毕(complete )通知。当执行完毕后,这些值就会继续传给下一个处理器。 |
只有当有人订阅 Observable
的实例时,订阅者函数才会开始发布值。html
订阅时要先调用该实例的 subscribe()
方法,并把一个观察者对象传给subscribe()
,用来接收通知。node
使用 Observable
上定义的一些静态方法来建立一些经常使用的简单可观察对象:react
of(...items)
—— 返回一个 Observable
实例,它用同步的方式把参数 中提供的这些值发送出来。ajax
from(iterable)
—— 把它的参数转换成一个 Observable
实例。 该方法一般用于把一个数组转换成一个(发送多个值的)可观察对象。编程
下面的例子会建立并订阅一个简单的可观察对象,它的观察者会把接收到的消息记录到控制台中:api
// 建立简单的可观察对象,来发送3个值
const myObservable = of(1, 2, 3);
// 建立观察者对象
const myObserver = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// 订阅
myObservable.subscribe(myObserver);
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification
=>前面指定预约义观察者并订阅它,等同以下写法,省略了next,error,complete
myObservable.subscribe(
// subscribe() 方法能够接收预约义在观察者中同一行的回调函数
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
复制代码
不管哪一种状况,next
处理器都是必要的,而 error
和 complete
处理器是可选的。数组
next()
函数能够接受消息字符串、事件对象、数字值或各类结构。咱们把由可观察对象发布出来的数据统称为流。任何类型的值均可以表示为可观察对象,而这些值会被发布为一个流。of(1, 2, 3)
等价的可观察对象,你能够这样作:// 订阅者函数会接收一个 Observer 对象,并把值发布给观察者的 next() 方法。
function sequenceSubscriber(observer) {
// 同步地 发布 1, 2, and 3, 而后 complete
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
// 同步发布数据,因此取消订阅 不须要作任何事情
return {unsubscribe() {}};
}
// 使用 Observable 构造函数,建立一个新的可观察对象,
// 当执行可观察对象的 subscribe() 方法时,这个构造函数就会把它接收到的参数sequenceSubscriber做为订阅者函数来运行。
const sequence = new Observable(sequenceSubscriber);
sequence.subscribe({
next(num) { console.log(num); },
complete() { console.log('Finished sequence'); }
});
// Logs:
// 1
// 2
// 3
// Finished sequence
复制代码
function fromEvent(target, eventName) {
return new Observable(
// new Observable中传入的订阅者函数是用内联方式定义的
// 订阅者函数会接收一个 观察者对象observer,并把值e发布给观察者的 next() 方法
(observer) => {
const handler = (e) => observer.next(e);
// Add the event handler to the target
target.addEventListener(eventName, handler);
return () => {
// Detach the event handler from the target
target.removeEventListener(eventName, handler);
};
}
);
}
const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;
const subscription = fromEvent(nameInput, 'keydown')//使用fromEvent函数来建立可发布 keydown 事件的可观察对象
.subscribe(
// subscribe() 方法接收预约义在观察者中同一行的next回调函数
(e: KeyboardEvent) => {
if (e.keyCode === ESC_KEY) {
nameInput.value = '';
}
}
);
复制代码
setTimeout
异步生成值,因此用 try/catch
是没法捕获错误的。你应该在观察者中指定一个 error
回调来处理错误。subscribe()
调用 next
回调),也能够调用 complete
或 error
回调来主动结束。myObservable.subscribe({
next: (num) => console.log('Next num: ' + num),
error: (err) => console.log('Received an errror: ' + err)
});
复制代码
RxJS
是一个使用可观察对象进行响应式编程的库。promise
RxJS
提供了一些用来建立可观察对象的函数。这些函数能够简化根据某些东西建立可观察对象的过程,好比承诺、定时器、事件、ajax
等等。缓存
import { fromPromise } from 'rxjs';
// Create an Observable out of a promise
const data = fromPromise(fetch('/api/endpoint'));
// Subscribe to begin listening for async result
data.subscribe({
next(response) { console.log(response); },
error(err) { console.error('Error: ' + err); },
complete() { console.log('Completed'); }
});
复制代码
import { interval } from 'rxjs';
// Create an Observable that will publish a value on an interval
const secondsCounter = interval(1000);
// Subscribe to begin publishing values
secondsCounter.subscribe(n =>
console.log(`It's been ${n} seconds since subscribing!`));
复制代码
import { fromEvent } from 'rxjs';
const el = document.getElementById('my-element');
// Create an Observable that will publish mouse movements
const mouseMoves = fromEvent(el, 'mousemove');
// Subscribe to start listening for mouse-move events
const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
// Log coords of mouse movements
console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);
// When the mouse is over the upper-left of the screen,
// unsubscribe to stop listening for mouse movements
if (evt.clientX < 40 && evt.clientY < 40) {
subscription.unsubscribe();
}
});
复制代码
import { ajax } from 'rxjs/ajax';
// Create an Observable that will create an AJAX request
const apiData = ajax('/api/data');
// Subscribe to create the request
apiData.subscribe(res => console.log(res.status, res.response));
复制代码
操做符会观察来源可观察对象中发出的值,转换它们,并返回由转换后的值组成的新的可观察对象。
Observable
能够链式写法,这意味着咱们能够这样:Observable.fromEvent(node, 'input')
.map((event: any) => event.target.value)
.filter(value => value.length >= 2)
.subscribe(value => { console.log(value); });
复制代码
下面是整个顺序步骤:
假设用户输入:a
Observable
对触发 oninput
事件做出反应,将值以参数的形式传递给observer
的 next()
。(内部实现)
map()
根据 event.target.value
的内容返回一个新的 Observable
,并调用 next()
传递给下一个observer
。
filter()
若是值长度 >=2
的话,则返回一个新的 Observable
,并调用 next()
传递给下一个observer
。
最后,将结果传递给 subscribe
订阅块。
你只要记住每一次 operator
都会返回一个新的 Observable
,无论 operator
有多少个,最终只有最后一个 Observable
会被订阅。
import { filter, map } from 'rxjs/operators';
const squareOdd = of(1, 2, 3, 4, 5) // 可观察对象
.pipe(
filter(n => n % 2 !== 0),
map(n => n * n)
);
// Subscribe to get values
squareOdd.subscribe(x => console.log(x));
复制代码
takeWhile
若是组件有多个订阅者的话,咱们须要将这些订阅者存储在数组中,当组件被销毁时再逐个取消订阅。但,咱们有更好的办法: 使用 takeWhile() operator
,它会在你传递一个布尔值是调用 next()
仍是 complete()
。
private alive: boolean = true;
ngOnInit() {
const node = document.querySelector('input[type=text]');
this.s = Observable.fromEvent(node, 'input')
.takeWhile(() => this.alive)
.map((event: any) => event.target.value)
.filter(value => value.length >= 2)
.subscribe(value => { console.log(value) });
}
ngOnDestroy() {
this.alive = false;
}
复制代码
RxJS
很火很大缘由我认仍是提供了丰富的API
,如下是摘抄:
建立数据流:
of, empty, never
from
.from([1, 2, 3, 4])
interval, timer
fromEvent
Promise
建立:fromPromise
create
转换操做:
map, mapTo, pluck
mapTo
: event$.mapTo(1) // 使event流的值为1
pluck
: event$.pluck('target', 'value') // 从event流中取得其target属性的value属性
filter, skip, first, last, take,distinctUntilChanged
distinctUntilChanged
:保留跟前一个元素不同的元素delay, timeout, throttletime, throttle, debouncetime, debounce, audit, bufferTime
debounce
:若是在900
毫秒内没有新事件产生,那么以前的事件将经过;若是在900
毫秒内有新事件产生,那么以前的事件将被舍弃。throttle
:在必定时间范围内无论产生了多少事件,它只放第一个过去,剩下的都将舍弃butterTime
:缓存参数毫秒内的全部的源Observable的值,而后一次性以数组的形式发出reduce, scan
throw, catch, retry, finally
takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn
switch
组合数据流:
concat
,保持原来的序列顺序链接两个数据流。只有运行完前面的流,才会运行后面的流merge
,将两个流按各自的顺序叠加成一个流race
,预设条件为其中一个数据流完成forkJoin
,预设条件为全部数据流都完成zip
,取各来源数据流最后一个值合并为对象combineLatest
,取各来源数据流最后一个值合并为数组startWith
,先发出做为startWith
参数指定的项,而后再发出由源 Observable
所发出的项窃听:
do、tap
是两个彻底相同的操做符,用于窃听Observable
的生命周期事件,而不会产生打扰。除了能够在订阅时提供 error()
处理器外,RxJS
还提供了 catchError
操做符,它容许你在管道中处理已知错误。 下面是使用 catchError
操做符实现这种效果的例子:
import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('/api/data').pipe(
map(res => {
if (!res.response) {
throw new Error('Value expected!');
}
return res.response;
}),
//若是你捕获这个错误并提供了一个默认值,流就会继续处理这些值,而不会报错。
catchError(err => of([]))
);
apiData.subscribe({
next(x) { console.log('data: ', x); },
error(err) { console.log('errors already caught... will not run'); }
});
复制代码
能够在 catchError
以前使用 retry
操做符。 下列代码为前面的例子加上了捕获错误前重发请求的逻辑:
import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';
const apiData = ajax('/api/data').pipe(
retry(3), // Retry up to 3 times before failing
map(res => {
if (!res.response) {
throw new Error('Value expected!');
}
return res.response;
}),
catchError(err => of([]))
);
apiData.subscribe({
next(x) { console.log('data: ', x); },
error(err) { console.log('errors already caught... will not run'); }
});
复制代码
习惯上的可观察对象的名字以$
符号结尾。
stopwatchValue$: Observable<number>;
复制代码
Angular
使用可观察对象做为处理各类经常使用异步操做的接口。好比:
EventEmitter
类派生自 Observable
。HTTP
模块使用可观察对象来处理 AJAX
请求和响应。Angular
提供了一个 EventEmitter
类,它用来从组件的 @Output()
属性中发布一些值。EventEmitter
扩展了 Observable
,并添加了一个 emit()
方法,这样它就能够发送任意值了。当你调用 emit()
时,就会把所发送的值传给订阅上来的观察者的 next()
方法。
@Output() changed = new EventEmitter<string>();
click() {
this.changed.emit('hi~');
}
复制代码
@Component({
template: `<comp (changed)="subscribe($event)"></comp>`
})
export class HomeComponent {
subscribe(message: string) {
// 接收:hi~
}
}
复制代码
Angular
的 HttpClient
从 HTTP
方法调用中返回了可观察对象。例如,http.get(‘/api’)
就会返回可观察对象。
相对于基于承诺(Promise
)的 HTTP API
,它有一系列优势:
.then()
调用同样)。反之,你可使用一系列操做符来按需转换这些值。HTTP
请求是能够经过 unsubscribe()
方法来取消的。AsyncPipe
会订阅一个可观察对象或承诺,并返回其发出的最后一个值。当发出新值时,该管道就会把这个组件标记为须要进行变动检查的
Router.events
以可观察对象的形式提供了其事件。 你可使用 RxJS
中的 filter()
操做符来找到感兴趣的事件,而且订阅它们,以便根据浏览过程当中产生的事件序列做出决定。 例子以下:import { Router, NavigationStart } from '@angular/router';
import { filter } from 'rxjs/operators';
@Component({
selector: 'app-routable',
templateUrl: './routable.component.html',
styleUrls: ['./routable.component.css']
})
export class Routable1Component implements OnInit {
navStart: Observable<NavigationStart>;
constructor(private router: Router) {
// Create a new Observable the publishes only the NavigationStart event
this.navStart = router.events.pipe(
filter(evt => evt instanceof NavigationStart)
) as Observable<NavigationStart>;
}
ngOnInit() {
this.navStart.subscribe(evt => console.log('Navigation Started!'));
}
}
复制代码
ActivatedRoute
是一个可注入的路由器服务,它使用可观察对象来获取关于路由路径和路由参数的信息。好比,ActivateRoute.url
包含一个用于汇报路由路径的可观察对象。例子以下:import { ActivatedRoute } from '@angular/router';
@Component({
selector: 'app-routable',
templateUrl: './routable.component.html',
styleUrls: ['./routable.component.css']
})
export class Routable2Component implements OnInit {
constructor(private activatedRoute: ActivatedRoute) {}
ngOnInit() {
this.activatedRoute.url
.subscribe(url => console.log('The URL changed to: ' + url));
}
}
复制代码
响应式表单具备一些属性,它们使用可观察对象来监听表单控件的值。 FormControl
的 valueChanges
属性和 statusChanges
属性包含了会发出变动事件的可观察对象。订阅可观察的表单控件属性是在组件类中触发应用逻辑的途径之一。好比:
import { FormGroup } from '@angular/forms';
@Component({
selector: 'my-component',
template: 'MyComponent Template'
})
export class MyComponent implements OnInit {
nameChangeLog: string[] = [];
heroForm: FormGroup;
ngOnInit() {
this.logNameChange();
}
logNameChange() {
const nameControl = this.heroForm.get('name');
nameControl.valueChanges.subscribe(
(value: string) => this.nameChangeLog.push(value)
);
}
}
复制代码
可观察对象 | 承诺 | Observable优点 |
---|---|---|
可观察对象是声明式的,在被订阅以前,它不会开始执行。 | 承诺是在建立时就当即执行的。 | 这让可观察对象可用于定义那些应该按需执行的情景。 |
可观察对象能提供多个值。 | 承诺只提供一个。 | 这让可观察对象可用于随着时间的推移获取多个值。 |
可观察对象会区分串联处理和订阅语句。 | 承诺只有 .then() 语句。 | 这让可观察对象可用于建立供系统的其它部分使用而不但愿当即执行的复杂菜谱。 |
可观察对象的 subscribe() 会负责处理错误。 | 承诺会把错误推送给它的子承诺。 | 这让可观察对象可用于进行集中式、可预测的错误处理。 |
subscribe()
会执行一次定义好的行为,而且能够再次调用它。从新订阅会致使从新计算这些值。content_copy
// declare a publishing operation
new Observable((observer) => { subscriber_fn });
// initiate execution
observable.subscribe(() => {
// observer handles notifications
});
复制代码
then
语句(订阅)都会共享同一次计算。content_copy
// initiate execution
new Promise((resolve, reject) => { executer_fn });
// handle return value
promise.then((value) => {
// handle result here
});
复制代码
content_copy
observable.map((v) => 2*v);
复制代码
.then()
语句(等价于订阅)和中间的 .then()
语句(等价于映射)。content_copy
promise.then((v) => 2*v);
复制代码
content_copy
const sub = obs.subscribe(...);
sub.unsubscribe();
复制代码
content_copy
obs.subscribe(() => {
throw Error('my error');
});
复制代码
content_copy
promise.then(() => {
throw Error('my error');
});
复制代码
咱们在写一个Service
用于数据传递时,老是使用 new Subject
。
@Injectable()
export class MessageService {
private subject = new Subject<any>();
send(message: any) {
this.subject.next(message);
}
get(): Observable<any> {
return this.subject.asObservable();
}
}
复制代码
当F
组件须要向M
组件传递数据时,咱们能够在F
组件中使用 send()
。
constructor(public srv: MessageService) { }
ngOnInit() {
this.srv.send('w s k f m?')
}
复制代码
而M
组件只须要订阅内容就行:
constructor(private srv: MessageService) {}
message: any;
ngOnInit() {
this.srv.get().subscribe((result) => {
this.message = result;
})
}
复制代码