DevUI是一支兼具设计视角和工程视角的团队,服务于华为云 DevCloud平台和华为内部数个中后台系统,服务于设计师和前端工程师。
官方网站: devui.design
Ng组件库: ng-devui(欢迎Star)
官方交流群:添加DevUI小助手(微信号:devui-official)
DevUIHelper插件:DevUIHelper-LSP(欢迎Star)
在本文开始以前,先定义一些自定义术语,方便阅读。前端
在上一篇中,我描述了 OuterSubscriber 和 InnerSubscriber 的做用,并将几个 Join Creation Operator 的源码解析了一遍。下面,咱们将进入的是 Transformation Operator 的源码分析。git
在知道了 OuterSubscriber 和 InnerSubscriber 是一种经过委托模式实现管理下游流订阅的方法后,我发现其实这种实现技巧用于不少的 operators。那么本篇及下一篇将会介绍更多这种相似的设计。github
(PS:为了方便描述,我拿了官网的相关的图片,时序图的阅读顺序是从左到右。)typescript
map 是最为基础的映射,他将上游流的值通过运算,转发给下游订阅。这部分源码不是很复杂,实际上就是作了一层转发。segmentfault
protected _next(value: T) { let result: R; try { result = this.project.call(this.thisArg, value, this.count++); } catch (err) { this.destination.error(err); return; } this.destination.next(result); }
Scan 和 的做用跟 reduce 同样,传入一个函数把一组数据打平,可是跟 reduce 不同的点在于,每一次结合完毕都会马上返回结果。缓存
const clicks1 = fromEvent(document, 'click'); const ones1 = clicks.pipe(mapTo(1)); const seed1 = 0; const count1 = ones.pipe( // 输入的是返回任意值的函数 scan((acc, one) => acc + one, seed) ); count.subscribe(x => console.log(x));
这部分的实现一样也不是很复杂,在拿到上游流数据后,使用 accumulator
对数据进行累加操做。微信
protected _next(value: T): void { // 须要判断是否带有初始值。 if (!this.hasSeed) { this.seed = value; this.hasSeed = true; this.destination.next(value); } else { return this._tryNext(value); } } private _tryNext(value: T): void { const index = this.index++; let result: any; try { // 计算结果 result = this.accumulator(<R>this.seed, value, index); } catch (err) { this.destination.error(err); } // 保存,以备下次使用 this.seed = result; this.destination.next(result); }
所谓复合映射,意思就是这些操做符接收的参数是一个带有上游流数据做为参数并返回 Observable 的函数,同时把其中的订阅数据转发给下游订阅。前端工程师
mergeMap,switchMap,exhaustMap,concatMap,mergeScan 是五种复合映射操做符,它使得上游流的数据能够传递给下游流,并交由其处理。 concatMap 和 mergeScan 是 mergeMap 的一种特殊状况,因此咱们只须要关注剩余的三种。数据结构
mergeMap,switchMap,exhaustMap,这三种操做符的源码结构分为这三个部分:并发
其中,前两个部分都拥有很是相似的结构,都是经过这种样板代码来进行编写。
export function someMap<T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O, ): OperatorFunction<T, ObservedValueOf<O> | R> { return (source: Observable<T>) => source.lift(new SomeMapOperator(project)); } class SomeMapOperator<T, R> implements Operator<T, R> { constructor(private project: (value: T, index: number) => ObservableInput<R>) { } call(Subscriber: Subscriber<R>, source: any): any { return source.subscribe(new SomeMapSubscriber(Subscriber, this.project)); } }
经过 _innerSub 提供的内部注册方法,在里面建立 InnerSubscriber,并传入当前的 OuterSubscriber 。
private _innerSub(input: ObservableInput<R>, value: T, index: number): void { const innerSubscriber = new InnerSubscriber(this, value, index); const destination = this.destination as Subscription; destination.add(innerSubscriber); const innerSubscription = subscribeToResult<T, R>(this, input, undefined, undefined, innerSubscriber); // 由于这里的 input 可能不是 observable, 那么返回的 // 订阅结果也可能跟 innserSubscriber 相等,因此这里要 // 处理一下。 if (innerSubscription !== innerSubscriber) { destination.add(innerSubscription); } }
最终,交由 subscribeToResult 建立一个内部订阅来管理下游流。
mergeMap 提供的是一种合并操做,经过在内部维护了多个下游流的订阅,使得上游流能够将数据下发给多个下游流。它提供了一个并发数限制的参数,主要用于控制下游流并发的数量。
export function mergeMap<T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O, concurrent: number = Number.POSITIVE_INFINITY ): OperatorFunction<T, ObservedValueOf<O> | R> { return (source: Observable<T>) => source.lift(new MergeMapOperator(project, concurrent)); }
下面,咱们关注的点将转移到 MergeMapSubscriber 。首先看看它的数据结构。
export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> { // 标记是否已经完成 private hasCompleted: boolean = false; // 上流 observable 的数据缓存 private buffer: T[] = []; // 当前正在开启的流的数量 private active: number = 0; // 数据的索引 protected index: number = 0; constructor( // 外部传入的订阅者 destination: Subscriber<R>, // 须要合并的 Observable 的工厂 private project: (value: T, index: number) => ObservableInput<R>, // 并发数量 private concurrent: number = Number.POSITIVE_INFINITY, ) { super(destination); } ... }
MergeMapSubscriber 的 _next 调用的时候,会比较 active (下游流的数量) 与 concurrent (最大并发数)的大小,active 小于 concurrent 则调用 _tryNext,不然将已经到来的数据放入缓冲区中,可是你知道的, JavaScript 并无真正的并发,这就是一个异步队列。而每一次进行 _tryNext,都会经过 project 来建立一个下游流,同时让更新 active,将下游流传入并触发 _innerSub。
protected _next(value: T): void { if (this.active < this.concurrent) { this._tryNext(value); } else { this.buffer.push(value); } } protected _tryNext(value: T) { let result: ObservableInput<R>; const index = this.index++; try { result = this.project(value, index); } catch (err) { this.destination.error(err); return; } this.active++; // this._innerSub(result, value, index); }
在上游流完成时,会触发 _complete。
protected _complete(): void { this.hasCompleted = true; if (this.active === 0 && this.buffer.length === 0) { this.destination.complete(); } this.unsubscribe(); }
若是全部的下游流都已经完成,且缓冲区中没有数据,则通知下游订阅数据已经输出完毕。
notifyNext 就是单纯的将结果传递给下游订阅,而 notifyComplete 则有意思多了。
经过 notifyComplete ,能够得知哪些流已经完成任务而且关闭。若是 buffer 中存在数据,那么将数据交由 _next 发送出去并建立新的下游流。过这种递归操做,能够将全部 buffer 中的数据都发送出去。最后判断上游流和下游流是否是都已经结束了,若是已经结束了,则通知下游订阅数据已经输出完毕。
notifyNext( outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, R> ): void { this.destination.next(innerValue); } notifyComplete(innerSub: Subscription): void { const buffer = this.buffer; this.remove(innerSub); this.active--; if (buffer.length > 0) { this._next(buffer.shift()); } else if (this.active === 0 && this.hasCompleted) { this.destination.complete(); } }
switchMap 提供的是一个上游流为主的映射操做,当上游流的订阅数据到来的时候,旧的下游流会被取消订阅,而后从新订阅一组新的下游流。
export function switchMap<T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O ): OperatorFunction<T, ObservedValueOf<O>|R> { return (source: Observable<T>) => source.lift(new SwitchMapOperator(project)); }
innerSubscription 保存了当前下游流的订阅,因此这个操做符只会维护一个下游流的订阅。
private index: number = 0; private innerSubscription: Subscription;
当进行 next 操做的时候,会先建立新的下游流,若是旧的下游流存在,那么会被取消订阅。
protected _next(value: T) { let result: ObservableInput<R>; const index = this.index++; try { // 上游流的数据到来了,建立新的下游流。 result = this.project(value, index); } catch (error) { this.destination.error(error); return; } // 旧的下游流取消订阅 const innerSubscription = this.innerSubscription; if (innerSubscription) { innerSubscription.unsubscribe(); } this._innerSub(result, value, index); }
该 Subscriber 重写了_complete 。这里意味着上游流已经输出完毕,那么若是下游订阅
protected _complete(): void { const {innerSubscription} = this; if (!innerSubscription || innerSubscription.closed) { super._complete(); return; } this.unsubscribe(); }
跟以前同样, notifyNext 依旧是将下游流中的数据转发出去。主要关注点仍是在于 notifyComplete。由于 innerSubscription 被置为空了,因此调用 this._complete 无心义,不会触发到其父类函数。
notifyComplete(innerSub: Subscription): void { const destination = this.destination as Subscription; destination.remove(innerSub); this.innerSubscription = null; if (this.isStopped) { super._complete(); } }
若是当前的下游流已经完成了,那么就要将它从下游订阅(destination)中移除,若是上游流已经中止(error 或者 complete 被调用,或者被取消订阅),那么还得调用 super._complete 表示已经完成。
跟 switchMap 相反, exhaustMap 提供了一种如下游流为主的映射操做。若是下游流已经开启,那么上游流以后到来的订阅数据都将会被抛弃,直到该下游流完成订阅。下游流完成订阅后,上游流的数据才会继续跟新的下游流结合,并造成新的订阅。
export function exhaustMap<T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O, ): OperatorFunction<T, ObservedValueOf<O>|R> { return (source: Observable<T>) => source.lift(new ExhaustMapOperator(project)); }
exhaustMap 的实现很简单,经过维护 hasSubscription 这样一个内部状态,标记下游流是否被订阅了。 hasCompleted 则是上游流完成状况的标记。
private hasSubscription = false; private hasCompleted = false;
订阅会调用 _next,标记下游流是否已经开启(订阅是否已经存在),若是未开启,则构建新的下游流,并标记 hasSubscription 为 true。
protected _next(value: T): void { if (!this.hasSubscription) { let result: ObservableInput<R>; const index = this.index++; try { result = this.project(value, index); } catch (err) { this.destination.error(err); return; } // 标记为 true this.hasSubscription = true; this._innerSub(result, value, index); } }
上游流和下游流的数据都已经输出完毕了,那么把完成信号传递给下游订阅。
protected _complete(): void { this.hasCompleted = true; if (!this.hasSubscription) { this.destination.complete(); } this.unsubscribe(); }
若是下游流的数据输出完毕,那么就应该要将 hasSubscription 标记为 false。
notifyComplete(innerSub: Subscription): void { const destination = this.destination as Subscription; destination.remove(innerSub); // 标记为 false this.hasSubscription = false; // 此处判断上游流是否已经完成 if (this.hasCompleted) { this.destination.complete(); } }
concatMap 是 mergeMap 的一种特殊形式。
export function concatMap<T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O, ): OperatorFunction<T, ObservedValueOf<O>|R> { return mergeMap(project, 1); }
mergeScan 的源码跟 mergeMap 相似。只不过就是把传入的函数替换了一下,而且在内部缓存了上一个结合后的值。
const clicks2 = fromEvent(document, 'click'); const ones2 = click$.pipe(mapTo(1)); const seed2 = 0; const count2 = one$.pipe( // 输入一个 Observable 工厂 mergeScan((acc, one) => of(acc + one), seed), );
上一篇中,关于 concat 和 merge 两个相关的 operators 并无讲到,由于这它们其实最终都是调用 mergeMap。
经过这三个不一样的映射操做符,使得上游流能够经过必定的方式跟下游流结合。那么,结合一张图,能够看看相关操做符的关系。
对这些操做符分一下类。
expand 将传入的 Observable 工厂进行递归操做。与上面的复合映射相似,expand 也是一种复合映射,只不过,他会不断的去复合下游流的数据,也就是相似上图的模式。
为了实现相对应的功能,expand 定义了如下数据结构。
export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> { // 当前索引 private index: number = 0; // 已启动的下游流的数量 private active: number = 0; // 上游流是否已经完成 private hasCompleted: boolean = false; // 对于索引的缓存数据 private buffer: any[]; // 下游流工厂 private project: (value: T, index: number) => ObservableInput<R>, // 并发数 private concurrent: number; }
上游流数据到来的时候,跟 mergeMap 比较相似,也会比较 active 和 concurrent,若是 active 大于 concurrent ,那么便会用buffer缓存上游流的数据,若是 active 小于 concurrent ,那么直接发送数据给到下游订阅,并订阅一个新的下游流。须要注意的一点,为了防止爆栈,expand 在这里加了一个判断条件,在 notify 中,将利用这一条件,来结束递归。
protected _next(value: any): void { const destination = this.destination; if (destination.closed) { this._complete(); return; } const index = this.index++; if (this.active < this.concurrent) { destination.next(value); try { const { project } = this; const result = project(value, index); this.subscribeToProjection(result, value, index); } catch (e) { destination.error(e); } } else { this.buffer.push(value); } } // 订阅新的下游流 private subscribeToProjection(result: any, value: T, index: number): void { this.active++; const destination = this.destination as Subscription; destination.add(subscribeToResult<T, R>(this, result, value, index)); }
当上游流完成时,须要标记 hasComplete 为 true。这一步是结束递归的重要标志。
protected _complete(): void { this.hasCompleted = true; if (this.hasCompleted && this.active === 0) { this.destination.complete(); } this.unsubscribe(); }
那么 expand 是怎么构成递归的呢,当下游流有数据到来的时候,他会直接调用 _next。最终造成了 _next -> subscribeToProjection -> next -> notifyNext -> _next
这样的一条递归链。
notifyNext( outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, R> ): void { this._next(innerValue); }
下游流完成时,须要根据 hasCompleted 和 buffer 的状态来决定是否结束递归。在这里,也造成了一条这样的递归链: _next -> subscribeToProjection -> next -> notifyComplete -> _next
。
notifyComplete(innerSub: Subscription): void { const buffer = this.buffer; const destination = this.destination as Subscription; destination.remove(innerSub); this.active--; if (buffer && buffer.length > 0) { this._next(buffer.shift()); } if (this.hasCompleted && this.active === 0) { this.destination.complete(); } }
exhaust 是一种打平操做,它的源码并无调用 exhaustMap。它的实现思路很简单,经过判断当前是否存在前一个下游流订阅(hasSubscription),来决定当前到来的下游流是否开启。
private hasCompleted: boolean = false; private hasSubscription: boolean = false; protected _next(value: T): void { // 若是存在订阅,那么抛弃这个值 if (!this.hasSubscription) { this.hasSubscription = true; this.add(subscribeToResult(this, value)); } } protected _complete(): void { this.hasCompleted = true; if (!this.hasSubscription) { this.destination.complete(); } } notifyComplete(innerSub: Subscription): void { this.remove(innerSub); this.hasSubscription = false; if (this.hasCompleted) { this.destination.complete(); } }
本篇主要的内容集中在分析操做符是如何进行数据的映射,那么下一篇将讲解的是 buffer 和 window 相关的缓存操做符是如何运行和实现的。
咱们是DevUI团队,欢迎来这里和咱们一块儿打造优雅高效的人机设计/研发体系。招聘邮箱:muyang2@huawei.com。
做者:zcx(公众号:Coder写字的地方)
原文连接:https://mp.weixin.qq.com/s/lrawMOuHNj6GyQJMqK1Now
往期文章推荐