原文连接:RxJS: Managing Operator State
原文做者:Nicholas Jamieson;发表于2019年2月12日
译者:yk;如需转载,请注明出处,谢谢合做!git
摄影:Victoire Joncheray,来自 Unsplashgithub
在 RxJS 5.5 引入了管道操做符(pipeable operators)以后,编写用户级(userland)操做符变得更为简单了。typescript
管道操做符属于高阶函数(higher-order function):即返回值为函数的函数。所返回的函数接受一个 observable(可观察对象)做为参数,并返回一个 observable。因此,要建立一个操做符,你没必要划分 Operator
和 Subscriber
,只要写一个函数就好了。shell
这听起来很简单。api
然而在某些状况下你须要格外当心,尤为是当你的操做符在存储内部状态时更应如此。app
让咱们来看这样一个例子:一个将接收到的数据及其索引显示到终端的 debug
操做符。函数
咱们的操做符须要维护一些内部状态:索引——每收到一次 next
通知时就会递增。有个很戆的办法是直接将状态存储在操做符的内部,就像这样:ui
import { MonoTypeOperatorFunction } from "rxjs";
import { tap } from "rxjs/operators";
export function debug<T>(): MonoTypeOperatorFunction<T> {
let index = -1;
// 让咱们假设不存在 map 操做符,因而咱们只能用 tap 来维护内部存储中的索引
// 该操做符的目的是为了代表:运行结果取决于状态存储的位置
return tap(t => console.log(`[${++index}]: ${t}`));
}
复制代码
该办法会存在许多问题,并致使一些意料以外的行为和难以定位的 bug。spa
第一个问题是:咱们的操做符不具备引用透明(referentially transparent)性。当一个函数的返回值能够替代该函数而不影响程序运行,那么咱们称这个函数是引用透明的。debug
让咱们来看看当这个操做符的返回值与多个 observables 进行组合时会发生什么:
import { range } from "rxjs";
import { debug } from "./debug";
const op = debug();
console.log("first use:");
range(1, 2).pipe(op).subscribe();
console.log("second use:");
range(1, 2).pipe(op).subscribe();
复制代码
运行结果为:
first use:
[0] 1
[1] 2
second use:
[2] 1
[3] 2
复制代码
好吧,我知道这很使人意外。在第二个 observable 中,索引并无从 0 开始计数。
第二个问题是:只有在首次订阅该操做符返回的 observable 时,其行为才会是合理的。
如今,让咱们屡次订阅由 debug
操做符组成的 observable,看看会发生什么:
import { range } from "rxjs";
import { debug } from "./debug";
const source = range(1, 2).pipe(debug());
console.log("first use:");
source.subscribe();
console.log("second use:");
source.subscribe();
复制代码
运行结果为:
first use:
[0] 1
[1] 2
second use:
[2] 1
[3] 2
复制代码
仍是一样使人意外的结果:在第二次订阅中,索引依旧没有从 0 开始计数。
因此该如何解决这些问题呢?
这两个问题均可以经过基于每一个订阅的状态存储(storing the state on a per-subscription basis)来解决。如下是几种实现方法:
第一种方法是使用 Observable
的构造函数来建立操做符返回值(observable)。若是将 index
变量放入传给 constructor
的函数中,那么每次订阅的状态都会被独立存储。写法以下:
import { MonoTypeOperatorFunction, Observable } from "rxjs";
import { tap } from "rxjs/operators";
export function debug<T>(): MonoTypeOperatorFunction<T> {
return source => new Observable<T>(subscriber => {
let index = -1;
return source.pipe(
tap(t => console.log(`[${++index}]: ${t}`))
).subscribe(subscriber);
});
}
复制代码
第二种方法,也是我比较喜欢的,就是使用 defer
来实现基于每一个订阅的状态存储。若是将 index
变量放入传给 defer
的工厂函数中,它就能够按每一个订阅独立存储状态。写法以下:
译者注:
defer()
的参数为一个返回值为 observable 的工厂函数observableFactory
,详见文档。
import { defer, MonoTypeOperatorFunction } from "rxjs";
import { tap } from "rxjs/operators";
export function debug<T>(): MonoTypeOperatorFunction<T> {
return source => defer(() => {
let index = -1;
return source.pipe(
tap(t => console.log(`[${++index}]: ${t}`))
);
});
}
复制代码
还有个较为复杂的方法,就是使用 scan
操做符。scan
会维护每一个订阅的状态,该状态由 seed
参数进行初始化,而后经过 accumulator
(累加器)函数计算并返回结果。在本例中,index
能够像这样存储在 scan
中:
译者注:
accumulator
和seed
为scan()
的两个参数,详见文档。
import { MonoTypeOperatorFunction } from "rxjs";
import { map, scan } from "rxjs/operators";
export function debug<T>(): MonoTypeOperatorFunction<T> {
return source => source.pipe(
scan<T, [T, number]>(([, index], t) => [t, index + 1], [undefined!, -1]), map(([t, index]) => (console.log(`[${index}]: ${t}`), t)) ); } 复制代码
若是用以上任意一种方法来代替一开始那个很戆的办法,输出将会是下面这样:
first use:
[0] 1
[1] 2
second use:
[0] 1
[1] 2
复制代码
如你所愿:一切都在乎料之中。