《Node.js设计模式》基于ES2015+的回调控制流

本系列文章为《Node.js Design Patterns Second Edition》的原文翻译和读书笔记,在GitHub连载更新,同步翻译版连接javascript

欢迎关注个人专栏,以后的博文将在专栏同步:html

Asynchronous Control Flow Patterns with ES2015 and Beyond

在上一章中,咱们学习了如何使用回调处理异步代码,以及如何解决如回调地狱代码等异步问题。回调是JavaScriptNode.js中的异步编程的基础,可是如今,其余替代方案已经出现。这些替代方案更复杂,以便可以以更方便的方式处理异步代码。前端

在本章中,咱们将探讨一些表明性的替代方案,PromiseGenerator。以及async await,这是一种创新的语法,可在高版本的JavaScript中提供,其也做为ECMAScript 2017发行版的一部分。java

咱们将看到这些替代方案如何简化处理异步控制流的方式。最后,咱们将比较全部这些方法,以了解全部这些方法的全部优势和缺点,并可以明智地选择最适合咱们下一个Node.js项目要求的方法。node

Promise

咱们在前面的章节中提到,CPS风格不是编写异步代码的惟一方法。事实上,JavaScript生态系统为传统的回调模式提供了有趣的替代方案。最着名的选择之一是Promise,特别是如今它是ECMAScript 2015的一部分,而且如今能够在Node.js中可用。mysql

什么是Promise?

Promise是一种抽象的对象,咱们一般容许函数返回一个名为Promise的对象,它表示异步操做的最终结果。一般状况下,咱们说当异步操做还没有完成时,咱们说Promise对象处于pending状态,当操做成功完成时,咱们说Promise对象处于resolve状态,当操做错误终止时,咱们说Promise对象处于reject状态。一旦Promise处于resolvereject,咱们认为当前异步操做结束。git

为了接收到异步操做的正确结果或错误捕获,咱们可使用Promisethen方法:github

promise.then([onFulfilled], [onRejected])
复制代码

在前面的代码中,onFulfilled()是一个函数,最终会收到Promise的正确结果,而onRejected()是另外一个函数,它将接收产生异常的缘由(若是有的话)。两个参数都是可选的。redis

要了解Promise如何转换咱们的代码,让咱们考虑如下几点:算法

asyncOperation(arg, (err, result) => {
  if (err) {
    // 错误处理
  }
  // 正常结果处理
});
复制代码

Promise容许咱们将这个典型的CPS代码转换成更好的结构化和更优雅的代码,以下所示:

asyncOperation(arg)
  .then(result => {
    // 错误处理
  }, err => {
    // 正常结果处理
  });
复制代码

then()方法的一个关键特征是它同步地返回另外一个Promise对象。若是onFulfilled()onRejected()函数中的任何一个函数返回x,则then()方法返回的Promise对象将以下所示:

  • 若是x是一个值,则这个Promise对象会正确处理(resolve)x
  • 若是x是一个Promise对象或thenable,则会正确处理(resolve)x
  • 若是x是一个异常,则会捕获异常(reject)x

注:thenable是一个具备then方法的相似于Promise的对象(Promise-like)。

这个特色使咱们可以链式构建Promise,容许轻松排列组合咱们的异步操做。另外,若是咱们没有指定一个onFulfilled()onRejected()处理程序,则正确结果或异常捕获将自动转发到Promise链的下一个Promise。例如,这容许咱们在整个链中自动传播错误,直到被onRejected()处理程序捕获。随着Promise链,任务的顺序执行忽然变成简单多了:

asyncOperation(arg)
  .then(result1 => {
    // 返回另外一个Promise
    return asyncOperation(arg2);
  })
  .then(result2 => {
    // 返回一个值
    return 'done';
  })
  .then(undefined, err => {
    // 捕获Promise链中的异常
  });
复制代码

下图展现了链式Promise如何工做:

Promise的另外一个重要特性是onFulfilled()onRejected()函数是异步调用的,如同上述的例子,在最后那个then函数resolve一个同步的Promise,它也是同步的。这种模式避免了Zalgo(参见Chapter2-Node.js Essential Patterns),使咱们的异步代码更加一致和稳健。

若是在onFulfilled()onRejected()处理程序中抛出异常(使用throw语句),则then()方法返回的Promise将被自动地reject,抛出异常做为reject的缘由。这相对于CPS来讲是一个巨大的优点,由于它意味着有了Promise,异常将在整个链中自动传播,而且throw语句终于可使用。

在之前,许多不一样的库实现了Promise,大多数时候它们之间不兼容,这意味着不可能在使用不一样Promise库的thenable链式传播错误。

JavaScript社区很是努力地解决了这个限制,这些努力致使了Promises / A +规范的建立。该规范详细描述了then方法的行为,提供了一个可互兼容的基础,这使得来自不一样库的Promise对象可以彼此兼容,开箱即用。

有关Promises / A +规范的详细说明,能够参考Promises / A + 官方网站

Promise / A + 的实施

JavaScript中以及Node.js中,有几个实现Promises / A +规范的库。如下是最受欢迎的:

真正区别他们的是在Promises / A +标准之上提供的额外功能。正如咱们上述所说的那样,该标准定义了then()方法和Promise解析过程的行为,但它没有指定其余功能,例如,如何从基于回调的异步函数建立Promise

在咱们的示例中,咱们将使用由ES2015Promise,由于Promise对象自Node.js 4后便可使用,而不须要任何库来实现。

做为参考,如下是ES2015Promise提供的API:

constructor(new Promise(function(resolve, reject){})):建立了一个新的Promise,它基于做为传递两个类型为函数的参数来决定resolvereject。构造函数的参数解释以下:

  • resolve(obj)resolve一个Promise,并带上一个参数obj,若是obj是一个值,这个值就是传递的异步操做成功的结果。若是obj是一个Promise或一个thenable,则会进行正确处理。
  • reject(err)reject一个Promise,并带上一个参数err。它是Error对象的一个实例。

Promise对象的静态方法

  • Promise.resolve(obj): 将会建立一个resolvePromise实例
  • Promise.reject(err): 将会建立一个rejectPromise实例
  • Promise.all(iterable):返回一个新的Promise实例,而且在iterable中所 有Promise状态为reject时, 返回的Promise实例的状态会被置为reject,若是iterable中至少有一个Promise状态为reject时, 返回的Promise实例状态也会被置为reject,而且reject的缘由是第一个被rejectPromise对象的reject缘由。
  • Promise.race(iterable):返回一个Promise实例,当iterable中任何一个Promiseresolve或被reject时, 返回的Promise实例以一样的缘由resolvereject

Promise实例方法

  • Promise.then(onFulfilled, onRejected):这是Promise的基本方法。它的行为与咱们以前描述的Promises / A +标准兼容。
  • Promise.catch(onRejected):这只是Promise.then(undefined,onRejected)的语法糖。

值得一提的是,一些Promise实现提供了另外一种机制来建立新的Promise,称为deferreds。咱们不会在这里描述,由于它不是ES2015标准的一部分,可是若是您想了解更多信息,能够阅读Q文档 (github.com/kriskowal/q…) 或When.js文档 (github.com/cujojs/when…) 。

Promisifying一个Node.js回调风格的函数

JavaScript中,并非全部的异步函数和库都支持开箱即用的Promise。大多数状况下,咱们必须将一个典型的基于回调的函数转换成一个返回Promise的函数,这个过程也被称为promisification

幸运的是,Node.js中使用的回调约定容许咱们建立一个可重用的函数,咱们经过使用Promise对象的构造函数来简化任何Node.js风格的API。让咱们建立一个名为promisify()的新函数,并将其包含到utilities.js模块中(以便稍后在咱们的Web爬虫应用程序中使用它):

module.exports.promisify = function(callbackBasedApi) {
  return function promisified() {
    const args = [].slice.call(arguments);
    return new Promise((resolve, reject) => {
      args.push((err, result) => {
        if (err) {
          return reject(err);
        }
        if (arguments.length <= 2) {
          resolve(result);
        } else {
          resolve([].slice.call(arguments, 1));
        }
      });
      callbackBasedApi.apply(null, args);
    });
  }
};
复制代码

前面的函数返回另外一个名为promisified()的函数,它表示输入中给出的callbackBasedApipromisified版本。如下展现它是如何工做的:

  1. promisified()函数使用Promise构造函数建立一个新的Promise对象,并当即将其返回给调用者。
  2. 在传递给Promise构造函数的函数中,咱们确保传递给callbackBasedApi,这是一个特殊的回调函数。因为咱们知道回调老是最后调用的,咱们只需将回调函数附加到提供给promisified()函数的参数列表里(args)。
  3. 在特殊的回调中,若是咱们收到错误,咱们当即reject这个Promise
  4. 若是没有收到错误,咱们使用一个值或一个数组值来resolve这个Promise,具体取决于传递给回调的结果数量。
  5. 最后,咱们只需使用咱们构建的参数列表调用callbackBasedApi

大部分的Promise已经提供了一个开箱即用的接口来将一个Node.js风格的API转换成一个返回Promise的API。例如,Q有Q.denodeify()和Q.nbind(),Bluebird有Promise.promisify(),而When.js有node.lift()。

顺序执行

在一些必要的理论以后,咱们如今准备将咱们的Web爬虫应用程序转换为使用Promise的形式。让咱们直接从版本2开始,直接下载一个Web网页的连接。

spider.js模块中,第一步是加载咱们的Promise实现(咱们稍后会使用它)和Promisifying咱们打算使用的基于回调的函数:

const utilities = require('./utilities');
const request = utilities.promisify(require('request'));
const mkdirp = utilities.promisify(require('mkdirp'));
const fs = require('fs');
const readFile = utilities.promisify(fs.readFile);
const writeFile = utilities.promisify(fs.writeFile);
复制代码

如今,咱们开始更改咱们的download函数:

function download(url, filename) {
  console.log(`Downloading ${url}`);
  let body;
  return request(url)
    .then(response => {
      body = response.body;
      return mkdirp(path.dirname(filename));
    })
    .then(() => writeFile(filename, body))
    .then(() => {
      console.log(`Downloaded and saved: ${url}`);
      return body;
    });
}
复制代码

这里要注意的到的最重要的是咱们也为readFile()返回的Promise注册 一个onRejected()函数,用来处理一个网页没有被下载的状况(或文件不存在)。 还有,看咱们如何使用throw来传递onRejected()函数中的错误的。

既然咱们已经更改咱们的spider()函数,咱们这么修改它的调用方式:

spider(process.argv[2], 1)
  .then(() => console.log('Download complete'))
  .catch(err => console.log(err));
复制代码

注意咱们是如何第一次使用Promise的语法糖catch来处理源自spider()函数的任何错误状况。若是咱们再看看迄今为止咱们所写的全部代码,那么咱们会惊喜的发现,咱们没有包含任何错误传播逻辑,由于咱们在使用回调函数时会被迫作这样的事情。这显然是一个巨大的优点,由于它极大地减小了咱们代码中的样板文件以及丢失任何异步错误的机会。

如今,完成咱们惟一缺失的Web爬虫应用程序的第二版的spiderLinks()函数,咱们将在稍后实现它。

顺序迭代

到目前为止,Web爬虫应用程序代码库主要是对Promise是什么以及如何使用的概述,展现了使用Promise实现顺序执行流程的简单性和优雅性。可是,咱们如今考虑的代码只涉及到一组已知的异步操做的执行。因此,完成咱们对顺序执行流程的探索的缺失部分是看咱们如何使用Promise来实现迭代。一样,网络蜘蛛第二版的spiderLinks()函数也是一个很好的例子。

让咱们添加缺乏的这一块:

function spiderLinks(currentUrl, body, nesting) {
  let promise = Promise.resolve();
  if (nesting === 0) {
    return promise;
  }
  const links = utilities.getPageLinks(currentUrl, body);
  links.forEach(link => {
    promise = promise.then(() => spider(link, nesting - 1));
  });
  return promise;
}
复制代码

为了异步迭代一个网页的所有连接,咱们必须动态建立一个Promise的迭代链。

  1. 首先,咱们定义一个空的Promiseresolveundefined。这个Promise只是用来做为Promise的迭代链的起始点。
  2. 而后,咱们经过在循环中调用链中前一个Promisethen()方法得到的新的Promise来更新Promise变量。这就是咱们使用Promise的异步迭代模式。

这样,循环的结束,promise变量会包含循环中最后一个then()返回的Promise对象,因此它只有当Promise的迭代链中所有Promise对象被resolve后才能被resolve

注:在最后调用了这个then方法来resolve这个Promise对象

经过这个,咱们已使用Promise对象重写了咱们的Web爬虫应用程序。咱们如今应该能够运行它了。

顺序迭代模式

为了总结这个顺序执行的部分,让咱们提取一个模式来依次遍历一组Promise

let tasks = [ /* ... */ ]
let promise = Promise.resolve();
tasks.forEach(task => {
  promise = promise.then(() => {
    return task();
  });
});
promise.then(() => {
  // 全部任务都完成
});
复制代码

使用reduce()方法来替代forEach()方法,容许咱们写出更为简洁的代码:

let tasks = [ /* ... */ ]
let promise = tasks.reduce((prev, task) => {
  return prev.then(() => {
    return task();
  });
}, Promise.resolve());

promise.then(() => {
  //All tasks completed
});
复制代码

与往常同样,经过对这种模式的简单调整,咱们能够将全部任务的结果收集到一个数组中,咱们能够实现一个mapping算法,或者构建一个filter等等。

上述这个模式使用循环动态地创建一个链式的Promise。

并行执行

另外一个适合用Promise的执行流程是并行执行流程。实际上,咱们须要作的就是使用内置的Promise.all()。这个方法创造了另外一个Promise对象,只有在输入中的全部Promiseresolve时才能resolve。这是一个并行执行,由于在其参数Promise对象的之间没有执行顺序可言。

为了演示这一点,咱们来看咱们的Web爬虫应用程序的第三版,它将页面中的全部连接并行下载。让咱们再次使用Promise更新spiderLinks()函数来实现并行流程:

function spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return Promise.resolve();
  }
  const links = utilities.getPageLinks(currentUrl, body);
  const promises = links.map(link => spider(link, nesting - 1));
  return Promise.all(promises);
}
复制代码

这里的模式在elements.map()迭代中产生一个数组,存放全部异步任务,以后便于同时启动spider()任务。这一次,在循环中,咱们不等待之前的下载完成,而后开始一个新的下载任务:全部的下载任务在一个循环中一个接一个地开始。以后,咱们利用Promise.all()方法,它返回一个新的Promise对象,当数组中的全部Promise对象都被resolve时,这个Promise对象将被resolve。换句话说,全部的下载任务完成,这正是咱们想要的。

限制并行执行

不幸的是,ES2015Promise API并无提供一种原生的方式来限制并发任务的数量,可是咱们老是能够依靠咱们所学到的有关用普通JavaScript来限制并发。事实上,咱们在TaskQueue类中实现的模式能够很容易地被调整来支持返回Promise的任务。这很容易经过修改next()方法来完成:

class TaskQueue {
  constructor(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
  }

  pushTask(task) {
    this.queue.push(task);
    this.next();
  }

  next() {
    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift();
      task().then(() => {
        this.running--;
        this.next();
      });
      this.running++;
    }
  }
}
复制代码

不一样于使用一个回调函数来处理任务,咱们简单地调用Promisethen()

让咱们回到spider.js模块,并修改它以支持咱们的新版本的TaskQueue类。首先,咱们确保定义一个TaskQueue的新实例:

const TaskQueue = require('./taskQueue');
const downloadQueue = new TaskQueue(2);
复制代码

而后,是咱们的spiderLinks()函数。这里的修改也是很简单:

function spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return Promise.resolve();
  }
  const links = utilities.getPageLinks(currentUrl, body);
  // 咱们须要以下代码,用于建立Promise对象
  // 若是没有下列代码,当任务数量为0时,将永远不会resolve
  if (links.length === 0) {
    return Promise.resolve();
  }
  return new Promise((resolve, reject) => {
    let completed = 0;
    let errored = false;
    links.forEach(link => {
      let task = () => {
        return spider(link, nesting - 1)
          .then(() => {
            if (++completed === links.length) {
              resolve();
            }
          })
          .catch(() => {
            if (!errored) {
              errored = true;
              reject();
            }
          });
      };
      downloadQueue.pushTask(task);
    });
  });
}
复制代码

在上述代码中有几点值得咱们注意的:

  • 首先,咱们须要返回使用Promise构造函数建立的新的Promise对象。正如咱们将看到的,这使咱们可以在队列中的全部任务完成时手动resolve咱们的Promise对象。
  • 而后,咱们应该看看咱们如何定义任务。咱们所作的是将一个onFulfilled()回调函数的调用添加到由spider()返回的Promise对象中,因此咱们能够计算完成的下载任务的数量。当完成的下载量与当前页面中连接的数量相同时,咱们知道任务已经处理完毕,因此咱们能够调用外部Promiseresolve()函数。

Promises / A +规范规定,then()方法的onFulfilled()和onRejected()回调函数只能调用一次(仅调用onFulfilled()和onRejected())。Promise接口的实现确保即便咱们屡次手动调用resolve或reject,Promise也仅能够被resolve或reject一次。

如今,使用PromiseWeb爬虫应用程序的第4版应该已经准备好了。咱们可能再次注意到下载任务如何并行运行,并发数量限制为2。

在公有API中暴露回调函数和Promise

正如咱们在前面所学到的,Promise能够被用做回调函数的一个很好的替代品。它们使咱们的代码更具可读性和易于理解。虽然Promise带来了许多优势,但也要求开发人员理解许多不易于理解的概念,以便正确和熟练地使用。因为这个缘由和其余缘由,在某些状况下,比起Promise来讲,不少开发者更偏向于回调函数。

如今让咱们想象一下,咱们想要构建一个执行异步操做的公共库。咱们须要作什么?咱们是建立了一个基于回调函数的API仍是一个面向PromiseAPI?仍是二者均有?

这是许多知名的库所面临的问题,至少有两种方法值得一提,使咱们可以提供一个多功能的API

requestredismysql这样的库所使用的第一种方法是提供一个简单的基于回调函数的API,若是须要,开发人员能够选择公开函数。其中一些库提供工具函数来Promise化异步回调,但开发人员仍然须要以某种方式将暴露的API转换为可以使用Promise对象。

第二种方法更透明。它还提供了一个面向回调的API,但它使回调参数可选。每当回调做为参数传递时,函数将正常运行,在完成时或失败时执行回调。当回调未被传递时,函数将当即返回一个Promise对象。这种方法有效地结合了回调函数和Promise,使得开发者能够在调用时选择采用什么接口,而不须要提早进行Promise化。许多库,如mongoosesequelize,都支持这种方法。

咱们来看一个简单的例子。假设咱们要实现一个异步执行除法的模块:

module.exports = function asyncDivision(dividend, divisor, cb) {
  return new Promise((resolve, reject) => { // [1]
    process.nextTick(() => {
      const result = dividend / divisor;
      if (isNaN(result) || !Number.isFinite(result)) {
        const error = new Error('Invalid operands');
        if (cb) {
          cb(error); // [2]
        }
        return reject(error);
      }
      if (cb) {
        cb(null, result); // [3]
      }
      resolve(result);
    });
  });
};
复制代码

该模块的代码很是简单,可是有一些值得强调的细节:

  • 首先,返回使用Promise的构造函数建立的新承诺。咱们在构造函数参数函数内定义所有逻辑。
  • 在发生错误的状况下,咱们reject这个Promise,但若是回调函数在被调用时做为参数传递,咱们也执行回调来进行错误传播。
  • 在计算结果以后,咱们resolve了这个Promise,可是若是有回调函数,咱们也会将结果传播给回调函数。

咱们如今看如何用回调函数和Promise来使用这个模块:

// 回调函数的方式
asyncDivision(10, 2, (error, result) => {
  if (error) {
    return console.error(error);
  }
  console.log(result);
});

// Promise化的调用方式
asyncDivision(22, 11)
  .then(result => console.log(result))
  .catch(error => console.error(error));
复制代码

应该很清楚的是,即将开始使用相似于上述的新模块的开发人员将很容易地选择最适合本身需求的风格,而无需在但愿利用Promise时引入外部promisification功能。

Generators

ES2015规范引入了另一种机制,除了其余新功能外,还能够用来简化Node.js应用程序的异步控制流程。咱们正在谈论Generator,也被称为semi-coroutines。它们是子程序的通常化,能够有不一样的入口点。在一个正常的函数中,实际上咱们只能有一个入口点,这个入口点对应着函数自己的调用。Generator与通常函数相似,可是能够暂停(使用yield语句),而后在稍后继续执行。在实现迭代器时,Generator特别有用,由于咱们已经讨论了如何使用迭代器来实现重要的异步控制流模式,如顺序执行和限制并行执行。

Generators基础

在咱们探索使用Generator来实现异步控制流程以前,学习一些基本概念是很重要的。咱们从语法开始吧。能够经过在函数关键字以后附加*(星号)运算符来声明Generator函数:

function* makeGenerator() {
  // body
}
复制代码

makeGenerator()函数内部,咱们可使用关键字yield暂停执行并返回给调用者传递给它的值:

function* makeGenerator() {
  yield 'Hello World';
  console.log('Re-entered');
}
复制代码

在前面的代码中,Generator经过yield一个字符串Hello World暂停当前函数的执行。当Generator恢复时,执行将从下列语句开始:

console.log('Re-entered');
复制代码

makeGenerator()函数本质上是一个工厂,它在被调用时返回一个新的Generator对象:

const gen = makeGenerator();
复制代码

生成器对象的最重要的方法是next(),它用于启动/恢复Generator的执行,并返回以下形式的对象:

{
  value: <yielded value>
  done: <true if the execution reached the end>
}
复制代码

这个对象包含Generator yield的值和一个指示Generator是否已经完成执行的符号。

一个简单的例子

为了演示Generator,咱们来建立一个名为fruitGenerator.js的新模块:

function* fruitGenerator() {
  yield 'apple';
  yield 'orange';
  return 'watermelon';
}
const newFruitGenerator = fruitGenerator();
console.log(newFruitGenerator.next()); // [1]
console.log(newFruitGenerator.next()); // [2]
console.log(newFruitGenerator.next()); // [3]
复制代码

前面的代码将打印下面的输出:

{ value: 'apple', done: false }
{ value: 'orange', done: false }
{ value: 'watermelon', done: true }
复制代码

咱们能够这么解释上述现象:

  • 第一次调用newFruitGenerator.next()时,Generator函数开始执行,直到达到第一个yield语句为止,该命令暂停Generator函数执行,并将值apple返回给调用者。
  • 在第二次调用newFruitGenerator.next()时,Generator函数恢复执行,从第二个yield语句开始,这又使得执行暂停,同时将orange返回给调用者。
  • newFruitGenerator.next()的最后一次调用致使Generator函数的执行从其最后的yield恢复,一个返回语句,它终止Generator函数,返回watermelon,并将结果对象中的done属性设置为true

Generators做为迭代器

为了更好地理解为何Generator函数对实现迭代器很是有用,咱们来构建一个例子。在咱们将调用iteratorGenerator.js的新模块中,咱们编写下面的代码:

function* iteratorGenerator(arr) {
  for (let i = 0; i < arr.length; i++) {
    yield arr[i];
  }
}
const iterator = iteratorGenerator(['apple', 'orange', 'watermelon']);
let currentItem = iterator.next();
while (!currentItem.done) {
  console.log(currentItem.value);
  currentItem = iterator.next();
}
复制代码

此代码应按以下所示打印数组中的元素:

apple
orange
watermelon
复制代码

在这个例子中,每次咱们调用iterator.next()时,咱们都会恢复Generator函数的for循环,经过yield数组中的下一个项来运行另外一个循环。这演示了如何在函数调用过程当中维护Generator的状态。当继续执行时,循环和全部变量的值与Generator函数执行暂停时的状态彻底相同。

传值给Generators

如今咱们继续研究Generator的基本功能,首先学习如何将值传递回Generator函数。这其实很简单,咱们须要作的只是为next()方法提供一个参数,而且该值将做为Generator函数内的yield语句的返回值提供。

为了展现这一点,咱们来建立一个新的简单模块:

function* twoWayGenerator() {
  const what = yield null;
  console.log('Hello ' + what);
}
const twoWay = twoWayGenerator();
twoWay.next();
twoWay.next('world');
复制代码

当执行时,前面的代码会输出Hello world。咱们作以下的解释:

  • 第一次调用next()方法时,Generator函数到达第一个yield语句,而后暂停。
  • next('world')被调用时,Generator函数从上次中止的位置,也就是上次的yield语句点恢复,可是此次咱们有一个值传递到Generator函数。这个值将被赋值到what变量。生成器而后执行console.log()指令并终止。

用相似的方式,咱们能够强制Generator函数抛出异常。这能够经过使用Generator函数的throw方法来实现,以下例所示:

const twoWay = twoWayGenerator();
twoWay.next();
twoWay.throw(new Error());
复制代码

在这个最后这段代码,twoWayGenerator()函数将在yield函数返回的时候抛出异常。这就好像从Generator函数内部抛出了一个异常同样,这意味着它能够像使用try ... catch块同样进行捕获和处理异常。

Generator实现异步控制流

你必定想知道Generator函数如何帮助咱们处理异步操做。咱们能够经过建立一个接受Generator函数做为参数的特殊函数来演示这一点,并容许咱们在Generator函数内部使用异步代码。这个函数在异步操做完成时要注意恢复Generator函数的执行。咱们将调用这个函数asyncFlow()

function asyncFlow(generatorFunction) {
  function callback(err) {
    if (err) {
      return generator.throw(err);
    }
    const results = [].slice.call(arguments, 1);
    generator.next(results.length > 1 ? results : results[0]);
  }
  const generator = generatorFunction(callback);
  generator.next();
}
复制代码

前面的函数取一个Generator函数做为输入,而后当即调用:

const generator = generatorFunction(callback);
generator.next();
复制代码

generatorFunction()接受一个特殊的回调函数做为参数,当generator.throw()若是接收到一个错误,便当即返回。另外,经过将在回调函数中接收的results传值回Generator函数继续Generator函数的执行:

if (err) {
  return generator.throw(err);
}
const results = [].slice.call(arguments, 1);
generator.next(results.length > 1 ? results : results[0]);
复制代码

为了说明这个简单的辅助函数的强大,咱们建立一个叫作clone.js的新模块,这个模块只是建立它自己的克隆。粘贴咱们刚才建立的asyncFlow()函数,核心代码以下:

const fs = require('fs');
const path = require('path');
asyncFlow(function*(callback) {
  const fileName = path.basename(__filename);
  const myself = yield fs.readFile(fileName, 'utf8', callback);
  yield fs.writeFile(`clone_of_${filename}`, myself, callback);
  console.log('Clone created');
});
复制代码

明显地,有了asyncFlow()函数的帮助,咱们能够像咱们书写同步阻塞函数同样用同步的方式来书写异步代码了。而且这个结果背后的原理显得很清楚。一旦异步操做结束,传递给每一个异步函数的回调函数将继续Generator函数的执行。没有什么复杂的,可是结果确实很使人意外。

这个技术有其余两个变化,一个是Promise的使用,另一个则是thunks

在基于Generator的控制流中使用的thunk只是一个简单的函数,它除了回调以外,部分地应用了原始函数的全部参数。返回值是另外一个只接受回调做为参数的函数。例如,fs.readFile()的thunkified版本以下所示:

function readFileThunk(filename, options) {
  return function(callback) {
    fs.readFile(filename, options, callback);
  }
}
复制代码

thunkPromise都容许咱们建立不须要回调的Generator函数做为参数传递,例如,使用thunkasyncFlow()版本以下:

function asyncFlowWithThunks(generatorFunction) {
  function callback(err) {
    if (err) {
      return generator.throw(err);
    }
    const results = [].slice.call(arguments, 1);
    const thunk = generator.next(results.length > 1 ? results : results[0]).value;
    thunk && thunk(callback);
  }
  const generator = generatorFunction();
  const thunk = generator.next().value;
  thunk && thunk(callback);
}
复制代码

这个技巧是读取generator.next()的返回值,返回值中包含thunk。下一步是经过注入特殊的回调函数调用thunk自己。这容许咱们写下面的代码:

asyncFlowWithThunk(function*() {
  const fileName = path.basename(__filename);
  const myself = yield readFileThunk(__filename, 'utf8');
  yield writeFileThunk(`clone_of_${fileName}`, myself);
  console.log("Clone created")
});
复制代码

使用co的基于Gernator的控制流

你应该已经猜到了,Node.js生态系统会借助Generator函数来提供一些处理异步控制流的解决方案,例如,suspend是其中一个最老的支持PromisethunksNode.js风格回调函数和正常风格的回调函数的 库。还有,大部分咱们以前分析的Promise库都提供工具函数使得GeneratorPromise能够一块儿使用。

咱们选择co做为本章节的例子。它支持不少类型的yieldables,其中一些是:

  • Thunks
  • Promises
  • Arrays(并行执行)
  • Objects(并行执行)
  • Generators(委托)
  • Generator函数(委托)

还有不少框架或库是基于co生态系统的,包括如下一些:

  • Web框架,最流行的是koa
  • 实现特定控制流模式的库
  • 包装流行的API兼容co的库

咱们使用co从新实现咱们的Generator版本的Web爬虫应用程序

为了将Node.js风格的函数转换成thunks,咱们将会使用一个叫作thunkify的库。

顺序执行

让咱们经过修改Web爬虫应用程序的版本2开始咱们对Generator函数和co的实际探索。咱们要作的第一件事就是加载咱们的依赖包,并生成咱们要使用的函数的thunkified版本。这些将在spider.js模块的最开始进行:

const thunkify = require('thunkify');
const co = require('co');
const request = thunkify(require('request'));
const fs = require('fs');
const mkdirp = thunkify(require('mkdirp'));
const readFile = thunkify(fs.readFile);
const writeFile = thunkify(fs.writeFile);
const nextTick = thunkify(process.nextTick);
复制代码

看上述代码,咱们能够注意到与本章前面promisify化的API的代码的一些类似之处。在这一点上,有意思的是,若是咱们使用咱们的promisified版本的函数来代替thunkified的版本,代码将保持彻底同样,这要归功于co支持thunkPromise对象做为yieldable对象。事实上,若是咱们想,甚至能够在同一个应用程序中使用thunkPromise,即便在同一个Generator函数中。就灵活性而言,这是一个巨大的优点,由于它使咱们可以使用基于Generator函数的控制流来解决咱们应用程序中的问题。

好的,如今让咱们开始将download()函数转换为一个Generator函数:

function* download(url, filename) {
  console.log(`Downloading ${url}`);
  const response = yield request(url);
  const body = response[1];
  yield mkdirp(path.dirname(filename));
  yield writeFile(filename, body);
  console.log(`Downloaded and saved ${url}`);
  return body;
}
复制代码

经过使用Generatorco,咱们的download()函数变得简单多了。当咱们须要作异步操做的时候,咱们使用异步的Generator函数做为thunk来把以前的内容转化到Generator函数,并使用yield子句。

而后咱们开始实现咱们的spider()函数:

function* spider(url, nesting) {
  cost filename = utilities.urlToFilename(url);
  let body;
  try {
    body = yield readFile(filename, 'utf8');
  } catch (err) {
    if (err.code !== 'ENOENT') {
      throw err;
    }
    body = yield download(url, filename);
  }
  yield spiderLinks(url, body, nesting);
}
复制代码

从上述代码中一个有趣的细节是咱们可使用try...catch语句块来处理异常。咱们还可使用throw来传播异常。另一个细节是咱们yield咱们的download()函数,而这个函数既不是一个thunk,也不是一个promisified函数,只是另外的一个Generator函数。这也毫无问题,因为co也支持其余Generators做为yieldables

最后转换spiderLinks(),在这个函数中,咱们递归下载一个网页的连接。在这个函数中使用Generators,显得简单多了:

function* spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return nextTick();
  }
  const links = utilities.getPageLinks(currentUrl, body);
  for (let i = 0; i < links.length; i++) {
    yield spider(links[i], nesting - 1);
  }
}
复制代码

看上述代码。虽然顺序迭代没有什么模式能够展现。Generatorco辅助咱们作了不少,方便了咱们可使用同步方式开书写异步代码。

看最重要的部分,程序的入口:

co(function*() {
  try {
    yield spider(process.argv[2], 1);
    console.log(`Download complete`);
  } catch (err) {
    console.log(err);
  }
});
复制代码

这是惟一一处须要调用co(...)来封装的一个Generator。实际上,一旦咱们这么作,co会自动封装咱们传递给yield语句的任何Generator函数,而且这个过程是递归的,因此程序的剩余部分与咱们是否使用co是彻底无关的,虽然是被co封装在里面。

如今应该能够运行使用Generator函数改写的Web爬虫应用程序了。

并行执行

不幸的是,虽然Generator很方便地进行顺序执行,可是不能直接用来并行化执行一组任务,至少不能仅仅使用yieldGenerator。以前,在种状况下咱们使用的模式只是简单地依赖于一个基于回调或者Promise的函数,但使用了Generator函数后,一切会显得更简单。

幸运的是,若是不限制并发数的并行执行,co已经能够经过yield一个Promise对象、thunkGenerator函数,甚至包含Generator函数的数组来实现。

考虑到这一点,咱们的Web爬虫应用程序第三版能够经过重写spiderLinks()函数来作以下改动:

function* spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return nextTick();
  }
  const links = utilities.getPageLinks(currentUrl, body);
  const tasks = links.map(link => spider(link, nesting - 1));
  yield tasks;
}
复制代码

可是上述函数所作的只是拿到全部的任务,这些任务本质上都是经过Generator函数来实现异步的,若是在cothunk内对一个包含Generator函数的数组使用yield,这些任务都会并行执行。外层的Generator函数会等到yield子句的全部异步任务并行执行后再继续执行。

接下来咱们看怎么用一个基于回调函数的方式来解决相同的并行流。咱们用这种方式重写spiderLinks()函数:

function spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return nextTick();
  }
  // 返回一个thunk
  return callback => {
    let completed = 0,
      hasErrors = false;
    const links = utilities.getPageLinks(currentUrl, body);
    if (links.length === 0) {
      return process.nextTick(callback);
    }

    function done(err, result) {
      if (err && !hasErrors) {
        hasErrors = true;
        return callback(err);
      }
      if (++completed === links.length && !hasErrors) {
        callback();
      }
    }
    for (let i = 0; i < links.length; i++) {
      co(spider(links[i], nesting - 1)).then(done);
    }
  }
}
复制代码

咱们使用co并行运行spider()函数,调用Generator函数返回了一个Promise对象。这样,等待Promise完成后调用done()函数。一般,基于Generator控制流的库都有这一功能,所以若是须要,你老是能够将一个Generator转换成一个基于回调或基于Promise的函数。

为了并行开启多个下载任务,咱们只要重用在前面定义的基于回调的并行执行的模式。咱们应该也注意到咱们将spiderLinks()转换成一个thunk(而再也不是一个Generator函数)。这使得当所有并行任务完成时,咱们有一个回调函数能够调用。

上面讲到的是将一个Generator函数转换为一个thunk的模式,使之可以支持其余的基于回调或基于Promise的控制流算法,并能够经过同步阻塞的代码风格书写异步代码。

限制并行执行

如今咱们知道如何处理异步执行流程,应该很容易规划咱们的Web爬虫应用程序的第四版的实现,这个版本对并发下载任务的数量施加了限制。咱们有几个方案能够用来作到这一点。其中一些方案以下:

  • 使用先前实现的基于回调的TaskQueue类。咱们只须要thunkify咱们的Generator函数和其提供的回调函数便可。
  • 使用基于PromiseTaskQueue类,并确保每一个做为任务的Generator函数都被转换成一个返回Promise对象的函数。
  • 使用asyncthunkify咱们打算使用的工具函数,此外还须要把咱们用到的Generator函数转化为基于回调的模式,以便于可以被这个库较好地使用。
  • 使用基于co的生态系统中的库,特别是专门为这种场景的库,如co-limiter
  • 实现基于生产者 - 消费者模型的自定义算法,这与co-limiter的内部实现原理相同。

为了学习,咱们选择最后一个方案,甚至帮助咱们能够更好地理解一种常常与协程(也和线程和进程)同步相关的模式。

生产者 - 消费者模式

咱们的目标是利用队列来提供固定数量的workers,与咱们想要设置的并发级别同样多。为了实现这个算法,咱们将基于本章前面定义的TaskQueue类改写:

class TaskQueue {
  constructor(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.taskQueue = [];
    this.consumerQueue = [];
    this.spawnWorkers(concurrency);
  }
  pushTask(task) {
    if (this.consumerQueue.length !== 0) {
      this.consumerQueue.shift()(null, task);
    } else {
      this.taskQueue.push(task);
    }
  }
  spawnWorkers(concurrency) {
    const self = this;
    for (let i = 0; i < concurrency; i++) {
      co(function*() {
        while (true) {
          const task = yield self.nextTask();
          yield task;
        }
      });
    }
  }
  nextTask() {
    return callback => {
      if (this.taskQueue.length !== 0) {
        return callback(null, this.taskQueue.shift());
      }
      this.consumerQueue.push(callback);
    }
  }
}
复制代码

让咱们分析这个TaskQueue类的新实现。首先是在构造函数中。须要调用一次this.spawnWorkers(),由于这是启动worker的方法。

咱们的worker很简单,它们只是用co()包装的当即执行的Generator函数,因此每一个Generator函数能够并行执行。在内部,每一个worker正在运行在一个死循环(while(true){})中,一直阻塞(yield)到新任务在队列中可用时(yield self.nextTask()),一旦能够执行新任务,yield这个异步任务直到其完成。您可能想知道咱们如何可以限制并行执行,并让下一个任务在队列中处于等待状态。答案是在nextTask()方法中。咱们来详细地看看在这个方法的原理:

nextTask() {
  return callback => {
    if (this.taskQueue.length !== 0) {
      return callback(null, this.taskQueue.shift());
    }
    this.consumerQueue.push(callback);
  }
}
复制代码

咱们看这个函数内部发生了什么,这才是这个模式的核心:

  1. 这个方法返回一个对于co而言是一个合法的yieldablethunk
  2. 只要taskQueue类生成的实例中还有下一个任务,thunk的回调函数会被当即调用。回调函数调用时,立马解锁一个worker的阻塞状态,yield这一个任务。
  3. 若是队列中没有任务了,回调函数自己会被放入consumerQueue中。经过这种作法,咱们将一个worker置于空闲(idle)的模式。一旦咱们有一个新的任务来要处理,在consumerQueue队列中的回调函数会被调用,立马唤醒咱们这一worker进行异步处理。

如今,为了理解consumerQueue队列中的空闲worker是如何恢复工做的,咱们须要分析pushTask()方法。若是当前有回调函数可用的话,pushTask()方法将调用consumerQueue队列中的第一个回调函数,从而将取消对worker的锁定。若是没有可用的回调函数,这意味着全部的worker都是工做状态,只须要添加一个新的任务到taskQueue任务队列中。

TaskQueue类中,worker充当消费者的角色,而调用pushTask()函数的角色能够被认为是生产者。这个模式向咱们展现了一个Generator函数实际上能够跟一个线程或进程相似。实际上,生产者 - 消费者之间问题是研究进程间通讯和同步时最多见的问题,但正如咱们已经提到的那样,它对于进程和线程来讲,也是一个常见的例子。

限制下载任务的并发量

既然咱们已经使用Generator函数和生产者 - 消费者模型实现一个限制并行算法,而且已经在Web爬虫应用程序第四版应用它来限制中下载任务的并发数。 首先,咱们加载和初始化一个TaskQueue对象:

const TaskQueue = require('./taskQueue');
const downloadQueue = new TaskQueue(2);
复制代码

而后,修改spiderLinks()函数。和以前不限制并发的版本相似,因此这里咱们只展现修改的部分,主要是经过调用新版本的TaskQueue类生成的实例的pushTask()方法来限制并行执行:

function spiderLinks(currentUrl, body, nesting) {
  //...
  return (callback) => {
    //...
    function done(err, result) {
      //...
    }
    links.forEach(function(link) {
      downloadQueue.pushTask(function*() {
        yield spider(link, nesting - 1);
        done();
      });
    });
  }
}
复制代码

在每一个任务中,咱们在下载完成后当即调用done()函数,所以咱们能够计算下载了多少个连接,而后在完成下载时通知thunk的回调函数执行。

配合Babel使用Async await新语法

回调函数、PromiseGenerator函数都是用于处理JavaScriptNode.js异步问题的方式。正如咱们所看到的,Generator的真正意义在于它提供了一种方式来暂停一个函数的执行,而后等待前面的任务完成后再继续执行。咱们可使用这样的特性来书写异步代码,而且让开发者用同步阻塞的代码风格来书写异步代码。等到异步操做的结果返回后才恢复当前函数的执行。

Generator函数是更多的是用来处理迭代器,然而迭代器在异步代码的使用显得有点笨重。代码可能难以理解,致使代码易读性和可维护性差。

但在不远的未来会有一种更加简洁的语法。实际上,这个提议即将引入到ESMASCript 2017的规范中,这项规范定义了async函数语法。

async函数规范引入两个关键字(asyncawait)到原生的JavaScript语言中,改进咱们书写异步代码的方式。

为了理解这项语法的用法和优点为,咱们看一个简单的例子:

const request = require('request');

function getPageHtml(url) {
  return new Promise(function(resolve, reject) {
    request(url, function(error, response, body) {
      resolve(body);
    });
  });
}
async function main() {
  const html = await getPageHtml('http://google.com');
  console.log(html);
}

main();
console.log('Loading...');
复制代码

在上述代码中,有两个函数:getPageHtmlmain。第一个函数的做用是提取给定URL的一个远程网页的HTML文档代码。值得注意的是,这个函数返回一个Promise对象。

重点在于main函数,由于在这里使用了asyncawait关键字。首先要注意的是函数要以async关键字为前缀。意思是这个函数执行的是异步代码而且容许它在函数体内使用await关键字。await关键字在getPageHtml调用以前,告诉JavaScript解释器在继续执行下一条指令以前,等待getPageHtml返回的Promise对象的结果。这样,main函数内部哪部分代码是异步的,它会等待异步代码的完成再继续执行后续操做,而且不会阻塞这段程序其他部分的正常执行。实际上,控制台会打印字符串Loading...,随后是Google主页的HTML代码。

是否是这种方法的可读性更好而且更容易理解呢? 不幸地是,这个提议还没有定案,即便经过这个提议,咱们须要等下一个版本 的ECMAScript规范出来并把它集成到Node.js后,才能使用这个新语法。 因此咱们今天作了什么?只是漫无目的地等待?不是,固然不是!咱们已经能够在咱们的代码中使用async await语法,只要咱们使用Babel

安装与运行Babel

Babel是一个JavaScript编译器(或翻译器),可以使用语法转换器将高版本的JavaScript代码转换成其余JavaScript代码。语法转换器容许例如咱们书写并使用ES2015ES2016JSX和其它的新语法,来翻译成日后兼容的代码,在JavaScript运行环境如浏览器或Node.js中均可以使用Babel

在项目中使用npm安装Babel,命令以下:

npm install --save-dev babel-cli
复制代码

咱们还须要安装插件以支持async await语法的解释或翻译:

npm install --save-dev babel-plugin-syntax-async-functions babel-plugin-tranform-async-to-generator
复制代码

如今假设咱们想运行咱们以前的例子(称为index.js)。咱们须要经过如下命令启动:

node_modules/.bin/babel-node --plugins "syntax-async-functions,transform-async-to-generator" index.js
复制代码

这样,咱们使用支持async await的转换器动态地转换源代码。Node.js运行的实际是保存在内存中的日后兼容的代码。

Babel也能被配置为一个代码构建工具,保存翻译或解释后的代码到本地文件系统中,便于咱们部署和运行生成的代码。

关于如何安装和配置Babel,能够到官方网站 babeljs.io 查阅相关文档。

几种方式的比较

如今,咱们应该对于怎么处理JavaScript的异步问题有了一个更好的认识和总结。在下面的表格中总结几大机制的优点和劣势:

值得一提的是,咱们选择在本章中仅介绍处理异步控制流程的最受欢迎的解决方案,或者是普遍使用的解决方案,可是例如Fibers( npmjs.org/package/fib… )和Streamline( npmjs.org/p ackage/streamline )也是值得一看的。

总结

在本章中,咱们分析了一些处理异步控制流的方法,分析了PromiseGenerator函数和即将到来的async await语法。

咱们学习了如何使用这些方法编写更简洁,更具备可读性的异步代码。咱们讨论了这些方法的一些最重要的优势和缺点,并认识到即便它们很是有用,也须要一些时间来掌握。这就是这几种方式也没有彻底取代在许多状况下仍然很是有用的回调的缘由。做为一名开发人员,应该按照实际状况分析决定使用哪一种解决方案。若是您正在构建执行异步操做的公共库,则应该提供易于使用的API,即便对于只想使用回调的开发人员也是如此。

在下一章中,咱们将探讨另外一个与异步代码执行相关的机制,这也是整个Node.js生态系统中的另外一个基本构建块:streams

相关文章
相关标签/搜索