DevUI是一支兼具设计视角和工程视角的团队,服务于华为云 DevCloud平台和华为内部数个中后台系统,服务于设计师和前端工程师。
官方网站:[devui.design
]( https://devui.design/)Ng组件库: ng-devui(欢迎Star)
官方交流群:添加DevUI小助手(微信号:devui-official)
DevUIHelper插件:DevUIHelper-LSP(欢迎Star)
在这以前,我一直都没有讲过 Scheduler 的做用,那么本章就开始讲解 Scheduler 的设计思路和基本结构。RxJS 的存在是为了处理异步 IO,而异步 IO 所包含的一系列 API 确定也是要通过进一步的封装才能让 RxJS 中的异步操做使用。前端
能够看到,它主要仍是根据 JS 的所可以提供的异步能力来设计这些基本结构。git
setInterval
实现。AsyncScheduler
,可是 QueueAction
是一种链式结构,使得调度以迭代器的形式进行。reqeustAnimationFrame
实现了帧调度器。Promise.resolve().then()
实现的微任务调度器。首先,SchedulerLike 提供了如下两个接口。github
export interface SchedulerLike { // 标记当前时间 now(): number; // 开启调度的基础接口 schedule<T>( work: (this: SchedulerAction<T>, state?: T) => void, delay?: number, state?: T ): Subscription; }
Scheduler 则实现了这些接口。算法
export class Scheduler implements SchedulerLike { // 获取当前时间戳 public static now: () => number = () => Date.now(); constructor( private SchedulerAction: typeof Action, now: () => number = Scheduler.now ) { this.now = now; } public now: () => number; // 直接调用 action 的 schedule public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription { return new this.SchedulerAction<T>(this, work).schedule(state, delay); } }
Scheduler 为后续的继承它的调度器定义了建立方式,经过传入一个 Action 工厂,使得内部能够构造特定的 Action 。而 Action 继承了 Subscription,意味着 Action 其实是一种的订阅器。segmentfault
export class Action<T> extends Subscription { constructor(scheduler: Scheduler, work: (this: SchedulerAction<T>, state?: T) => void) { super(); } // Action 开始调度 public schedule(state?: T, delay: number = 0): Subscription { return this; } }
上面的设计是一种名为 Template Method 的设计模式,这种方法有效地约束了后续的不一样的 Scheduler 的实现。设计模式
定义一个操做中的算法的骨架,而将一些步骤延迟到子类中。它使得子类能够不改变一个算法的结构便可重定义该算法的某些特定步骤。
先来了解一下 Scheduler 的子类 AsyncScheduler,余下全部的 Scheduler 都会继承它。在这里,先不急着进行源码分析,咱们须要先为了弄清楚调度器的运行原理,了解调度器是如何对异步 API 进行封装的。微信
首先,调度器自己也是基于观察者模式来进行设计,可是它又独立于 Rxjs 的 Observable。通常来讲, AsyncScheduler 是这样调用的。前端工程师
const scheduler = AsyncScheduler(AsyncAction); const subscription = async.schedule(function (counter) { console.log(counter); // this 绑定了 AsyncAction this.schedule(counter + 1, 1000); }, 1000, 1); // subscription.unsubscribe();
它的调用栈是这样的。less
AsyncScheduler.schedule AsyncAction.schedule AsyncAction.requestAsyncId listOnTimeout // 原生事件 processTimers // 原生事件 AsyncScheduler.flush AsyncAction.execute AsyncAction.\_execute AsyncAction.work
跟着调用栈分析源码来溯源,在 AsyncScheduler 的 schedule
方法中,它先构造了 AsyncAction ,而后调用它的 schedule
。在这个方法中,其实是对 Action 的内部状态进行更新,因此此处关注的地方就是在于 schedule
如何触发异步 API。异步
class AsyncAction<T> extends Action<T> { constructor( protected scheduler: AsyncScheduler, protected work: (this: SchedulerAction<T>, state?: T) => void ) { super(scheduler, work); } public schedule(state?: T, delay: number = 0): Subscription { if (this.closed) { return this; } this.state = state; const id = this.id; const scheduler = this.scheduler; // 须要对相应的异步 API 进行取消操做 if (id != null) { this.id = this.recycleAsyncId(scheduler, id, delay); } this.pending = true; this.delay = delay; // 从新配置异步 API this.id = this.id || this.requestAsyncId(scheduler, this.id, delay); return this; } }
能够看到,从 scheduler 传入的回调函数最终会被 Action 持有,因此调用栈最终执行的 work
实际上就是回调函数。
requestAsyncId
是调用异步 API 的方法,这个方法在 AsyncAction 最终触发了 setInterval
这一异步 API。那么实际上,根据 Template Method 的设计,全部继承 AsyncAction 的 Action 都会经过这个方法实现相对应的异步 API 。
至于 AsyncAction 为何会使用 setInterval
而不是 setTimeout
,源代码里是这样说明的。
Actions only execute once by default, unless rescheduled from within the scheduled callback. This allows us to implement single and repeat actions via the same code path, without adding API surface area, as well as mimic traditional recursion but across asynchronous boundaries. However, JS runtimes and timers distinguish between intervals achieved by serialsetTimeout
calls vs. a singlesetInterval
call. An interval of serialsetTimeout
calls can be individufenally delayed, which delays scheduling the nextsetTimeout
, and so on.setInterval
attempts to guarantee the interval callback will be invoked more precisely to the interval period, regardless of load. Therefore, we usesetInterval
to schedule single and repeat actions. If the action reschedules itself with the same delay, the interval is not canceled. If the action doesn't reschedule, or reschedules with a different delay, the interval will be canceled after scheduled callback execution.对于某一个 Action 来讲,除非它在调度的回调中被从新调度,那么它默认只会执行一次。这样的方式可使得咱们经过统一的代码实现调度单一或重复的 Actions,而无需添加 API,而且能够模仿传统递归来扩展异步。然而, JS 的运行时或者计时器分别经过串行的
setTimout
或者是单个setInterval
来获取调用的定时器。串行的setTimout
定时器能够单独延迟,这样作会延迟c下一个setTimout
的调度,以此类推。而setInterval
则无论程序运行的负载如何,它老是尝试去确保每一次定时器的回调更加精准的安排到合适的间隔时间。所以,咱们使用setInterval
来安排单一或重复的 Actions,若是 action 以相同的时延调度自己,那么当前定时器不会被取消。若是 action 只没有从新调度或者以不一样的时延从新调度,则安排的回调执行后,改定时器会被取消。
class AsyncAction<T> extends Action<T> { protected requestAsyncId( scheduler: AsyncScheduler, id?: any, delay: number = 0 ): any { // 绑定 scheduler,而且把当前的 AsyncAction 看成参数传入。 return setInterval(scheduler.flush.bind(scheduler, this), delay); } }
因此,在 AsyncScheduler 中,新增的 flush
方法其实是为 setInterval 服务的,它做为异步 API 的回调函数,主要步骤以下。
export class AsyncScheduler extends Scheduler { public flush(action: AsyncAction<any>): void { const {actions} = this; if (this.active) { // 使用了一个队列保存全部输入的 Actions actions.push(action); return; } let error: any; this.active = true; // 默认 action 也是队列中的一员 // 将全部队列中的 Action 进行调用。 do { if (error = action.execute(action.state, action.delay)) { break; } } while (action = actions.shift()); this.active = false; // 出现错误时,取消全部未运行 action 的订阅 if (error) { // 注意,此处不会重复取消订阅,由于执行错误的Action会先退出队列,再执行循环。 while (action = actions.shift()) { action.unsubscribe(); } throw error; } } }
上述的 flush 调用了 action 的 execute 方法。该方法也是经过处理 action 的内部状态来得到执行结果,其中会调用 _execute 这一内部方法,这个内部方法主要做用是调用 AsyncAction.work ,并处理它出现的异常。
class AsyncAction<T> extends Action<T> { public execute(state: T, delay: number): any { if (this.closed) { return new Error('executing a cancelled action'); } this.pending = false; // 获取异常错误 const error = this.\_execute(state, delay); if (error) { return error; } else if (this.pending === false && this.id != null) { this.id = this.recycleAsyncId(this.scheduler, this.id, null); } } protected \_execute(state: T, delay: number): any { let errored: boolean = false; let errorValue: any = undefined; try { // work this.work(state); } catch (e) { errored = true; errorValue = !!e && e || new Error(e); } if (errored) { this.unsubscribe(); return errorValue; } } }
在分析到 Action.schedule 的时候,引用了源码内部的注释,其中有一句话很重要,那就是 “若是 action 以相同的时延调度自己,那么当前定时器不会被取消”,因此 recycleAsyncId
这个方法是须要处理这种状况。
class AsyncAction<T> extends Action<T> { protected recycleAsyncId(scheduler: AsyncScheduler, id: any, delay: number = 0): any { // this.delay === delay 处理了这种状况。 if (delay !== null && this.delay === delay && this.pending === false) { return id; } // 取消当前的定时器 clearInterval(id); return undefined; } }
AsyncScheduler 能够说已经把全部的地基都打好了,它能够直接拿来用,也能够继承并重写一些相关的接口把相应的异步 API 进行替换。
队列调度器根据调用者传入的时延来决定使用同步方式的调度仍是 setInterval
方式的调度。
QueueScheduler 单纯继承了 AsyncScheduler,其主要实如今 QueueAction 中,经过重写 schedule
、 execute
以及 requestAsyncId
等方法来实现这种功能。
export class QueueAction<T> extends AsyncAction<T> { public schedule(state?: T, delay: number = 0): Subscription { // delay > 0 ,执行异步调度 if (delay > 0) { return super.schedule(state, delay); } this.delay = delay; this.state = state; // 不然直接执行同步调度 this.scheduler.flush(this); return this; } public execute(state: T, delay: number): any { // 根据传入的 delay 判断是否直接执行 work (同步执行) return (delay > 0 || this.closed) ? super.execute(state, delay) : this.\_execute(state, delay) ; } protected requestAsyncId(scheduler: QueueScheduler, id?: any, delay: number = 0): any { // 根据传入的 delay 以及自己的 delay 来决定是否使用异步 if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) { return super.requestAsyncId(scheduler, id, delay); } // delay 为 0,直接同步调度 return scheduler.flush(this); } }
帧调度器根据调用者传入的时延来决定使用 requestAnimationFrame
仍是 setInterval
,微任务调度器则是根据时延来决定使用 Promise.reslove().then()
仍是 setInterval
。
二者的调用相似,以致于能够结合起来分析。
它们的 action 方法均重写了requestAsyncId
和 recycleAsyncId
, 主要仍是为了处理不一样异步 API 。
protected requestAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any { if (delay !== null && delay > 0) { return super.requestAsyncId(scheduler, id, delay); } // 把当前action 加入到 actions 队列末端 scheduler.actions.push(this); if (!scheduler.scheduled) { // AsapAction 的状况 const scheduled = Immediate.setImmediate(scheduler.flush.bind(scheduler, null)); // AnimationFrameAction 的状况 const scheduled = requestAnimationFrame(scheduler.flush.bind(scheduler, null)); scheduler.scheduled = scheduled; } return scheduler.scheduled; } protected recycleAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any { if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) { return super.recycleAsyncId(scheduler, id, delay); } if (scheduler.actions.length === 0) { // AsapAction Immediate.clearImmediate(id); // AnimationFrameAction cancelAnimationFrame(id); scheduler.scheduled = undefined; } return undefined; }
它们的 flush,跟 AsyncScheduler 的 flush 实现思路差很少,依旧是轮询 actions 队列调用 action.execute ,只是它们的 flush 须要去处理额外的如下细节。
// export class AnimationFrameScheduler extends AsyncScheduler { export class AsapScheduler extends AsyncScheduler { public flush(action?: AsyncAction<any>): void { this.active = true; this.scheduled = undefined; const {actions} = this; let error: any; let index: number = -1; // 此处顺序不能打乱,由于这样 action = action || actions.shift()!; let count: number = actions.length; do { if (error = action.execute(action.state, action.delay)) { break; } } while (++index < count && (action = actions.shift())); this.active = false; if (error) { while (++index < count && (action = actions.shift())) { action.unsubscribe(); } throw error; } } }
这里颇有意思的一点, AsapScheduler 并无直接经过 Promise.reslove().then()
来实现。而是把它封装成 Immediate
,造成 setImmediate
和 clearImmediate
两个 API ,这样就使得微任务的调用其余的定时 API 无异。
内部实现是经过一个 Map 保存标记当前的是第几个微任务,这里并不直接保存 Promise,由于 Promise 执行完毕后就自行释放了,因此它须要的只是一个标记。
let nextHandle = 1; const RESOLVED = (() => Promise.resolve())(); const activeHandles: { \[key: number\]: any } = {}; function findAndClearHandle(handle: number): boolean { if (handle in activeHandles) { delete activeHandles\[handle\]; return true; } return false; } export const Immediate = { setImmediate(cb: () => void): number { const handle = nextHandle++; activeHandles\[handle\] = true; RESOLVED.then(() => findAndClearHandle(handle) && cb()); return handle; }, clearImmediate(handle: number): void { findAndClearHandle(handle); }, };
本篇分析了 RxJS 的调度器相关的一系列内容,经过封装 JS 异步 API ,调度器实现相对应的异步功能,加强了 RxJS 对异步 IO 的掌控。
咱们是DevUI团队,欢迎来这里和咱们一块儿打造优雅高效的人机设计/研发体系。招聘邮箱:muyang2@huawei.com。
做者:zcx(公众号:Coder写字的地方)
原文连接:https://mp.weixin.qq.com/s/vG0aaQmDy7Cqfv0CwJ_d0Q
往期文章推荐