原文连接: netbasal.com/rxjs-subjec…javascript
本文为 RxJS 中文社区 翻译文章,如需转载,请注明出处,谢谢合做!java
若是你也想和咱们一块儿,翻译更多优质的 RxJS 文章以奉献给你们,请点击【这里】git
我已经发表过一篇关于 Subject 的文章 (中文),但此次我想尝试一种不一样的方式。github
要理解 Subject
是什么的最简单的方式就是从新建立一个。咱们来建立一个简易版的 Subject
。api
注意: 下面的示例只是为了阐述概念,还不足以应用于实际开发之中,还有它们并非 Rx 中 Subjects 的真正完整实现。数组
咱们来看看真相。ide
Subject 既是 Observable,又是 Observer 。ui
这表示它拥有全部的操做符 (map
、filter
,等等) 而且你能够订阅它。this
class MySubject extends Rx.Observable {
constructor() {
super();
}
}
复制代码
这是第一部分所需的一切了。它能够经过扩展 Observable
类成为 Observable
。spa
这表示它必须实现 next()
,error()
和 complete()
方法。
class MySubject extends Rx.Observable {
constructor() {
super();
}
next() {}
error() {}
complete() {}
}
复制代码
好了,咱们来看下一个真相。
Subject 能够扮演源 observable 和 众多观察者之间的桥梁或代理,使得多个观察者能够共享同一个 observable 执行。
class MySubject extends Rx.Observable {
constructor() {
super();
this.observers = [];
}
subscribe(observer) {
this.observers.push(observer);
}
next(value) {
this.observers.forEach(observer => observer.next(value));
}
error(error) {
this.observers.forEach(observer => observer.error(error));
}
complete() {
this.observers.forEach(observer => observer.complete());
}
}
复制代码
当你调用 subscribe()
方法时,仅仅是将 observer
添加到一个数组中。next()
、error()
和 completed()
方法会调用数组中每一个 observer
的对应方法。
来使用咱们的 Subject 。
const interval$ = Rx.Observable.interval(1000).take(7);
const subject = new MySubject();
subject.map(value => `Observer one ${value}`).subscribe(value => {
console.log(value);
});
interval$.subscribe(subject);
setTimeout(() => {
subject.map(value => `Observer two ${value}`).subscribe(value => {
console.log(value);
});
}, 2000);
复制代码
当使用 Subject
时,不管你什么时候 subscribe
, 你永远都会获得相同的执行,这点不一样于典型的 observable,每次 subscribe
都会开启有个新的执行。(在咱们的案例中,这表示你会有两个不相关的 intervals)
Subject 让你同享相同的 observable 执行
咱们来总结一下这里发生了什么。
当对 subject 调用 subscribe
时,只是将 observer
添加到数组中。
当 subject
扮演 observer
时,每当源 observable (在咱们的案例中是指 interval
) 发出值时,它会调用数组中每一个 observer
的 next()
方法。
如今让咱们来尝试实现 BehaviorSubject
的简易版。
咱们来看看真相。
BehaviorSubject
须要一个初始值,由于它必须始终返回一个订阅值,即便它还没接收到 next()
调用。getValue()
方法来获取 subject 的最新值。class MyBehaviorSubject extends Rx.Observable {
constructor(initialValue) {
super();
this.observers = [];
if (typeof initialValue === 'undefined') {
throw new Error('You need to provide initial value');
}
this.lastValue = initialValue;
}
subscribe(observer) {
this.observers.push(observer);
observer.next(this.lastValue);
}
next(value) {
this.lastValue = value;
this.observers.forEach(observer => observer.next(value));
}
getValue() {
return this.lastValue;
}
}
复制代码
来使用咱们的 BehaviorSubject
。
const subject = new MyBehaviorSubject('initialValue');
subject.map(value => `Observer one ${value}`).subscribe(function(value) {
console.log(value);
});
subject.next('New value');
setTimeout(() => {
subject.map(value => `Observer two ${value}`).subscribe(function(value) {
console.log(value);
});
}, 2000);
复制代码
如今让咱们来尝试实现 ReplaySubject
的简易版。
咱们来看看真相.
ReplaySubject
表示一个对象既是 observable 序列,又是 observer 。class MyReplaySubject extends Rx.Observable {
constructor(bufferSize) {
super();
this.observers = [];
this.bufferSize = bufferSize;
this.lastValues = [];
}
subscribe(observer) {
this.lastValues.forEach(val => observer.next(val));
this.observers.push(observer);
}
next(value) {
if (this.lastValues.length === this.bufferSize) {
this.lastValues.shift();
}
this.lastValues.push(value);
this.observers.forEach(observer => observer.next(value));
}
}
复制代码
来使用咱们的 ReplaySubject
。
const subject = new MyReplaySubject(3);
subject.next('One');
subject.next('Two');
subject.next('Three');
subject.next('Four');
setTimeout(() => {
subject.map(value => `Later Observer ${value}`).subscribe(function(value) {
console.log(value);
});
}, 2000);
复制代码
ReplaySubject
、BehaviorSubject
?next()
、error()
和 completed()
方法。