JavaScript 异步数组

JavaScript 异步数组

吾辈的博客原文: https://blog.rxliuli.com/p/5e...

场景

吾辈是一只在飞向太阳的萤火虫

JavaScript 中的数组是一个至关泛用性的数据结构,能当数组,元组,队列,栈进行操做,更好的是 JavaScript 提供了不少原生的高阶函数,便于咱们对数组总体操做。
然而,JavaScript 中的高阶函数仍有缺陷 -- 异步!当你把它们放在一块儿使用时,就会感受到这种问题的所在。git

例如如今,有一组 id,咱们要根据 id 获取到远端服务器 id 对应的值,而后将之打印出来。那么,咱们要怎么作呢?github

const wait = ms => new Promise(resolve => setTimeout(resolve, ms))

async function get(id) {
  // 这里只是为了模拟每一个请求的时间多是不定的
  await wait(Math.random() * id * 100)
  return '内容: ' + id.toString()
}

const ids = [1, 2, 3, 4]

你或许会下意识地写出下面的代码数组

ids.forEach(async id => console.log(await get(id)))

事实上,控制台输出是无序的,而并不是想象中的 1, 2, 3, 4 依次输出缓存

内容: 2 ​​​​​
内容: 3 ​​​​​
内容: 1 ​​​​​
内容: 4

这是为何呢?缘由即是 JavaScript 中数组的高阶函数并不会等待异步函数的返回!当你在网络上搜索时,会发现不少人会说可使用 for-of, for-in 解决这个问题。服务器

;(async () => {
  for (let id of ids) {
    console.log(await get(id))
  }
})()

或者,使用 Promise.all 也是一种解决方案网络

;(async () => {
  ;(await Promise.all(ids.map(get))).forEach(v => console.log(v))
})()

然而,第一种方式至关于丢弃了 Array 的全部高阶函数,再次重返远古 for 循环时代了。第二种则必定会执行全部的异步函数,即使你须要使用的是 find/findIndex/some/every 这些高阶函数。那么,有没有更好的解决方案呢?数据结构

思考

既然原生的 Array 不支持完善的异步操做,那么,为何不禁咱们来实现一个呢?并发

实现思路:dom

  1. 建立异步数组类型 AsyncArray
  2. 内置一个数组保存当前异步操做数组的值
  3. 实现数组的高阶函数并实现支持异步函数顺序执行
  4. 获取到内置的数组
class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
    this._task = []
  }
  async forEach(fn) {
    const arr = this._arr
    for (let i = 0, len = arr.length; i < len; i++) {
      await fn(arr[i], i, this)
    }
  }
}

new AsyncArray(...ids).forEach(async id => console.log(await get(id)))

打印结果确实有顺序了,看似一切很美好?异步

然而,当咱们再实现一个 map 试一下

class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
  }
  async forEach(fn) {
    const arr = this._arr
    for (let i = 0, len = arr.length; i < len; i++) {
      await fn(arr[i], i, this)
    }
  }
  async map(fn) {
    const arr = this._arr
    const res = []
    for (let i = 0, len = arr.length; i < len; i++) {
      res.push(await fn(arr[i], i, this))
    }
    return this
  }
}

调用一下

new AsyncArray(...ids).map(get).forEach(async res => console.log(res))
// 抛出错误
// (intermediate value).map(...).forEach is not a function

然而会有问题,实际上 map 返回的是 Promise,因此咱们还必须使用 await 进行等待

;(async () => {
  ;(await new AsyncArray(...ids).map(get)).forEach(async res =>
    console.log(res),
  )
})()

是否是感受超级蠢?吾辈也是这样认为的!

链式调用加延迟执行

咱们能够尝试使用链式调用加延迟执行修改这个 AsyncArray

/**
 * 保存高阶函数传入的异步操做
 */
class Action {
  constructor(type, args) {
    /**
     * @field 异步操做的类型
     * @type {string}
     */
    this.type = type
    /**
     * @field 异步操做的参数数组
     * @type {Function}
     */
    this.args = args
  }
}

/**
 * 全部的操做类型
 */
Action.Type = {
  forEach: 'forEach',
  map: 'map',
  filter: 'filter',
}

/**
 * 真正实现的异步数组
 */
class InnerAsyncArray {
  constructor(arr) {
    this._arr = arr
  }
  async forEach(fn) {
    const arr = this._arr
    for (let i = 0, len = arr.length; i < len; i++) {
      await fn(arr[i], i, this)
    }
    this._arr = []
  }
  async map(fn) {
    const arr = this._arr
    const res = []
    for (let i = 0, len = arr.length; i < len; i++) {
      res.push(await fn(arr[i], i, this))
    }
    this._arr = res
    return this
  }
  async filter(fn) {
    const arr = this._arr
    const res = []
    for (let i = 0, len = arr.length; i < len; i++) {
      if (await fn(arr[i], i, this)) {
        res.push(arr[i])
      }
    }
    this._arr = res
    return this
  }
}

class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
    /**
     * @field 保存异步任务
     * @type {Action[]}
     */
    this._task = []
  }
  forEach(fn) {
    this._task.push(new Action(Action.Type.forEach, [fn]))
    return this
  }
  map(fn) {
    this._task.push(new Action(Action.Type.map, [fn]))
    return this
  }
  filter(fn) {
    this._task.push(new Action(Action.Type.filter, [fn]))
    return this
  }
  /**
   * 终结整个链式操做并返回结果
   */
  async value() {
    const arr = new InnerAsyncArray(this._arr)
    let result
    for (let task of this._task) {
      result = await arr[task.type](...task.args)
    }
    return result
  }
}

使用一下

new AsyncArray(...ids)
  .filter(async i => i % 2 === 0)
  .map(get)
  .forEach(async res => console.log(res))
  .value()

能够看到,确实符合预期了,然而每次都要调用 value(),终归有些麻烦。

使用 then 以支持 await 自动结束

这里使用 then() 替代它以使得可使用 await 自动计算结果

class AsyncArray {
  // 上面的其余内容...
  /**
   * 终结整个链式操做并返回结果
   */
  async then(resolve) {
    const arr = new InnerAsyncArray(this._arr)
    let result
    for (let task of this._task) {
      result = await arr[task.type](...task.args)
    }
    // 这里使用 resolve(result) 是为了兼容 await 的调用方式
    resolve(result)
    return result
  }
}

如今,可使用 await 结束此次链式调用了

await new AsyncArray(...ids).map(get).forEach(async res => console.log(res))

忽然之间,咱们发现了一个问题,为何会这么慢?一个个去进行异步操做太慢了,难道就不能一次性所有发送出去,而后有序的处理结果就行了嘛?

并发异步操做

咱们可使用 Promise.all 并发执行异步操做,而后对它们的结果进行有序地处理。

/**
 * 并发实现的异步数组
 */
class InnerAsyncArrayParallel {
  constructor(arr) {
    this._arr = arr
  }
  async _all(fn) {
    return Promise.all(this._arr.map(fn))
  }
  async forEach(fn) {
    await this._all(fn)
    this._arr = []
  }
  async map(fn) {
    this._arr = await this._all(fn)
    return this
  }
  async filter(fn) {
    const arr = await this._all(fn)
    this._arr = this._arr.filter((v, i) => arr[i])
    return this
  }
}

而后修改 AsyncArray,使用 _AsyncArrayParallel 便可

class AsyncArray {
  // 上面的其余内容...
  /**
   * 终结整个链式操做并返回结果
   */
  async then(resolve) {
    const arr = new InnerAsyncArrayParallel(this._arr)
    let result = this._arr
    for (let task of this._task) {
      result = await arr[task.type](...task.args)
    }
    // 这里使用 resolve(result) 是为了兼容 await 的调用方式
    if (resolve) {
      resolve(result)
    }
    return result
  }
}

调用方式不变。固然,因为使用 Promise.all 实现,也一样受到它的限制 -- 异步操做实际上所有执行了。

串行/并行相互转换

如今咱们的 _AsyncArray_AsyncArrayParallel 两个类只能二选一,因此,咱们须要添加两个函数用于互相转换。

class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
    /**
     * @field 保存异步任务
     * @type {AsyncArrayAction[]}
     */
    this._task = []
    /**
     * 是否并行化
     */
    this._parallel = false
  }
  // 其余内容...

  parallel() {
    this._parallel = true
    return this
  }
  serial() {
    this._parallel = false
    return this
  }
  async then() {
    const arr = this._parallel
      ? new InnerAsyncArrayParallel(this._arr)
      : new InnerAsyncArray(this._arr)
    let result = this._arr
    for (let task of this._task) {
      result = await arr[task.type](...task.args)
    }
    if (resolve) {
      resolve(result)
    }
    return result
  }
}

如今,咱们能够在真正执行以前在任意位置对其进行转换了

await new AsyncArray(...ids)
  .parallel()
  .filter(async i => i % 2 === 0)
  .map(get)
  .forEach(async res => console.log(res))

并发执行多个异步操做

然而,上面的代码有一些隐藏的问题

  1. await 以后返回值不是一个数组

    ;(async () => {
      const asyncArray = new AsyncArray(...ids)
      console.log(await asyncArray.map(i => i * 2)) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] }
    })()
  2. 上面的 map, filter 调用在 await 以后仍会影响到下面的调用

    ;(async () => {
      const asyncArray = new AsyncArray(...ids)
      console.log(await asyncArray.map(i => i * 2)) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] }
      console.log(await asyncArray) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] }
    })()
  3. 并发调用的顺序不能肯定,会影响到内部数组,致使结果不能肯定

    ;(async () => {
      const asyncArray = new AsyncArray(...ids)
      ;(async () => {
        console.log(
          await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2),
        ) // InnerAsyncArray { _arr: [ 2, 6 ] }
      })()
      ;(async () => {
        console.log(await asyncArray) // InnerAsyncArray { _arr: [ 2, 6 ] }
      })()
    })()

先解决第一个问题,这里只须要判断一下是否为终结操做(forEach),是的话就直接返回结果,不然继续下一次循环

class AsyncArray {
  // 其余内容...

  async then(resolve, reject) {
    const arr = this._parallel
      ? new InnerAsyncArrayParallel(this._arr)
      : new InnerAsyncArray(this._arr)
    let result = this._arr
    for (let task of this._task) {
      const temp = await arr[task.type](...task.args)
      if (
        temp instanceof InnerAsyncArray ||
        temp instanceof InnerAsyncArrayParallel
      ) {
        result = temp._arr
      } else {
        // 若是已是终结操做就返回数组的值
        if (resolve) {
          resolve(temp)
        }
        return temp
      }
    }
    if (resolve) {
      resolve(result)
    }
    return result
  }
}

如今,第一个问题简单解决

;(async () => {
  const asyncArray = new AsyncArray(...ids)
  console.log(await asyncArray.map(i => i * 2)) // [ 2, 4, 6, 8 ]
})()

第2、第三个问题看起来彷佛是同一个问题?其实咱们能够按照常规思惟解决第一个问题。既然 await 以后仍然会影响到下面的调用,那就在 then 中把 _task 清空好了,修改 then 函数

class AsyncArray {
  // 其余内容...

  async then(resolve, reject) {
    const arr = this._parallel
      ? new InnerAsyncArrayParallel(this._arr)
      : new InnerAsyncArray(this._arr)
    let result = this._arr
    for (let task of this._task) {
      const temp = await arr[task.type](...task.args)
      if (
        temp instanceof InnerAsyncArray ||
        temp instanceof InnerAsyncArrayParallel
      ) {
        result = temp._arr
      } else {
        // 若是已是终结操做就返回数组的值
        if (resolve) {
          resolve(temp)
        }
        this._task = []
        return temp
      }
    }
    if (resolve) {
      resolve(result)
    }
    this._task = []
    return result
  }
}

如今,第一个问题解决了,但第二个问题不会解决。究其缘由,仍是异步事件队列的问题,虽然 async-await 可以让咱们以同步的方式写异步的代码,但千万不可忘记它们本质上仍是异步的!

;(async () => {
  await Promise.all([
    (async () => {
      console.log(
        await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2),
      ) // [ 2, 6 ]
    })(),
    (async () => {
      console.log(await asyncArray) // [ 2, 6 ]
    })(),
  ])
  console.log(await asyncArray) // [ 1, 2, 3, 4 ]
})()

能够看到,在使用 await 进行等待以后就如同预期的 _task 被清空了。然而,并发执行的没有等待的 await asyncArray 却有奇怪的问题,由于它是在 _task 清空以前执行的。

而且,这带来一个反作用: 没法缓存操做了

;(async () => {
  const asyncArray = new AsyncArray(...ids).map(i => i * 2)
  console.log(await asyncArray) // [ 2, 4, 6, 8 ]
  console.log(await asyncArray) // [ 1, 2, 3, 4 ]
})()

使用不可变数据

为了解决直接修改内部数组形成的问题,咱们可使用不可变数据解决这个问题。试想:若是咱们每次操做都返回一个新的 AsyncArray,他们之间没有关联,这样又如何呢?

class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
    /**
     * @field 保存异步任务
     * @type {Action[]}
     */
    this._task = []
    /**
     * 是否并行化
     */
    this._parallel = false
  }
  forEach(fn) {
    return this._addTask(Action.Type.forEach, [fn])
  }
  map(fn) {
    return this._addTask(Action.Type.map, [fn])
  }
  filter(fn) {
    return this._addTask(Action.Type.filter, [fn])
  }
  parallel() {
    this._parallel = true
    return this
  }
  serial() {
    this._parallel = false
    return this
  }
  _addTask(type, args) {
    const result = new AsyncArray(...this._arr)
    result._task = [...this._task, new Action(type, args)]
    result._parallel = this._parallel
    return result
  }
  /**
   * 终结整个链式操做并返回结果
   */
  async then(resolve, reject) {
    const arr = this._parallel
      ? new InnerAsyncArrayParallel(this._arr)
      : new InnerAsyncArray(this._arr)
    let result = this._arr
    for (let task of this._task) {
      const temp = await arr[task.type](...task.args)
      if (
        temp instanceof InnerAsyncArray ||
        temp instanceof InnerAsyncArrayParallel
      ) {
        result = temp._arr
      } else {
        // 若是已是终结操做就返回数组的值
        if (resolve) {
          resolve(temp)
        }
        return temp
      }
    }
    if (resolve) {
      resolve(result)
    }
    return result
  }
}

再次测试上面的那第三个问题,发现已经一切正常了呢

  • 并发调用的顺序不能肯定,但不会影响内部数组了,结果是肯定的
;(async () => {
  const asyncArray = new AsyncArray(...ids)
  await Promise.all([
    (async () => {
      console.log(
        await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2),
      ) // [ 2, 6 ]
    })(),
    (async () => {
      console.log(await asyncArray) // [ 1, 2, 3, 4 ]
    })(),
  ])
  console.log(await asyncArray) // [ 1, 2, 3, 4 ]
})()
  • 操做能够被缓存
;(async () => {
  const asyncArray = new AsyncArray(...ids).map(i => i * 2)
  console.log(await asyncArray) // [ 2, 4, 6, 8 ]
  console.log(await asyncArray) // [ 2, 4, 6, 8 ]
})()

完整代码

下面吾辈把完整的代码贴出来

/**
 * 保存高阶函数传入的异步操做
 */
class Action {
  constructor(type, args) {
    /**
     * @field 异步操做的类型
     * @type {string}
     */
    this.type = type
    /**
     * @field 异步操做的参数数组
     * @type {Function}
     */
    this.args = args
  }
}

/**
 * 全部的操做类型
 */
Action.Type = {
  forEach: 'forEach',
  map: 'map',
  filter: 'filter',
}

/**
 * 真正实现的异步数组
 */
class InnerAsyncArray {
  constructor(arr) {
    this._arr = arr
  }
  async forEach(fn) {
    const arr = this._arr
    for (let i = 0, len = arr.length; i < len; i++) {
      await fn(arr[i], i, this)
    }
    this._arr = []
  }
  async map(fn) {
    const arr = this._arr
    const res = []
    for (let i = 0, len = arr.length; i < len; i++) {
      res.push(await fn(arr[i], i, this))
    }
    this._arr = res
    return this
  }
  async filter(fn) {
    const arr = this._arr
    const res = []
    for (let i = 0, len = arr.length; i < len; i++) {
      if (await fn(arr[i], i, this)) {
        res.push(arr[i])
      }
    }
    this._arr = res
    return this
  }
}

class InnerAsyncArrayParallel {
  constructor(arr) {
    this._arr = arr
  }
  async _all(fn) {
    return Promise.all(this._arr.map(fn))
  }
  async forEach(fn) {
    await this._all(fn)
    this._arr = []
  }
  async map(fn) {
    this._arr = await this._all(fn)
    return this
  }
  async filter(fn) {
    const arr = await this._all(fn)
    this._arr = this._arr.filter((v, i) => arr[i])
    return this
  }
}

class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
    /**
     * @field 保存异步任务
     * @type {Action[]}
     */
    this._task = []
    /**
     * 是否并行化
     */
    this._parallel = false
  }
  forEach(fn) {
    return this._addTask(Action.Type.forEach, [fn])
  }
  map(fn) {
    return this._addTask(Action.Type.map, [fn])
  }
  filter(fn) {
    return this._addTask(Action.Type.filter, [fn])
  }
  parallel() {
    this._parallel = true
    return this
  }
  serial() {
    this._parallel = false
    return this
  }
  _addTask(type, args) {
    const result = new AsyncArray(...this._arr)
    result._task = [...this._task, new Action(type, args)]
    result._parallel = this._parallel
    return result
  }
  /**
   * 终结整个链式操做并返回结果
   */
  async then(resolve, reject) {
    const arr = this._parallel
      ? new InnerAsyncArrayParallel(this._arr)
      : new InnerAsyncArray(this._arr)
    let result = this._arr
    for (let task of this._task) {
      const temp = await arr[task.type](...task.args)
      if (
        temp instanceof InnerAsyncArray ||
        temp instanceof InnerAsyncArrayParallel
      ) {
        result = temp._arr
      } else {
        // 若是已是终结操做就返回数组的值
        if (resolve) {
          resolve(temp)
        }
        return temp
      }
    }
    if (resolve) {
      resolve(result)
    }
    return result
  }
}

总结

那么,关于 JavaScript 中如何封装一个可使用异步操做高阶函数的数组就先到这里了,完整的 JavaScript 异步数组请参考吾辈的 AsyncArray(使用 TypeScript 编写)。

相关文章
相关标签/搜索