RxJS

一.简介html

RxJS 是一个库,它经过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操做符 (map、filter、reduce、every, 等等),这些数组操做符能够把异步事件做为集合来处理。react

能够把 RxJS 当作是用来处理事件的 Lodash 。es6

二.基本概念npm

Observable (可观察对象): 表示一个概念,这个概念是一个可调用的将来值或事件的集合。编程

Observer (观察者): 观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每一个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete 。下面的示例是一个典型的观察者对象:json

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
复制代码

要使用观察者,须要把它提供给 Observable 的 subscribe 方法:数组

observable.subscribe(observer);异步

观察者只是有三个回调函数的对象,每一个回调函数对应一种 Observable 发送的通知类型async

Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。 Subscription 是表示可清理资源的对象,一般是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe,它不须要任何参数,只是用来清理由 Subscription 占用的资源。在上一个版本的 RxJS 中,Subscription 叫作 "Disposable" (可清理对象)。函数式编程

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
复制代码

// Observable 执行是经过使用观察者调用 subscribe 方法启动的

subscription.unsubscribe();
复制代码

Subscription 基本上只有一个 unsubscribe() 函数,这个函数用来释放资源或去取消 Observable 执行

Operators (操做符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操做符来处理集合。

操做符是 Observable 类型上的方法,好比 .map(...)、.filter(...)、.merge(...),等等。当操做符被调用时,它们不会改变已经存在的 Observable 实例。相反,它们返回一个新的 Observable ,它的 subscription 逻辑基于第一个 Observable 。

操做符是函数,它基于当前的 Observable 建立一个新的 Observable。这是一个无反作用的操做:前面的 Observable 保持不变。

操做符本质上是一个纯函数 (pure function),它接收一个 Observable 做为输入,并生成一个新的 Observable 做为输出。订阅输出 Observable 一样会订阅输入 Observable 。在下面的示例中,咱们建立一个自定义操做符函数,它将从输入 Observable 接收的每一个值都乘以10:

function multiplyByTen(input) {

  var output = Rx.Observable.create(function subscribe(observer) {
  
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
    
  });
  
  return output;
  
}

var input = Rx.Observable.from([1, 2, 3, 4]);

var output = multiplyByTen(input);

output.subscribe(x => console.log(x));

输出:

10
20
30
40
复制代码

Subject (主体): 至关于 EventEmitter,而且是将值或事件多路推送给多个 Observer 的惟一方式。

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); 
复制代码

// 你能够提供一个 Subject 进行订阅 执行结果:

observerA: 1

observerB: 1

observerA: 2

observerB: 2

observerA: 3

observerB: 3

使用上面的方法,咱们基本上只是经过 Subject 将单播的 Observable 执行转换为多播的。这也说明了 Subjects 是将任意 Observable 执行共享给多个观察者的惟一方式。

Schedulers (调度器): 调度器控制着什么时候启动 subscription 和什么时候发送通知。 让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者

var observable = Rx.Observable.create(function (observer) {

        observer.next(1);
  
        observer.next(2);
  
        observer.next(3);
  
        observer.complete();
  
})
.observeOn(Rx.Scheduler.async);

console.log('just before subscribe');

observable.subscribe({

      next: x => console.log('got value ' + x),
      
      error: err => console.error('something wrong occurred: ' + err),
      
      complete: () => console.log('done'),
  
});
console.log('just after subscribe');
输出结果:

just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
复制代码

三. 安装

1.经过 npm 安装 ES6 版本 npm install rxjs 导入整个核心功能集:

import Rx from 'rxjs/Rx';

Rx.Observable.of(1,2,3)
经过打补丁的方式只导入所须要的(这对于减小 bundling 的体积是十分有用的):

import { Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/map';

Observable.of(1,2,3).map(x => x + '!!!'); // 等等
只导入须要的而且使用被提议的绑定操做符:

注意:这个额外的预发须要编译器支持而且此语法可能会在没有任何通知的状况下彻底从 TC39 撤回!要使用的话须要你本身来承担风险。

import { Observable } from 'rxjs/Observable';
import { of } from 'rxjs/observable/of';
import { map } from 'rxjs/operator/map';

Observable::of(1,2,3)::map(x => x + '!!!'); // 等等
复制代码

2.经过 npm 安装 CommonJS 版本 npm install rxjs 导入全部核心功能:

var Rx = require('rxjs/Rx');

Rx.Observable.of(1,2,3); // 等等
经过打补丁的方式只导入所须要的(这对于减小 bundling 的体积是十分有用的):

var Observable = require('rxjs/Observable').Observable;
// 使用适合的方法在 Observable 上打补丁
require('rxjs/add/observable/of');
require('rxjs/add/operator/map');

Observable.of(1,2,3).map(function (x) { return x + '!!!'; }); // 等等
导入操做符并手动地使用它们(这对于减小 bundling 的体积也十分有用):

var of = require('rxjs/observable/of').of;
var map = require('rxjs/operator/map').map;

map.call(of(1,2,3), function (x) { return x + '!!!'; });
还可使用上面的方法来构建你本身的 Observable 并将其从你本身的模块中导出。
复制代码

3.使用 TypeScript 的 CommonJS 模式 当使用 RxJS 时收到了像 error TS2304: Cannot find name 'Promise' 或 error TS2304: Cannot find name 'Iterable' 这样的报错信息,那么你可能须要安装额外的 typings 。

对于使用 typings 的用户:

typings install es6-shim --ambient

若是没有使用 typings 的话,能够从 /es6-shim/es6-shim.d.ts 拷贝定义好的接口。

在 tsconfig.json 或 CLI 参数中添加类型定义文件。
复制代码

4.经过 npm 全部全模块类型 (CJS/ES6/AMD/TypeScript) 要安装这个库须要 npm 3及以上版本,使用下面的命令行:

npm install @reactivex/rxjs
若是你使用的仍是 npm 2的话,那么在这个库升级至稳定版以前,须要明确地指定库的版本号:

npm install @reactivex/rxjs@5.0.0-beta.1
复制代码

5.CDN 对于 CDN,可使用 unpkg 。只须要用当前的版本号来替换下面连接中的 version:

对于 RxJS 5.0.0-beta.1 到 beta.11: https://unpkg.com/@reactivex/rxjs@version/dist/global/Rx.umd.js

对于 RxJS 5.0.0-beta.12 及以上版本: https://unpkg.com/@reactivex/rxjs@version/dist/global/Rx.js
复制代码

四.第一个示例 注册事件监听器的常规写法。

var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

**使用 RxJS 的话,建立一个 observable 来代替。**

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .subscribe(() => console.log('Clicked!'));  
复制代码

更多用法:cn.rx.js.org/manual/tuto…

手册:cn.rx.js.org/manual/inde…

相关文章
相关标签/搜索