[译]RxJS06——Subject

原文:http://reactivex.io/rxjs/manu...html

Subject是什么? RxJS的Subject是Observable的一个特殊类型,他能够将流中的值广播给众多观察者(Observer)。
通常的Observalbe流是单一广播制(每个订阅流的Observer拥有一个独立的执行过程)。react

一个Subject相似一道Observable数据流,可是能够对多个Observer进行多点广播。这就像事件触发器(EventEmitter):维护了一个侦听器的列表。this

每个Subject就是一个Observable流。 对于给定的Subject,你能够订阅它(subscribe),提供一个Observer,以后将会正常的接收传递来的数据。从Observer的角度来讲,它是没法分辨一个流中的值是来源于单一广播机制的Observable流仍是一个Subject流。rest

在Subject内部,订阅(subscribe)不会引发一个新的接收数据的过程。相似于其余库或语言中的注册事件侦听器(addListener),它会直接把给定的Observer放入到一个注册列表中。code

每个Subject也是一个观察者(Observer)。 拥有next(v)error(e)complete()方法。往Subject中填充数据,只须要调用next(theValue)便可,它将会把数据广播给全部已注册的Observer。 server

如下的例子中,咱们设定了2个订阅Subject流的Observer,而后咱们填充一些数据到Subject:htm

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

获得了以下输出:rxjs

observerA: 1
observerB: 1
observerA: 2
observerB: 2

由于Subject是一个Observer,所以你也能够将它做为任何Observable的subscribe()的参数,订阅这个Observable流,就像下面这样:事件

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // You can subscribe providing a Subject

运行的结果:ip

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

在上面的方法中,咱们使用Subject将一个单点广播的Observable流转换为多点广播。这也佐证了,Subject是能够将任何Observable流共享给多个Observer的惟一途径。

除了Subject,还有一些衍生出的专门的Subject:BehaviorSubject,ReplaySubjectAsyncSubject

多路传播的Observable流 Multicasted Observables

相比于只能推送消息给单个的Observer的“单路Observable流”,利用具备多个订阅者的Subject,“多路传播的Observable流”能够有多个通知通道。

多路传播的Observable在后台经过使用Subject让多个Observers可以从同一个Observable流中获取数据。

在后台,multicast操做符是这样工做的:Obersver订阅潜在的Subject,而Subject又订阅了源Observable流。下面的例子和以前使用observable.subscribe(subject)的状况相似:

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

multicast流返回了一个看似普通的Observable流,可是当订阅的时候他表现的与Subject相似。这个流被称做ConnectableObservable流,本质是一个Observable流,但拥有connect()方法。

connect()在内部执行了source.subscribe(subject),而且返回了一个你能够取消Observable流执行的Subscription。所以,当可被共享的Observable流开始时,connect()方法对于精确的断定执行过程很重要。

引用计数 Reference counting

手动的调用connect()和执行Subscription每每是很累人的。咱们固然但愿能够在第一个Observer订阅的时候就自动的执行connect(),而且最好在最后一个Observer取消订阅(unsubscribe)的时候能自动取消流的执行。

考虑一下,处于下列操做顺序时的表现状况:

  1. 第一个Observer订阅了多路传播的Observable流

  2. 多路传播的Observable流呈被链接状态

  3. 调用next()传0给第一个Observer

  4. 第二个Observer订阅多路传播Observable流

  5. 调用next()传1给第一个Observer

  6. 调用next()传1给第二个Observer

  7. 第一个Observer取消订阅

  8. 调用next()传2给第二个Observer

  9. 第二个Observer取消订阅

  10. 多路传播Observable流的链接状况是未被订阅状态

为了显式的调用connect()实现这个过程,咱们编写以下代码:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

若是咱们想避免显式的调用connect(),咱们可使用ConnectableObservable的refCount()方法(引用计数),他返回了一个存有众多订阅者的Observable流。当订阅者的数量从0增长到1时,将会自动调用connect(),开始共享流。
当订阅者的数量从1变为0,即将处于未订阅状态时,将会自动中止下一步的执行。

refCount使多路传播Observable流在第一个订阅者出现时自动启动,在最后一个订阅者离开时自动中止。

请看下面的例子:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

执行事后的输出是:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

refCount()方法只存在于ConnectableObservable中,他返回一个Observable流,而不是另外一个ConnectableObservable流。

BehaviorSubject

BehaviorSubject是一类特异的Subject。具备返回“当前值”的特性。它存储了流中最新的值并把它推送给本身的用户,不论它的新旧与否,都可以当即收到推送的这个“当前值”。

BehaviorSubject 很是有利于表示“变化中的值”。举例来讲,每一年都有生日是一道Subject数据流,可是一我的的年龄倒是一个BehaviorSubject流。

来看下面的例子,BehaviorSubject以0为值进行初始化,第一个订阅的Observer将会直接收到这个值。当2被填充入流以后,第二个Observer订阅流时,尽管时间较晚,也会收到最新值2。

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

输出以下:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject 很像BehaviorSubject,他会把时间线中较老的值推送给新的订阅者们,并且他还能够记录Observable流中一段时间的值。

ReplaySubject可以记录Observable流中的多个值,并将它们推送给新的订阅者。

建立ReplaySubject时,你能够指定须要回放多少个值,像这样:

var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

输出以下:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

在设定数据量大小以外,你还能够指定一个以毫秒为单位的窗口时间,用来肯定记录的数据所在的时间区间(数据有多老)。
在下面的例子中,咱们使用了一个较大的数据量设定,同时还设定了500毫秒的窗口时间。

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

运行结果显示,第二个Observer在订阅以后,得到了数据流中最后500毫秒事件内产生的3,4和5三个值。

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
/************/
observerB: 3
observerB: 4
observerB: 5
/************/
observerA: 6
observerB: 6
...

AsyncSubject

AsyncSubject是Subject的另外一个变化,他会在流发出complete通知时,将数据流中的最后一个值推送给全部订阅流的Observer。

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

输出为:
With output:

observerA: 5
observerB: 5

AsyncSubject很是相似last()操做符,它会等待complete通知,并在那时推送流中的数据值。

相关文章
相关标签/搜索