Observable 的 map 方法使用上跟阵列的 map 是同样的数组
var source = interval(1000);
var newest = source.map(x => x + 2);
newest.subscribe(console.log);
// 2
// 3
// 4
// 5..
复制代码
mapTo 能够把传进来的值改为一个固定的值,以下异步
var source = interval(1000);
var newest = source.mapTo(2);
newest.subscribe(console.log);
// 2
// 2
// 2
// 2..
复制代码
filter 在使用上也跟阵列的相同,传入一个 callback function,这个 function 会传入每一个被送出的元素,而且回传一个 boolean 值,若是为 true 的话就会保留,若是为 false 就会被滤掉,以下函数
var source = interval(1000);
var newest = source.filter(x => x % 2 === 0);
newest.subscribe(console.log);
// 0
// 2
// 4
// 6..
复制代码
数据累加计算ui
var main = from('hello').pipe(
// 依次打印hello的每一个字母
zip(interval(500), (x, y) => x)
)
const example = main.pipe(
// scan第二个参数为初始值
scan(
(origin,next)=> origin + next
)
)
example.subscribe({
next: value => {
console.log(value);
},
error: err => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// h
// he
// hel
// hell
// hello
// complete
复制代码
不少时候若是Observable没有发生错误,我门也但愿能够重复发起订阅,这个时候就要用到repeat方法了,repeat用法和retry基本同样。spa
var example = from(['a','b','c']).pipe(
zip(interval(500), (x,y) => x),
repeat()
)
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
复制代码
根据一个或多个列对结果集进行分组code
var people = [
{ name: "Anna", score: 100, subject: "English" },
{ name: "Anna", score: 90, subject: "Math" },
{ name: "Anna", score: 96, subject: "Chinese" },
{ name: "Jerry", score: 100, subject: "Math" },
{ name: "Jerry", score: 80, subject: "English" },
{ name: "Jerry", score: 90, subject: "Chinese" }
];
var example = from(people).pipe(
groupBy(item => item.name),
map(group =>
group.pipe(
reduce((acc, cur) => ({
name: cur.name,
score: acc.score + cur.score
}))
)
),
mergeAll()
);
example.subscribe({
next: value => {
console.log(value);
},
error: err => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// {name: "Anna", score: 286}
// {name: "Jerry", score: 270}
// complete
复制代码
take 是一个很简单的 operator,顾名思义就是取前几个元素后就结束,以下对象
var source = interval(1000);
var example = source.take(3);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// complete
复制代码
first 会取 observable 送出的第 1 个元素以后就直接结束,行为跟 take(1) 一致。rxjs
var source = interval(1000);
var example = source.first();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// complete
复制代码
除了能够用 take 取前几个以外,咱们也能够倒过来取最后几个,范例以下:事件
var source = interval(1000).take(6);
var example = source.takeLast(2);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 4
// 5
// complete
复制代码
跟 take(1) 相同,咱们有一个 takeLast(1) 的简化写法,那就是 last() 用来取得最后一个元素。ip
var source = interval(1000).take(6);
var example = source.last();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 5
// complete
复制代码
他能够在某件事情发生时,让一个 observable 直送出 完成(complete)讯息,范例以下
var source = interval(1000);
var click = fromEvent(document.body, 'click');
var example = source.takeUntil(click);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// complete (点击body了
复制代码
这裡咱们一开始先用 interval 创建一个 observable,这个 observable 每隔 1 秒会送出一个从 0 开始递增的数值,接著咱们用 takeUntil,传入另外一个 observable。 当 takeUntil 传入的 observable 发送值时,本来的 observable 就会直接进入完成(complete)的状态,而且发送完成讯息。也就是说上面这段程式码的行为,会先每 1 秒印出一个数字(从 0 递增)直到咱们点击 body 为止,他才会送出 complete 讯息。
一、这裡能够看到 source observable 内部每次发送的值也是 observable,这时咱们用 concatAll 就能够把 source 摊平成 example
var click = fromEvent(document.body, 'click');
var source = click.map(e => of(1,2,3));
var example = source.concatAll();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
复制代码
二、concatAll 会处理 source 先发出来的 observable,必须等到这个 observable 结束,才会再处理下一个 source 发出来的 observable,范例以下。
var obs1 = interval(1000).take(5);
var obs2 = interval(500).take(2);
var obs3 = interval(2000).take(1);
var source = of(obs1, obs2, obs3);
var example = source.concatAll();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 0
// 1
// 0
// complete
复制代码
skip用于略过前几个送出元素
var source = interval(1000);
var example = source.skip(3);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 3
// 4
// 5...
复制代码
本来从 0 开始的就会变成从 3 开始,可是记得本来元素的等待时间仍然存在,也就是说此范例第一个取得的元素须要等 4 秒。
concat 能够把多个 observable 实例合并成一个,concat和concatAll效果是同样的,区别在于 concat要传递参数,参数必须是Observable类型。范例以下:
var source = interval(1000).pipe(take(3));
var source2 = of(3)
var source3 = of(4,5,6)
var example = source.pipe(
concat(source2, source3)
);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// complete
复制代码
startWith 能够在 observable 的一开始塞要发送的元素,有点像 concat 但参数不是 observable 而是要发送的元素,使用范例以下
var source = interval(1000);
var example = source.startWith(0);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 0
// 1
// 2
// 3...
复制代码
merge使用方式和concat同样,区别就是merge处理的Observable是异步执行的,在时间序上是同时在跑。
const source1 = interval(1000).pipe(take(3));
const source2 = of(3);
const source3 = of (4,5);
const example = source1.pipe(merge(source2,source3))
example.subscribe({
next: value => {
console.log(value);
},
error: err => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 3
// 4
// 5
// 0
// 1
// 2
// complete
复制代码
delay会将observable第一次发出订阅的时间延迟,以下:
const example = interval(100).pipe(take(5),delay(1000));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
复制代码
delayWhen和delay不一样,他的延迟时间由参数函数决定,而且会将主订阅对象发出的值做为参数:
var example = interval(300).pipe(
take(5),
delayWhen(
x => empty().pipe(delay(100 * x * x)))
);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
复制代码
上边的例子会将第一次source发出的值做为参数传给delayWhen的函数做为参数,只有在参数对象中的Observable发出订阅的值,主订阅对象才会继续发出订阅的值。
debounce 在每次收到元素,他会先把元素 cache 住并等待一段时间,若是这段时间沒有收到任何元素,则把元素送出;若是这段时间又收到新的元素,则会把本来 cache 住的元素释放掉并从新计时,不断反复。
var example = interval(300).pipe(take(5),debounceTime(1000));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 4
// complete
复制代码
跟 debounce 的不一样是 throttle 会先放送出元素,等到有元素被送出就会沉默一段时间,等到时间过了又会继续发送元素,防止某个事件频繁触发,影响效率。
var example = interval(300).pipe(
take(5),
throttleTime(1000)
);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
复制代码
distinct会和已经拿到的数据比较过滤掉 重复的元素以下:
var example = from(['a', 'b', 'c', 'a', 'b']).pipe(
zip(interval(300), (x, y) => x),
distinct()
)
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// complete
复制代码
distinct第一个参数是个函数,函数返回值就是distinct比较的值:
var source = from([{ value: 'a' }, { value: 'b' }, { value: 'c' }, { value: 'a' }, { value: 'c' }]).pipe(
zip(interval(300), (x, y) => x)
)
var example = source.pipe(
distinct((x) => {
return x.value
})
)
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// {value: "a"}
// {value: "b"}
// {value: "c"}
// complete
复制代码
可是distinct底层是建立一个set来辅助去重,若是数据很大,可能致使set过大,这个时候就须要设置distinct第二个参数来刷新set,第二个 参数是个observable到发起订阅的时候就会清空set
var flushes = interval(1300);
var example = from(['a', 'b', 'c', 'a', 'c']).pipe(
zip(interval(300), (x, y) => x),
distinct(
null,flushes
)
)
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// c
// complete
复制代码
distinctUntilChanged与distinct不一样之处就是,distinctUntilChanged只会比较相邻两次输入,例子以下:
var example = from(['a', 'b', 'c', 'c', 'b']).pipe(
.zip(interval(300), (x, y) => x),
distinctUntilChanged()
)
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// b
// complete
复制代码
协调过个observable,参数Observable中有一个发生变化都会发起订阅(前提是每一个observable都有值)。 一、当conbineLatest没有传入第二个参数,返回的订阅值是个数组
// timerOne 在1秒时发出第一个值,而后每4秒发送一次
const timerOne = timer(1000, 4000);
// timerTwo 在2秒时发出第一个值,而后每4秒发送一次
const timerTwo = timer(2000, 4000);
// timerThree 在3秒时发出第一个值,而后每4秒发送一次
const timerThree = timer(3000, 4000);
// 当一个 timer 发出值时,将每一个 timer 的最新值做为一个数组发出
const combined = combineLatest(timerOne, timerTwo, timerThree);
const subscribe = combined.subscribe(latestValues => {
// 从 timerValOne、timerValTwo 和 timerValThree 中获取最新发出的值
const [timerValOne, timerValTwo, timerValThree] = latestValues;
/* 示例: timerOne first tick: 'Timer One Latest: 1, Timer Two Latest:0, Timer Three Latest: 0 timerTwo first tick: 'Timer One Latest: 1, Timer Two Latest:1, Timer Three Latest: 0 timerThree first tick: 'Timer One Latest: 1, Timer Two Latest:1, Timer Three Latest: 1 */
console.log(
`Timer One Latest: ${timerValOne}, Timer Two Latest: ${timerValTwo}, Timer Three Latest: ${timerValThree}`
);
}
);
复制代码
二、conbineLatest能够传入第二个参数,在发给Observabler进行数据处理。
var source = interval(500).take(3);
var newest = interval(300).take(6);
var example = source.combineLatest(newest, (x, y) => x + y);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// complete
复制代码
对于上面的例子来说,由于咱们这裡合并了两个 observable,因此后面的 callback function 就接收 x, y 两个参数,x 会接收从 source 发送出来的值,y 会接收从 newest 发送出来的值。 最后一个重点就是不论是 source 仍是 newest 送出值来,只要另外一方曾有送出过值(有最后的值),就会执行 callback 并送出新的值,因此这段程式是这样运行的: newest 送出了 0,但此时 source 并无送出过任何值,因此不会执行 callback。 source 送出了 0,此时 newest 最后一次送出的值为 0,把这两个数传入 callback 获得 0。 newest 送出了 1,此时 source 最后一次送出的值为 0,把这两个数传入 callback 获得 1。 newest 送出了 2,此时 source 最后一次送出的值为 0,把这两个数传入 callback 获得 2。 source 送出了 1,此时 newest 最后一次送出的值为 2,把这两个数传入 callback 获得 3。 newest 送出了 3,此时 source 最后一次送出的值为 1,把这两个数传入 callback 获得 4。 source 送出了 2,此时 newest 最后一次送出的值为 3,把这两个数传入 callback 获得 5。 source 结束,但 newest 还没结束,因此 example 还不会结束。 newest 送出了 4,此时 source 最后一次送出的值为 2,把这两个数传入 callback 获得 6。 newest 送出了 5,此时 source 最后一次送出的值为 2,把这两个数传入 callback 获得 7。 newest 结束,由于 source 也结束了,因此 example 结束。
每一个 observable 的相同 index 元素会一块儿被传入 callback
var source = interval(500).take(3);
var newest = interval(300).take(6);
var example = source.zip(newest, (x, y) => x + y);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 2
// 4
// complete
复制代码
对于上面的例子来说,zip 会等到 source 跟 newest 都送出了第一个元素,再传入 callback,下次则等到 source 跟 newest 都送出了第二个元素再一块儿传入 callback,因此运行的步骤以下: newest 送出了第一个值 0,但此时 source 并无送出第一个值,因此不会执行 callback。 source 送出了第一个值 0,newest 以前送出的第一个值为 0,把这两个数传入 callback 获得 0。 newest 送出了第二个值 1,但此时 source 并无送出第二个值,因此不会执行 callback。 newest 送出了第三个值 2,但此时 source 并无送出第三个值,因此不会执行 callback。 source 送出了第二个值 1,newest 以前送出的第二个值为 1,把这两个数传入 callback 获得 2。 newest 送出了第四个值 3,但此时 source 并无送出第四个值,因此不会执行 callback。 source 送出了第三个值 2,newest 以前送出的第三个值为 2,把这两个数传入 callback 获得 4。 source 结束 example 就直接结束,由于 source 跟 newest 不会再有对应顺位的值。
withLatestFrom和combineLatest用法很相似,withLatestFrom主要特色是只有在,主Observable发起值的时候才会发动订阅,不过若是副Observable没有发送过值,也不会发起订阅,例子以下:
var main = from('hello').pipe(
zip(interval(500), (x, y) => x)
)
var some = from([0,1,0,0,0,1]).pipe(
zip(interval(300), (x, y) => x)
)
var example = main.pipe(
withLatestFrom(some, (x, y) => {
return y === 1 ? x.toUpperCase() : x;
})
)
example.subscribe({
next: value => {
console.log(value);
},
error: err => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// h
// e
// l
// L
// O
// complete
复制代码
withLatestFrom 会在 main 送出值的时候执行 callback,但请注意若是 main 送出值时 some 以前没有送出过任何值 callback 仍然不会执行! 这裡咱们在 main 送出值时,去判断 some 最后一次送的值是否是 1 来决定是否要切换大小写,执行步骤以下: main 送出了 h,此时 some 上一次送出的值为 0,把这两个参数传入 callback 获得 h。 main 送出了 e,此时 some 上一次送出的值为 0,把这两个参数传入 callback 获得 e。 main 送出了 l,此时 some 上一次送出的值为 0,把这两个参数传入 callback 获得 l。 main 送出了 l,此时 some 上一次送出的值为 1,把这两个参数传入 callback 获得 L。 main 送出了 o,此时 some 上一次送出的值为 1,把这两个参数传入 callback 获得 O。
concatMap就是map加上concatAll
var source = fromEvent(document.body, 'click');
var example = source.pipe(
map(e => interval(1000).pipe(take(3))),
concatAll()
)
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
复制代码
转化成concatMap就是以下这样:
var source = fromEvent(document.body, 'click');
var example = source.pipe(
concatMap(
e => interval(100).pipe(take(3))
)
)
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
复制代码
mergeMap一样是mergeAll加上map
var source = fromEvent(document.body, 'click');
var example = source.pipe(
mergeMap(
e => interval(100).take(3)
)
)
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
复制代码
switch在rxjs6中只有switchMap switch对比merge和concat有个特色就是附属observable发起订阅后会马上解绑主observable。
var source = fromEvent(document.body, 'click');
var example = source.pipe(
.switchMap(
e => interval(100).pipe(take(3))
)
)
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
复制代码
var obs1 = interval(1000).pipe(take(5));
var obs2 = interval(500).pipe(take(2));
var obs3 = interval(2000).pipe(take(1));
var source = of(obs1, obs2, obs3);
var example = source.pipe(concatAll());
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 0
// 1
// 0
// complete
复制代码
上边的例子中会一个个按照顺序执行obs一、obs二、obs3
mergeAll和concatAll用法基本一致,区别在于mergeAll是并行处理Observable,实例以下:
var click = fromEvent(document.body, 'click');
var source = click.pipe(
map(_ => interval(1000)),
mergeAll()
);
source.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
复制代码
mergeAll使用特殊的一点就是mergeAll能够传递一个参数,这个参数表示最大并行处理数量,当处理的observable数量大于这个数字的时候,就须要等待在处理的observable有完成的才会分配资源处理。mergeAll(1)的效果就和concatAll效果同样。