RxJS 简介:可观察对象、观察者与操做符

RxJS 简介:可观察对象、观察者与操做符

对于响应式编程来讲,RxJS 是一个难以想象的工具。今天咱们将深刻探讨什么是 Observable(可观察对象)和 observer(观察者),而后了解如何建立本身的 operator(操做符)。javascript

若是你以前用过 RxJS,想了解它的内部工做原理,以及 Observable、operator 是如何运做的,这篇文章将很适合你阅读。html

什么是 Observable(可观察对象)?

可观察对象其实就是一个比较特别的函数,它接受一个“观察者”(observer)对象做为参数(在这个观察者对象中有 “next”、“error”、“complete”等方法),以及它会返回一种解除与观察者关系的逻辑。例如咱们本身实现的时候会使用一个简单的 “unsubscribe” 函数来实现退订功能(即解除与观察者绑定关系的逻辑)。而在 RxJS 中, 它是一个包含 unsubsribe 方法的订阅对象(Subscription)。前端

可观察对象会建立观察者对象(稍后咱们将详细介绍它),并将它和咱们但愿获取数据值的“东西”链接起来。这个“东西”就是生产者(producer),它可能来自于 click 或者 input 之类的 DOM 事件,是数据值的来源。固然,它也能够是一些更复杂的状况,好比经过 HTTP 与服务器交流的事件。java

咱们稍后将要本身写一个可观察对象,以便更好地理解它!在此以前,让咱们先看看一个订阅对象的例子:node

const node = document.querySelector('input[type=text]');

const input$ = Rx.Observable.fromEvent(node, 'input');

input$.subscribe({
  next: (event) => console.log(`你刚刚输入了 ${event.target.value}!`),
  error: (err) => console.log(`Oops... ${err}`),
  complete: () => console.log(`完成!`)
});复制代码

这个例子使用了一个 <input type="text"> 节点,并将其传入 Rx.Observable.fromEvent() 中。当咱们触发指定的事件名时,它将会返回一个输入的 Event 的可观察对象。(所以咱们在 console.log 中用 ${event.target.value} 能够获取输入值)react

当输入事件被触发的时候,可观察对象会将它的值传给观察者。android

什么是 Observer(观察者)?

观察者至关容易理解。在前面的例子中,咱们传入 .subscribe() 中的对象字面量就是观察者(订阅对象将会调用咱们的可观察对象)。ios

.subscribe(next, error, complete) 也是一种合法的语法,可是咱们如今研究的是对象字面量的状况。git

当一个可观察对象产生数据值的时候,它会通知观察者,当新的值被成功捕获的时候调用 .next(),发生错误的时候调用 .error()github

当咱们订阅一个可观察对象的时候,它会持续不断地将值传递给观察者,直到发生如下两件事:一种是生产者告知没有更多的值须要传递了,这种状况它会调用观察者的 .complete() ;一种是咱们(“消费者”)对以后的值再也不感兴趣,决定取消订阅(unsubsribe)。

若是咱们想要对可观察对象传来的值进行组成构建(compose),那么在值传达最终的 .subscribe() 代码块以前,须要通过一连串的可观察对象(也就是操做符)处理。这个一连串的“链”也就是咱们所说的可观察对象序列。链中的每一个操做符都会返回一个新的可观察对象,让咱们的序列可以持续进行下去——这也就是咱们所熟知的“流”。

什么是 Operator(操做符)?

咱们前面提到,可观察对象可以进行链式调用,也就是说咱们能够像这样写代码:

const input$ = Rx.Observable.fromEvent(node, 'input')
  .map(event => event.target.value)
  .filter(value => value.length >= 2)
  .subscribe(value => {
    // use the `value`
  });复制代码

这段代码作了下面一系列事情:

  • 咱们先假定用户输入了一个“a”
  • 可观察对象将会对这个输入事件做出反应,将值传给下一个观察者
  • “a”被传给了订阅了咱们初始可观察对象的 .map()
  • .map() 会返回一个 event.target.value 的新可观察对象,而后调用它观察者对象中的 .next()
  • .next() 将会调用订阅了 .map().filter(),并将 .map() 处理后的值传递给它
  • .filter() 将会返回另外一个可观察对象,.filter() 过滤后留下 .length 大于等于 2 的值,并将其传给 .next()
  • 咱们经过 .subscribe() 得到了最终的数据值

这短短的几行代码作了这么多的事!若是你还以为弄不清,只须要记住:

每当返回一个新的可观察对象,都会有一个新的观察者挂载到前一个可观察对象上,这样就能经过观察者的“流”进行传值,对观察者生产的值进行处理,而后调用 .next() 方法将处理后的值传递给下一个观察者。

简单来讲,操做符将会不断地依次返回新的可观察对象,让咱们的流可以持续进行。做为用户而言,咱们不须要关心何时、什么状况下须要建立与使用可观察对象与观察者,咱们只须要用咱们的订阅对象进行链式调用就好了。

建立咱们本身的 Observable(可观察对象)

如今,让咱们开始写本身的可观察对象的实现吧。尽管它不会像 Rx 的实现那么高级,但咱们仍是对完善它充满信心。

Observable 构造器

首先,咱们须要建立一个 Observable 构造函数,此构造函数接受且仅接受 subscribe 函数做为其惟一的参数。每一个 Observable 实例都存储 subscribe 属性,稍后能够由观察者对象调用它:

function Observable(subscribe) {
  this.subscribe = subscribe;
}复制代码

每一个分配给 this.subscribesubscribe 回调都将会被咱们或者其它的可观察对象调用。这样咱们下面作的事情就有意义了。

Observer 示例

在深刻探讨实际状况以前,咱们先看一看基础的例子。

如今咱们已经配好了可观察对象函数,能够调用咱们的观察者,将 1 这个值传给它并订阅它:

const one$ = new Observable((observer) => {
  observer.next(1);
  observer.complete();
});

one$.subscribe({
  next: (value) => console.log(value) // 1
});复制代码

咱们订阅了 Observable 实例,将咱们的 observer(对象字面量)传入构造器中(以后它会被分配给 this.subscribe)。

Observable.fromEvent

如今咱们已经完成了建立本身的 Observable 的基础步骤。下一步是为 Observable 添加 static 方法:

Observable.fromEvent = (element, name) => {

};复制代码

咱们将像使用 RxJS 同样使用咱们的 Observable:

const node = document.querySelector('input');

const input$ = Observable.fromEvent(node, 'input');复制代码

这意味着咱们须要返回一个新的 Observable,而后将函数做为参数传递给它:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {

  });
};复制代码

这段代码将咱们的函数传入了构造器中的 this.subscribe。接下来,咱们须要将事件监听设置好:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    element.addEventListener(name, (event) => {}, false);
  });
};复制代码

那么这个 observer 参数是什么呢?它又是从哪里来的呢?

这个 observer 其实就是携带 nexterrorcomplete 的对象字面量。

这块其实颇有意思。observer.subscribe() 被调用以前都不会被传递,所以 addEventListener 在 Observable 被“订阅”以前都不会被执行。

一旦调用 subscribe,也就会调用 Observable 构造器内的 this.subscribe 。它将会调用咱们传入 new Observable(callback) 的 callback,同时也会依次将值传给咱们的观察者。这样,当 Observable 作完一件事的时候,它就会用更新过的值调用咱们观察者中的 .next() 方法。

那么以后呢?咱们已经获得了初始化好的事件监听器,可是尚未调用 .next()。下面完成它:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    element.addEventListener(name, (event) => {
      observer.next(event);
    }, false);
  });
};复制代码

咱们都知道,可观察对象在被销毁前须要一个“处理后事”的函数,在咱们这个例子中,咱们须要移除事件监听:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    const callback = (event) => observer.next(event);
    element.addEventListener(name, callback, false);
    return () => element.removeEventListener(name, callback, false);
  });
};复制代码

由于这个 Observable 还在处理 DOM API 和事件,所以咱们还不会去调用 .complete()。这样在技术上就有无限的可用性。

试一试吧!下面是咱们已经写好的完整代码:

const node = document.querySelector('input');
const p = document.querySelector('p');

function Observable(subscribe) {
  this.subscribe = subscribe;
}

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    const callback = (event) => observer.next(event);
    element.addEventListener(name, callback, false);
    return () => element.removeEventListener(name, callback, false);
  });
};

const input$ = Observable.fromEvent(node, 'input');

const unsubscribe = input$.subscribe({
  next: (event) => {
    p.innerHTML = event.target.value;
  }
});

// 5 秒以后自动取消订阅
setTimeout(unsubscribe, 5000);复制代码

在线示例:

创造咱们本身的 Operator(操做符)

在咱们理解了可观察对象与观察者对象的概念以后,咱们能够更轻松地去创造咱们本身的操做符了。咱们在 Observable 对象原型中加上一个新的方法:

Observable.prototype.map=function(mapFn){

};复制代码

这个方法将会像 JavaScript 中的 Array.prototype.map 同样使用,不过它能够对任何值用:

const input$ = Observable.fromEvent(node, 'input')
    .map(event => event.target.value);复制代码

因此咱们要取得回调函数,并调用它,返回咱们指望获得的数据。在这以前,咱们须要拿到流中最新的数据值。

下面该作什么就比较明了了,咱们要获得调用了这个 .map() 操做符的 Observable 实例的引用入口。咱们是在原型链上编程,所以能够直接这么作:

Observable.prototype.map = function (mapFn) {
  const input = this;
};复制代码

找找乐子吧!如今咱们能够在返回的 Obeservable 中调用 subscribe:

Observable.prototype.map = function (mapFn) {
  const input = this;
  return new Observable((observer) => {
      return input.subscribe();
  });
};复制代码

咱们要返回 input.subscribe() ,由于在咱们退订的时候,非订阅对象将会顺着链一直转下去,解除每一个 Observable 的订阅。

这个订阅对象将容许咱们把以前 Observable.fromEvent 传来的值传递下去,由于它返回了构造器中含有 subscribe 原型的新的 Observable 对象。咱们能够轻松地订阅它对数据值作出的任何更新!最后,完成经过 map 调用咱们的 mapFn() 的功能:

Observable.prototype.map = function (mapFn) {
  const input = this;
  return new Observable((observer) => {
    return input.subscribe({
      next: (value) => observer.next(mapFn(value)),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
};复制代码

如今咱们能够进行链式调用了!

const input$ = Observable.fromEvent(node, 'input')
  .map(event => event.target.value);

input$.subscribe({
  next: (value) => {
    p.innerHTML = value;
  }
});复制代码

注意到最后一个 .subscribe() 再也不和以前同样传入 Event 对象,而是传入了一个 value 了吗?这说明你成功地建立了一个可观察对象流。

再试试:

但愿这篇文章对你来讲还算有趣~:)


掘金翻译计划 是一个翻译优质互联网技术文章的社区,文章来源为 掘金 上的英文分享文章。内容覆盖 AndroidiOSReact前端后端产品设计 等领域,想要查看更多优质译文请持续关注 掘金翻译计划

相关文章
相关标签/搜索