RxJS进阶——关于流的理解和应用

RxJS是微软公司推出的响应式编程的JavaScript库。 对于它的学习,最开始个人理解是把它当成是 能优雅地解决异步问题的lodash。 随着学习的深刻,发现它采用了订阅者模式,其中也带有纯函数的思想。 直到在使用了RxJS 6以后才了解其少有人意识到的另外一面——流。html

什么是流?这里咱们不用专业术语来解释,用生活中你们熟悉的的例子来类比,好比“河流”。前端

河流有什么特色? 至少有两个特色:git

有朝向。 水往低处流,河流虽然可能会蜿蜒盘旋,可是朝向是固定的,好比我国的长江和黄河就都是由西往东流。 在RxJS中数据的流向也是固定的,就是从发送者到订阅者。基本都以下面这种形式:github

from(Promise.resolve(1)) // 流的源头
......
.subscribe(x => console.log(x)); // 流的终点
复制代码

有分支。编程

大的河流通常有干流和支流,大大小小的支流汇入干流。bash

RxJS中的数据则能够经过操做符将数据流进行聚合或拆分。前端工程师

// 流的聚合
mergeMap(from(Promise.resolve(1)), from(Promise.resolve(2)))
......
.subscribe(x => console.log(x))

// 流的拆分
const obs$ = from(Promise.resolve(1)
obs$.subscribe(x => console.log(x))
obs$.subscribe(x => {
  // do sth
})/
复制代码

RxJS 6 相对于 RxJS 5(这里指5.5如下的版本,由于pipe函数在RxJS 5.5中做为新特性已被引入。) 来讲不只修改了一部分操做符的名称,同时作了一个较大的改动,引入了管道(pipe)。这个改动到底有多大?异步

首先是写法上的变化。 RxJS 5的这种操做符的调用方式有没有一种似曾相识的感受? 是的,它看上去很像JQuery那种意大利面条式的链式调用 而RxJS 6和Gulp的写法有些像,想一想Gulp是什么?基于流的构建工具!函数

// RxJS 5 伪代码
myObservable
  .map(data => data * 2)
  .switchMap(...)
  .throttle(...))
  .subscribe(...);

// RxJS 6 伪代码
myObservable
  .pipe(map(data => data * 2), switchMap(...), throttle(...))
  .subscribe(...);
复制代码

这种写法上的变化就带来了用法上的变化,之前的固定“河流”能够经过“管道”(pipe)来控制造成灵活的“水流”。工具

下面举个例子来更加形象地阐述加入管道以后流的灵活性。

如今有一个这样的业务场景: 点击按钮以后发送一个请求,让服务端开始执行任务,而后轮询发送请求查询任务执行状态,根据不一样状态进行不一样操做。有3种状态"controlling"——继续轮询, "stop"——中止轮询,"finish"——中止轮询,并进行后续操做。

不考虑判断条件,伪代码是下面这样子:

// 开始任务
start$().pipe(
  switchMap(() => interval(1000)), // 开始轮询
  switchMap(() => getStatus$()), // 查询状态
  )
.subscribe(x => {
  // 后续操做
})
复制代码

这段代码有一个问题没有解决,根据状态进行相应操做。 先来看看这3种状态对应的操做。

  • controlling。继续轮询很好处理,不进行任何操做便可。
  • stop。中止轮询的操做符有3个:take,须要固定次数,这个次数无法预先肯定。takeUntil,须要建立一个额外的subject来进行中止,应该能够实现,不过代码量比较大。takeWhile,只需简单的逻辑判断便可,比较合适。
  • finish。问题来了,如过咱们在管道操做符中判断状态的并中止流的话,那么订阅者将没法收到消息,意味着后续操做没法执行。

解决方法就是把后续操做放到管道中。代码以下:

// 开始任务
start$().pipe(
  switchMap(() => interval(1000)), // 开始轮询
  switchMap(() => getStatus$()), // 查询状态
  filter(x => x==='stop' || x==='finish') // 'controlling'状态下继续轮询,其它状态进行对应操做
  takeWhile(x => x!=='stop') // 当为'stop'时结束轮询
  tap(() => {
    // 后续操做
  })
  takeWhile(() => false) // 操做完成结束轮询
  )
.subscribe();
复制代码

如今需求变化了,在另外一段代码中,咱们也要经过查询状态并根据状态进行,可是再也不须要开始任务和轮询了。 那么上面的代码查询和操做部分能够利用pipe方法抽取出来。

const handle = pipe(
  switchMap(() => getStatus$()), // 查询状态
  filter(x => x==='stop' || x==='finish') // 'controlling'状态下继续轮询,其它状态进行对应操做
  takeWhile(x => x!=='stop') // 当为'stop'时结束轮询
  tap(() => {
    // 后续操做
  })
  takeWhile(() => false) // 操做完成结束轮询
)
start$().pipe(
  switchMap(() => interval(1000)), // 开始轮询
  handle
).subscribe();
// 直接复用查询状态代码和后续操做部分代码
other.pipe(
  handle
).subscribe()
复制代码

总结一下。RxJS比较完整的理解应该是基于流的订阅者模式,而流的灵活性体如今可拆分和聚合,有了pipe管道的加入,流的可复用性加强,所以更容易对代码逻辑进行抽象。

原文连接:tech.gtxlab.com/rxjs-stream…


做者信息:朱德龙,人和将来高级前端工程师。

相关文章
相关标签/搜索