吾辈的博客原文: https://blog.rxliuli.com/p/5e...
吾辈是一只在飞向太阳的萤火虫
JavaScript 中的数组是一个至关泛用性的数据结构,能当数组,元组,队列,栈进行操做,更好的是 JavaScript 提供了不少原生的高阶函数,便于咱们对数组总体操做。
然而,JavaScript 中的高阶函数仍有缺陷 -- 异步!当你把它们放在一块儿使用时,就会感受到这种问题的所在。git
例如如今,有一组 id,咱们要根据 id 获取到远端服务器 id 对应的值,而后将之打印出来。那么,咱们要怎么作呢?github
const wait = ms => new Promise(resolve => setTimeout(resolve, ms)) async function get(id) { // 这里只是为了模拟每一个请求的时间多是不定的 await wait(Math.random() * id * 100) return '内容: ' + id.toString() } const ids = [1, 2, 3, 4]
你或许会下意识地写出下面的代码数组
ids.forEach(async id => console.log(await get(id)))
事实上,控制台输出是无序的,而并不是想象中的 1, 2, 3, 4 依次输出缓存
内容: 2 内容: 3 内容: 1 内容: 4
这是为何呢?缘由即是 JavaScript 中数组的高阶函数并不会等待异步函数的返回!当你在网络上搜索时,会发现不少人会说可使用 for-of
, for-in
解决这个问题。服务器
;(async () => { for (let id of ids) { console.log(await get(id)) } })()
或者,使用 Promise.all
也是一种解决方案网络
;(async () => { ;(await Promise.all(ids.map(get))).forEach(v => console.log(v)) })()
然而,第一种方式至关于丢弃了 Array 的全部高阶函数,再次重返远古 for
循环时代了。第二种则必定会执行全部的异步函数,即使你须要使用的是 find/findIndex/some/every
这些高阶函数。那么,有没有更好的解决方案呢?数据结构
既然原生的 Array 不支持完善的异步操做,那么,为何不禁咱们来实现一个呢?并发
实现思路:dom
AsyncArray
class AsyncArray { constructor(...args) { this._arr = Array.from(args) this._task = [] } async forEach(fn) { const arr = this._arr for (let i = 0, len = arr.length; i < len; i++) { await fn(arr[i], i, this) } } } new AsyncArray(...ids).forEach(async id => console.log(await get(id)))
打印结果确实有顺序了,看似一切很美好?异步
然而,当咱们再实现一个 map
试一下
class AsyncArray { constructor(...args) { this._arr = Array.from(args) } async forEach(fn) { const arr = this._arr for (let i = 0, len = arr.length; i < len; i++) { await fn(arr[i], i, this) } } async map(fn) { const arr = this._arr const res = [] for (let i = 0, len = arr.length; i < len; i++) { res.push(await fn(arr[i], i, this)) } return this } }
调用一下
new AsyncArray(...ids).map(get).forEach(async res => console.log(res)) // 抛出错误 // (intermediate value).map(...).forEach is not a function
然而会有问题,实际上 map
返回的是 Promise
,因此咱们还必须使用 await
进行等待
;(async () => { ;(await new AsyncArray(...ids).map(get)).forEach(async res => console.log(res), ) })()
是否是感受超级蠢?吾辈也是这样认为的!
咱们能够尝试使用链式调用加延迟执行修改这个 AsyncArray
/** * 保存高阶函数传入的异步操做 */ class Action { constructor(type, args) { /** * @field 异步操做的类型 * @type {string} */ this.type = type /** * @field 异步操做的参数数组 * @type {Function} */ this.args = args } } /** * 全部的操做类型 */ Action.Type = { forEach: 'forEach', map: 'map', filter: 'filter', } /** * 真正实现的异步数组 */ class InnerAsyncArray { constructor(arr) { this._arr = arr } async forEach(fn) { const arr = this._arr for (let i = 0, len = arr.length; i < len; i++) { await fn(arr[i], i, this) } this._arr = [] } async map(fn) { const arr = this._arr const res = [] for (let i = 0, len = arr.length; i < len; i++) { res.push(await fn(arr[i], i, this)) } this._arr = res return this } async filter(fn) { const arr = this._arr const res = [] for (let i = 0, len = arr.length; i < len; i++) { if (await fn(arr[i], i, this)) { res.push(arr[i]) } } this._arr = res return this } } class AsyncArray { constructor(...args) { this._arr = Array.from(args) /** * @field 保存异步任务 * @type {Action[]} */ this._task = [] } forEach(fn) { this._task.push(new Action(Action.Type.forEach, [fn])) return this } map(fn) { this._task.push(new Action(Action.Type.map, [fn])) return this } filter(fn) { this._task.push(new Action(Action.Type.filter, [fn])) return this } /** * 终结整个链式操做并返回结果 */ async value() { const arr = new InnerAsyncArray(this._arr) let result for (let task of this._task) { result = await arr[task.type](...task.args) } return result } }
使用一下
new AsyncArray(...ids) .filter(async i => i % 2 === 0) .map(get) .forEach(async res => console.log(res)) .value()
能够看到,确实符合预期了,然而每次都要调用 value()
,终归有些麻烦。
这里使用 then()
替代它以使得可使用 await
自动计算结果
class AsyncArray { // 上面的其余内容... /** * 终结整个链式操做并返回结果 */ async then(resolve) { const arr = new InnerAsyncArray(this._arr) let result for (let task of this._task) { result = await arr[task.type](...task.args) } // 这里使用 resolve(result) 是为了兼容 await 的调用方式 resolve(result) return result } }
如今,可使用 await
结束此次链式调用了
await new AsyncArray(...ids).map(get).forEach(async res => console.log(res))
忽然之间,咱们发现了一个问题,为何会这么慢?一个个去进行异步操做太慢了,难道就不能一次性所有发送出去,而后有序的处理结果就行了嘛?
咱们可使用 Promise.all
并发执行异步操做,而后对它们的结果进行有序地处理。
/** * 并发实现的异步数组 */ class InnerAsyncArrayParallel { constructor(arr) { this._arr = arr } async _all(fn) { return Promise.all(this._arr.map(fn)) } async forEach(fn) { await this._all(fn) this._arr = [] } async map(fn) { this._arr = await this._all(fn) return this } async filter(fn) { const arr = await this._all(fn) this._arr = this._arr.filter((v, i) => arr[i]) return this } }
而后修改 AsyncArray
,使用 _AsyncArrayParallel
便可
class AsyncArray { // 上面的其余内容... /** * 终结整个链式操做并返回结果 */ async then(resolve) { const arr = new InnerAsyncArrayParallel(this._arr) let result = this._arr for (let task of this._task) { result = await arr[task.type](...task.args) } // 这里使用 resolve(result) 是为了兼容 await 的调用方式 if (resolve) { resolve(result) } return result } }
调用方式不变。固然,因为使用 Promise.all
实现,也一样受到它的限制 -- 异步操做实际上所有执行了。
如今咱们的 _AsyncArray
和 _AsyncArrayParallel
两个类只能二选一,因此,咱们须要添加两个函数用于互相转换。
class AsyncArray { constructor(...args) { this._arr = Array.from(args) /** * @field 保存异步任务 * @type {AsyncArrayAction[]} */ this._task = [] /** * 是否并行化 */ this._parallel = false } // 其余内容... parallel() { this._parallel = true return this } serial() { this._parallel = false return this } async then() { const arr = this._parallel ? new InnerAsyncArrayParallel(this._arr) : new InnerAsyncArray(this._arr) let result = this._arr for (let task of this._task) { result = await arr[task.type](...task.args) } if (resolve) { resolve(result) } return result } }
如今,咱们能够在真正执行以前在任意位置对其进行转换了
await new AsyncArray(...ids) .parallel() .filter(async i => i % 2 === 0) .map(get) .forEach(async res => console.log(res))
然而,上面的代码有一些隐藏的问题
await
以后返回值不是一个数组
;(async () => { const asyncArray = new AsyncArray(...ids) console.log(await asyncArray.map(i => i * 2)) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] } })()
上面的 map
, filter
调用在 await
以后仍会影响到下面的调用
;(async () => { const asyncArray = new AsyncArray(...ids) console.log(await asyncArray.map(i => i * 2)) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] } console.log(await asyncArray) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] } })()
并发调用的顺序不能肯定,会影响到内部数组,致使结果不能肯定
;(async () => { const asyncArray = new AsyncArray(...ids) ;(async () => { console.log( await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2), ) // InnerAsyncArray { _arr: [ 2, 6 ] } })() ;(async () => { console.log(await asyncArray) // InnerAsyncArray { _arr: [ 2, 6 ] } })() })()
先解决第一个问题,这里只须要判断一下是否为终结操做(forEach
),是的话就直接返回结果,不然继续下一次循环
class AsyncArray { // 其余内容... async then(resolve, reject) { const arr = this._parallel ? new InnerAsyncArrayParallel(this._arr) : new InnerAsyncArray(this._arr) let result = this._arr for (let task of this._task) { const temp = await arr[task.type](...task.args) if ( temp instanceof InnerAsyncArray || temp instanceof InnerAsyncArrayParallel ) { result = temp._arr } else { // 若是已是终结操做就返回数组的值 if (resolve) { resolve(temp) } return temp } } if (resolve) { resolve(result) } return result } }
如今,第一个问题简单解决
;(async () => { const asyncArray = new AsyncArray(...ids) console.log(await asyncArray.map(i => i * 2)) // [ 2, 4, 6, 8 ] })()
第2、第三个问题看起来彷佛是同一个问题?其实咱们能够按照常规思惟解决第一个问题。既然 await
以后仍然会影响到下面的调用,那就在 then
中把 _task
清空好了,修改 then
函数
class AsyncArray { // 其余内容... async then(resolve, reject) { const arr = this._parallel ? new InnerAsyncArrayParallel(this._arr) : new InnerAsyncArray(this._arr) let result = this._arr for (let task of this._task) { const temp = await arr[task.type](...task.args) if ( temp instanceof InnerAsyncArray || temp instanceof InnerAsyncArrayParallel ) { result = temp._arr } else { // 若是已是终结操做就返回数组的值 if (resolve) { resolve(temp) } this._task = [] return temp } } if (resolve) { resolve(result) } this._task = [] return result } }
如今,第一个问题解决了,但第二个问题不会解决。究其缘由,仍是异步事件队列的问题,虽然 async-await
可以让咱们以同步的方式写异步的代码,但千万不可忘记它们本质上仍是异步的!
;(async () => { await Promise.all([ (async () => { console.log( await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2), ) // [ 2, 6 ] })(), (async () => { console.log(await asyncArray) // [ 2, 6 ] })(), ]) console.log(await asyncArray) // [ 1, 2, 3, 4 ] })()
能够看到,在使用 await
进行等待以后就如同预期的 _task
被清空了。然而,并发执行的没有等待的 await asyncArray
却有奇怪的问题,由于它是在 _task
清空以前执行的。
而且,这带来一个反作用: 没法缓存操做了
;(async () => { const asyncArray = new AsyncArray(...ids).map(i => i * 2) console.log(await asyncArray) // [ 2, 4, 6, 8 ] console.log(await asyncArray) // [ 1, 2, 3, 4 ] })()
为了解决直接修改内部数组形成的问题,咱们可使用不可变数据解决这个问题。试想:若是咱们每次操做都返回一个新的 AsyncArray
,他们之间没有关联,这样又如何呢?
class AsyncArray { constructor(...args) { this._arr = Array.from(args) /** * @field 保存异步任务 * @type {Action[]} */ this._task = [] /** * 是否并行化 */ this._parallel = false } forEach(fn) { return this._addTask(Action.Type.forEach, [fn]) } map(fn) { return this._addTask(Action.Type.map, [fn]) } filter(fn) { return this._addTask(Action.Type.filter, [fn]) } parallel() { this._parallel = true return this } serial() { this._parallel = false return this } _addTask(type, args) { const result = new AsyncArray(...this._arr) result._task = [...this._task, new Action(type, args)] result._parallel = this._parallel return result } /** * 终结整个链式操做并返回结果 */ async then(resolve, reject) { const arr = this._parallel ? new InnerAsyncArrayParallel(this._arr) : new InnerAsyncArray(this._arr) let result = this._arr for (let task of this._task) { const temp = await arr[task.type](...task.args) if ( temp instanceof InnerAsyncArray || temp instanceof InnerAsyncArrayParallel ) { result = temp._arr } else { // 若是已是终结操做就返回数组的值 if (resolve) { resolve(temp) } return temp } } if (resolve) { resolve(result) } return result } }
再次测试上面的那第三个问题,发现已经一切正常了呢
;(async () => { const asyncArray = new AsyncArray(...ids) await Promise.all([ (async () => { console.log( await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2), ) // [ 2, 6 ] })(), (async () => { console.log(await asyncArray) // [ 1, 2, 3, 4 ] })(), ]) console.log(await asyncArray) // [ 1, 2, 3, 4 ] })()
;(async () => { const asyncArray = new AsyncArray(...ids).map(i => i * 2) console.log(await asyncArray) // [ 2, 4, 6, 8 ] console.log(await asyncArray) // [ 2, 4, 6, 8 ] })()
下面吾辈把完整的代码贴出来
/** * 保存高阶函数传入的异步操做 */ class Action { constructor(type, args) { /** * @field 异步操做的类型 * @type {string} */ this.type = type /** * @field 异步操做的参数数组 * @type {Function} */ this.args = args } } /** * 全部的操做类型 */ Action.Type = { forEach: 'forEach', map: 'map', filter: 'filter', } /** * 真正实现的异步数组 */ class InnerAsyncArray { constructor(arr) { this._arr = arr } async forEach(fn) { const arr = this._arr for (let i = 0, len = arr.length; i < len; i++) { await fn(arr[i], i, this) } this._arr = [] } async map(fn) { const arr = this._arr const res = [] for (let i = 0, len = arr.length; i < len; i++) { res.push(await fn(arr[i], i, this)) } this._arr = res return this } async filter(fn) { const arr = this._arr const res = [] for (let i = 0, len = arr.length; i < len; i++) { if (await fn(arr[i], i, this)) { res.push(arr[i]) } } this._arr = res return this } } class InnerAsyncArrayParallel { constructor(arr) { this._arr = arr } async _all(fn) { return Promise.all(this._arr.map(fn)) } async forEach(fn) { await this._all(fn) this._arr = [] } async map(fn) { this._arr = await this._all(fn) return this } async filter(fn) { const arr = await this._all(fn) this._arr = this._arr.filter((v, i) => arr[i]) return this } } class AsyncArray { constructor(...args) { this._arr = Array.from(args) /** * @field 保存异步任务 * @type {Action[]} */ this._task = [] /** * 是否并行化 */ this._parallel = false } forEach(fn) { return this._addTask(Action.Type.forEach, [fn]) } map(fn) { return this._addTask(Action.Type.map, [fn]) } filter(fn) { return this._addTask(Action.Type.filter, [fn]) } parallel() { this._parallel = true return this } serial() { this._parallel = false return this } _addTask(type, args) { const result = new AsyncArray(...this._arr) result._task = [...this._task, new Action(type, args)] result._parallel = this._parallel return result } /** * 终结整个链式操做并返回结果 */ async then(resolve, reject) { const arr = this._parallel ? new InnerAsyncArrayParallel(this._arr) : new InnerAsyncArray(this._arr) let result = this._arr for (let task of this._task) { const temp = await arr[task.type](...task.args) if ( temp instanceof InnerAsyncArray || temp instanceof InnerAsyncArrayParallel ) { result = temp._arr } else { // 若是已是终结操做就返回数组的值 if (resolve) { resolve(temp) } return temp } } if (resolve) { resolve(result) } return result } }
那么,关于 JavaScript 中如何封装一个可使用异步操做高阶函数的数组就先到这里了,完整的 JavaScript 异步数组请参考吾辈的 AsyncArray(使用 TypeScript 编写)。