RxJs 核心概念之Observable

Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。下方表格对Observable进行了定位(为解决基于推送的多值问题):javascript

MagicQ 单值 多值
拉取(Pull) 函数 遍历器
推送(Push) Promise Observable

:当observable被订阅后,会当即(同步地)推送123 三个值;1秒以后,继续推送4这个值,最后结束(推送结束通知):html

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

为获得observable推送的值,咱们须要订阅(subscribe)这个Observable:java

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

程序执行后,将在控制台输出以下结果:react

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

拉取(Pull) V.S. 推送(Push)

拉取推送是数据生产者和数据消费者之间通讯的两种不一样机制。es6

何为拉取? 在拉取系统中,老是由消费者决定什么时候从生产者那里得到数据。生产者对数据传递给消费者的时间毫无感知(被动的生产者,主动的消费者)。正则表达式

JavaScript函数是典型的拉取系统:函数是数据的生产者,对函数进行调用的代码(消费者)从函数调用后的返回值中拉取单值进行消费。安全

// 函数是数据的生产者
let getLuckyNumber = function() {
    return 7;
};

/* let代码段是数据的消费者,
 * getLuckyNumber对调用时间毫无感知。 
 */
let luckNumber = getLuckyNumber();

ES2015 引入了的 生成器函数 | 遍历器 (function*)一样是基于拉取的系统: 调用 iterator.next()的代码段是消费者,它能够从生成器函数中拉取多个值。app

function* getLessThanTen() {
  var i = 0;
  while(i < 11) {
    yield i++;
  }
}

// 生产者
let iterator = getLessThanTen();

// 消费者
iterator.next(); // Object {value: 0, done: false}
iterator.next(); // Object {value: 1, done: false}
MagicQ 生产者 消费者
拉取 被动: 在被请求时产生数据 主动: 决定什么时候请求数据
推送 主动: 控制数据的产生逻辑 被动: 得到数据后进行响应

何为推送? 在推送系统中生产者决定什么时候向消费者传递数据,消费者对什么时候收到数据毫无感知(被动的消费者)。异步

现代JavaScript中Promise是典型的推送系统。做为数据生产者的Promise经过resolve()向数据消费者——回调函数传递数据:与函数不一样,Promise决定向回调函数推送值的时间。async

RxJS在 JavaScript 中引入了Observable(可观察对象)这个新的推送系统。Observable是多数据值的生产者,向Observer(被动的消费者)推送数据。

  • 函数 调用后同步计算并返回单一值

  • 生成器函数 | 遍历器 遍历过程当中同步计算并返回0个到无穷多个值

  • Promise 异步执行中返回或者不返回单一值

  • Observable 同步或者异步计算并返回0个到无穷多个值

Observable 是函数概念的拓展

Observable既不像EventEmitter,也不像是Promise。Observable 中的 Subject 进行多路推送时与 EventEmitter 行为上有些相似,可是实际上Observable与EventEmitter并不相同。

Observable 更像是一个不须要传入参数的函数,它拓展了函数的概念使得它能够返回多个值。

看看下面的例子:

function foo() {
  console.log('Hello');
  return 42;
}

var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y);

输出结果以下:

"Hello"
42
"Hello"
42

经过Observable能够实现一样的行为:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(function (x) {
  console.log(x);
});
foo.subscribe(function (y) {
  console.log(y);
});

输出结果相同:

"Hello"
42
"Hello"
42

不论Observable仍是函数都是在运行时进行求值计算的。若是不调用函数,console.log('Hello')就不会执行;若是若是不subscribe(订阅)Observable,console.log('Hello')也不会执行。此外,调用或者订阅都是独立的:两次调用产生两个独立的做用域,两次订阅一样会产生两个独立的做用域。EventEmitter老是在同一个做用域中,发射前也不会在乎本身是否已经被订阅;Observable不会被共享而产生反作用,而且老是在被订阅时才执行。

订阅Observable与调用函数相似。

一些人认为Observable老是是异步的,这个观点并不正确,若是在控制台log函数中调用函数:

console.log('before');
console.log(foo.call());
console.log('after');

显然能够看到如下输出:

"before"
"Hello"
42
"after"

Observable的行为彻底同样:

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

输出结果为:

"before"
"Hello"
42
"after"

订阅 foo彻底是同步的,与函数的调用同样。

Observable能够异步或者同步地产生数据。

那Observable 与函数的不一样之处在哪里? Observable能够在一个时间过程当中‘返回’多个值,而函数却不能。在函数中你不能够这么作:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // 这个语句永远不会被执行。
}

虽然函数只能有一个返回值,可是在Observable中你彻底能够这么作:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // 返回另外一个值
  observer.next(200); // 返回另外一个值
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

输出结果以下:

"before"
"Hello"
42
100
200
"after"

你甚至能够异步地返回值:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(() => {
    observer.next(300); // happens asynchronously
  }, 1000);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

输出结果:

"before"
"Hello"
42
100
200
"after"
300

结论:

  • func.call() 意味着“同步地给我一个值”

  • observable.subscribe() 意味着“不论是同步或者异步,给我一些值”

Observable 剖析

经过使用 Rx.Observable.create 或者是建立操做符建立一个Observable; Observable 被 Observer(观察者) 订阅; 在执行时 向观察者发送next / error / complete 通知;同时执行过程能够被 终止
Observable 类型的实例具有了以上四个方面的特性,与其余类型如:Observer 和 Subscription 紧密相关。

咱们重点关注如下四个方面:

  • 建立

  • 订阅

  • 执行

  • 终止

建立

Rx.Observable.createObservable 构造函数的别名,接受一个参数: subscribe函数。

如下例子会建立一个Observable,每一秒钟向其订阅者发射一个'hi' 字符串。

var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next('hi')
  }, 1000);
});

除了使用create建立Observable,咱们一般还使用建立操做符, 如 offrominterval, 等来建立Observable。

上面例子中,subscribe函数是定义Observable最重要的部分。咱们接下来了解订阅的含义。

订阅

上面例子中的observable 能够以以下方式 订阅

observable.subscribe(x => console.log(x));

observable.subscribeObservable.create(function subscribe(observer) {...})中的subscribe 同名并不是巧合。虽然在Rx中它们不是同一个对象,可是在工程中,咱们能够在概念上视二者为等价物。

调用subscribe的观察者并不会共享同一个Observable。观察者调用observable.subscribe 时,Observable.create(function subscribe(observer) {...})中的subscribe会在调用它的观察者做用域中执行。每一次observable.subscribe的调用,都是彼此独立的。

订阅Observable如同调用函数,须要提供相应的回调方法。

订阅机制与处理事件的addEventListener / removeEventListenerAPI彻底不一样。经过observable.subscribe,观察者并不须要在Observable中进行注册,Observable也不须要维护订阅者的列表。

订阅后便进入了Observable的执行阶段,在执行阶段值和事件将会被传递给观察者供其消费。

执行

只有在被订阅以后Observable才会执行,执行的逻辑在Observable.create(function subscribe(observer) {...})中描述,执行后将会在特定时间段内,同步或者异步地成产多个数据值。

Observable在执行过程当中,能够推送三种类型的值:

  • "Next" 通知: 实际产生的数据,包括数字、字符串、对象等

  • "Error" 通知:一个JavaScript错误或者异常

  • "Complete" 通知:一个不带有值的事件

“Next” 通知是最重要和经常使用的类型:表示事件传递给观察者的数据。错误和完成通知仅会在执行阶段推送其一,并不会同时推送错误和完成通知。

经过所谓的“Observable语法”或者“契约”能够最好地表达这个规则,“Observable语法”借助于正则表达式:

next*(error|complete)?

在Observable的执行过程当中,0个或者多个“Next”通知会被推送。在错误或者完成通知被推送后,Observable不会再推送任何其余通知。

下面代码展现了Observable 在执行过程当中推送3个“Next” 通知而后结束:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

Observable 严格遵照 Observable 契约,后面值为4的“Next” 通知永远不会被推送:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  observer.next(4); // 因为违法契约,4不会被推送
});

使用try/catch块包裹 subscribe 代码是一个很赞的想法,若是捕获了异常,能够推送错误通知:

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // 捕获异常后推送错误通知
  }
});

终止

Observable的执行多是无限的,做为观察者须要主动中断执行:咱们须要特定的API去终止执行过程。由于特定的观察者都有特定的执行过程,一旦观察者得到想要的数据后就须要终止执行过程以避免带来计算时对内存资源的浪费。

observable.subscribe被调用时,观察者会与其执行做用域绑定,同时返回一个Subscription类型的对象:

var subscription = observable.subscribe(x => console.log(x));

Subscription对象表示执行过程,经过极简的API,你能够终止执行过程。详情请阅读Subscription 相关文档。经过调用subscription.unsubscribe() 你能够终止执行过程:

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();

在Observable被订阅后,表明执行过程的Subscription 对象将被返回。对其调用unsubscribe()就能够终止执行。

每个Observable都须要在 create()的建立过程当中定义终止的逻辑。在function subscribe()中返回自定义的unsubscribe就能够实现。

下面的例子说明了如何在终止后释放setInterval的句柄:

var observable = Rx.Observable.create(function subscribe(observer) {
  // 得到定时函数的句柄
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);
  
  // 提供终止方法释放定时函数的句柄
  return function unsubscribe() {
    clearInterval(intervalID);
  };
});

相似于observable.subscribeObservable.create(function subscribe() {...})的关系,咱们在subscribe中返回的 unsubscribe 也与subscription.unsubscribe在概念上等价。事实上,若是咱们除去Rx的包装,纯粹的JavaScript代码简单清晰:

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);
  
  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

// 一段时间后:
unsubscribe(); // 终止

使用Observable、 Observer 和 Subscription这些概念的缘由是,咱们能够在Observable 契约之下安全、兼容地调用操做符。

相关文章
相关标签/搜索