DevUI是一支兼具设计视角和工程视角的团队,服务于华为云 DevCloud平台和华为内部数个中后台系统,服务于设计师和前端工程师。
官方网站: devui.design
Ng组件库: ng-devui(欢迎Star)
官方交流群:添加DevUI小助手(微信号:devui-official)进群
DevUIHelper插件:DevUIHelper-LSP(欢迎Star)
本篇是 RxJS 源码解析的第四篇文章,使用源码的版本是 6.6.0 。本篇文章的内容仍然可能会比较多,请耐心阅读。为了方便阅读,文中的相关代码均通过裁剪和处理。若有不妥,还请指正。前端
在本文开始以前,先定义一些自定义术语,方便阅读。git
我并不打算像上一篇那样,抓着几个操做符一顿输出。从这篇开始,不管是 Join Operator、仍是 Transformation Operator,都有很大的规律性。因此我想先总结出来它们的规律,再来对 operator 进行分析。github
为了让操做符能够控制下游流,RxJS 经过委托模式,让操做符生成的了一个特定的 Subscriber,它在内部就能拿到全部传入的下游流的订阅。所以,在这里先介绍两个 Subscriber: OuterSubscriber 和 InnerSubscriber 。typescript
其内部实现实际上就是把 InnerSubscriber 的 next,error,complete 转发给 OuterSubscriber 。segmentfault
export class InnerSubscriber<T, R> extends Subscriber<R> { private index = 0; constructor(private parent: OuterSubscriber<T, R>, public outerValue: T, public outerIndex: number) { super(); } protected _next(value: R): void { this.parent.notifyNext(this.outerValue, value, this.outerIndex, this.index++, this); } protected _error(error: any): void { this.parent.notifyError(error, this); this.unsubscribe(); } protected _complete(): void { this.parent.notifyComplete(this); this.unsubscribe(); } }
而 OuterSubscriber 的默认实现则是将数据交由终结订阅转发出去。数组
export class OuterSubscriber<T, R> extends Subscriber<T> { notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, R>): void { this.destination.next(innerValue); } notifyError(error: any, innerSub: InnerSubscriber<T, R>): void { this.destination.error(error); } notifyComplete(innerSub: InnerSubscriber<T, R>): void { this.destination.complete(); } }
不一样的操做符可能会要生成不一样的 Subscriber,而生成这些 Subscriber 都会调用 subscribeToResult。这个函数会根据传入的 ObservableInput ,进行类型判断,并返回一个正确处理后的订阅。这里为了能够复用,就调用了以前 from 也使用过的 subscribeTo ,在这个函数中,会处理列表、Promise、以及生成器等数据并返回一个订阅。缓存
export function subscribeToResult<T, R>( outerSubscriber: OuterSubscriber<T, R>, result: any, outerValue?: T, outerIndex: number = 0, innerSubscriber: Subscriber<R> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex) ): Subscription | undefined { if (innerSubscriber.closed) { return undefined; } if (result instanceof Observable) { return result.subscribe(innerSubscriber); } return subscribeTo(result)(innerSubscriber) as Subscription; }
经过这种设计,使得生成的 Subscriber 拥有控制下游流状态的能力。这种能力可使得数据装箱和拆箱都放在同一个 Subscriber 中,同时这样作也把反作用集中在一个订阅器中处理,使得操做符在表现上像纯函数同样。微信
下面以及下一篇的内容中,会出现大量 subscribeToResult ,咱们只须要知道,这个函数将订阅的数据或信息转发到了 OuterSubscriber 的相关接口中,它的功能再也不赘述。前端工程师
最后,咱们仍是要回归到 operators 的源码分析上。由于总体规律和设计已经了解完毕,那么分析每个 operator 的时候,也能经过这些规律来理解某一部分的 operators 为何要这样设计。并发
在这里,咱们继续沿着上一篇的内容,先分析 Join Creation Operators。
所谓 race,意味着全部的流都在进行一场赛跑,跑赢的流能够留下并继续发送数据,没跑赢的只能取消订阅。
const first = interval(1000).pipe(take(1), mapTo('first')); const second = interval(2000).pipe(take(1), mapTo('second')); const race$ = race(first, second); race$.subscribe((v) => console.log(v)); // 打印结果 // first
race 经过 fromArray 的方式,将输入的 Observable 交由内部订阅器来处理。
export function race<T>(...observables: ObservableInput<any>[]): Observable<T> { return fromArray(observables).lift(new RaceOperator<T>()); }
RaceSubscriber 保存了这么几个状态。
private hasFirst: boolean = false; private observables: Observable<any>[] = []; private subscriptions: Subscription[] = [];
订阅后上游流输出 Observable 会由 observables 缓存起来,然后在上游流输出完成时,对他们进行订阅,并保存订阅对象。
protected _complete() { const observables = this.observables; const len = observables.length; if (len === 0) { this.destination.complete(); } else { for (let i = 0; i < len && !this.hasFirst; i++) { let observable = observables[i]; let subscription = subscribeToResult(this, observable, observable as any, i); if (this.subscriptions) { this.subscriptions.push(subscription); } this.add(subscription); } this.observables = null; } }
在 notifyNext 中,RaceSubscriber 能够获取下游流的订阅数据。并对 hasFirst 进行判断。若是该数据是第一个到达,更新 hasFirst 状态,并将其他下游流的订阅取消,这样作的目的是为了只让这个下游流的数据发送给终结订阅。
notifyNext( outerValue: T, innerValue: T, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, T> ): void { if (!this.hasFirst) { // 更新状态 this.hasFirst = true; // for (let i = 0; i < this.subscriptions.length; i++) { if (i !== outerIndex) { let subscription = this.subscriptions[i]; subscription.unsubscribe(); this.remove(subscription); } } this.subscriptions = null; } this.destination.next(innerValue); }
zip 是这样的一种操做符,它如下游流中数据量最少的流为基准,按照前后顺序与其他的下游流结合成新的流。
let age$ = of<number>(27, 25, 29, 30, 35, 40); let name$ = of<string>('Foo', 'Bar', 'Beer'); let isDev$ = of<boolean>(true, true); zip(age$, name$, isDev$).pipe( map(([age, name, isDev]) => ({ age, name, isDev })), ) .subscribe(x => console.log(x)); // outputs // { age: 27, name: 'Foo', isDev: true } // { age: 25, name: 'Bar', isDev: true }
zip 也同样,经过 fromArray 的方式,将输入内容交由内部订阅器处理。
export function zip<O extends ObservableInput<any>, R>( ...observables: O[] ): Observable<ObservedValueOf<O>[]|R> { // 经过 fromArray 将传入的参数以流的形式进入到订阅中 return fromArray(observables, undefined).lift(new ZipOperator()); }
订阅开始,生成 ZipSubscriber,调用 _next。根据输入流的类型,将其传入到不一样的迭代器中,输入的流的数据类型包含了如下几种:
protected _next(value: any) { const iterators = this.iterators; if (isArray(value)) { iterators.push(new StaticArrayIterator(value)); } else if (typeof value[Symbol_iterator] === 'function') { iterators.push(new StaticIterator(value[Symbol_iterator]())); } else { iterators.push(new ZipBufferIterator(this.destination, this, value)); } }
相较于 静态数据而言,Observable 才是咱们关注的重点。在前面已经讲过 OuterSubscriber 的做用,我在这里再也不赘述。 ZipBufferIterator
经过继承 OuterSubscriber,并实现了相应的操做,而后维护了这些 Observable 并进行订阅。
在 zip 中,上游流为 fromArray 生成的 Observable。当它完成时,会把 next 中存储的迭代器进行循环调用。在 next 的时候咱们能够看到,会生成与 ObservableInput 相对应的内容 ,的内部若是实现了订阅功能,那么就订阅这些迭代器,不然,直接按照静态处理。
protected _complete() { const iterators = this.iterators; const len = iterators.length; this.unsubscribe(); if (len === 0) { this.destination.complete(); return; } this.active = len; for (let i = 0; i < len; i++) { let iterator: ZipBufferIterator<any, any> = <any>iterators[i]; if (iterator.stillUnsubscribed) { const destination = this.destination as Subscription; // 持有并管理该迭代器的订阅 destination.add(iterator.subscribe(iterator, i)); } else { // 不是 Observable this.active--; } } }
ZipBufferIterator
继承了 OuterSubscriber ,那么它确定也是经过内部维护一个 InnerSubscriber 来将下游流中的数据转发出去。
class ZipBufferIterator<T, R> extends OuterSubscriber<T, R> implements LookAheadIterator<T> { ... subscribe(value: any, index: number) { const subscriber = new InnerSubscriber(this, index as any, undefined); return subscribeToResult<any, any>(this, this.observable, undefined, undefined, subscriber); } ... }
ZipBufferIterator
其内部维护了 InnerSubscriber ,那么意味着数据会由发送到 notifyNext 中,这里使用了一个数组将数据缓存起来。
notifyNext(outerValue: T, innerValue: any, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, R>): void { this.buffer.push(innerValue); this.parent.checkIterators(); }
然后,会调用 ZipSubscriber.checkIterators, 这个方法决定了终结订阅的数据来源,同时也给出了终结订阅完成所须要的条件。
checkIterators() { const iterators = this.iterators; const len = iterators.length; const destination = this.destination; // 是否是全部的迭代器都存在数据。 for (let i = 0; i < len; i++) { let iterator = iterators[i]; if (typeof iterator.hasValue === 'function' && !iterator.hasValue()) { return; } } let shouldComplete = false; // 终结订阅最终拿到的数据 const args: any[] = []; for (let i = 0; i < len; i++) { let iterator = iterators[i]; let result = iterator.next(); // 判断迭代器是否已经完成数据输出 if (iterator.hasCompleted()) { shouldComplete = true; } // 若是结果已经到了末尾,意味着最短的数据已经输出完毕。 // 有可能数据没到末尾,可是该迭代器已经结束。 if (result.done) { destination.complete(); return; } // 收集全部迭代器中的数据。 args.push(result.value); } // 发送给终结订阅 destination.next(args); // if (shouldComplete) { destination.complete(); } }
当某一个下游流完成的时候,缓冲区的存在与否会决定终结订阅的是否完成。
notifyComplete() { if (this.buffer.length > 0) { this.isComplete = true; this.parent.notifyInactive(); } else { this.destination.complete(); } }
若是缓冲区存在数据,那么还得去调用 ZipSubscriber.notifyInactive ,将信息返回给 ZipSubscriber。到了这一步,意味着某一个下游流已经彻底发送完数据了,那么还得更新 active 的记录。若是 active 最终为 0 ,那么将通知终结订阅这个流已经完成了。
notifyInactive() { this.active--; if (this.active === 0) { this.destination.complete(); } }
跟 zip 不同,在 CombineLatest 中,每个下游流的新数据都会和其他下游流的当前的数据相结合,从而造成新的数据并重新的流中转发出去。
export function combineLatest<O extends ObservableInput<any>, R>( ...observables: O[] ): Observable<R> { return fromArray(observables).lift(new CombineLatestOperator<ObservedValueOf<O>, R>()); } export class CombineLatestOperator<T, R> implements Operator<T, R> { constructor() {} call(subscriber: Subscriber<R>, source: any): any { return source.subscribe(new CombineLatestSubscriber()); } }
起始状态跟 zip 同样,也是经过 fromArray 将 ObservableInput 做为上游流的数据输入到 CombineLatestSubscriber 中。把目光锁定这个 Subscriber,深刻了解一下它的心路历程。
当数据到来的时候,CombineLatestSubscriber 把下游流集体缓存到一个 observables 数组中。
protected _next(observable: any) { this.values.push(NONE); this.observables.push(observable); }
当下游流缓存完毕的时候,上游流也输出完毕,那么便会调用 complete。 在这里,complete 作的事情仅仅是将全部的下游流进行订阅,并记录这些流的订阅状态。
protected _complete() { const observables = this.observables; const len = observables.length; if (len === 0) { this.destination.complete(); } else { this.active = len; this.toRespond = len; for (let i = 0; i < len; i++) { const observable = observables[i]; const innerSub = new InnerSubscriber(this, observable, i); this.add(subscribeToResult(this, observable, undefined, undefined, innerSub)); } } }
在订阅完毕全部的下游流后,它们的数据全都会流到 notify 中。
CombineLatestSubscriber 每接收到一个下游流的数据,都会触发 notifyNext。toRespond 记录的是剩余未收到数据的下游流的数量, 当全部下游流都有数据的时候,那么才会开始结合。
values 经过初始化的索引缓存了每个下游流当前的数据,当任意一个下游流的数据到来后,都将会更新 values 中对应索引中的缓存数据。
notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, R>): void { const values = this.values; const oldVal = values[outerIndex]; let toRespond = 0; if (this.toRespond) { // 若是这个数据为NONE,那么则表明当前的 // 下游流是首次发送数据,则 toRespond // 要减一。 if (oldVal === NONE) { this.toRespond -= 1; } toRespond = this.toRespond; } values[outerIndex] = innerValue; if (toRespond === 0) { this.destination.next(values.slice()); } }
以上即是 combineLastest 的核心设计。
至于 notifyComplete ,则是处理了当前正在运行的下游流和终结订阅的关系。当 active 减小到零的时候,意味着须要通知终结订阅全部数据已经输出完毕了。
notifyComplete(unused: Subscriber<R>): void { this.active -= 1; if (this.active === 0) { this.destination.complete(); } }
相较于 combineLatest ,forkJoin 是一种更为激进的实现。为何说它激进,由于它判断合并的条件,从下游流有数据输出变成了下游流完成数据输出。它的实现很简单,只须要计算每一个结束输出数据的下游流的数量 completed,经过比较 completed 和下游流总数,就能判断何时结束。须要注意的一点,若是全部流都输出了数据,那么 forkJoin 才能把数据发送。
function forkJoinInternal(sources: ObservableInput<any>[], keys: string[] | null): Observable<any> { return new Observable(subscriber => { const len = sources.length; if (len === 0) { subscriber.complete(); return; } const values = new Array(len); let completed = 0; let emitted = 0; // 循环订阅全部的下游流 for (let i = 0; i < len; i++) { // 将输入转换成 Observable const source = from(sources[i]); let hasValue = false; subscriber.add(source.subscribe({ next: value => { if (!hasValue) { hasValue = true; emitted++; } // 记录当前订阅的值 values[i] = value; }, error: err => subscriber.error(err), // 处理完成时所须要作的工做 complete: () => { // 更新下游流订阅完成数 completed++; // 判断是否全部的下游流订阅都已经完成 if (completed === len || !hasValue) { if (emitted === len) { // 若是所有的下游流都发送了数据, // 那么终结订阅将收到全部的下游流 // 的数据。 subscriber.next(values); } subscriber.complete(); } } })); } }); }
merge 经过调用 mergeMap 来建立合并流,concat 也是经过 mergeMap 来建立相同的合并流。这一部分会在下一章讲到。它们两个惟一不一样的点就是在于并发的数量上。merge能够并发订阅多个下游流,而 concat 同一时间只能订阅一个下游流。
type Any = ObservableInput<any>; export function merge<T, R>(...observables: Array<ObservableInput<any> | number>): Observable<R> { let concurrent = Number.POSITIVE_INFINITY; let last: any = observables[observables.length - 1]; if (typeof last === 'number') { concurrent = <number>observables.pop(); } return mergeMap<Any, Any>(x => x, concurrent)(fromArray<any>(observables)); }
export function concat1<O extends ObservableInput<any>, R>(...observables: Array<O>): Observable<R> { return mergeMap<O, O>(x => x, 1)(of(...observables)); }
partion 是一种分割操做,经过传入一个判断函数,使得输出的流一分为二。它经过 filter 来实现,将两个不一样的流分离。其中须要注意的是,第二个 filter 中,传入的是一个求反操做。
export function partition<T>( predicate: (value: T, index: number) => boolean, thisArg?: any ): UnaryFunction<Observable<T>, [Observable<T>, Observable<T>]> { return (source: Observable<T>) => [ filter(predicate, thisArg)(source), // 此处传入的是一个 not,他把整个 predicate 封装。 filter(not(predicate, thisArg) as any)(source) ] as [Observable<T>, Observable<T>]; }
总结一下,本章首先给出一些 operators 的综合规律,而后再对 Join Creation Operators 进行分析,下面分别用一句话将它们总结一下,就结束本篇的内容。
咱们是DevUI团队,欢迎来这里和咱们一块儿打造优雅高效的人机设计/研发体系。招聘邮箱:muyang2@huawei.com。
做者:zcx(公众号:Coder写字的地方)
原文连接:https://mp.weixin.qq.com/s/1b141waT_tAxZR-PZC79kg
往期文章推荐