RxJS - Subject(转)

Observer Pattern

观察者模式定义

观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知全部的观察者对象,使得它们可以自动更新本身。javascript

咱们可使用平常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系以下:css

  • 期刊出版方 - 负责期刊的出版和发行工做html

  • 订阅者 - 只需执行订阅操做,新版的期刊发布后,就会主动收到通知,若是取消订阅,之后就不会再收到通知java

在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来咱们来看张图,从而加深对上面概念的理解。react

 

观察者模式结构

 

观察者模式实战

Subject 类定义

class Subject { constructor() { this.observerCollection = []; } addObserver(observer) { // 添加观察者 this.observerCollection.push(observer); } deleteObserver(observer) { // 移除观察者 let index = this.observerCollection.indexOf(observer); if(index >= 0) this.observerCollection.splice(index, 1); } notifyObservers() { // 通知观察者 this.observerCollection.forEach((observer)=>observer.notify()); } }

Observer 类定义

class Observer { constructor(name) { this.name = name; } notify() { console.log(`${this.name} has been notified.`); } }

使用示例

let subject = new Subject(); // 建立主题对象 let observer1 = new Observer('semlinker'); // 建立观察者A - 'semlinker' let observer2 = new Observer('lolo'); // 建立观察者B - 'lolo' subject.addObserver(observer1); // 注册观察者A subject.addObserver(observer2); // 注册观察者B subject.notifyObservers(); // 通知观察者 subject.deleteObserver(observer1); // 移除观察者A subject.notifyObservers(); // 验证是否成功移除

以上代码成功运行后控制台的输出结果:es6

semlinker has been notified. lolo has been notified. lolo has been notified.

Observable subscribe

在介绍 RxJS - Subject 以前,咱们先来看个示例:typescript

const interval$ = Rx.Observable.interval(1000).take(3); interval$.subscribe({ next: value => console.log('Observer A get value: ' + value); }); setTimeout(() => { interval$.subscribe({ next: value => console.log('Observer B get value: ' + value); }); }, 1000);

以上代码运行后,控制台的输出结果:shell

Observer A get value: 0 Observer A get value: 1 Observer B get value: 0 Observer A get value: 2 Observer B get value: 1 Observer B get value: 2

经过以上示例,咱们能够得出如下结论:segmentfault

  • Observable 对象能够被重复订阅浏览器

  • Observable 对象每次被订阅后,都会从新执行

上面的示例,咱们能够简单地认为两次调用普通的函数,具体参考如下代码:

function interval() { setInterval(() => console.log('..'), 1000); } interval(); setTimeout(() => { interval(); }, 1000);

Observable 对象的默认行为,适用于大部分场景。但有些时候,咱们会但愿在第二次订阅的时候,不会从头开始接收 Observable 发出的值,而是从第一次订阅当前正在处理的值开始发送,咱们把这种处理方式成为组播 (multicast),那咱们要怎么实现呢 ?回想一下咱们刚才介绍过观察者模式,你脑海中是否是已经想到方案了。没错,咱们能够经过自定义 Subject 来实现上述功能。

自定义 Subject

Subject 类定义

class Subject { constructor() { this.observers = []; } addObserver(observer) { this.observers.push(observer); } next(value) { this.observers.forEach(o => o.next(value)); } error(error){ this.observers.forEach(o => o.error(error)); } complete() { this.observers.forEach(o => o.complete()); } }

使用示例

const interval$ = Rx.Observable.interval(1000).take(3); let subject = new Subject(); let observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.addObserver(observerA); // 添加观察者A interval$.subscribe(subject); // 订阅interval$对象 setTimeout(() => { subject.addObserver(observerB); // 添加观察者B }, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0 Observer A get value: 1 Observer B get value: 1 Observer A get value: 2 Observer B get value: 2 Observer A complete! Observer B complete!

经过自定义 Subject,咱们实现了前面提到的功能。接下来咱们进入正题 - RxJS Subject。

RxJS Subject

首先咱们经过 RxJS Subject 来重写一下上面的示例:

const interval$ = Rx.Observable.interval(1000).take(3); let subject = new Rx.Subject(); let observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); // 添加观察者A interval$.subscribe(subject); // 订阅interval$对象 setTimeout(() => { subject.subscribe(observerB); // 添加观察者B }, 1000);

RxJS Subject 源码片断

/** * Suject继承于Observable */ export class Subject extends Observable { constructor() { super(); this.observers = []; // 观察者列表 this.closed = false; this.isStopped = false; this.hasError = false; this.thrownError = null; } next(value) { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者 copy[i].next(value); } } } error(err) { if (this.closed) { throw new ObjectUnsubscribedError(); } this.hasError = true; this.thrownError = err; this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者error方法 copy[i].error(err); } this.observers.length = 0; } complete() { if (this.closed) { throw new ObjectUnsubscribedError(); } this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者complete方法 copy[i].complete(); } this.observers.length = 0; // 清空内部观察者列表 } }

经过 RxJS Subject 示例和源码片断,对于 Subject 咱们能够得出如下结论:

  • Subject 既是 Observable 对象,又是 Observer 对象

  • 当有新消息时,Subject 会对内部的 observers 列表进行组播 (multicast)

Angular 2 RxJS Subject 应用

在 Angular 2 中,咱们能够利用 RxJS Subject 来实现组件通讯,具体示例以下:

message.service.ts

import { Injectable } from '@angular/core'; import {Observable} from 'rxjs/Observable'; import { Subject } from 'rxjs/Subject'; @Injectable() export class MessageService { private subject = new Subject<any>(); sendMessage(message: string) { this.subject.next({ text: message }); } clearMessage() { this.subject.next(); } getMessage(): Observable<any> { return this.subject.asObservable(); } }

home.component.ts

import { Component } from '@angular/core'; import { MessageService } from '../_services/index'; @Component({ moduleId: module.id, templateUrl: 'home.component.html' }) export class HomeComponent { constructor(private messageService: MessageService) {} sendMessage(): void { // 发送消息 this.messageService.sendMessage('Message from Home Component to App Component!'); } clearMessage(): void { // 清除消息 this.messageService.clearMessage(); } }

app.component.ts

import { Component, OnDestroy } from '@angular/core'; import { Subscription } from 'rxjs/Subscription'; import { MessageService } from './_services/index'; @Component({ moduleId: module.id, selector: 'app', templateUrl: 'app.component.html' }) export class AppComponent implements OnDestroy { message: any; subscription: Subscription; constructor(private messageService: MessageService) { this.subscription = this.messageService.getMessage() .subscribe(message => { this.message = message; }); } ngOnDestroy() { this.subscription.unsubscribe(); } }

以上示例实现的功能是组件之间消息通讯,即 HomeComponent 子组件,向 AppComponent 父组件发送消息。代码运行后,浏览器的显示结果以下:

 

Plunker 示例

Subject 存在的问题

由于 Subject 在订阅时,是把 observer 存放到观察者列表中,并在接收到新值的时候,遍历观察者列表并调用观察者上的 next 方法,具体以下:

next(value) { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者 copy[i].next(value); } } }

这样会有一个大问题,若是某个 observer 在执行时出现异常,却没进行异常处理,就会影响到其它的订阅者,具体示例以下:

const source = Rx.Observable.interval(1000); const subject = new Rx.Subject(); const example = subject.map(x => { if (x === 1) { throw new Error('oops'); } return x; }); subject.subscribe(x => console.log('A', x)); example.subscribe(x => console.log('B', x)); subject.subscribe(x => console.log('C', x)); source.subscribe(subject);

以上代码运行后,控制台的输出结果:

A 0 B 0 C 0 A 1 Rx.min.js:74 Uncaught Error: oops

JSBin - Subject Problem Demo

在代码运行前,你们会认为观察者B 会在接收到 1 值时抛出异常,观察者 A 和 C 仍会正常运行。但实际上,在当前的 RxJS 版本中若观察者 B 报错,观察者 A 和 C 也会中止运行。那么应该如何解决这个问题呢?目前最简单的方式就是为全部的观察者添加异常处理,更新后的代码以下:

const source = Rx.Observable.interval(1000); const subject = new Rx.Subject(); const example = subject.map(x => { if (x === 1) { throw new Error('oops'); } return x; }); subject.subscribe( x => console.log('A', x), error => console.log('A Error:' + error) ); example.subscribe( x => console.log('B', x), error => console.log('B Error:' + error) ); subject.subscribe( x => console.log('C', x), error => console.log('C Error:' + error) ); source.subscribe(subject);

JSBin - RxJS Subject Problem Solved Demo

RxJS Subject & Observable

Subject 实际上是观察者模式的实现,因此当观察者订阅 Subject 对象时,Subject 对象会把订阅者添加到观察者列表中,每当有 subject 对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next() 方法,把值一一送出。

Subject 之因此具备 Observable 中的全部方法,是由于 Subject 类继承了 Observable 类,在 Subject 类中有五个重要的方法:

  • next - 每当 Subject 对象接收到新值的时候,next 方法会被调用

  • error - 运行中出现异常,error 方法会被调用

  • complete - Subject 订阅的 Observable 对象结束后,complete 方法会被调用

  • subscribe - 添加观察者

  • unsubscribe - 取消订阅 (设置终止标识符、清空观察者列表)

BehaviorSubject

BehaviorSubject 定义

BehaviorSubject 源码片断

export class BehaviorSubject extends Subject { constructor(_value) { // 设置初始值 super(); this._value = _value; } get value() { // 获取当前值 return this.getValue(); } _subscribe(subscriber) { const subscription = super._subscribe(subscriber); if (subscription && !subscription.closed) { subscriber.next(this._value); // 为新的订阅者发送当前最新的值 } return subscription; } getValue() { if (this.hasError) { throw this.thrownError; } else if (this.closed) { throw new ObjectUnsubscribedError(); } else { return this._value; } } next(value) { // 调用父类Subject的next方法,同时更新当前值 super.next(this._value = value); } }

BehaviorSubject 应用

有些时候咱们会但愿 Subject 能保存当前的最新状态,而不是单纯的进行事件发送,也就是说每当新增一个观察者的时候,咱们但愿 Subject 可以当即发出当前最新的值,而不是没有任何响应。具体咱们先看一下示例:

var subject = new Rx.Subject(); var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒后订阅 }, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 1 Observer A get value: 2 Observer A get value: 3

经过输出结果,咱们发如今 observerB 订阅 Subject 对象后,它再也没有收到任何值了。由于 Subject 对象没有再调用 next() 方法。但不少时候咱们会但愿 Subject 对象可以保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。要实现这个功能,咱们就须要使用 BehaviorSubject。

BehaviorSubject 跟 Subject 最大的不一样就是 BehaviorSubject 是用来保存当前最新的值,而不是单纯的发送事件。BehaviorSubject 会记住最近一次发送的值,并把该值做为当前值保存在内部的属性中。接下来咱们来使用 BehaviorSubject 从新一下上面的示例:

var subject = new Rx.BehaviorSubject(0); // 设定初始值 var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒后订阅 }, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0 Observer A get value: 1 Observer A get value: 2 Observer A get value: 3 Observer B get value: 3

JSBin - BehaviorSubject

ReplaySubject

ReplaySubject 定义

ReplaySubject 源码片断

export class ReplaySubject extends Subject { constructor(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) { super(); this.scheduler = scheduler; this._events = []; // ReplayEvent对象列表 this._bufferSize = bufferSize < 1 ? 1 : bufferSize; // 设置缓冲区大小 this._windowTime = windowTime < 1 ? 1 : windowTime; } next(value) { const now = this._getNow(); this._events.push(new ReplayEvent(now, value)); this._trimBufferThenGetEvents(); super.next(value); } _subscribe(subscriber) { const _events = this._trimBufferThenGetEvents(); // 过滤ReplayEvent对象列表 let subscription; if (this.closed) { throw new ObjectUnsubscribedError(); } ... else { this.observers.push(subscriber); subscription = new SubjectSubscription(this, subscriber); } ... const len = _events.length; // 从新发送设定的最后bufferSize个值 for (let i = 0; i < len && !subscriber.closed; i++) { subscriber.next(_events[i].value); } ... return subscription; } } class ReplayEvent { constructor(time, value) { this.time = time; this.value = value; } }

ReplaySubject 应用

有些时候咱们但愿在 Subject 新增订阅者后,能向新增的订阅者从新发送最后几个值,这时咱们就可使用 ReplaySubject ,具体示例以下:

var subject = new Rx.ReplaySubject(2); // 从新发送最后2个值 var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒后订阅 }, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 1 Observer A get value: 2 Observer A get value: 3 Observer B get value: 2 Observer B get value: 3

可能会有人认为 ReplaySubject(1) 是否是等同于 BehaviorSubject,其实它们是不同的。在建立BehaviorSubject 对象时,是设置初始值,它用于表示 Subject 对象当前的状态,而 ReplaySubject 只是事件的重放。

JSBin - ReplaySubject

AsyncSubject

AsyncSubject 定义

AsyncSubject 源码片断

export class AsyncSubject extends Subject { constructor() { super(...arguments); this.value = null; this.hasNext = false; this.hasCompleted = false; // 标识是否已完成 } _subscribe(subscriber) { if (this.hasError) { subscriber.error(this.thrownError); return Subscription.EMPTY; } else if (this.hasCompleted && this.hasNext) { // 等到完成后,才发出最后的值 subscriber.next(this.value); subscriber.complete(); return Subscription.EMPTY; } return super._subscribe(subscriber); } next(value) { if (!this.hasCompleted) { // 若未完成,保存当前的值 this.value = value; this.hasNext = true; } } }

AsyncSubject 应用

AsyncSubject 相似于 last 操做符,它会在 Subject 结束后发出最后一个值,具体示例以下:

var subject = new Rx.AsyncSubject(); var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); subject.complete(); setTimeout(() => { subject.subscribe(observerB); // 1秒后订阅 }, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 3 Observer A complete! Observer B get value: 3 Observer B complete!

JSBin - AsyncSubject

 

原文连接:http://www.javashuo.com/article/p-kqylacgc-bm.html

参考资源

相关文章
相关标签/搜索