RxJS v6 学习指南

为何要使用 RxJS

RxJS 是一套处理异步编程的 API,那么我将从异步讲起。css

前端编程中的异步有:事件(event)、AJAX、动画(animation)、定时器(timer)。html

异步常见的问题

  • 回调地狱(Callback Hell)
  • 竞态条件(Race Condition)
  • 内存泄漏(Memory Leak)
  • 管理复杂状态(Manage Complex States)
  • 错误处理(Exception Handling)

回调地狱就是指层层嵌套的回调函数,形成代码难以理解,而且难以协调组织复杂的操做。前端

竞态条件出现的缘由是没法保证异步操做的完成会和他们开始时的顺序同样,所以最终结果不可控。好比常见的 AutoComplete 效果,每次输入后向后端发送请求获取结果展现在搜索框下面,因为网络、后端数据查询等缘由有可能出现最后发送的请求比以前的请求更快地完成了,这时最终展示的并非最后那个请求的结果,而这并非咱们所但愿的。vue

这里说的内存泄漏指的是单页应用切换页面时因为忘记在合适的时机移除监听事件形成的内存泄漏。react

异步带来了状态的改变,可能会使状态管理变得很是复杂,尤为是某个状态有多个来源时,好比有些应用,一开始有一个默认值,再经过 AJAX 获取初始状态,存储在 localStorage,以后经过 WebSocket 获取更新。这时查询状态多是同步或者异步的,状态的变动多是主动获取也多是被动推送的,若是还有各类排序、筛选,状态管理将会更加复杂。ios

JavaScript 中的 try/catch 只能捕获同步的错误,异步的错误不易处理。git

Promise

使用 Promise 能够减轻一些异步问题,如将回调函数变为串行的链式调用,统一同步和异步代码等,async/await 中也可使用 try/catch 来捕获错误。可是对于复杂的场景,仍然难于处理。并且 Promise 还有其余的问题,一是只有一个结果,二是不能够取消。github

异步 API:

异步编程时不只要面对这些问题,还有下面这些使用方式各异的 API:web

  • DOM Events
  • XMLHttpRequest
  • fetch
  • WebSocket
  • Service Worker
  • setTimeout
  • setInterval
  • requestAnimationFrame

而若是使用 RxJS,能够用统一的 API 来进行处理,并且借助 RxJS 各类强大的操做符,咱们能够更简单地实现咱们的需求。ajax

认识 RxJS

什么是 RxJS

咱们都知道 JS 是什么,那么什么是 Rx 呢?Rx 是 Reactive Extension(也叫 ReactiveX)的简称,指的是实践响应式编程的一套工具,Rx 官网首页的介绍是一套经过可监听流来作异步编程的 API(An API for asynchronous programming with observable streams)。

Rx 最先是由微软开发的 LinQ 扩展出来的开源项目,以后由开源社区维护,有多种语言的实现,如 Java 的 RxJava,Python 的 RxPY 等,而 RxJS 就是 Rx 的 JavaScript 语言实现。

RxJS 的两种编程思想

RxJS 引入了两种重要的编程思想:函数式编程和响应式编程。

函数式编程(Functional Programming,简称 FP)是一种编程范式,强调使用函数来思考问题、编写代码。

In computer science, functional programming is a programming paradigm—a style of building the structure and elements of computer programs—that treats computation as the evaluation of mathematical functions and avoids changing-state and mutable data.

函数式编程的主要设计点在于避免使用状态和可变的数据,即 stateless and immutable。

函数式编程对函数的使用有一些特殊要求:

  • 声明式(Declarative)
  • 纯函数(Pure Function)
  • 数据不可变性(Immutability)

声明式的函数,让开发者只须要表达”想要作什么”,而不须要表达“怎么去作”。

纯函数指的是执行结果由输入参数决定,参数相同时结果相同,不受其余数据影响,而且不会带来反作用(Side Effect)的函数。反作用指的是函数作了和自己运算返回值没有关系的事情,如修改外部变量或传入的参数对象,甚至是执行 console.log 都算是 Side Effect。前端中常见的反作用有发送 http 请求、操做 DOM、调用 alert 或者 confirm 函数等。知足纯函数的特性也叫作引用透明度(Referential Transparency)。

数据不可变就是指这个数据一旦产生,它的值就永远不会变。JavaScript 中字符串类型和数字类型就是不可改变的,而对象基本都是可变的,可能会带来各类反作用。如今有各类库能够实现 Immutable 特性,如 immutable.jsimmer.js

中文维基上说响应式编程(Reactive Programming)是一种面向数据流(stream)和变化传播的编程范式。我的的理解是对数据流进行编程的一种编程范式,使用各类函数建立、组合、过滤数据流,而后经过监听这个数据流来响应它的变化。响应式编程抽象出了流这个概念,提升了代码的抽象级别,咱们不用去关心大量的实现细节,而专一于对数据流的操做。

响应式流能够认为是随着时间发出的一系列元素。响应式和观察者模式有点类似,订阅者订阅后,发布者吐出数据时,订阅者会响应式进行处理。实际上Rx 组合了观察者模式(Observer pattern )、迭代器模式(Iterator pattern)和函数式编程。

RxJS 是上面两种编程思想的结合,可是对于它是否是函数响应式编程(FRP)有比较大的争议,由于它虽然既是函数式又是响应式可是不符合早期 FRP 的定义。

RxJS 的特色

  • 数据流抽象了不少现实问题
  • 擅长处理异步问题
  • 把复杂问题分解为简单问题的组合

前端中的 DOM 事件、WebSocket 推送消息、AJAX 请求资源、动画均可以看做是数据流。

RxJS 对数据采用“推”的方式,当一个数据产生时,会将其推送给对应的处理函数,这个处理函数不用关心数据时同步产生仍是异步产生的,所以处理异步将会变得很是简单。

RxJS 中不少操做符,每一个操做符都提供了一个小功能,学习 RxJS 最重要的就是学习如何组合操做符来解决复杂问题。

RxJS 入门

RxJS 使用

RxJS 仓库如今移到了 ReactiveX 组织下,最新的大版本为 6,与以前的版本相比有许多破坏性变动,请注意。

RxJS 的 import 路径有如下 5 种:

  1. 建立 Observable 的方法、types、schedulers 和一些工具方法

    import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent, SubscriptionLike, PartialObserver } from 'rxjs';

  2. 操做符 operators

    import { map, filter, scan } from 'rxjs/operators';

  3. webSocket

    import { webSocket } from 'rxjs/webSocket';

  4. ajax

    import { ajax } from 'rxjs/ajax';

  5. 测试

    import { TestScheduler } from 'rxjs/testing';

本文全部 demo 均在 v6.2.1 中测试过

一个简单的例子

import { fromEvent } from 'rxjs';
import { take } from 'rxjs/operators';

const eleBtn = document.querySelector('#btn')
const click$ = fromEvent(eleBtn, 'click')

click$.pipe(take(1))
  .subscribe(e => {
    console.log('只可点击一次')
    eleBtn.setAttribute('disabled', '')
  })

这里演示了 RxJS 的大概用法,经过 fromEvent 将点击事件转换为 RxJS 的 Observable (响应式数据流),take(1) 表示只操做一次,观察者经过订阅(subscribe)来响应变化。具体 API 的使用会在后面讲到。

演示地址

表明流的变量用 $ 符号结尾,是 RxJS 中的一种惯例。

RxJS 要点

RxJS 有一个核心和三个重点,一个核心是 Observable 再加上相关的 Operators,三个重点分别是 Observer、Subject、Schedulers。

什么是 Observable

我的认为在文档中说的 Observable 更确切的说法是 Observable Stream,也就是 Rx 的响应式数据流。

在 RxJS 中 Observable 是可被观察者,观察者则是 Observer,它们经过 Observable 的 subscribe 方法进行关联。

前面提到了 RxJS 结合了观察者模式和迭代器模式。

对于观察者模式,咱们其实比较熟悉了,好比各类 DOM 事件的监听,也是观察者模式的一种实践。核心就是发布者发布事件,观察者选择时机去订阅(subscribe)事件。

在 ES6 中,Array、String 等可遍历的数据结构原生部署了迭代器(Iterator )接口。

const numbers = [1, 2, 3]
const iterator = numbers[Symbol.iterator]()
iterator.next() // {value: 1, done: false}
iterator.next() // {value: 2, done: false}
iterator.next() // {value: 3, done: false}
iterator.next() // {value: undefined, done: true}

观察者模式和迭代器模式的相同之处是二者都是渐进式使用数据的,只不过从数据使用者的角度来讲,观察者模式数据是推送(push)过来的,而迭代器模式是本身去拉取(pull)的。Rx 中的数据是 Observable 推送的,观察者不须要主动去拉取。

Observable 与 Array 至关相似,均可以看做是 Collection,只不过 Observable 是 a collection of items over time,是随时间发出的一序列元素,因此下面咱们会看到 Observable 的一些操做符与 Array 的方法极其类似。

建立 Observable

要建立一个 Observable,只要给 new Observable 传递一个接收 observer 参数的回调函数,在这个函数中去定义如何发送数据。

import { Observable } from 'rxjs';

const source$ = new Observable(observer => {
  observer.next(1)
  observer.next(2)
  observer.next(3)
})

const observer = {
  next : item => console.log(item)
}

console.log('start')
source$.subscribe(observer)
console.log('end')

上面的代码经过 new Observable 建立了一个 Observable,调用它的 subscribe 方法进行订阅,执行结果为依次输出 'start',1,2,3,'end'。

下面咱们再看一个异步的例子:

import { Observable } from 'rxjs';
    
const source$ = new Observable(observer => {
  let number = 1
  setInterval(() => {
    observer.next(number++)
  }, 1000)
})

const observer = {
  next : item => console.log(item)
}

console.log('start')
source$.subscribe(observer)
console.log('end')

先输出 ’start' 、'end',而后每隔 1000 ms 输出一个递增的数字。

经过这两个小例子,咱们知道 RxJS 既能处理同步的行为,也能处理异步的。

观察者 Observer

观察者 Observer 是一个有三个方法的对象:

  • next: 当 Observable 发出新的值时被调用,接收这个值做为参数
  • complete:当 Observable 完结,没有更多数据时被调用。complete 以后,next 方法无效
  • error:当 Observable 内部发生错误时被调用,以后不会调用 complete,next 方法无效

    const source$ = new Observable(observer => {
      observer.next(1)
      observer.next(2)
      observer.complete()
      observer.next(3)
    })
    
    const observer = {
      next: item => console.log(item),
      complete: () => console.log('complete')
    }
    
    source$.subscribe(observer)

上面的代码会输出 1,2,'complete',而不会输出 3。

const source$ = new Observable(observer => {
  try {
    observer.next(1)
    observer.next(2)
    throw new Error('there is an exception')
    observer.complete()
  } catch (e) {
    observer.error(e)
  }
})

const observer = {
  next: item => console.log(item),
  error: e => console.log(e),
  complete: () => console.log('complete')
}

source$.subscribe(observer)

注意 error 以后不会再调用 complete。

Observer 还有简单形式,即不用构建一个对象,而是直接把函数做为 subscribe 方法的参数。

source$.subscribe(
  item => console.log(item),
  e => console.log(e),
  () => console.log('complete')
)

参数依次为 next 、error、complete,后面两个参数能够省略。

延迟执行(lazy evaluation)

咱们传给 new Observable 的回调函数若是没有订阅是不会执行的,订阅一个 Observable 就像是执行一个函数,和下面的函数相似。这和咱们常见的那种内部保存有观察者列表的观察者模式是不一样的,Observable 内部没有这个观察者列表。

function subscribe (observer) {
  let number = 1
  setInterval(() => {
    observer.next(number++)
  }, 1000)
}

subscribe({
    next: item => console.log(item),
    error: e => console.log(e),
    complete: () => console.log('complete')
})

退订(unsubscribe)

观察者想退订,只要调用订阅返回的对象的 unsubscribe 方法,这样观察者就不再会接受到 Observable 的信息了。

const source$ = new Observable(observer => {
  let number = 1
  setInterval(() => {
    observer.next(number++)
  }, 1000)
})

const observer = {
  next : item => console.log(item)
}

const subscription = source$.subscribe(observer)

setTimeout(() => {
  subscription.unsubscribe()
}, 5000)

操做符

在 RxJS 中,操做符是用来处理数据流的。咱们每每须要对数据流作一系列处理,才交给 Observer,这时一个操做符就像一个管道同样,数据进入管道,完成处理,流出管道。

import { interval } from 'rxjs';
import { map } from 'rxjs/operators'
    
const source$ = interval(1000).pipe(
  map(x => x * x)
)

source$.subscribe(x => console.log(x))

interval 操做符创造了一个数据流,interval(1000) 会产生一个每隔 1000 ms 就发出一个从 0 开始递增的数据。map 操做符和数组的 map 方法相似,能够对数据流进行处理。具体见演示地址

这个 map 和数组的 map 方法会产生新的数组相似,它会产生新的 Observable。每个操做符都会产生一个新的 Observable,不会对上游的 Observable 作任何修改,这彻底符合函数式编程“数据不可变”的要求。

上面的 pipe 方法就是数据管道,会对数据流进行处理,上面的例子只有一个 map 操做符进行处理,能够添加更多的操做符做为参数。

弹珠图

弹珠图(Marble diagrams)就是用图例形象地表示 Observable 和各类操做符的一种方法。

用 - 表示一小段时间,X 表明有错误发生, | 表示结束,() 表示同步发生。

上面的例子能够以下表示:

source: -----0-----1-----2-----3--...
        map(x => x * x)
newest: -----0-----1-----4-----9--...

具体关于弹珠图的使用能够查看这个网站http://rxmarbles.com/

建立 Observable

建立 Observable 的这些方法就是用来建立 Observable 数据流的,注意和操做符不一样,它们是从 rxjs 中导入的,而不是 rxjs/operators

of 方法

以前咱们写的这种形式:

const source$ = new Observable(observer => {
  observer.next(1)
  observer.next(2)
  observer.next(3)
  observer.complete()
})

使用 of 方法将会很是简洁:

import {of} from 'rxjs'
const source$ = of(1, 2, 3)

from 方法

上面的代码用 from 则是这样:

import {from} from 'rxjs'
const source$ = from([1, 2, 3])

from 能够将可遍历的对象(iterable)转化为一个 Observable,字符串也部署有 iterator 接口,因此也支持。

from 还能够根据 promise 建立一个 Observable。咱们用 fetch 或者 axios 等类库发送的请求都是一个 promise 对象,咱们可使用 from 将其处理为一个 Observable 对象。

fromEvent 方法

用 DOM 事件建立 Observable,第一个参数为 DOM 对象,第二个参数为事件名称。具体示例见前面 RxJS 入门章节的一个简单例子。

fromEventPattern 方法

将添加事件处理器、删除事件处理器的 API 转化为 Observable。

function addClickHandler (handler) {
  document.addEventListener('click', handler)
}
 
function removeClickHandler (handler) {
  document.removeEventListener('click', handler)
}
 
fromEventPattern(
  addClickHandler,
  removeClickHandler
).subscribe(x => console.log(x))

也能够是咱们本身实现的和事件相似,拥有注册监听和移除监听的 API。

import { fromEventPattern } from 'rxjs'

class EventEmitter {
  constructor () {
    this.handlers = {}
  }
  on (eventName, handler) {
    if (!this.handlers[eventName]) {
      this.handlers[eventName] = []
    }
    if(typeof handler === 'function') {
        this.handlers[eventName].push(handler)
    } else {
        throw new Error('handler 不是函数!!!')
    }
  }
  off (eventName, handler) {
    this.handlers[eventName].splice(this.handlers[eventName].indexOf(handler), 1)
  }
  emit (eventName, ...args) {
    this.handlers[eventName].forEach(handler => {
      handler(...args)
    })
  }
}

const event = new EventEmitter()

const subscription = fromEventPattern(
  event.on.bind(event, 'say'), 
  event.off.bind(event, 'say')
).subscribe(x => console.log(x))

let timer = (() => {
  let number = 1
  return setInterval(() => {
    if (number === 5) {
      clearInterval(timer)
      timer = null
    }
    event.emit('say', number++)
  }, 1000)
})()

setTimeout(() => {
  subscription.unsubscribe()
}, 3000)

演示地址

interval、timer

interval 和 JS 中的 setInterval 相似,参数为间隔时间,下面的代码每隔 1000 ms 会发出一个递增的整数。

interval(1000).subscribe(console.log)
// 0
// 1
// 2
// ...

timer 则能够接收两个参数,第一个参数为发出第一个值须要等待的时间,第二个参数为以后的间隔时间。第一个参数能够是数字,也能够是一个 Date 对象,第二个参数可省。

range

操做符 of 产生较少的数据时能够直接写如 of(1, 2, 3),可是若是是 100 个呢?这时咱们可使用 range 操做符。

range(1, 100) // 产生 1 到 100 的正整数

empty、throwError、never

empty 是建立一个当即完结的 Observable,throwError 是建立一个抛出错误的 Observable,never 则是建立一个什么也不作的 Observable(不完结、不吐出数据、不抛出错误)。这三个操做符单独用时没有什么意义,主要用来与其余操做符进行组合。目前官方不推荐使用 empty 和 never 方法,而是推荐使用常量 EMPTY 和 NEVER(注意不是方法,已是一个 Observable 对象了)。

defer

defer 建立的 Observable 只有在订阅时才会去建立咱们真正想要操做的 Observable。defer 延迟了建立 Observable,而又有一个 Observable 方便咱们去订阅,这样也就推迟了占用资源。

defer(() => ajax(ajaxUrl))

只有订阅了才会去发送 ajax 请求。

操做符

操做符其实看做是处理数据流的管道,每一个操做符实现了针对某个小的具体应用问题的功能,RxJS 编程最大的难点其实就是如何去组合这些操做符从而解决咱们的问题。

在 RxJS 中,有各类各样的操做符,有转化类、过滤类、合并类、多播类、错误处理类、辅助工具类等等。通常不须要本身去实现操做符,可是咱们须要知道操做符是一个函数,实现的时候必须考虑如下功能:

  1. 返回一个全新的 Observable 对象
  2. 对上游和下游的订阅和退订处理
  3. 处理异常状况
  4. 及时释放资源

pipeable 操做符

以前版本的 RxJS 各类操做符都挂载到了全局 Observable 对象上,能够这样链式调用:

source$.filter(x => x % 2 === 0).map(x => x * 2)

如今须要这样使用:

import {filter, map} from 'rxjs/operators'

source$.pipe(
  filter(x => x % 2 === 0),
  map(x => x * 2)
)

其实也很好理解,pipe 就是管道的意思,数据流经过操做符处理,流出而后交给下一个操做符。

几个相似数组方法的基础操做符

map、filter 和数组的 map、filter 方法相似,scan 则是和 reduce 方法相似,mapTo 是将全部发出的数据映射到一个给定的值。

import {mapTo} from 'rxjs/operators'

fromEvent(document, 'click').pipe(
  mapTo('Hi')
).subscribe(x => console.log(x))

每次点击页面时都会输出 Hi。

一些过滤的操做符

  • take 是从数据流中选取最早发出的若干数据
  • takeLast 是从数据流中选取最后发出的若干数据
  • takeUntil 是从数据流中选取直到发生某种状况前发出的若干数据
  • first 是得到知足判断条件的第一个数据
  • last 是得到知足判断条件的最后一个数据
  • skip 是从数据流中忽略最早发出的若干数据
  • skipLast 是从数据流中忽略最后发出的若干数据

    import { interval } from 'rxjs';
    import { take } from 'rxjs/operators';
    
    interval(1000).pipe(
      take(3)
    ).subscribe(
      x => console.log(x),
      null,
      () => console.log('complete')
    )
    // 0
    // 1
    // 2
    // 'complete'

使用了 take(3),表示只取 3 个数据,Observable 就进入完结状态。

import { interval, fromEvent } from 'rxjs'
import { takeUntil } from 'rxjs/operators'

interval(1000).pipe(
  takeUntil(fromEvent(document.querySelector('#btn'), 'click'))
).subscribe(
  x => { document.querySelector('#time').textContent = x + 1 },
  null,
  () => console.log('complete')
)

这里有一个 interval 建立的数据流一直在发出数据,直到当用户点击按钮时中止计时,见演示

合并类操做符

合并类操做符用来将多个数据流合并。

1)concat、merge

concat、merge 都是用来把多个 Observable 合并成一个,可是 concat 要等上一个 Observable 对象 complete 以后才会去订阅第二个 Observable 对象获取数据并把数据传给下游,而 merge 时同时处理多个 Observable。使用方式以下:

import { interval } from 'rxjs'
import { merge, take } from 'rxjs/operators'

interval(500).pipe(
  take(3),
  merge(interval(300).pipe(take(6)))
).subscribe(x => console.log(x))

能够点此去比对效果,concat 的结果应该比较好理解,merge 借助弹珠图也比较好理解,它是在时间上对数据进行了合并。

source : ----0----1----2|
source2: --0--1--2--3--4--5|
            merge()
example: --0-01--21-3--(24)--5|

merge 的逻辑相似 OR,常常用来多个按钮有部分相同行为时的处理。

注意最新的官方文档RxJS v5.x 到 6 的更新指南中指出不推荐使用 merge、concat、combineLatest、race、zip 这些操做符方法,而是推荐使用对应的静态方法。

将上面的 merge 改为从 rxjs 中导入,使用方式变成了合并多个 Observable,而不是一个 Observable 与其余 Observable 合并。

import { interval,merge } from 'rxjs'
import { take } from 'rxjs/operators'

merge(
  interval(500).pipe(take(3)),
  interval(300).pipe(take(6))
).subscribe(x => console.log(x))

2)concatAll、mergeAll、switchAll

用来将高阶的 Observable 对象压平成一阶的 Observable,和 loadash 中压平数组的 flatten 方法相似。concatAll 会对内部的 Observable 对象作 concat 操做,和 concat 操做符相似,若是前一个内部 Observable 没有完结,那么 concatAll 不会订阅下一个内部 Observable,mergeAll 则是同时处理。switchAll 比较特殊一些,它老是切换到最新的内部 Observable 对象获取数据。上游高阶 Observable 产生一个新的内部 Observable 时,switchAll 就会当即订阅最新的内部 Observable,退订以前的,这也就是 ‘switch’ 的含义。

import { interval } from 'rxjs';
import { map, switchAll, take } from 'rxjs/operators';

interval(1500).pipe(
  take(2),
  map(x => interval(1000).pipe(
    map(y => x + ':' + y), 
    take(2))
  ),
  switchAll()
).subscribe(console.log)

// 0:0
// 1:0
// 1:1

内部第一个 Observable 对象的第二个数据还没来得及发出,第二个 Observable 对象就产生了。

3)concatMap、mergeMap、switchMap

从上面的例子咱们也能够看到高阶 Observable 经常是由 map 操做符将每一个数据映射为 Observable 产生的,而咱们订阅的时候须要将其压平为一阶 Observable,而就是要先使用 map 操做符再使用 concatAll 或 mergeAll 或 switchAll 这些操做符中的一个。RxJS 中提供了对应的更简洁的 API。使用的效果能够用下面的公式表示:

concatMap = map + concatAll
mergeMap = map + mergeAll
switchMap = map + switchAll

4)zip、combineLatest、withLatestFrom

zip 有拉链的意思,这个操做符和拉链的类似之处在于数据必定是一一对应的。

import { interval } from 'rxjs';
import { zip, take } from 'rxjs/operators';
const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

source$.pipe(
  zip(newest$, (x, y) => x + y)
).subscribe(x => console.log(x))
// 0
// 2
// 4

zip 是内部的 Observable 都发出相同顺序的数据后才交给下游处理,最后一个参数是可选的 resultSelector 参数,这个函数用来处理操做符的结果。上面的示例运行过程以下:

  1. newest 发出第一个值 0,但这时 source 尚未发出第一个值,因此不执行 resultSelector 函数也不会像下游发出数据
  2. source 发出第一个值 0,此时 newest 以前已发出了第一个值 0,执行 resultSelector 函数获得结果 0,发出这个结果
  3. newest 发出第二个值 1,但这时 source 尚未发出第二个值,因此不执行 resultSelector 函数也不会像下游发出数据
  4. newest 发出第三个值 2,但这时 source 尚未发出第三个值,因此不执行 resultSelector 函数也不会像下游发出数据
  5. source 发出第二个值 1,此时 newest 以前已发出了第一个值 1,执行 resultSelector 函数获得结果 2,发出这个结果
  6. newest 发出第四个值 3,但这时 source 尚未发出第四个值,因此不执行 resultSelector 函数也不会像下游发出数据
  7. source 发出第三个值 2,此时 newest 以前已发出了第一个值 2,执行 resultSelector 函数获得结果 4,发出这个结果
  8. source 完结,不可能再有对应的数据了,整个 Observable 完结

上面若是没有传递最后一个参数 resultSelector 函数,将会依次输出数组 [0, 0]、[1, 1]、[2, 2]。在更新指南中,官方指出不推荐使用 resultSelector 参数,将会在 v7 中移除。加上以前提到的推荐使用静态方法,这个示例应该改为这样:

import { interval, zip } from 'rxjs';
import { take, map } from 'rxjs/operators';

const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

const add = (x, y) => x + y

zip(source$, newest$).pipe(
  map(x => add(...x))
).subscribe(x => console.log(x))

使用 zip 当有数据流吐出数据很快,而有数据流发出值很慢时,要当心数据积压的问题。这时快的数据流已经发出了不少数据,因为对应的数据还没发出,RxJS 只能保存数据,快的数据流不断地发出数据,积压的数据愈来愈多,消耗的内存也会愈来愈大。

combineLatest 与 zip 不一样,只要其余的 Observable 已经发出过值就行,顾名思义,就是与其余 Observable 最近发出的值结合。

import { interval, combineLatest } from 'rxjs';
import { take } from 'rxjs/operators';

const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

combineLatest(source$, newest$).subscribe(x => console.log(x))
// [0, 0]
// [0, 1]
// [0, 2]
// [1, 2]
// [1, 3]
// [2, 3]
// [2, 4]
// [2, 5]

withLatestFrom 没有静态方法,只有操做符方法,前面的方法全部 Observable 地位是平等的,而这个方法是使用这个操做符的 Observable 起到了主导做用,即只有它发出值才会进行合并产生数据发出给下游。

import { interval } from 'rxjs';
import { take, withLatestFrom } from 'rxjs/operators';

const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

source$.pipe(
  withLatestFrom(newest$)
).subscribe(x => console.log(x))
// [0, 0]
// [1, 2]
// [2, 4]
  1. source 发出 0 时,newest 最新发出的值为 0,结合为 [0, 0] 发出
  2. source 发出 1,此时 newest 最新发出的值为 2,结合为 [1, 2] 发出
  3. source 发出 2,此时 newest 最新发出的值为 4,结合为 [2, 4] 发出
  4. source 完结,整个 Observable 完结

5)startWith、forkJoin、race

startWith 是在 Observable 的一开始加入初始数据,同步当即发送,经常使用来提供初始状态。

import { fromEvent, from } from 'rxjs';
import { startWith, switchMap } from 'rxjs/operators';

const source$ = fromEvent(document.querySelector('#btn'), 'click')

let number = 0
const fakeRequest = x => {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(number++)
    }, 1000)
  })
}

source$.pipe(
  startWith('initData'),
  switchMap(x => from(fakeRequest(x)))
).subscribe(x => document.querySelector('#number').textContent = x)

这里经过 startWith 操做符获取了页面的初始数据,以后经过点击按钮获取更新数据。

forkJoin 只有静态方法形式,相似 Promise.all ,它会等内部全部 Observable 都完结以后,将全部 Observable 对象最后发出来的最后一个数据合并成 Observable。

race 操做符产生的 Observable 会彻底镜像最早吐出数据的 Observable。

const obs1 = interval(1000).pipe(mapTo('fast one'));
const obs2 = interval(3000).pipe(mapTo('medium one'));
const obs3 = interval(5000).pipe(mapTo('slow one'));

race(obs3, obs1, obs2)
.subscribe(
  winner => console.log(winner)
);

// result:
// a series of 'fast one'

一个小的练习

本文中的例子基原本自30 天精通 RxJS,使用 RxJS v6 版本进行重写。

页面上有一个 p 标签存放一个状态,初始为 0,有两个按钮,一个按钮点击后这个状态增长 1,另外一个按钮点击后这个状态减小 1。

<button id="addButton">Add</button>
<button id="minusButton">Minus</button>
<p id="state"></p>

这两个按钮的点击事件咱们均可以创建响应式数据流,可使用 mapTo(1) 和 mapTo(-1) 分别表示点击后增长 1 和减小 1。咱们可使用 EMPTY 建立一个空的数据流来表示这个状态,用 startWith 设定初始值。而后 merge 这两个点击的数据流,可是这还有一个问题,点击事件的数据流须要与表示状态的数据流进行逻辑计算,发出最终的状态,咱们才能去订阅这个最终的数据流来更改页面的显示。而这种累计计算的方法,能够用 scan 操做符来实现。最终实现以下:

import { fromEvent, EMPTY, merge } from 'rxjs'
import { mapTo, startWith, scan } from 'rxjs/operators'

const addButton = document.getElementById('addButton')
const minusButton = document.getElementById('minusButton')
const state = document.getElementById('state')

const addClick$ = fromEvent(addButton, 'click').pipe(mapTo(1))
const minusClick$ = fromEvent(minusButton, 'click').pipe(mapTo(-1))

merge(
  EMPTY.pipe(startWith(0)),
  addClick$, 
  minusClick$)
.pipe(
  scan((origin, next) => origin + next)
).subscribe(item => {
  state.textContent = item
})

查看演示

简单拖拽

页面上有一个 id 为 drag 的 div:

<div id="drag"></div>

页面 css:

html, body {
  height: 100%;
  background-color: tomato;
  position: relative;
}

#drag {
  position: absolute;
  width: 100px;
  height: 100px;
  background-color: #fff;
  cursor: all-scroll;
}

要实现的功能以下:

  1. 当在这个 div 上按下鼠标左键(mousedown)时,开始监听鼠标移动(mousemove)位置
  2. 当鼠标松开(mouseup)时,结束监听鼠标移动
  3. 当鼠标移动被监听时,更新 div 样式来实现拖拽效果

实现思路:

  1. 咱们可使用 fromEvent 去转化 DOM 事件

    const mouseDown$ = fromEvent(eleDrag, 'mousedown')
    const mouseMove$ = fromEvent(eleBody, 'mousemove')
    const mouseUp$ = fromEvent(eleBody, 'mouseup')
  2. 对于鼠标按下这个数据流,每次鼠标按下事件发生时都转成鼠标移动的数据流

    mouseDown$.pipe(
      map(mouseDownEvent => mouseMove$)
    )
  3. 鼠标松开时,结束监听鼠标移动,咱们能够用 takeUntil 表示这个逻辑

    mouseDown$.pipe(
      map(mouseDownEvent => mouseMove$.pipe(
        takeUntil(mouseUp$)
      ))
    )
  4. 上面的 map 操做符内将每次 mousedown 映射为一个 Observable,造成了高阶 Observable,咱们须要用 concatlAll 压平,map 和 concatAll 连用,能够用更简洁的 concatMap

    mouseDown$.pipe(
      concatMap(mouseDownEvent => mouseMove$.pipe(
        takeUntil(mouseUp$)
      ))
    )
  5. 订阅这个 mousemove 数据流更新 div 位置。咱们能够获取 mousemove event 中的 clientX 和 clientY,减去初始鼠标按下时鼠标相对 div 元素的值来获得最终 div 的绝对位置的 left 和 top。也可使用 withLatestFrom 操做符,见 demo

    mouseDown$.pipe(
      concatMap(mouseDownEvent => mouseMove$.pipe(
        map(mouseMoveEvent => ({
          left: mouseMoveEvent.clientX - mouseDownEvent.offsetX,
          top: mouseMoveEvent.clientY - mouseDownEvent.offsetY
        })),
        takeUntil(mouseUp$)
      ))
    ).subscribe(position => {
      eleDrag.style.left = position.left + 'px'
      eleDrag.style.top = position.top + 'px'
    })

这里是一个更复杂一些的例子,当页面滑动到视频出页面时视频 fixed 定位,这是能够拖拽移动视频位置。经过 getValidValue 对视频拖拽的位置进行了一个限制。

缓存

把上游的多个数据缓存起来,当时机合适时再把汇聚的数据传给下游。

1)buffer、bufferTime、bufferCount、bufferWhen、bufferToggle

对于 buffer 这一组操做符,数据汇聚的形式就是数组。

buffer 接收一个 Observable 做为 notifier,当 notifier 发出数据时,将 缓存的数据传给下游。

interval(300).pipe(
  take(30),
  buffer(interval(1000))
).subscribe(
  x => console.log(x)
)
// [0, 1, 2]
// [3, 4, 5]
// [6, 7, 8]
// [9, 10, 11, 12]

bufferTime 是用时间来控制时机,上面能够改为 bufferTime(1000)

bufferCount 是用数量来控制时机,如 3 个一组,bufferCount(3)

bufferWhen 接收一个叫作 closeSelector 的参数,它应该返回一个 Observable。经过这个 Observable 来控制缓存。这个函数没有参数。下面的方法等价于前面的 buffer:

interval(300).pipe(
  take(30),
  bufferWhen(() => {
    return interval(1000)
  })
).subscribe(
  x => console.log(x)
)

bufferToggle 和 buffer 的不一样是能够不断地控制缓存窗口的开和关,一个参数是一个 Observable,称为 opening,第二个参数是称为 closeSelector 的一个函数。这个函数的参数是 opening 产生的数据。前一个参数用来控制缓存的开始时间,后一个控制缓存的结束。与 bufferWhen 相比,它的 closeSelector 能够接收参数,控制性更强。

咱们可使用 buffer 来作事件的过滤,下面的代码只有 500ms 内连续点击两次以上才会输出 ‘success’ 。

fromEvent(document.querySelector('#btn'), 'click').pipe(
  bufferTime(500),
  filter(arr => arr.length >= 2)
).subscribe(
  x => console.log('success')
)

2)window、windowTime、windowCount、windowWhen、windowToggle

与前面的 buffer 相似,不过 window 缓存数据汇聚的形式是 Observable,所以造成了高阶 Observable。

debounceTime、throttleTime

相似 lodash 的 debounce 和 throttle,用来下降事件的触发频率。

咱们作搜索时,经常要对输入进行 debounce 来减小请求频率。

fromEvent(document.querySelector('#searchInput'), 'input').pipe(
  debounceTime(300),
  map(e => e.target.value)
).subscribe(
  input => document.querySelector('#text').textContent = input
  // 发送请求
)

distinct、distinctUntilChanged

distinct 操做符能够用来去重,将上游重复的数据过滤掉。

of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1).pipe(
  zip(interval(1000)),
  map(arr => arr[0]),
  distinct()
).subscribe(x => console.log(x))

上面的代码只会输出 1, 2, 3, 4

distinct 操做符还能够接收一个 keySelector 的函数做为参数,这是官网的一个 typescript 的例子:

interface Person {
  age: number,
  name: string
}

of<Person>(
  { age: 4, name: 'Foo' },
  { age: 7, name: 'Bar' },
  { age: 5, name: 'Foo' },
).pipe(
  distinct((p: Person) => p.name),
).subscribe(x => console.log(x))
 
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }

distinctUntilChanged 也是过滤重复数据,可是只会与上一次发出的元素比较。这个操做符比 distinct 更经常使用。distinct 要与以前发出的不重复的值进行比较,所以要在内部存储这些值,要当心内存泄漏,而 distinctUntilChanged 只用保存上一个的值。

dalay、delayWhen

用来延迟上游 Observable 数据的发出。

delay 能够接受一个数字(单位默认为 ms)或者 date 对象做为延迟控制。

const clicks = fromEvent(document, 'click')
const delayedClicks = clicks.pipe(delay(1000)) // 全部点击事件延迟 1 秒
delayedClicks.subscribe(x => console.log(x))

咱们前面介绍过 bufferWhen,dalayWhen 也带有 when,在 RxJS 中,这种操做符它接收的参数都是 Observable Factory,即一个返回 Observable 对象的回调函数,用这个 Observable 来进行控制。

每一个 click 都延迟 0 至 5 秒之间的任意一个时间:

const clicks = fromEvent(document, 'click')
const delayedClicks = clicks.pipe(
  delayWhen(event => interval(Math.random() * 5000)),
)
delayedClicks.subscribe(x => console.log(x))

异常错误处理

异常处理的难点:

  1. try/catch 只支持同步
  2. 回调函数容易造成回调地狱,并且每一个回调函数的最开始都要判断是否存在错误
  3. Promise 不能重试,并且不强制异常被捕获

对错误处理的处理能够分为两类,即恢复(recover)和重试(retry)。

恢复是虽然发生了错误可是让程序继续运行下去。重试,是认为这个错误是临时的,重试尝试发生错误的操做。实际中每每配合使用,由于通常重试是由次数限制的,当尝试超过这个限制时,咱们应该使用恢复的方法让程序继续下去。

1)catchError

catchError 用来在管道中捕获上游传递过来的错误。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  catchError(err => of(8))
).subscribe(x => console.log(x))
// 0
// 1
// 2
// 3
// 8

catchError 中的回调函数返回了一个 Observable,当捕获到上游的错误时,调用这个函数,返回的 Observable 中发出的数据会传递给下游。所以上面当 x 为4 时发生了错误,会用 8 来替换。

catchError 中的回调函数除了接收错误对象为参数外,还有第二个参数 caught$ 表示上游的 Observable 对象。若是回调函数返回这个 Observable 对象,就会进行重试。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  catchError((err, caught$) => caught$),
  take(20)
).subscribe(x => console.log(x))

这个代码会依次输出 5 次 0, 1, 2, 3。

2)retry

retry 能够接收一个整数做为参数,表示重试次数,若是是负数或者没有传参,会无限次重试。重试实际上就是退订再从新订阅。

interval(1000).pipe(
      take(6),
      map(x => {
        if (x === 4) {
          throw new Error('unlucky number 4')
        } else {
          return x
        }
      }),
      retry(5) // 重试 5 次
    ).subscribe(x => console.log(x))

在实际开发中,若是是代码缘由形成的错误,重试没有意义,若是是由于外部资源致使的异常错误适合重试,如用户网络或者服务器偶尔不稳定的时候。

3)retryWhen

和前面带 when 的操做符同样,retryWhen 操做符接收一个返回 Observable 的回调函数,用这个 Observable 来控制重试的节奏。当这个 Observable 发出一个数据时就会进行一次重试,它完结时 retryWhen 返回的 Observable 也当即完结。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  retryWhen(err$ => err$.pipe(
    delay(1000),
    take(5))
  ) // 延迟 1 秒后重试,重试 5 次
).subscribe(x => console.log(x))

retryWhen 的可定制性很是高,不只能够实现延迟定制,还能够实现 retry 的控制重试次数。在实践中,这种重试频率固定的方法还不够好,若是以前的重试失败,以后重试成功的概率也不高。Angular 官网介绍了一个 Exponential backoff 的方法。将每次重试的延迟时间控制为指数级增加。

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';
 
function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));
 
function handleData(data) {
  // ...
}

4)finalize

返回上游数据流的镜像 Observable,当上游的 Observable 完结或出错时调用传给它的函数,不影响数据流。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  finalize(() => console.log('finally'))
).subscribe(x => console.log('a'))

tap 操做符

咱们可使用 tap 操做符来进行调试。

拦截源 Observable 的每一次发送,执行一个函数,返回源 Observable 的镜像 Observable。

这个 API 有助于咱们对 Observable 的值进行验证(debug)和执行一个会带来反作用的函数,而不会影响源 Observable。如咱们用鼠标进行 canvas 绘图,鼠标按下是开始画图,鼠标松开即中止。咱们须要在 mousedown 的时候进行 moveTo,不然此次画的会和上次画的连在一块儿。咱们应该把这个会带来反作用过程放在 tap 操做符的函数中,这样才不会影响原来的数据流。

tap 操做符和订阅并不相同,tap 返回的 Observable 若是没有被订阅,tap 中产生反作用的函数并不会执行。

其余一些操做符

1) repeat

repeat 用来重复上游 Observable

2)pluck 相似 lodash 的方法 pluck,提取对象的嵌套属性的值。

const click$ = fromEvent(document, 'click')
const tagName$ = click$.pipe(pluck('target', 'tagName'))
tagName$.subscribe(x => console.log(x))

等价于:

click$.pipe(map(e => e.target.tagName))

3)toArray

将发出的数据汇聚为数组

interval(1000).pipe(
  take(3),
  toArray()
).subscribe(x => console.log(x))
// [0, 1, 2]

4)partition

将上游的 Observable 分为两个,一个 Observable 的数据是符合断定的数据,另外一个时不符合断定的数据。

const part$ = interval(1000).pipe(
  take(6),
  partition(x => x % 2 === 0)
)

part$[0].subscribe(x => console.log(x)) // 0, 2, 4
part$[1].subscribe(x => console.log(x)) // 1, 3, 5

5) 更多操做符

RxJS 中的操做符很是多,这里只介绍了一部分,更多请查看官网 API

RxJS 最经典的例子——AutoComplete

有一个用于搜索的 input,当输入时自动发送 ajax,并在下方显示结果列表,而后能够选择结果,这就是咱们常见的 AutoComplete 效果。要实现这个效果有不少细节要考虑,如防止 race condition 和优化请求次数。

<div class="autocomplete">
    <input class="input" type="search" id="search" autocomplete="off">
    <ul id="suggest-list" class="suggest"></ul>
</div>

先获取两个 DOM 元素:

const input = document.querySelector('#search');
const suggestList = document.querySelector('#suggest-list');

咱们先将输入框的 input 的事件转化为 Observable。

const input$ = fromEvent(input, 'input');

而后咱们根据输入的值去发送 ajax 请求,因为咱们是要获取最新的值而丢弃以前 ajax 返回的值,咱们应该使用 switchMap 操做符。经过使用这个操做符,咱们解决了 race condition 问题。

input$.pipe(
  switchMap(e => from(getSuggestList(e.target.value)))
)

getSuggestList 是一个发送 ajax 请求的方法,返回 promise,咱们使用 from 来将其转化为 Observable。

为了优化请求,首先 e.target.value 是空字符串时不该该发送请求,而后可使用 debounceTime 减小触发频率,也可使用 distinctUntilChanged 操做符来表示只有与上次不一样时才去发送请求。咱们还能够在 API 失败时重试 3 次。

input$.pipe(
  filter(e => e.target.value.length > 1),
  debounceTime(300),
  distinctUntilChanged(),
    switchMap(
      e => from(getSuggestList(e.target.value)).pipe(retry(3))
    )
  )

而后咱们去订阅渲染就能够了。

对于结果列表上的点击事件,比较简单,具体见demo

操做符和数组方法

Observable 的操做符和数组的方法有类似之处,可是也有很大的不一样,体如今如下两点:

  1. 延迟运算
  2. 渐进式取值

延迟运算,咱们以前有讲到过,就是只有订阅后才会开始对元素进行运算。

由于 Observable 是时间上的集合,操做符不是像数组方法那样运算完全部元素再返回交给下一个方法,而是一个元素一直运算到底,就像管道中的水流同样,先发出的数据先通过操做符的运算。

多播

前面的例子都是只有一个订阅者的状况,实际上固然能够有多个订阅者,这就是多播(multicast),即一个数据流的内容被多个 Observable 订阅。

Hot Observable 和 Cold Observable

先思考一下下面的例子结果是什么?

const source$ = interval(1000).pipe(
  take(3)
)

source$.subscribe(x => console.log('Observer 1: ' + x))

setTimeout(() => {
  source$.subscribe(x => console.log('Observer 2: ' + x))
}, 1000)

你可能会觉得 Observer 2 一秒后才订阅,错过了数据 0,所以只会输出 1 和 2,但实际上会先输出 0。为何如此呢?这就涉及到对已错过数据的两种处理策略。

  1. 错过的就让它过去,只要订阅以后生产的数据就好
  2. 不能错过,订阅以前生产的数据也要

第一种策略相似于直播,第二种和点播类似。使用第一种策略的 Observable 叫作 Cold Observable,由于每次都要从新生产数据,是 “冷”的,须要从新发动。第二种,由于一直在生产数据,只要使用后面的数据就能够了,因此叫 Hot Observable。

RxJS 中如 interval、range 这些方法产生的 Observable 都是 Cold Observable,产生 Hot Observable 的是由 Promise、Event 这些转化而来的 Observable,它们的数据源都在外部,和 Observer 无关。

前面咱们提到 Observable 都是 lazy evaluation 的,数据管道内的逻辑只有订阅后才会执行,可是 Cold Observable 相对更 lazy 一些。Cold Observable 若是没有订阅者连数据都不会产生,对于 Hot Observable,数据仍会产生,可是不会进入管道处理。

Hot Observable 是多播,对于 Cold Observable,每次订阅都从新生产了一份数据流,因此不是多播。下面的例子更加明显,两个订阅者有很大的几率会接收到不一样的数据。

const source$ = interval(1000).pipe(
  map(x => Math.floor(Math.random() * 10)),
  take(3)
)

source$.subscribe(x => console.log('Observer 1: ' + x))

setTimeout(() => {
  source$.subscribe(x => console.log('Observer 2: ' + x))
}, 1000)

若是想要实现多播,就要使用 RxJS 中 Subject。

Subject

为了防止每次订阅都从新生产一份数据流,咱们可使用中间人,让这个中间人去订阅源数据流,观察者都去订阅这个中间人。这个中间人能去订阅数据流,因此是个 Observer,又能被观察者订阅,因此也是 Observable。咱们能够本身实现一个这样的中间人:

const subject = {
  observers: [],
  subscribe: function (observer) {
    this.observers.push(observer)
  },
  next: function (value) {
    this.observers.forEach(o => o.next(value))
  },
  error: function (error) {
    this.observers.forEach(o => o.error(error))
  },
  complete: function () {
    this.observers.forEach(o => o.complete())
  }
}

这个 subject 拥有 Observer 的 next、error、complete 方法,每次被观察者订阅时都会在内部保存这个观察者。当接收到源数据流的数据时,会把数据发送给每个观察者。

const source$ = interval(1000).pipe(
  map(x => Math.floor(Math.random() * 10)),
  take(3)
)

const observerA = {
  next: x => console.log('Observer A: ' + x),
  error: null,
  complete: () => console.log('Observer A completed')
}
const observerB = {
  next: x => console.log('Observer B: ' + x),
  error: null,
  complete: () => console.log('Observer B completed')
}

source$.subscribe(subject)
subject.subscribe(observerA)
setTimeout(() => {
  subject.subscribe(observerB)
}, 1000)

这时咱们发现两个观察者接收到的是同一份数据,ObserverB 因为延迟一秒订阅,因此少接收到一个数据。将咱们本身实现的 subject 换成 RxJS 中的 Subject,效果相同:

import { Subject } from 'rxjs'
const subject = new Subject()

从上面能够看到,Subject 和 Observable 有一个很大的不一样:它内部保存有一个观察者列表。

前面的 subject 是在源数据流发出值时调用 next 方法,向订阅的观察者发送这个值,咱们也能够手动调用 subject 的next 方法送出值:

const observerA = {
  next: x => console.log('Observer A: ' + x)
}
const observerB = {
  next: x => console.log('Observer B: ' + x)
}

const subject = new Subject()

subject.subscribe(observerA)
setTimeout(() => {
  subject.subscribe(observerB)
}, 500)

subject.next(1)
setTimeout(() => {
  subject.next(2)
}, 1000)

总结一下,Subject 既是 Observable 又是 Observer,它会对内部的 observers 清单进行组播(multicast)。

Subject 的错误处理

在 RxJS 5 中,若是 Subject 的某个下游数据流产生了错误异常,而又没有被 Observer 处理,那这个 Subject 的其余 Observer 都会失败。可是在 RxJS 6 中不会如此。

在 v6 的这个例子 中,ObserverA 没有对错误进行处理,可是并不影响 ObserverB,而在 v5 这个demo中由于 ObserverA 没有对错误进行处理,使得 ObserverB 终止了。很明显 v6 的这种处理更符合直觉。

BehaviorSubject、ReplaySubject、AsyncSubject

1)BehaviorSubject

BehaviorSubject 须要在实例化时给定一个初始值,若是没有默认是 undefined,每次订阅时都会发出最新的状态,即便已经错过数据的发送时间。

const observerA = {
  next: x => console.log('Observer A: ' + x)
}
const observerB = {
  next: x => console.log('Observer B: ' + x)
}

const subject = new BehaviorSubject(0)

subject.subscribe(observerA) // Observer A: 0

subject.next(1) // Observer A: 1
subject.next(2) // Observer A: 2
subject.next(3) // Observer A: 3

setTimeout(() => {
  subject.subscribe(observerB) // Observer B: 3
}, 500)

observerB 已经错过流数据的发送时间,可是订阅时也能获取到最新数据 3。

BehaviorSubject 有点相似于状态,一开始能够提供初始状态,以后订阅均可以获取最新的状态。

2)ReplaySubject

ReplaySubject 表示重放,在新的观察者订阅时从新发送原来的数据,能够经过参数指定重放最后几个数据。

const observerA = {
  next: x => console.log('Observer A: ' + x)
}
const observerB = {
  next: x => console.log('Observer B: ' + x)
}

const subject = new ReplaySubject(2) // 重放最后两个

subject.subscribe(observerA)

subject.next(1) // Observer A: 1
subject.next(2) // Observer A: 2
subject.next(3) // Observer A: 3
subject.complete()

setTimeout(() => {
  subject.subscribe(observerB)
  // Observer B: 2
  // Observer B: 3
}, 500)

这里咱们能够看到,即便 subject 完结后再去订阅依然能够重放最后两个数据。

ReplaySubject(1) 和前面的 BehaviorSubject 是不同的,首前后者能够提供默认数据,而前者不行,其次前者在 subject 终结后再去订阅依然能够获得最近发出的数据然后者不行。

3)AsyncSubject

AsyncSubject 有点相似 operator last,会在 subject 完结后送出最后一个值。

const subject = new AsyncSubject()

subject.subscribe(observerA)

subject.next(1)
subject.next(2)
subject.next(3)
subject.complete()
// Observer A: 3
setTimeout(() => {
  subject.subscribe(observerB)
  // Observer B: 3
}, 500)

observerA 即便早就订阅了,可是并不会响应前面的 next,完结后才接收到最后一个值 3。

多播操做符

前面咱们写的 Subject 须要去订阅源数据流和被观察者订阅,写起来比较繁琐,咱们能够借助操做符来实现。

1)multicast

使用方式以下,接收一个 subject 或者 subject factory。这个操做符返回了一个 connectable 的 Observable。等到执行 connect() 才会用真的 subject 订阅 source,并开始发送数据,若是没有 connect,Observable 是不会执行的。

const source = interval(1000).pipe(
  map(x => Math.floor(Math.random() * 10)),
  take(3),
  multicast(new Subject)
)

const observerA = {
  next: x => console.log('Observer A: ' + x),
  error: null,
  complete: () => console.log('Observer A completed')
}
const observerB = {
  next: x => console.log('Observer B: ' + x),
  error: null,
  complete: () => console.log('Observer B completed')
}

source.subscribe(observerA) // subject.subscribe(observerA)

source.connect() // source.subscribe(subject)

setTimeout(() => {
  source.subscribe(observerB) // subject.subscribe(observerB)
}, 1000)

2)refCount

上面使用了 multicast,可是仍是有些麻烦,还须要去手动 connect。这时咱们能够再搭配 refCount 操做符建立只要有订阅就会自动 connect 的 Observable。只须要去掉 connect 方法调用,在 multicast 后面再加一个 refCount 操做符。

multicast(new Subject),
refCount()

refCount 其实就是自动计数的意思,当 Observer 数量大于 1 时,subject 订阅上游数据流,减小为 0 时退订上游数据流。

3)multicast selector 参数

multicast 第一个参数除了是一个 subject,还能够是一个 subject factory,即返回 subject 的函数。这时使用了不一样的中间人,每一个观察者订阅时都从新生产数据,适用于退订了上游以后再次订阅的场景。

multicast 还能够接收可选的第二个参数,称为 selector 参数。它可使用上游数据流任意屡次,而不会重复订阅上游的数据。当使用了这个参数时,multicast 不会返回 connectable Observable,而是这个参数(回调函数)返回的 Observable。selecetor 回调函数有一个参数,一般叫作 shared,即 multicast 第一个参数所表明的 subject 对象。

const selector = shared => {
  return shared.pipe(concat(of('done')))
}
const source = interval(1000).pipe(
  take(3),
  multicast(new Subject, selector)
)

const observerA = {
  next: x => console.log('Observer A: ' + x),
  error: null,
  complete: () => console.log('Observer A completed')
}
const observerB = {
  next: x => console.log('Observer B: ' + x),
  error: null,
  complete: () => console.log('Observer B completed')
}

source.subscribe(observerA)
setTimeout(() => {
  source.subscribe(observerB)
}, 5000)
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A: done
// Observer A completed
// Observer B: done
// Observer B: completed

observerB 订阅时会调用 selector 函数,subject 即shared 已经完结,可是 concat 依然会在这个 Observable 后面加上 'done'。

能够利用 selector 处理 “三角关系”的数据流,若有一个 tick$ 数据流,对其进行 delay(500) 操做后的下游 delayTick$, 一个由它们合并获得的 mergeTick$,这时就造成了三角关系。delayTick$ 和 mergeTick$ 都订阅了 tick$。

const tick$ = interval(1000).pipe(
  take(1),
  tap(x => console.log('source: ' + x))
)

const delayTick$ = tick$.pipe(
  delay(500)
)

const mergeTick$ = merge(tick$, delayTick$).subscribe(x => console.log('observer: ' + x))
// source: 0
// observer: 0
// source: 0
// observer: 0

从上面的结果咱们能够验证,tick$ 被订阅了两次。

咱们可使用 selector 函数来使其只订阅一次,将上面的过程移到 selector 函数内便可。

const source$ = interval(1000).pipe(
  take(1),
  tap(x => console.log('source: ' + x))
)

const result$ = source$.pipe(
  multicast(new Subject(), shared => {
    const tick$ = shared
    const delayTick$ = tick$.pipe(delay(500))
    const mergeTick$ = merge(tick$, delayTick$)
    return mergeTick$
  })
)

result$.subscribe(x => console.log('observer: ' + x))

这时只会输出一次 'source: 0'。

4)publish

publish 是 multicast 的一种简写方式,效果等同于以下:

function publish (selector) {
  if (selector) {
    return multicast(() => new Subject(), selector)
  } else {
    return multicast(new Subject())
  }
}

有上一节说到的 selector 函数时,等价于:

multicast(() => new Subject(), selector)

没有时,等价于:

multicast(new Subject())

5)share

share 是 multicast 和 refCount 的简写,share() 等同于在 pipe 中先调用了 multicast(() => new Subject()),再调用了 refCount()。

const source = interval(1000).pipe(
  take(3),
  share()
)

const observerA = {
  next: x => console.log('Observer A: ' + x),
  error: null,
  complete: () => console.log('Observer A completed')
}
const observerB = {
  next: x => console.log('Observer B: ' + x),
  error: null,
  complete: () => console.log('Observer B completed')
}

source.subscribe(observerA)
setTimeout(() => {
  source.subscribe(observerB)
}, 5000)
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A completed
// Observer B: 0
// Observer B: 1
// Observer B: 2
// Observer B completed

因为 share 是调用了 subject 工厂函数,而不是一个 subject 对象,所以 observerB 订阅时能够从新获取数据。

6)publishLast、publishBehavior、publishReplay

同前面的 publish,只不过使用的不是普通 Subject,而是对应的 AsyncSubject、BehaviorSubject、ReplaySubject。

Scheduler

Scheduler(调度器)用于控制数据流中数据的推送节奏。

import { range, asapScheduler } from 'rxjs'

const source$ = range(1, 3, asapScheduler)

console.log('before subscribe')
source$.subscribe(x => console.log(x))
console.log('subscribed')

上面的代码,若是去掉 asapScheduler 参数,由于 range 是同步的,会先输出 1, 2, 3,再输出 'subscribed',可是加了之后就变成 先输出 'subscribed',改变了原来数据产生的方式。asap 是 as soon as possible 的缩写,同步任务完成后就会立刻执行。

Scheduler 拥有一个虚拟时钟,如 interval 建立的数据流每隔一段时间要发出数据,由 Scheduler 提供时间来判断是否到了发送数据的时间。

Scheduler 实例

  • undefined/null:不指定 Scheduler,表明同步执行的 Scheduler
  • asap:尽快执行的 Scheduler
  • async:利用 setInterval 实现的 Scheduler
  • queue:利用队列实现的 Scheduler,用于迭代一个的大的集合的场景。
  • animationFrame:用于动画的 Scheduler

asap 会尽可能使用 micro task,而 async 会使用 macro task。

相关操做符

一些建立数据流的方法能够提供 Scheduler 参数,合并类操做符如 merge 也能够,在建立数据流后咱们也可使用操做符,使得产生的下游 Observable 推送数据的节奏由指定的 Scheduler 来控制。这个操做符就是 observeOn。

const tick$ = interval(10) // Intervals are scheduled with async scheduler by default...
tick$.pipe(
  observeOn(animationFrameScheduler)  // but we will observe on animationFrame scheduler to ensure smooth animation.
)
.subscribe(val => {
  someDiv.style.height = val + 'px'
})

原本每 10 ms 就会发送一个数据,修改 Scheduler 为 animationFrame 后只有浏览器重绘才会发送数据更新样式。

咱们还能够经过操做符 subscribeOn 控制订阅的时机。

const source$ = new Observable(observer => {
  console.log('on subscribe')
  observer.next(1)
  observer.next(2)
  observer.next(3)
  return () => {
    console.log('on unsubscribe')
  }
})

const tweaked$ = source$.pipe(subscribeOn(asapScheduler))

console.log('before subscribe')
tweaked$.subscribe(x => console.log(x))
console.log('subscribed')
// before subscribe
// subscribed
// on subscribe
// 1
// 2
// 3

经过 subscribeOn(asapScheduler),咱们把订阅时间推迟到尽快执行。

TestScheduler

RxJS 中有一个 用于测试的 TestScheduler,RxJS 的测试你们能够查看程墨的《深刻浅出 RxJS》或者其余资料。

import { TestScheduler } from 'rxjs/testing'

RxJS 的一些实践

RxJS 与前端框架结合

Angular 自身引用了 RxJS,如 http 和 animation 都使用了 Observable,状态管理可使用 ngrx

Vue 官方有与 RxJS 集成的 vue-rx

React 能够经过 Subject 创建桥梁,Redux 也有与 RxJS 结合的中间件 Redux-Observable。

轮询中的错误处理

interval(10000).pipe(
  switchMap(() => from(axios.get(url))),
  catchError(err => EMPTY)
).subscribe(data => render(data))

上面的代码,每隔 10s 去发送一个请求,当某个请求返回出错时,返回空的 Observable 而不渲染数据。这样处理貌似正确,可是实际上某个请求出错时,整个 Observable 终结了,所以轮询就结束了。为了保持轮询,咱们须要进行隔离,把错误处理移到 switchMap 内部进行处理。

interval(10000).pipe(
  switchMap(() => from(axios.get(url)).pipe(
    catchError(err => EMPTY)
  ))
).subscribe(data => render(data))

订阅管理

若是没有及时退订可能会引起内存泄露,咱们须要经过退订去释放资源。

1)命令式管理

const subscription = source$.subscribe(observer)
// later...
subscription.unsubscribe()

上面的管理方式,数量不多时还好,若是数量较多,将会显得十分笨拙。

2) 声明式管理

const kill1 = fromEvent(button, 'click')
const kill2 = getStreamOfRouteChanges()
const kill3 = new Subject()

const merged$ = mege(
    source1.pipe(takeUntil(kill1)),
    source2.pipe(takeUntil(kill2)),
    source3.pipe(takeUntil(kill3))
)

const sub = merged$.subscribe(observer)
// later...
sub.unsubscribe()

// 或者发出任意结束的事件
kill3.next(true)

经过 takeUntil、map 或者其余操做符组合进行管理。这样更不容易漏掉某个退订,订阅也减小了。

3)让框架或者某些类库去处理

好比 Angular 中的 async pipe,当 unmount 时会自动退订,也不用写订阅。

不要 Rx 一切

不要过分使用 Rx,它比较适合如下场景:

  • 组合事件时
  • 增长延迟和控制频率
  • 组合异步任务
  • 须要取消时

简单的应用并不须要 RxJS。

RxJS 的业务实践

能够看看徐飞的相关思考:流动的数据——使用 RxJS 构造复杂单页应用的数据逻辑

RxJS 与 Async Iterator

Async Iterator 提案已经进入了 ES2018,能够认为是 iterator 的异步版本。在 Symbol 上部署了 asyncIterator 的接口,不过它的 next 方法返回的是 { value, done } 对象的 Promise 版本。可使用 for-await-of 进行迭代:

for await (const line of readLines(filePath)) {
  console.log(line)
}

使用 Async Iterator 咱们能够很容易实现相似 RxJS 操做符的功能:

const map = async function*(fn) {
  for await(const value of this) yield fn(value)
}

其余如 fromEvent 等也比较容易实现。Async Iterator 扩展库 axax 的一个例子:

import { fromEvent } from "axax/es5/fromEvent";

const clicks = fromEvent(document, 'click');

for await (const click of clicks) {
    console.log('a button was clicked');
}

下面是 Benjamin Gruenbaum 用 Async Iterator 实现 AutoComplete 的一个例子:

let tooSoon = false, last;
for await (const {target: {value}} of fromEvent(el, "keyup")) {
  if(!value || tooSoon) continue;
  if(value === last) continue;
  last = value;
  yield await fetch("/autocomplete/" + value); // misses `last` 
  tooSoon = true;
  delay(500).then(() => tooSoon = false);
}

Async Iterator 相比 RxJS,没有那么多概念,上手快,也比较容易扩展实现那些操做符。

从数据消费者的角度上看,RxJS 是 push stream,由生产者把数据推送过来,Async Iterator 是 pull stream,是本身去拉取数据。

参考连接

博客:30 天精通 RxJS

书:深刻浅出RxJS

视频:RxJS 5 Thinking Reactively | Ben Lesh

相关文章
相关标签/搜索