RxJS 核心概念之Subject

什么是Subject? 在RxJS中,Subject是一类特殊的Observable,它能够向多个Observer多路推送数值。普通的Observable并不具有多路推送的能力(每个Observer都有本身独立的执行环境),而Subject能够共享一个执行环境。html

Subject是一种能够多路推送的可观察对象。与EventEmitter相似,Subject维护着本身的Observer。react

每个Subject都是一个Observable(可观察对象) 对于一个Subject,你能够订阅(subscribe)它,Observer会和往常同样接收到数据。从Observer的视角看,它并不能区分本身的执行环境是普通Observable的单路推送仍是基于Subject的多路推送。es6

Subject的内部实现中,并不会在被订阅(subscribe)后建立新的执行环境。它仅仅会把新的Observer注册在由它自己维护的Observer列表中,这和其余语言、库中的addListener机制相似。code

每个Subject也能够做为Observer(观察者) Subject一样也是一个由next(v)error(e),和 complete()这些方法组成的对象。调用next(theValue)方法后,Subject会向全部已经在其上注册的Observer多路推送theValueserver

下面的例子中,咱们在Subject上注册了两个Observer,而且多路推送了一些数值:htm

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

控制台输出结果以下:对象

observerA: 1
observerB: 1
observerA: 2
observerB: 2

既然Subject是一个Observer,你能够把它做为subscribe(订阅)普通Observable时的参数,以下面例子所示:rxjs

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你能够传递Subject来订阅observable

执行后结果以下:ip

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

经过上面的实现:咱们发现能够经过Subject将普通的Observable单路推送转换为多路推送。这说明了Subject的做用——做为单路Observable转变为多路Observable的桥梁。get

还有几种特殊的Subject 类型,分别是BehaviorSubjectReplaySubject,和 AsyncSubject

多路推送的Observable

在之后的语境中,每当提到“多路推送的Observable”,咱们特指经过Subject构建的Observable执行环境。不然“普通的Observable”只是一个不会共享执行环境而且被订阅后才生效的一系列值。

经过使用Subject能够建立拥有相同执行环境的多路的Observable。

下面展现了多路的运做方式:Subject从普通的Observable订阅了数据,而后其余Observer又订阅了这个Subject,示例以下:

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// 经过`subject.subscribe({...})`订阅Subject的Observer:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// 让Subject从数据源订阅开始生效:
multicasted.connect();

multicast方法返回一个相似于Observable的可观察对象,可是在其被订阅后,它会表现Subject的特性。 multicast 返回的对象同时是ConnectableObservable类型的,拥有connect() 方法。

connect()方法很是的重要,它决定Observable什么时候开始执行。因为调用connect()后,Observable开始执行,所以,connect()会返回一个Subscription供调用者来终止执行。

引用计数

经过手动调用connect()返回的Subscription控制执行十分繁杂。一般,咱们但愿在有第一个Observer订阅Subject后自动connnect,当全部Observer都取消订阅后终止这个Subject。

咱们来分析一下下面例子中subscription的过程:

  1. 第一个Observer 订阅了多路推送的 Observable

  2. 多路Observable被链接

  3. 向第一个Observer发送 值为0next通知

  4. 第二个Observer订阅了多路推送的 Observable

  5. 向第一个Observer发送 值为1next通知

  6. 向第二个Observer发送 值为1next通知

  7. 第一个Observer取消了对多路推送的Observable的订阅

  8. 向第二个Observer发送 值为2next通知

  9. 第二个Observer取消了对多路推送的Observable的订阅

  10. 取消对多路推送的Observable的链接

经过显式地调用connect(),代码以下:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); 
}, 2000);

若是你不想显式地调用connect()方法,能够在ConnectableObservable类型的Observable上调用refCount()方法。方法会进行引用计数:记录Observable被订阅的行为。当订阅数从 01refCount() 会调用connect() 方法。到订阅数从10,他会终止整个执行过程。

refCount 使得多路推送的Observable在被订阅后自动执行,在全部观察者取消订阅后,中止执行。

下面是示例:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

执行输出结果以下:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

只有ConnectableObservables拥有refCount()方法,调用后会返回一个Observable而不是新的ConnectableObservable。

BehaviorSubject

BehaviorSubject是Subject的一个衍生类,具备“最新的值”的概念。它老是保存最近向数据消费者发送的值,当一个Observer订阅后,它会即刻从BehaviorSubject收到“最新的值”。

BehaviorSubjects很是适于表示“随时间推移的值”。举一个形象的例子,Subject表示一我的的生日,而Behavior则表示一我的的岁数。(生日只是一天,一我的的岁数会保持到下一次生日以前。)

下面例子中,展现了如何用 0初始化BehaviorSubject,当Observer订阅它时,0是第一个被推送的值。紧接着,在第二个Observer订阅BehaviorSubject以前,它推送了2,虽然订阅在推送2以后,可是第二个Observer仍然能接受到2

var subject = new Rx.BehaviorSubject(0 /* 初始值 */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

输出结果以下:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject 如同于BehaviorSubjectSubject 的子类。经过 ReplaySubject能够向新的订阅者推送旧数值,就像一个录像机ReplaySubject能够记录Observable的一部分状态(过去时间内推送的值)。

.一个ReplaySubject能够记录Observable执行过程当中推送的多个值,并向新的订阅者回放它们。

你能够指定回放值的数量:

var subject = new Rx.ReplaySubject(3 /* 回放数量 */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

输出以下:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

除了回放数量,你也能够以毫秒为单位去指定“窗口时间”,决定ReplaySubject记录多久之前Observable推送的数值。下面的例子中,咱们把回放数量设置为100,把窗口时间设置为500毫秒:

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

第二个Observer接受到3(600ms), 4(800ms) 和 5(1000ms),这些值均在订阅以前的500毫秒内推送(窗口长度 1000ms - 600ms = 400ms < 500ms):

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

AsyncSubject

AsyncSubject是Subject的另一个衍生类,Observable仅会在执行完成后,推送执行环境中的最后一个值。

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

输出结果以下:

observerA: 5
observerB: 5

AsyncSubject 与 last() 操做符类似,等待完成通知后推送执行过程的最后一个值。

相关文章
相关标签/搜索