咱们已经快把全部基本的转换(Transformation)、过滤(Filter)和合并(Combination)的 operators 讲完了。今天要讲错误处理(Error Handling)的 operators,错误处理是非同步行为中的一大难题,尤为有多个交错的非同步行为时,更容易凸显错误处理的困难。javascript
就让咱们一块儿来看看在 RxJS 中能如何处理错误吧!java
catch 是很常见的非同步错误处理方法,在 RxJS 中也可以直接用 catch 来处理错误,在 RxJS 中的 catch 能够回传一个 observable 来送出新的值,让咱们直接来看示例:web
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch(error => Rx.Observable.of('h'));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});复制代码
这个示例咱们每隔 500 毫秒会送出一个字串(String),并用字串的方法 toUpperCase()
来把字串的英文字母改为大写,过程当中可能未知的缘由送出了一个数值(Number) 2
致使发生例外(数值没有 toUpperCase 的方法),这时咱们在后面接的 catch 就能抓到错误。socket
catch 能够回传一个新的 Observable、Promise、Array 或任何 Iterable 的事件,来传送以后的元素。fetch
以咱们的例子来讲最后就会在送出 X
就结束,画成 Marble Diagram 以下ui
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
catch(error => Rx.Observable.of('h'))
example: ----a----b----c----d----h|复制代码
这里能够看到,当错误发生后就会进到 catch 并从新处理一个新的 observable,咱们能够利用这个新的 observable 来送出咱们想送的值。spa
也能够在遇到错误后,让 observable 结束,以下.net
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch(error => Rx.Observable.empty());
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});复制代码
回传一个 empty 的 observable 来直接结束(complete)。
另外 catch 的 callback 能接收第二个参数,这个参数会接收当前的 observalbe,咱们能够回传当前的 observable 来作到从新执行,示例以下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch((error, obs) => obs);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});复制代码
这里能够看到咱们直接回传了当前的 obserable(其实就是 example)来从新执行,画成 Marble Diagram 以下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
catch((error, obs) => obs)
example: ----a----b----c----d--------a----b----c----d--..复制代码
由于是咱们只是简单的示范,因此这里会一直无限循环,实务上一般会用在断线重连的情境。
另上面的处理方式有一个简化的写法,叫作 retry()
。
若是咱们想要一个 observable 发生错误时,从新尝试就能够用 retry 这个方法,跟咱们前一个讲示例的行为是一致
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retry();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});复制代码
一般这种无限的 retry
会放在即时同步的从新链接,让咱们在连线断掉后,不断的尝试。另外咱们也能够设定只尝试几回,以下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retry(1);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// d
// a
// b
// c
// d
// Error: TypeError: x.toUpperCase is not a function复制代码
这里咱们对 retry 传入一个数值 1
,可以让咱们只重复尝试 1 次后送出错误,画成 Marble Diagram 以下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
retry(1)
example: ----a----b----c----d--------a----b----c----d----X|复制代码
这种处理方式很适合用在 HTTP request 失败的场景中,咱们能够设定从新发送几回后,再秀出错误讯息。
RxJS 还提供了另外一种方法 retryWhen
,他能够把例外发生的元素放到一个 observable 中,让咱们能够直接操做这个 observable,并等到这个 observable 操做完后再从新订阅一次本来的 observable。
这里咱们直接来看代码
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retryWhen(errorObs => errorObs.delay(1000));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});复制代码
这里 retryWhen 咱们传入一个 callback,这个 callback 有一个参数会传入一个 observable,这个 observable 不是本来的 observable(example) 而是例外事件送出的错误所组成的一个 observable,咱们能够对这个由错误所组成的 observable 作操做,等到此次的处理完成后就会从新订阅咱们本来的 observable。
这个示例咱们是把错误的 observable 送出错误延迟 1 秒,这会使后面从新订阅的动做延迟 1 秒才执行,画成 Marble Diagram 以下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
retryWhen(errorObs => errorObs.delay(1000))
example: ----a----b----c----d-------------------a----b----c----d----...复制代码
从上图能够看到后续从新订阅的行为就被延后了,但实务上咱们不太会用 retryWhen 来作从新订阅的延迟,一般是直接用 catch 作到这件事。这里只是为了示范 retryWhen 的行为,实务上咱们一般会把 retryWhen 拿来作错误通知或是例外收集,以下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retryWhen(
errorObs => errorObs.map(err => fetch('...')));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});复制代码
这里的 errorObs.map(err => fetch('...'))
能够把 errorObs 里的每一个错误变成 API 的发送,一般这里个 API 会像是送讯息到公司的通信频道(Slack 等等),这样可让工程师立刻知道可能哪一个 API 挂了,这样咱们就能即时地处理。
retryWhen 其实是在背地里创建一个 Subject 并把错误放入,会在对这个 Subject 进行内部的订阅,由于咱们尚未讲到 Subject 的观念,你们能够先把它看成 Observable 就行了,另外记得这个 observalbe 预设是无限的,若是咱们把它结束,本来的 observable 也会跟着结束。
咱们有时候可能会想要 retry 一直重复订阅的效果,但没有错误发生,这时就能够用 repeat 来作到这件事,示例以下
var source = Rx.Observable.from(['a','b','c'])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source.repeat(1);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// a
// b
// c
// complete复制代码
这里 repeat 的行为跟 retry 基本一致,只是 retry 只有在例外发生时才触发,画成 Marble Diagram 以下
source : ----a----b----c|
repeat(1)
example: ----a----b----c----a----b----c|复制代码
一样的咱们能够不给参数让他无限循环,以下
var source = Rx.Observable.from(['a','b','c'])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source.repeat();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});复制代码
这样咱们就能够作动不断重复的行为,这个能够在创建轮询时使用,让咱们不断地发 request 来更新画面。
最后咱们来看一个错误处理在实际应用中的小示例
const title = document.getElementById('title');
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x)
.map(x => x.toUpperCase());
// 一般 source 会是创建即时同步的连线,像是 web socket
var example = source.catch(
(error, obs) => Rx.Observable.empty()
.startWith('连线发生错误: 5秒后重连')
.concat(obs.delay(5000))
);
example.subscribe({
next: (value) => { title.innerText = value },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});复制代码
这个示例其实就是模仿在即时同步断线时,利用 catch 返回一个新的 observable,这个 observable 会先送出错误讯息而且把本来的 observable 延迟 5 秒再作合并,虽然这只是一个模仿,但它清楚的展现了 RxJS 在作错误处理时的灵活性。
今天咱们讲了三个错误处理的方法还有一个 repeat operator,这几个方法都颇有机会在实际上用到,不知道今天你们有没有收获呢? 若是有任何问题,欢迎在下方留言给我,谢谢!