DevUI是一支兼具设计视角和工程视角的团队,服务于华为云DevCloud平台和华为内部数个中后台系统,服务于设计师和前端工程师。
官方网站:devui.design
Ng组件库:ng-devui(欢迎Star)
官方交流:添加DevUI小助手(devui-official)
DevUIHelper插件:DevUIHelper-LSP(欢迎Star)前端
ReactiveX 是 Reactive Extensions 的缩写,通常简写为 Rx ,最初是 LINQ 的一个扩展,由微软的架构师 Erik Meijer 领导的团队开发,在2012年11月开源。Rx 是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。git
首先,先给出官方对于 Rx 的定义。github
ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.编程
翻译起来有点麻烦,简而言之,就是基于观察者队列实现了对异步和基础事件的编程。后端
Rxjs 是 Rx 的 JavaScript 的实现。本篇文章将简单的分析一下 Obersvable 和 Subscription 的源码是怎么进行的。数组
可观察对象是整个 Rx 的核心,主要的做用就是提供了一个观察者模式,使得调用者能够经过响应式的方式获取数据。markdown
Observable 实际上就是一个单向链表,基本的数据结构以下:前端工程师
class Observable<T> {
source: Observable<any>;
}
复制代码
其构造方法与 Promise 相似,经过传入一个函数包裹操做,并让这个函数来决定数据传递,这个函数的参数包含了一个订阅器。数据结构
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.error(Error('error message'));
subscriber.complete();
});
复制代码
订阅器提供了三个主要方法:next
,error
,complete
。订阅器的实现很巧妙,其内部实现是一个链表。架构
跟 Promise 不一样,Observable 不会马上运行这个函数,而是等到它被订阅后,这个函数才会被执行,这种惰性求值的特性使得 Observable 能够在它仅被须要的地方进行计算。
lift
方法提供了一个这样的功能,传入一个映射函数,并返回一个新的 Observable,这个新的 Observable 的 source 会指向建立它的 Observable。实际上,这种作法就是将这个映射函数用一个外覆类包裹起来,这个外覆类,正是 Observable。那么,看看它是如何实现。
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
复制代码
Rxjs 跟其余语言实现的 ReactiveX 不同的地方就是在于,它的映射方法再也不是放在 Observable 内部,而是经过参数的形式传入到一个管道函数pipe
中,在这个函数中,经过对管道函数的数组进行 reduce
后,就可以获得最终的Observable。这个 reduce
的过程也很巧妙,传入的函数的参数就是上游的 Observable,返回的就是一个给下游接收的 Observable,那么就能够把一个又一个的 Observable 串联起来
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
if (operations.length === 0) {
return this as any;
}
if (operations.length == 1) {
return operation[0];
}
return operations.reduce((prev, fn) => fn(prev), this);
}
复制代码
那么在使用过程当中,pipe
经过重载给传入的函数提供类型信息。
export function pipe<T>(): UnaryFunction<T, T>;
export function pipe<T, A>(fn1: UnaryFunction<T, A>): UnaryFunction<T, A>;
export function pipe<T, A, B>(fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>): UnaryFunction<T, B>;
// ...
复制代码
其中 UnaryFunction
表示一元函数,经过这种链式操做,使得链条上的全部函数均可以拿到上游的类型,并把类型转化传递给下游。
当 Observable 一旦调用 subscribe,那么就意味着其开始执行链条中的全部函数。subscribe 传入的参数是一个包含了 next ,error , complete 三个属性的对象;也能够是三个函数,分别对应 next,error,complete。
observable.subscribe((value) {
console.log(value);
}, (error) {
console.error(error);
}, () {
console.log('complete');
});
observable.subscribe({
next: (value) {
console.log(value);
},
error: (error) {
console.error(error);
},
complete: () {
console.log('complete');
},
});
复制代码
其具体实现是经过将传入的函数(对象)参数转化成 Subscriber 对象,而 Subscriber 继承了 Subscription。最后,返回的就是一个 subscription 给到调用者。
subscribe(
observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription
) {
// operator 是一个映射函数
const {operator} = this;
const sink = new Subscriber(observerOrNext, error, complete);
if (operator) {
sink.add(operator.call(sink, this.source));
} else {
sink.add(this.source || !sink.syncErrorThrowable ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
}
// 省略了错误处理
return sink;
}
_subscribe(subscriber: Subscriber<any>): TeardownLogic {
const { source } = this;
return source && source.subscribe(subscriber);
}
_trySubscribe(sink: Subscriber<T>): TeardownLogic {
try {
return this._subscribe(sink);
} catch (err) {
// 此处省略了源码中的一些判断,不影响阅读
sink.error(err);
}
复制代码
Subscriber 的 add 方法下面会讲。总之,Observable 就像一串或者一个爆竹,只有当它被点燃(subscribe)的时候,才会把一个又一个的 Observable 点着,最终迸发出巨大声响,而 subscribe 就是一个找到引线并点燃它们的过程。
Subscription 则是经过一种树结构,它包含了叶节点和一个父节点或者父节点的集合。
class Subscription {
_parentOrParents: Subscription;
_subscriptions: Subscription[];
}
复制代码
add 方法主要的功能是链接不一样的订阅,配合注释,其逻辑就是将函数或者订阅对象包裹后放入成员变量 subscriptions 中,并将这个包裹对象的父订阅对象设置为当前对象。
add(logic: Function | Subscription | void): Subscription {
let subscription = logic;
if (typeof logic === 'object') {
// 若是添加进来订阅已经被取消了,则不进行设置。
// 若是当前的订阅已经被取消,添加进来的订阅也应该要被取消。
if (subscription === this ||
subscription.closed ||
typeof subscription.unsubscribe !== 'function') {
return subscription;
} else if (this.closed) {
subscription.unsubscribe();
return subscription;
} else if (!(subscription instanceof Subscription)) {
const tmp = subscription;
subscription = new Subscription();
subscription._subscriptions = [tmp];
}
} else if (typeof logic === 'function' ) {
subscription = new Subscription(<(() => void)>teardown);
} else {
// 抛出错误。
}
// 设置父对象的过程采用懒加载模式。
let { _parentOrParents } = subscription;
if (_parentOrParents === null) {
// 若是没有设置父对象,则设置当前对象为父对象。
subscription._parentOrParents = this;
} else if (_parentOrParents instanceof Subscription) {
// 若是父对象已是当前的对象,直接返回。
if (_parentOrParents === this) {
return subscription;
}
// 添加进来的订阅的父对象已经存在,那么用一个数组保存。
subscription._parentOrParents = [_parentOrParents, this];
} else if (_parentOrParents.indexOf(this) === -1) {
// 若是已是数组对象了,而且不存在当前订阅对象,则设置当前订阅对象
_parentOrParents.push(this);
} else {
// 已经设置当前订阅对象为父对象
return subscription;
}
// 一样,设置叶子结点的过程也是用懒加载
const subscriptions = this._subscriptions;
if (subscriptions === null) {
this._subscriptions = [subscription];
} else {
subscriptions.push(subscription);
}
return subscriptio
复制代码
取消订阅是订阅对象的主要功能,它为观察者模式提供了终结观察的方法。
unsubscribe(): void {
// 已经取消订阅了。
if (this.closed) {
return;
}
// 拿到当前想要取消订阅的相关的对象。
// 这样作的目的是防止loop
let { _parentOrParents, _unsubscribe, _subscriptions } = (<any> this);
// 设置取消订阅
this.closed = true;
// 设置父对象为空
this._parentOrParents = null;
// 设置订阅为空
this._subscriptions = null;
// 父对象多是数组,也多是订阅对象
if (_parentOrParents instanceof Subscription) {
_parentOrParents.remove(this);
} else if (_parentOrParents !== null) {
for (let index = 0; index < _parentOrParents.length; ++index) {
const parent = _parentOrParents[index];
parent.remove(this);
}
}
// _unsubscribe 是一个外部传入的函数.
if (isFunction(_unsubscribe)) {
try {
_unsubscribe.call(this);
} catch (e) {
errors = e instanceof UnsubscriptionError ? flattenUnsubscriptionErrors(e.errors) : [e];
}
}
// 将全部的子订阅取消订阅
if (isArray(_subscriptions)) {
let len = _subscriptions.length;
for (const sub of _subscriptions) {
if (isObject(sub)) {
try {
sub.unsubscribe();
} catch (e) {
// 省略错误处理
}
}
}
}
//
复制代码
第一篇就先介绍这两个重要的组成类,由这两个类引伸出来的组合才是 Rx 的精华,以后会陆续介绍这些操做函数。
咱们是DevUI团队,欢迎来这里和咱们一块儿打造优雅高效的人机设计/研发体系。招聘邮箱:muyang2@huawei.com。
做者:zcx(公众号:Coder写字的地方)
原文连接:mp.weixin.qq.com/s/6fVoI_JtS…
往期文章推荐