RxJS 是 Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions,是一个基于可观测数据流 Stream 结合观察者模式和迭代器模式的一种异步编程的应用库。RxJS 是 Reactive Extensions 在 JavaScript 上的实现。javascript
Reactive Extensions(Rx)是对 LINQ 的一种扩展,他的目标是对异步的集合进行操做,也就是说,集合中的元素是异步填充的,好比说从 Web
或者云端获取数据而后对集合进行填充。LINQ(Language Integrated Query)语言集成查询是一组用于 C# 和
Visual Basic 语言的扩展。它容许编写 C# 或者 Visual Basic 代码以操做内存数据的方式,查询数据库。
RxJS
的主要功能是利用响应式编程的模式来实现 JavaScript 的异步式编程(现前端主流框架 Vue React Angular 都是响应式的开发框架)。前端
RxJS
是基于观察者模式和迭代器模式以函数式编程思惟来实现的。学习 RxJS
以前咱们须要先了解观察者模式和迭代器模式,还要对 Stream
流的概念有所认识。下面咱们将对其逐一进行介绍,准备好了吗?让咱们如今就开始吧。java
观察者模式又叫发布订阅模式(Publish/Subscribe
),它是一种一对多的关系,让多个观察者(Obesver
)同时监听一个主题(Subject
),这个主题也就是被观察者(Observable
),被观察者的状态发生变化时就会通知全部的观察者,使得它们可以接收到更新的内容。git
观察者模式主题和观察者是分离的,不是主动触发而是被动监听。github
举个常见的例子,例如微信公众号关注者和微信公众号之间的信息订阅。当微信用户关注微信公众号 webinfoq
就是一个订阅过程,webinfoq
负责发布内容和信息,webinfoq
有内容推送时,webinfoq
的关注者就能收到最新发布的内容。这里,关注公众号的朋友就是观察者的角色,公众号webinfoq
就是被观察者的角色。web
示例代码:ajax
// 定义一个主题类(被观察者/发布者) class Subject { constructor() { this.observers = []; // 记录订阅者(观察者)的集合 this.state = 0; // 发布的初始状态 } getState() { return this.state; } setState(state) { this.state = state; // 推送新信息 this.notify(); // 通知订阅者有更新了 } attach(observer) { this.observers.push(observer); // 对观察者进行登记 } notify() { // 遍历观察者集合,一一进行通知 this.observers.forEach(observer = { observer.update(); }) } }
// 定义一个观察者(订阅)类 class Observer { constructor(name, subject) { this.name = name; // name 表示观察者的标识 this.subject = subject; // 观察者订阅主题 this.subject.attach(this); // 向登记处传入观察者实体 } update() { console.log(`${this.name} update, state: ${this.subject.getState()}`); } } // 建立一个主题 let subject = new Subject(); // 建立三个观察者: observer$1 observer$2 observer$3 let observer$1 = new Observer("observer$1", subject); let observer$2 = new Observer("observer$2", subject); let observer$3 = new Observer("observer$3", subject); // 主题有更新 subject.setState(1); subject.setState(2); subject.setState(3); // 输出结果 // observer$1 update, state: 1 // observer$1 update, state: 1 // observer$1 update, state: 1 // observer$2 update, state: 2 // observer$2 update, state: 2 // observer$2 update, state: 2 // observer$3 update, state: 3 // observer$3 update, state: 3 // observer$3 update, state: 3
迭代器(Iterator
)模式又叫游标(Sursor
)模式,迭代器具备 next
方法,能够顺序访问一个聚合对象中的各个元素,而不须要暴露该对象的内部表现。数据库
迭代器模式能够把迭代的过程从从业务逻辑中分离出来,迭代器将使用者和目标对象隔离开来,即便不了解对象的内部构造,也能够经过迭代器提供的方法顺序访问其每一个元素。npm
了解更多可迭代对象:「JS篇」你不知道的 JS 知识点总结(一)编程
使用 ES5 建立一个迭代器
//建立一个迭代类,传入目标对象 function Iterator(container) { this.list = container.list; this.index = 0; //定义私有的next方法,执行迭代 this.next = function() { if(this.hasNext()) { //判断是否迭代完毕 return { value: this.list[this.index++], done: false } } return {value: null, done: true} } this.hasNext = function() { if(this.index >= this.list.length) { return false; } return true; } } //定义目标对象 function Container(list) { this.list = list; this.getIterator = function() { return new Iterator(this); //用户返回一个迭代器 } } //调用 var container = new Container([1, 2, 3, 4, 5]); var iterator = container.getIterator(); iterator.next(); // {value: 1, done: false} iterator.next(); // {value: 2, done: false} iterator.next(); // {value: 3, done: false} iterator.next(); // {value: 4, done: false} iterator.next(); // {value: 5, done: false} iterator.next(); // {value: null, done: true}
使用 ES6 构造一个迭代器
class Iterator { constructor(container) { this.list = container.list; this.index = 0; } next() { if(this.hasNext()) { return { value: this.list[this.index++], done: false } } return {value: null, done: true} } hasNext() { if(this.index >= this.list.length) { return false; } return true; } } class Container { constructor(list) { this.list = list; } getIterator() { return new Iterator(this); } } let container = new Container([1, 2, 3, 4, 5]); let iterator = container.getIterator(); iterator.next(); // {value: 1, done: false} iterator.next(); // {value: 2, done: false} iterator.next(); // {value: 3, done: false} iterator.next(); // {value: 4, done: false} iterator.next(); // {value: 5, done: false} iterator.next(); // {value: null, done: true}
使用 ES6 的 Symbol.iterator
建立一个迭代器
var list = [1, 2, 3, 4, 5]; var iterator = list[Symbol.iterator](); iterator.next(); // {value: 1, done: false} iterator.next(); // {value: 2, done: false} iterator.next(); // {value: 3, done: false} iterator.next(); // {value: 4, done: false} iterator.next(); // {value: 5, done: false} iterator.next(); // {value: null, done: true}
经过上边的示例代码咱们能够得知,咱们不了解对象的内部构造,可是能够经过调用迭代器提供的 next() 方法就能顺序访问其每一个元素。
在这里能够将一系列的鼠标点击、键盘点击产生的事件和将要处理的元素集合看做一种流, 流的特色是数据源的自己是无限的,流在管道中传输, 而且能够在管道的节点上进行处理, 好比筛选, 排序,聚合等。
流数据源(source
)通过数据转换等中间操做的处理,最后由最终操做获得前面处理的结果,每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(能够有屡次转换),这就容许对其操做能够像链条同样排列,变成一个管道。
为了对 stream 有一个更感性的认识,咱们说点击事件能够看做一种 stream,在介绍 RxJS 以前,咱们不妨先看一个 RxJS 官网上的例子(列举官方的例子更能充分体现 RxJS 是基于可观测数据流 Stream 的)。
一般,注册一个事件侦听器是这样的。
document.addEventListener('click', () => console.log('Clicked!'));
使用 RxJS 能够建立一个 observable
。
import { fromEvent } from 'rxjs'; fromEvent(document, 'click').subscribe(() => console.log('Clicked!'));
结合上边讲到流和设计模式,为了方便咱们对 RxJS 有进一步的认识,咱们就用代码本身来实现一个 Obserable
。
其实 Observable 实际上就是一个函数,它接收一个Observer 对象做为参数,返回一个函数用来取消订阅。Observer 对象能够声明 next、err、complete
方法来处理流的不一样状态。
首先咱们定义数据源 Source
// 建立数据源 class Source { constructor() { this.state = 0; this.data = setInterval(() => this.emit(this.state++), 200); } emit(state) { const limit = 10; // 定义数据上限 if (this.onData) { this.onData(state); // 产生数据 } if (state === limit) { if (this.onComplete) { this.onComplete(); // 数据终止 } this.destroy(); } } destroy() { //中止定时器,清除数据 clearInterval(this.data); } }
建立一个 Observable
// 建立 Observable class Observable { constructor() {} getStream() { return new Source(); } subscribe(observer) { // 获取流数据源 this.stream = this.getStream(); // 转换 this.stream.onData = (e) => observer.next(e); // 处理流数据 this.stream.onError = (err) => observer.error(err); //处理异常 this.stream.onComplete = () => observer.complete(); //处理流数据终止 // 返回一个函数 return () => { this.stream.destroy(); } } }
调用 subscribe 进行订阅
const observable = new Observable(); //订阅 let observer = { next(data) { console.log(data); }, error(err) { console.error(err); }, complete() { console.log('done')} } const unsubscribe = observable.subscribe(observer);
输出结果
咱们能够调用 unsubscribe
取消订阅
//0.5后取消订阅 setTimeout(unsubscribe, 500);
咱们能够看到 Observable 做为生产者与观察者之间的桥梁,并返回一种方法来解除生产者与观察者之间的联系,其中观察者用于处理时间序列上数据流。
介绍完 RxJS 的一些前置知识点,下面就让咱们一块儿来认识下什么是 RxJS 吧。
RxJS 中含有两个基本概念:Observables
与 Observer
。Observables 做为被观察者,是一个值或事件的流集合;而 Observer 则做为观察者,根据 Observables 进行处理。
Observables 与 Observer 之间的订阅发布关系(观察者模式) 以下:
Observable 属于全新的 push
体系,让咱们先了解下什么是 pull
体系和 push
体系吧。
Pull
和 Push
是两种不一样的协议,描述了数据生产者如何与数据消费者进行通讯。
生产者 | 消费者 | |
---|---|---|
pull | 被请求的时候产生数据 | 决定什么时候请求数据 |
push | 按本身的节奏生产数据 | 对接收的数据进行处理 |
在 Pull
体系中,数据的消费者决定什么时候从数据生产者那里获取数据,而生产者自身并不会意识到何时数据将会被发送给消费者。
每个 JavaScript函数
都是一个 Pull
体系,函数是数据的生产者,调用函数的代码经过 '拉出' 一个单一的返回值来消费该数据。
function add(x, y) { console.log('Hello'); return x + y; } const x = add(4, 5);
ES6介绍了 Iterator
迭代器 和 Generator
生成器,另外一种 Pull
体系,调用 iterator.next()
的代码是消费者,可从中拉取多个值。
在 Push
体系中,数据的生产者决定什么时候发送数据给消费者,消费者不会在接收数据以前意识到它将要接收这个数据。
Promise
是当今 JS 中最多见的 Push
体系,一个 Promise
(数据的生产者)发送一个 resolved value
(成功状态的值)来执行一个回调(数据消费者)。可是不一样于函数的地方的是:Promise
决定着什么时候数据才被推送至这个回调函数。
RxJS 引入了 Observable (可观察对象),一个全新的 Push 体系。一个可观察对象是一个产生多值的生产者,当产生新数据的时候,会主动 "推送给" Observer (观察者)。
Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。
MagicQ | 单值 | 多值 |
---|---|---|
拉取(Pull) | Function | Iterator |
推送(Push) | Promise | Observable |
Observable
于 Promise
之间的差别:
使用 RxJS 咱们可使用 npm 进行安装(更多使用方法请参考 github):
npm install rxjs
须要注意的是,不少人认为 RxJS 中的全部操做都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是能够用来处理同步的行为。具体示例以下:
import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); console.log('start'); observable.subscribe({ next(x) { console.log(x); }, error(err) { console.error(err); }, complete() { console.log('done'); } }); console.log('end');
以上代码运行后,控制台的输出结果:
start 1 2 3 done end
固然咱们也能够用它处理异步行为:
import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); setTimeout(() => { subscriber.next(4); subscriber.complete(); }, 1000); }); console.log('start'); observable.subscribe({ next(x) { console.log(x); }, error(err) { console.error(err); }, complete() { console.log('done'); } }); console.log('end');
代码运行后的输出结果为:
start 1 2 3 end 4 done
RxJS 中提供了不少操做符 Operators
,下篇文章咱们将对 Operators
进行介绍,建立类操做符(Creation Operator
)用于建立 Observable 对象。
官网列举的一些建立类操做符以下:
最后咱们简单的来看一下,如何使用 from
建立一个 Observable(关于操做符的介绍咱们将在下篇文章进行详细介绍,保持关注哦)。
from
:能够把数组、Promise、以及 Iterable 转化为 Observable。
//将数组转换为 Observable: import { from } from 'rxjs'; const array = [10, 20, 30]; const result = from(array); result.subscribe(x => console.log(x)); // Logs: // 10 // 20 // 30
因为 RxJS 涉及到的概念和知识点比较宽泛和复杂,咱们须要一步一步的去理解和掌握它,最终能够作到知其然亦知其因此然。接下来文章中会继续介绍 RxJS 中涉及到知识点,关注此公众号webinfoq
不要错过哦。