本系列文章为《Node.js Design Patterns Second Edition》的原文翻译和读书笔记,在GitHub连载更新,同步翻译版连接。javascript
欢迎关注个人专栏,以后的博文将在专栏同步:html
Node.js
这类语言习惯于同步的编程风格,其CPS
风格和异步特性的API
是其标准,对于新手来讲可能难以理解。编写异步代码多是一种不一样的体验,尤为是对异步控制流而言。异步代码可能让咱们难以预测在Node.js
中执行语句的顺序。例如读取一组文件,执行一串任务,或者等待一组操做完成,都须要开发人员采用新的方法和技术,以免最终编写出效率低下和不可维护的代码。一个常见的错误是回调地狱,代码量急剧上升又不可读,使得简单的程序也难以阅读和维护。在本章中,咱们将看到如何经过使用一些规则和一些模式来避免回调,并编写干净、可管理的异步代码。咱们将看到控制流库,如async
,能够极大地简化咱们的问题,提高咱们的代码可读性,更易于维护。前端
JavaScript
中异步代码的顺序错乱无疑是很容易的。闭包和对匿名函数的定义可使开发人员有更好的编程体验,而并不须要开发人员手动对异步操做进行管理和跳转。这是符合KISS
原则的。简单且能保持异步代码控制流,让它在更短的时间内工做。但不幸的是,回调嵌套是以牺牲诸如模块性、可重用性和可维护性,增大整个函数的大小,致使糟糕的代码结构为代价的。大多数状况下,建立闭包在功能上是不须要的,但这更可能是一种约束,而不是与异步编程相关的问题。认识到回调嵌套会使得咱们的代码变得笨拙,而后根据最适合的解决方案采起相应的方法解决回调地狱,这是新手与专家的区别。java
为了解释上述问题,咱们建立了一个简单的Web爬虫,一个命令行应用,其接受一个URL
为输入,而后能够把其内容下载到一个文件中。在下列代码中,咱们会依赖如下两个npm
库。node
此外,咱们还将引用一个叫作./utilities
的本地模块。git
咱们的应用程序的核心功能包含在一个名为spider.js
的模块中。以下所示,首先加载咱们所须要的依赖包:github
const request = require('request'); const fs = require('fs'); const mkdirp = require('mkdirp'); const path = require('path'); const utilities = require('./utilities');
接下来,咱们将建立一个名为spider()
的新函数,该函数接受URL
为参数,并在下载过程完成时调用一个回调函数。算法
function spider(url, callback) { const filename = utilities.urlToFilename(url); fs.exists(filename, exists => { if (!exists) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { callback(err); } else { mkdirp(path.dirname(filename), err => { if (err) { callback(err); } else { fs.writeFile(filename, body, err => { if (err) { callback(err); } else { callback(null, filename, true); } }); } }); } }); } else { callback(null, filename, false); } }); }
上述函数执行如下任务:数据库
URL
的文件是否已经下载过,即验证相应文件是否已经被建立:fs.exists(filename, exists => ...
npm
request(url, (err, response, body) => ...
mkdirp(path.dirname(filename), err => ...
HTTP
请求返回的报文主体写入文件系统:mkdirp(path.dirname(filename), err => ...
要完成咱们的Web爬虫
应用程序,只需提供一个URL
做为输入(在咱们的例子中,咱们从命令行参数中读取它),咱们只需调用spider()
函数便可。
spider(process.argv[2], (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of "${filename}"`); } else { console.log(`"${filename}" was already downloaded`); } });
如今,咱们开始尝试运行Web爬虫
应用程序,可是首先,确保已有utilities.js
模块和package.json
中的全部依赖包已经安装到你的项目中:
npm install
以后,咱们执行咱们这个爬虫模块来下载一个网页,使用如下命令:
node spider http://www.example.com
咱们的Web爬虫
应用程序要求在咱们提供的URL
中老是包含协议类型(例如,http://
)。另外,不要指望HTML
连接被从新编写,也不要指望下载像图片这样的资源,由于这只是一个简单的例子来演示异步编程是如何工做的。
看看咱们的spider()
函数,咱们能够发现,尽管咱们实现的算法很是简单,可是生成的代码有几个级别的缩进,并且很难读懂。使用阻塞式的同步API
实现相似的功能是很简单的,并且不多有机会让它看起来如此错误。然而,使用异步CPS
是另外一回事,使用闭包可能会致使出现难以阅读的代码。
大量闭包和回调将代码转换成不可读的、难以管理的状况称为回调地狱。它是Node.js
中最受承认和最严重的反模式之一。通常来讲,对于JavaScript
而言。受此问题影响的代码的典型结构以下:
asyncFoo(err => { asyncBar(err => { asyncFooBar(err => { //... }); }); });
咱们能够看到,用这种方式编写的代码是如何造成金字塔形状的,因为深嵌的缘由致使的难以阅读,称为“末日金字塔”。
像前面的代码片断这样的代码最明显的问题是可读性差。因为嵌套太深,几乎不可能跟踪回调函数的结束位置和另外一个回调函数开始的位置。
另外一个问题是由每一个做用域中使用的变量名的重叠引发的。一般,咱们必须使用相似甚至相同的名称来描述变量的内容。最好的例子是每一个回调接收到的错误参数。有些人常常尝试使用相同名称的变体来区分每一个范围内的对象,例如,error
、err
、err1
、err2
等等。另外一些人则倾向于隐藏在范围中定义的变量,老是使用相同的名称。例如,err
。这两种选择都远非完美,并且会形成混淆,并增长致使bug
的可能性。
此外,咱们必须记住,虽然闭包在性能和内存消耗方面的代价很小。此外,它们还能够建立不易识别的内存泄漏,由于咱们不该该忘记,由闭包引用的任何上下文变量都不会被垃圾收集所保留。
关于对于V8
的闭包工做原理,能够参考Vyacheslav Egorov的博客文章。
若是咱们看一下咱们的spider()
函数,咱们会清楚地注意到它即是一个典型的回调地狱的场景,而且在这个函数中有咱们刚才描述的全部问题。这正是咱们将在本章中学习的模式和技巧所要解决的问题。
既然咱们已经遇到了第一个回调地狱的例子,咱们知道咱们应该避免什么。然而,在编写异步代码时,这并非唯一的关注点。事实上,有几种状况下,控制一组异步任务的流须要使用特定的模式和技术,特别是若是咱们只使用普通的JavaScript
而没有任何外部库的帮助的状况下。例如,经过按顺序应用异步操做来遍历集合并不像在数组中调用forEach()
那样简单,但实际上它须要一种相似于递归的技术。
在本节中,咱们将学习如何避免回调地狱,以及如何使用简单的JavaScript
实现一些最多见的控制流模式。
在编写异步代码时,要记住的第一个规则是在定义回调时不要滥用闭包。滥用闭包一时很爽,由于它不须要对诸如模块化和可重用性这样的问题进行额外的思考。可是,咱们已经看到,这种作法弊大于利。大多数状况下,修复回调地狱问题并不须要任何库、花哨的技术或范式的改变,只是一些常识。
如下是一些基本原则,能够帮助咱们更少的嵌套,并改进咱们的代码的组织:
return
、continue
或break
,以便当即退出当前代码块,而不是使用if...else
代码块。其余语句。这将有助于优化咱们的代码结构。为了展现上述原则,咱们经过重构Web爬虫
应用程序来讲明。
对于第一步,咱们能够经过删除else
语句来重构咱们的错误检查方式。这是在咱们收到错误后当即从函数中返回。所以,看如下代码:
if (err) { callback(err); } else { // 若是没有错误,执行该代码块 }
咱们能够经过编写下面的代码来改进咱们的代码结构:
if (err) { return callback(err); } // 若是没有错误,执行该代码块
有了这个简单的技巧,咱们当即减小了函数的嵌套级别,它很简单,不须要任何复杂的重构。
在执行咱们刚才描述的优化时,一个常见的错误是在调用回调函数以后忘记终止函数,即return
。对于错误处理场景,如下代码是bug
的典型来源:
if (err) { callback(err); } // 若是没有错误,执行该代码块
在这个例子中,即便在调用回调以后,函数的执行也会继续。那么避免这种状况的出现,return
语句是十分必要的。还要注意,函数返回的输出是什么并不重要,实际结果(或错误)是异步生成的,并传递给回调。异步函数的返回值一般被忽略。该属性容许咱们编写以下的代码:
return callback(...);
不然咱们必须拆成两条语句来写:
callback(...); return;
接下来咱们继续重构咱们的spider()
函数,咱们能够尝试识别可复用的代码片断。例如,将给定字符串写入文件的功能能够很容易地分解为一个单独的函数:
function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); }
遵循一样的原则,咱们能够建立一个名为download()
的通用函数,它将URL
和文件名
做为输入,并将URL
的内容下载到给定的文件中。在内部,咱们可使用前面建立的saveFile()
函数。
function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }); }
最后,修改咱们的spider()
函数:
function spider(url, callback) { const filename = utilities.urlToFilename(url); fs.exists(filename, exists => { if (exists) { return callback(null, filename, false); } download(url, filename, err => { if (err) { return callback(err); } callback(null, filename, true); }) }); }
spider()
函数的功能和接口仍然是彻底相同的,改变的仅仅是代码的组织方式。经过应用上述基本原则,咱们可以极大地减小代码的嵌套,同时增长了它的可重用性和可测试性。实际上,咱们能够考虑导出saveFile()
和download()
,这样咱们就能够在其余模块中重用它们。这也使咱们可以更容易地测试他们的功能。
咱们在这一节中进行的重构清楚地代表,大多数时候,咱们所须要的只是一些规则,并确保咱们不滥用闭包和匿名函数。它的工做很是出色,只需最少的工做量,而且只使用原始的JavaScript
。
如今开始探寻异步控制流的执行顺序,咱们会经过开始分析一串异步代码来探寻其控制流。
按顺序执行一组任务意味着一次一个接一个地运行它们。执行顺序很重要,必须保证其正确性,由于列表中一个任务的结果可能会影响下一个任务的执行。下图说明了这个概念:
上述异步控制流有一些不一样的变化:
chain
,pipeline
,或者waterfall
)对于顺序执行而言,尽管在使用直接样式阻塞API
实现很简单,但一般状况下使用异步CPS
时会致使回调地狱问题。
在上一节中实现spider()
函数时,咱们已经遇到了顺序执行的问题。经过研究以下方式,咱们能够更好地控制异步代码。以该代码为准则,咱们能够用如下模式来解决上述问题:
function task1(callback) { asyncOperation(() => { task2(callback); }); } function task2(callback) { asyncOperation(result() => { task3(callback); }); } function task3(callback) { asyncOperation(() => { callback(); //finally executes the callback }); } task1(() => { //executed when task1, task2 and task3 are completed console.log('tasks 1, 2 and 3 executed'); });
上述模式显示了在完成一个异步操做后,再调用下一个异步操做。该模式强调任务的模块化,而且避免在处理异步代码使用闭包。
咱们前面描述的模式若是咱们预先知道要执行什么和有多少个任务,这些模式是完美的。这使咱们可以对序列中下一个任务的调用进行硬编码,可是若是要对集合中的每一个项目执行异步操做,会发生什么?在这种状况下,咱们不能对任务序列进行硬编码。相反的是,咱们必须动态构建它。
为了显示顺序迭代的例子,让咱们为Web爬虫
应用程序引入一个新功能。咱们如今想要递归地下载网页中的全部连接。要作到这一点,咱们将从页面中提取全部连接,而后按顺序逐个地触发咱们的Web爬虫
应用程序。
第一步是修改咱们的spider()
函数,以便经过调用一个名为spiderLinks()
的函数触发页面全部连接的递归下载。
此外,咱们如今尝试读取文件,而不是检查文件是否已经存在,并开始爬取其连接。这样,咱们就能够恢复中断的下载。最后还有一个变化是,咱们确保咱们传递的参数是最新的,还要限制递归深度。结果代码以下:
function spider(url, nesting, callback) { const filename = utilities.urlToFilename(url); fs.readFile(filename, 'utf8', (err, body) => { if (err) { if (err.code! == 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); }
如今咱们能够建立这个新版本的Web爬虫
应用程序的核心,即spiderLinks()
函数,它使用顺序异步迭代算法下载HTML
页面的全部连接。注意咱们在下面的代码块中定义的方式:
function spiderLinks(currentUrl, body, nesting, callback) { if(nesting === 0) { return process.nextTick(callback); } let links = utilities.getPageLinks(currentUrl, body); //[1] function iterate(index) { //[2] if(index === links.length) { return callback(); } spider(links[index], nesting - 1, function(err) { //[3] if(err) { return callback(err); } iterate(index + 1); }); } iterate(0); //[4] }
从这个新功能中的重要步骤以下:
utilities.getPageLinks()
函数获取页面中包含的全部连接的列表。此函数仅返回指向相同主机名的连接。iterate()
的本地函数来遍历连接,该函数须要下一个连接的索引进行分析。在这个函数中,咱们首先要检查索引是否等于连接数组的长度,若是等于则是迭代完成,在这种状况下咱们当即调用callback()
函数,由于这意味着咱们处理了全部的项目。spider()
函数。spiderLinks()
函数的最后一步也是最重要的一步,咱们经过调用iterate(0)
来开始迭代。咱们刚刚提出的算法容许咱们经过顺序执行异步操做来迭代数组,在咱们的例子中是spider()
函数。
咱们如今能够尝试这个新版本的Web爬虫
应用程序,并观看它一个接一个地递归地下载网页的全部连接。要中断这个过程,若是有不少连接可能须要一段时间,请记住咱们能够随时使用Ctrl + C
。若是咱们决定恢复它,咱们能够经过启动Web爬虫
应用程序并提供与上次结束时相同的URL
来恢复执行。
如今咱们的网络Web爬虫
应用程序可能会触发整个网站的下载,请仔细考虑使用它。例如,不要设置高嵌套级别或离开爬虫运行超过几秒钟。用数千个请求重载服务器是不道德的。在某些状况下,这也被认为是非法的。须要考虑后果!
咱们以前展现的spiderLinks()
函数的代码是一个清楚的例子,说明了如何在应用异步操做时迭代集合。咱们还能够注意到,这是一种能够适应任何其余状况的模式,咱们须要在集合的元素或一般的任务列表上按顺序异步迭代。该模式能够推广以下:
function iterate(index) { if (index === tasks.length) { return finish(); } const task = tasks[index]; task(function() { iterate(index + 1); }); } function finish() { // 迭代完成的操做 } iterate(0);
注意到,若是task()
是同步操做,这些类型的算法变得真正递归。在这种状况下,可能形成调用栈的溢出。
咱们刚刚提出的模式是很是强大的,由于它能够适应几种状况。例如,咱们能够映射数组的值,或者咱们能够将迭代的结果传递给迭代中的下一个,以实现一个reduce算法,若是知足特定的条件,咱们能够提早退出循环,或者甚至能够迭代无限数量的元素。
咱们还能够选择将解决方案进一步推广:
iterateSeries(collection, iteratorCallback, finalCallback);
经过建立一个名为iterator
的函数来执行任务列表,该函数调用集合中的下一个可执行的任务,并确保在当前任务完成时调用迭代器结束的回调函数。
在某些状况下,一组异步任务的执行顺序并不重要,咱们只须要在全部这些运行的任务完成时通知咱们。使用并行执行流更好地处理这种状况,以下图所示:
若是咱们认为Node.js
是单线程的话,这可能听起来很奇怪,可是若是咱们记住咱们在第一章中讨论过的内容,咱们意识到即便咱们只有一个线程,咱们仍然能够实现并发,因为Node.js
的非阻塞性质。实际上,在这种状况下,并行字不正确地使用,由于这并不意味着任务同时运行,而是它们的执行由底层的非阻塞API
执行,并由事件循环进行交织。
咱们知道,当一个任务容许事件循环执行另外一个任务时,或者是说一个任务容许控制回到事件循环。这种工做流的名称为并发,但为了简单起见,咱们仍然会使用并行。
下图显示了两个异步任务能够在Node.js
程序中并行运行:
经过上图,咱们有一个Main
函数执行两个异步任务:
Main
函数触发Task 1
和Task 2
的执行。因为这些触发异步操做,这两个函数会当即返回,并将控制权返还给主函数,以后等到事件循环完成再通知主线程。Task 1
的异步操做完成时,事件循环给与其线程控制权。当Task 1
同步操做完成时,它通知Main
函数。Task 2
的异步操做完成时,事件循环给与其线程控制权。当Task 2
同步操做完成时,它再次通知Main
函数。在这一点上,Main
函数知晓Task 1
和Task 2
都已经执行完毕,因此它能够继续执行其后操做或将操做的结果返回给另外一个回调函数。简而言之,这意味着在Node.js
中,咱们只能执行并行异步操做,由于它们的并发性由非阻塞API
在内部处理。在Node.js
中,同步阻塞操做不能同时运行,除非它们的执行与异步操做交错,或者经过setTimeout()
或setImmediate()
延迟。咱们将在第九章中更详细地看到这一点。
上边的Web爬虫
在并行异步操做上彷佛也算表现得很完美。到目前为止,应用程序正在递归地执行连接页面的下载。但性能不是最佳的,想要提高这个应用的性能很容易。
要作到这一点,咱们只须要修改spiderLinks()
函数,确保spider()
任务只执行一次,当全部任务都执行完毕后,调用最后的回调,因此咱们对spiderLinks()
作以下修改:
function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; function done(err) { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { return callback(); } } links.forEach(link => { spider(link, nesting - 1, done); }); }
上述代码有何变化?,如今spider()
函数的任务所有同步启动。能够经过简单地遍历连接数组和启动每一个任务,咱们没必要等待前一个任务完成再进行下一个任务:
links.forEach(link => { spider(link, nesting - 1, done); });
而后,使咱们的应用程序知晓全部任务完成的方法是为spider()
函数提供一个特殊的回调函数,咱们称之为done()
。当爬虫任务完成时,done()
函数设定一个计数器。当完成的下载次数达到连接数组的大小时,调用最终回调:
function done(err) { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { callback(); } }
经过上述变化,若是咱们如今试图对网页运行咱们的爬虫,咱们将注意到整个过程的速度有很大的改进,由于每次下载都是并行执行的,而没必要等待以前的连接被处理。
此外,对于并行执行流程,咱们能够提取咱们方案,以便适应于不一样的状况提升代码的可复用性。咱们可使用如下代码来表示模式的通用版本:
const tasks = [ /* ... */ ]; let completed = 0; tasks.forEach(task => { task(() => { if (++completed === tasks.length) { finish(); } }); }); function finish() { // 全部任务执行完成后调用 }
经过小的修改,咱们能够调整模式,将每一个任务的结果累积到一个list
中,以便过滤或映射数组的元素,或者一旦完成了一个或必定数量的任务便可调用finish()
回调。
注意:若是是没有限制的状况下,并行执行的一组异步任务,而后等待全部异步任务完成后执行回调这种方式,其方法是计算它们的执行完成的数目。
当使用阻塞I/O
与多线程组合的方式时,并行运行一组任务可能会致使一些问题。可是,咱们刚刚看到,在Node.js
中却不同,并行运行多个异步任务实际上在资源方面消耗较低。这是Node.js
最重要的优势之一,所以在Node.js
中并行化成为一种常见的作法,并且这并是多么复杂的技术。
Node.js
的并发模型的另外一个重要特征是咱们处理任务同步和竞争条件的方式。在多线程编程中,这一般使用诸如锁,互斥条件,信号量和观察器之类的构造来实现,这些是多线程语言并行化的最复杂的方面之一,对性能也有很大的影响。在Node.js
中,咱们一般不须要一个花哨的同步机制,由于全部运行在单个线程上!可是,这并不意味着咱们没有竞争条件。相反,他们能够至关广泛。问题的根源在于异步操做的调用与其结果通知之间的延迟。举一个具体的例子,咱们能够再次参考咱们的Web爬虫
应用程序,特别是咱们建立的最后一个版本,其实际上包含一个竞争条件。
问题在于在开始下载相应的URL
的文档以前,检查文件是否已经存在的spider()
函数:
function spider(url, nesting, callback) { if(spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); const filename = utilities.urlToFilename(url); fs.readFile(filename, 'utf8', function(err, body) { if(err) { if(err.code !== 'ENOENT') { return callback(err); } return download(url, filename, function(err, body) { if(err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); }
如今的问题是,在同一个URL
上操做的两个爬虫任务可能会在两个任务之一完成下载并建立一个文件,致使第二个任务开始下载以前,在同一个文件上调用fs.readFile()
的结果不对,导致下载两次。这种状况以下图所示:
上图显示了Task 1
和Task 2
如何在Node.js
的单个线程中交错执行,以及异步操做如何实际引入竞争条件。在咱们的状况下,两个爬虫任务最终会下载相同的文件。
咱们如何解决这个问题?答案比咱们想象的要简单得多。实际上,咱们所须要的只是一个变量(互斥变量),能够相互排除运行在同一个URL
上的多个spider()
任务。这能够经过如下代码来实现:
const spidering = new Map(); function spider(url, nesting, callback) { if (spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); // ... }
一般,若是不控制并行任务频率,并行任务就会致使过载。想象一下,有数千个文件要读取,访问的URL
或数据库查询并行运行。在这种状况下,常见的问题是系统资源不足,例如,当尝试一次打开太多文件时,利用可用于应用程序的全部文件描述符。在Web应用程序
中,它还可能会建立一个利用拒绝服务(DoS
)攻击的漏洞。在全部这种状况下,最好限制同时运行的任务数量。这样,咱们能够为服务器的负载增长一些可预测性,并确保咱们的应用程序不会耗尽资源。下图描述了一个状况,咱们将五个任务并行运行并发限制为两段:
从上图能够清楚咱们的算法如何工做:
咱们如今提出一种模式,以有限的并发性并行执行一组给定的任务:
const tasks = ... let concurrency = 2, running = 0, completed = 0, index = 0; function next() { while (running < concurrency && index < tasks.length) { task = tasks[index++]; task(() => { if (completed === tasks.length) { return finish(); } completed++, running--; next(); }); running++; } } next(); function finish() { // 全部任务执行完成 }
该算法能够被认为是顺序执行和并行执行之间的混合。事实上,咱们可能会注意到咱们以前介绍的两种模式的类似之处:
next()
,有一个内部循环,并行执行尽量多的任务,同时保持并发限制。next()
来执行下一个任务。咱们的Web爬虫
应用程序很是适合应用咱们所学到的限制一组任务的并发性。事实上,为了不同时爬上数千个连接的状况,咱们能够经过在并发下载数量上增长一些措施来限制并发量。
0.11以前的Node.js版本已经将每一个主机的并发HTTP链接数限制为5.然而,这能够改变以适应咱们的须要。请查看官方文档http://nodejs.org/docs/v0.10.... axsockets中的更多内容。从Node.js 0.11开始,并发链接数没有默认限制。
咱们能够将咱们刚刚学到的模式应用到咱们的spiderLinks()
函数,可是咱们将得到的只是限制一个页面中的一组连接的并发性。若是咱们选择了并发量为2,咱们最多能够为每一个页面并行下载两个连接。然而,因为咱们能够一次下载多个连接,所以每一个页面都会产生另外两个下载,这样递归下去,其实也没有彻底作到并发量的限制。
咱们真正想要的是限制咱们能够并行运行的全局下载操做数量。咱们能够略微修改以前展现的模式,可是咱们宁愿把它做为一个练习,由于咱们想借此机会引入另外一个机制,它利用队列来限制多个任务的并发性。让咱们看看这是如何工做的。
咱们如今要实现一个名为TaskQueue
类,它将队列与咱们以前提到的算法相结合。咱们建立一个名为taskQueue.js
的新模块:
class TaskQueue { constructor(concurrency) { this.concurrency = concurrency; this.running = 0; this.queue = []; } pushTask(task) { this.queue.push(task); this.next(); } next() { while (this.running < this.concurrency && this.queue.length) { const task = this.queue.shift(); task(() => { this.running--; this.next(); }); this.running++; } } };
上述类的构造函数只做为输入的并发限制,但除此以外,它初始化运行和队列的变量。前一个变量是用于跟踪全部正在运行的任务的计数器,然后者是将用做队列以存储待处理任务的数组。
pushTask()
方法简单地将新任务添加到队列中,而后经过调用this.next()
来引导任务的执行。
next()
方法从队列中生成一组任务,确保它不超过并发限制。
咱们可能会注意到,这种方法与限制咱们前面提到的并发性的模式有一些类似之处。它基本上从队列开始尽量多的任务,而不超过并发限制。当每一个任务完成时,它会更新运行任务的计数,而后再次调用next()
来启动另外一轮任务。 TaskQueue
类的有趣属性是它容许咱们动态地将新的项目添加到队列中。另外一个优势是,如今咱们有一个中央实体负责限制咱们任务的并发性,这能够在函数执行的全部实例中共享。在咱们的例子中,它是spider()
函数,咱们将在稍后看到。
如今咱们有一个通用的队列来执行有限的并行流程中的任务,咱们能够在咱们的Web爬虫
应用程序中直接使用它。咱们首先加载新的依赖关系并经过将并发限制设置为2来建立TaskQueue
类的新实例:
const TaskQueue = require('./taskQueue'); const downloadQueue = new TaskQueue(2);
接下来,咱们使用新建立的downloadQueue
更新spiderLinks()
函数:
function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; links.forEach(link => { downloadQueue.pushTask(done => { spider(link, nesting - 1, err => { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { callback(); } done(); }); }); }); }
这个函数的这种新的实现是很是容易的,它与这本章前面提到的无限并行执行的算法很是类似。这是由于咱们将并发控制委托给TaskQueue
对象,咱们惟一要作的就是检查全部任务是否完成。看上述代码中如何定义咱们的任务:
spider()
函数。spiderLinks()
函数执行相关的全部任务是否完成。当这个条件为真时,咱们调用spiderLinks()函数的最后回调。done()
回调,以便队列能够继续执行。在咱们进行这些小的变化以后,咱们如今能够尝试再次运行Web爬虫
应用程序。这一次,咱们应该注意到,同时不会有两个以上的下载。
若是咱们到目前为止咱们分析的每个控制流程模式看一下,咱们能够看到它们能够用做构建可重用和更通用的解决方案的基础。例如,咱们能够将无限制的并行执行算法包装到一个接受任务列表的函数中,并行运行它们,而且当它们都完成时调用给定的回调函数。将控制流算法转化为可重用功能的这种方式能够致使更具声明性和表达性的方式来定义异步控制流,这正是async所作的。async
库是一个很是流行的解决方案,在Node.js
和JavaScript
中来讲,用于处理异步代码。它提供了一组功能,能够大大简化不一样配置中一组任务的执行,并为异步处理集合提供了有用的帮助。即便有其余几个具备类似目标的库,因为它的受欢迎程度,所以async
是Node.js
中的一个事实上的标准。
async
库能够在实现复杂的异步控制流程时大大帮助咱们,可是一个难题就是选择正确的库来解决问题。例如,对于顺序执行,有大约20个不一样的函数可供选择,包括eachSeries()
, mapSeries()
, filterSeries()
, rejectSeries()
, reduce()
, reduceRight()
, detectSeries()
, concatSeries()
, series()
, whilst()
, doWhilst()
, until()
, doUntil()
, forever()
, waterfall()
, compose()
, seq()
, applyEachSeries()
, iterator()
, 和timesSeries()
。
选择正确的函数是编写更稳固和可读的代码的重要一步,但这也须要一些经验和实践。在咱们的例子中,咱们将仅介绍其中的一些状况,但它们仍将为理解和有效地使用库的其他部分提供坚实的基础。
下面,经过例子说明async
库如何工做,咱们将用于咱们的Web爬虫
应用程序。咱们直接从版本2开始,按顺序递归地下载全部的连接。
可是,首先咱们确保将async
库安装到咱们当前的项目中:
npm install async
而后咱们须要从spider.js
模块加载新的依赖项:
const async = require('async');
咱们先修改download()
函数。以下所示,它依次作了如下三件事:
URL
的内容。URL
的内容保存到文件中。async.series()
能够实现顺序执行一组任务:
async.series(tasks, [callback])
async.series()
接受一个任务列表和一个在全部任务完成后调用的回调函数做为参数。每一个任务只是一个接受回调函数的函数,当任务完成执行时,这个回调函数被调用:
function task(callback) {}
async
的优点是它使用与Node.js
相同的回调约定,它会自动处理错误传播。因此,若是任何一个任务调用它的回调而且产生了一个错误,async
将跳过列表中剩余的任务,直接跳转到最后的回调。
考虑到这一点,让咱们看看如何经过使用async
来修改上述的download()
函数:
function download(url, filename, callback) { console.log(`Downloading ${url}`); let body; async.series([ callback => { request(url, (err, response, resBody) => { if (err) { return callback(err); } body = resBody; callback(); }); }, mkdirp.bind(null, path.dirname(filename)), callback => { fs.writeFile(filename, body, callback); } ], err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }
对比起这段代码的回调地狱版本,使用async
方式使咱们可以更好地组织咱们的异步任务。而且不会嵌套回调,由于咱们只须要提供一个的任务列表,一般对于用于每一个异步操做,而后异步任务将依次执行:
URL
的内容。咱们将响应体保存到一个闭包变量(body
)中,以便它能够与其余任务共享。mkdirp()
函数实现,并和建立的目录路径绑定。这样,咱们能够节省几行代码并增长其可读性。URL
的内容写入文件。在这种状况下,咱们没法执行部分应用程序(就像咱们在第二个任务中所作的那样),由于变量body
只在系列中的下载任务完成后才可用。可是,经过将任务的回调直接传递到fs.writeFile()
函数,咱们仍然能够经过利用异步的自动错误管理来保存一些代码行。4.完成全部任务后,将调用async.series()
的最后回调。在咱们的例子中,咱们只是作一些错误管理,而后返回body
变量来回调download()
函数。
对于上述状况,async.series()
的一个可替代的方法是async.waterfall()
,它仍然按顺序执行任务,但另外还提供每一个任务的输出做为下一个输入。在咱们的状况下,咱们可使用这个特征来传播body
变量直到序列结束。
在前面讲了如何按顺序执行一组任务。上面的例子async.series()
来作到这一点。可使用相同的功能来实现Web爬虫版本2
的spiderLinks()
函数。然而,async
为特定的状况提供了一个更合适的API
,遍历一个集合,这个API
是async.eachSeries()
。咱们来使用它来从新实现咱们的spiderLinks()
函数(版本2,串行下载),以下所示:
function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } async.eachSeries(links, (link, callback) => { spider(link, nesting - 1, callback); }, callback); }
若是咱们将使用async
的上述代码与使用纯JavaScript
模式实现的相同功能的代码进行比较,咱们将注意到async
在代码组织和可读性方面给咱们带来的巨大优点。
async
不具备处理并行流的功能,其中能够找到each()
,map()
,filter()
,reject()
,detect()
,some()
,every()
,concat()
,parallel()
,applyEach()
和times()
。它们遵循与咱们已经看到的用于顺序执行的功能相同的逻辑,区别在于所提供的任务是并行执行的。
为了证实这一点,咱们能够尝试应用上述功能之一来实现咱们的Web爬虫
应用程序的第三版,即便用无限制的并行流程来执行下载。
若是咱们记住咱们以前使用的代码来实现spiderLinks()
函数的顺序版本,那么调整它使其并行工做就比较简单:
function spiderLinks(currentUrl, body, nesting, callback) { // ... async.each(links, (link, callback) => { spider(link, nesting - 1, callback); }, callback); }
这个函数与咱们用于顺序下载的功能彻底相同,可是使用的是async.each()
而非async.eachSeries()
。这清楚地代表了使用库(例如async
)抽象异步流的功能。代码再也不绑定到特定的执行流程了,没有专门为此写的代码。大多数只是应用逻辑。
若是你想知道async
还能够用来限制并行任务的并发性,答案是确定的。咱们有一些咱们可使用的函数,即eachLimit()
,mapLimit()
,parallelLimit()
,queue()
和cargo()
。
咱们试图利用其中的一个来实现Web爬虫
应用程序的第4版,以有限的并发性并行执行连接的下载。幸运的是,async
有async.queue()
,它的工做方式与本章前面建立的TaskQueue
相似。 async.queue()
函数建立一个新的队列,它使用一个worker()
函数来执行一组具备指定并发限制的任务:
const q = async.queue(worker, concurrency);
worker()
函数做为输入接收要运行的任务和一个回调函数做为参数,当任务完成时执行回调:
function worker(task, callback);
咱们应该注意到在这个例子中 task
能够是任何类型,而不只仅只能是函数。实际上, worker
有责任以最适当的方式处理任务。新建任务,能够经过q.push(task, callback)
将任务添加到队列中。一个任务处理完后,关联一个任务的回调函数必须被worker
调用。
如今,咱们再次修改咱们的代码实现一个全面并行的有并发限制的执行流,利用async.queue()
,首先,咱们须要建立一个队列:
const downloadQueue = async.queue((taskData, callback) => { spider(taskData.link, taskData.nesting - 1, callback); }, 2);
代码很简单。咱们正在建立一个并发限制为2的新队列,让一个工做人员只需使用与任务关联的数据调用咱们的spider()
函数。接下来,咱们实现spiderLinks()
函数:
function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } const completed = 0, hasErrors = false; links.forEach(function(link) { const taskData = { link: link, nesting: nesting }; downloadQueue.push(taskData, err => { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { callback(); } }); }); }
前面的代码应该看起来很是熟悉,由于它几乎和使用TaskQueue
对象来实现相同流程的代码相同。此外,在这种状况下,要分析的重要部分是将新任务推入队列的位置。在这一点上,咱们确保咱们传递一个回调,使咱们可以检查当前页面的全部下载任务是否完成,并最终调用最终回调。
辛亏有async.queue()
,咱们能够轻松地复制咱们的TaskQueue
对象的功能,再次证实了经过async
,咱们能够避免从头开始编写异步控制流模式,减小咱们的工做量,代码量更加简洁。
在本章开始的时候,咱们说Node.js
的编程可能很难由于它的异步性,特别是对于之前在其余平台上开发的人而言。然而,在本章中,咱们展现了异步API
如何能够从简单原生JavaScript
开始,从而为咱们分析更复杂的技术奠基了基础。而后咱们看到,除了为每一种口味提供编程风格,咱们所掌握的工具确实是多样化的,并为咱们大部分的问题提供了很好的解决方案。例如,咱们能够选择async
库来简化最多见的流程。
还有更为先进的技术,如Promise
和Generator
函数,这将是下一章的重点。当了解全部这些技术时,可以根据需求选择最佳解决方案,或者在同一个项目中使用多种技术。