《Node.js设计模式》高级异步准则

本系列文章为《Node.js Design Patterns Second Edition》的原文翻译和读书笔记,在GitHub连载更新,同步翻译版连接javascript

欢迎关注个人专栏,以后的博文将在专栏同步:前端

Advanced Asynchronous Recipes

几乎全部咱们迄今为止看到的设计模式均可以被认为是通用的,而且适用于应用程序的许多不一样的领域。可是,有一套更具体的模式,专一于解决明确的问题。咱们能够调用这些模式。就像现实生活中的烹饪同样,咱们有一套明确的步骤来实现预期的结果。固然,这并不意味着咱们不能用一些创意来定制设计模式,以配合咱们的客人的口味,对于书写Node.js程序来讲是必要的。在本章中,咱们将提供一些常见的解决方案来解决咱们在平常Node.js开发中遇到的一些具体问题。这些模式包括如下内容:java

  • 异步引入模块并初始化
  • 在高并发的应用程序中使用批处理和缓存异步操做的性能优化
  • 运行与Node.js处理并发请求的能力相悖的阻塞事件循环的同步CPU绑定操做

异步引入模块并初始化

Chapter2-Node.js Essential Patterns中,当咱们讨论Node.js模块系统的基本属性时,咱们提到了require()是同步的,而且module.exports也不能异步设置。node

这是在核心模块和许多npm包中存在同步API的主要缘由之一,是否同步加载会被做为一个option参数被提供,主要用于初始化任务,而不是替代异步APIgit

不幸的是,这并不老是可能的。同步API可能并不老是可用的,特别是对于在初始化阶段使用网络的组件,例如执行三次握手协议或在网络中检索配置参数。 许多数据库驱动程序和消息队列等中间件系统的客户端都是如此。github

普遍适用的解决方案

咱们举一个例子:一个名为db的模块,它将会链接到远程数据库。 只有在链接和与服务器的握手完成以后,db模块才可以接受请求。在这种状况下,咱们一般有两种选择:web

  • 在开始使用以前确保模块已经初始化,不然则等待其初始化。每当咱们想要在异步模块上调用一个操做时,都必须完成这个过程:
const db = require('aDb'); //The async module
module.exports = function findAll(type, callback) {
  if (db.connected) { //is it initialized?
    runFind();
  } else {
    db.once('connected', runFind);
  }

  function runFind() {
    db.findAll(type, callback);
  };
};
复制代码
  • 使用依赖注入(Dependency Injection)而不是直接引入异步模块。经过这样作,咱们能够延迟一些模块的初始化,直到它们的异步依赖被彻底初始化。 这种技术将管理模块初始化的复杂性转移到另外一个组件,一般是它的父模块。 在下面的例子中,这个组件是app.js
// 模块app.js
const db = require('aDb'); // aDb是一个异步模块
const findAllFactory = require('./findAll');
db.on('connected', function() {
  const findAll = findAllFactory(db);
  // 以后再执行异步操做
});


// 模块findAll.js
module.exports = db => {
  //db 在这里被初始化
  return function findAll(type, callback) {
    db.findAll(type, callback);
  }
}
复制代码

咱们能够看出,若是所涉及的异步依赖的数量过多,第一种方案便不太适用了。redis

另外,使用DI有时也是不理想的,正如咱们在Chapter7-Wiring Modules中看到的那样。在大型项目中,它可能很快变得过于复杂,尤为对于手动完成并使用异步初始化模块的状况下。若是咱们使用一个设计用于支持异步初始化模块的DI容器,这些问题将会获得缓解。算法

可是,咱们将会看到,还有第三种方案可让咱们轻松地将模块从其依赖关系的初始化状态中分离出来。mongodb

预初始化队列

将模块与依赖项的初始化状态分离的简单模式涉及到使用队列和命令模式。这个想法是保存一个模块在还没有初始化的时候接收到的全部操做,而后在全部初始化步骤完成后当即执行这些操做。

实现一个异步初始化的模块

为了演示这个简单而有效的技术,咱们来构建一个应用程序。首先建立一个名为asyncModule.js的异步初始化模块:

const asyncModule = module.exports;

asyncModule.initialized = false;
asyncModule.initialize = callback => {
  setTimeout(() => {
    asyncModule.initialized = true;
    callback();
  }, 10000);
};

asyncModule.tellMeSomething = callback => {
  process.nextTick(() => {
    if(!asyncModule.initialized) {
      return callback(
        new Error('I don\'t have anything to say right now')
      );
    }
    callback(null, 'Current time is: ' + new Date());
  });
};
复制代码

在上面的代码中,asyncModule展示了一个异步初始化模块的设计模式。 它有一个initialize()方法,在10秒的延迟后,将初始化的flag变量设置为true,并通知它的回调调用(10秒对于真实应用程序来讲是很长的一段时间了,可是对于具备互斥条件的应用来讲可能会显得力不从心)。

另外一个方法tellMeSomething()返回当前的时间,可是若是模块尚未初始化,它抛出产生一个异常。 下一步是根据咱们刚刚建立的服务建立另外一个模块。 咱们设计一个简单的HTTP请求处理程序,在一个名为routes.js的文件中实现:

const asyncModule = require('./asyncModule');

module.exports.say = (req, res) => {
  asyncModule.tellMeSomething((err, something) => {
    if(err) {
      res.writeHead(500);
      return res.end('Error:' + err.message);
    }
    res.writeHead(200);
    res.end('I say: ' + something);
  });
};
复制代码

handler中调用asyncModuletellMeSomething()方法,而后将其结果写入HTTP响应中。 正如咱们所看到的那样,咱们没有对asyncModule的初始化状态进行任何检查,这可能会致使问题。

如今,建立app.js模块,使用核心http模块建立一个很是基本的HTTP服务器:

const http = require('http');
const routes = require('./routes');
const asyncModule = require('./asyncModule');

asyncModule.initialize(() => {
  console.log('Async module initialized');
});

http.createServer((req, res) => {
  if (req.method === 'GET' && req.url === '/say') {
    return routes.say(req, res);
  }
  res.writeHead(404);
  res.end('Not found');
}).listen(8000, () => console.log('Started'));
复制代码

上述模块是咱们应用程序的入口点,它所作的只是触发asyncModule的初始化并建立一个HTTP服务器,它使用咱们之前建立的handlerroutes.say())来对网络请求做出相应。

咱们如今能够像往常同样经过执行app.js模块来尝试启动咱们的服务器。

在服务器启动后,咱们能够尝试使用浏览器访问URLhttp://localhost:8000/并查看从asyncModule返回的内容。 和预期的同样,若是咱们在服务器启动后当即发送请求,结果将是一个错误,以下所示:

Error:I don't have anything to say right now 复制代码

显然,在异步模块加载好了以后:

这意味着asyncModule还没有初始化,但咱们仍尝试使用它,则会抛出一个错误。

根据异步初始化模块的实现细节,幸运的状况是咱们可能会收到一个错误,乃至丢失重要的信息,崩溃整个应用程序。 总的来讲,咱们刚刚描述的状况老是必需要避免的。

大多数时候,可能并不会出现上述问题,毕竟初始化通常来讲很快,以致于在实践中,它永远不会发生。 然而,对于设计用于自动调节的高负载应用和云服务器,状况就彻底不一样了。

用预初始化队列包装模块

为了维护服务器的健壮性,咱们如今要经过使用咱们在本节开头描述的模式来进行异步模块加载。咱们将在asyncModule还没有初始化的这段时间内对全部调用的操做推入一个预初始化队列,而后在异步模块加载好后处理它们时当即刷新队列。这就是状态模式的一个很好的应用!咱们将须要两个状态,一个在模块还没有初始化的时候将全部操做排队,另外一个在初始化完成时将每一个方法简单地委托给原始的asyncModule模块。

一般,咱们没有机会修改异步模块的代码;因此,为了添加咱们的排队层,咱们须要围绕原始的asyncModule模块建立一个代理。

接下来建立一个名为asyncModuleWrapper.js的新文件,让咱们依照每一个步骤逐个构建它。咱们须要作的第一件事是建立一个代理,并将原始异步模块的操做委托给这个代理:

const asyncModule = require('./asyncModule');
const asyncModuleWrapper = module.exports;
asyncModuleWrapper.initialized = false;
asyncModuleWrapper.initialize = () => {
  activeState.initialize.apply(activeState, arguments);
};
asyncModuleWrapper.tellMeSomething = () => {
  activeState.tellMeSomething.apply(activeState, arguments);
};
复制代码

在前面的代码中,asyncModuleWrapper将其每一个方法简单地委托给activeState。 让咱们来看看这两个状态是什么样子

notInitializedState开始,notInitializedState是指还没初始化的状态:

// 当模块没有被初始化时的状态
let pending = [];
let notInitializedState = {

  initialize: function(callback) {
    asyncModule.initialize(function() {
      asyncModuleWrapper.initalized = true;
      activeState = initializedState;
      
      pending.forEach(function(req) {
        asyncModule[req.method].apply(null, req.args);
      });
      pending = [];
      
      callback();
    });
  },
  
  tellMeSomething: function(callback) {
    return pending.push({
      method: 'tellMeSomething',
      args: arguments
    });
  }
  
};
复制代码

initialize()方法被调用时,咱们触发初始化asyncModule模块,提供一个回调函数做为参数。 这使咱们的asyncModuleWrapper知道何时原始模块被初始化,在初始化后执行预初始化队列的操做,以后清空预初始化队列,再调用做为参数的回调函数,如下为具体步骤:

  1. initializedState赋值给activeState,表示预初始化已经完成了。
  2. 执行先前存储在待处理队列中的全部命令。
  3. 调用原始回调。

因为此时的模块还没有初始化,此状态的tellMeSomething()方法仅建立一个新的Command对象,并将其添加到预初始化队列中。

此时,当原始的asyncModule模块还没有初始化时,代理应该已经清楚,咱们的代理将简单地把全部接收到的请求防到预初始化队列中。 而后,当咱们被通知初始化完成时,咱们执行全部预初始化队列的操做,而后将内部状态切换到initializedState。来看这个代理模块最后的定义:

let initializedState = asyncModule;
复制代码

不出意外,initializedState对象只是对原始的asyncModule的引用!事实上,初始化完成后,咱们能够安全地将任何请求直接发送到原始模块。

最后,设定异步模块还没加载好的的状态,即notInitializedState

let activeState = notInitializedState;
复制代码

咱们如今能够尝试再次启动咱们的测试服务器,但首先,咱们不要忘记用咱们新的asyncModuleWrapper对象替换原始的asyncModule模块的引用; 这必须在app.jsroutes.js模块中完成。

这样作以后,若是咱们试图再次向服务器发送一个请求,咱们会看到在asyncModule模块还没有初始化的时候,请求不会失败; 相反,他们会挂起,直到初始化完成,而后才会被实际执行。咱们固然能够确定,比起以前,容错率变得更高了。

能够看到,在刚刚初始化异步模块的时候,服务器会等待请求的响应:

在异步模块加载完成后,服务器才会返回响应的信息:

模式:若是模块是须要异步初始化的,则对每一个操做进行排队,直到模块彻底初始化释放队列。

如今,咱们的服务器能够在启动后当即开始接受请求,并保证这些请求都不会因为其模块的初始化状态而失败。咱们可以在不使用DI的状况下得到这个结果,也不须要冗长且容易出错的检查来验证异步模块的状态。

其它场景的应用

咱们刚刚介绍的模式被许多数据库驱动程序和ORM库所使用。 最值得注意的是Mongoose,它是MongoDBORM。使用Mongoose,没必要等待数据库链接打开,以便可以发送查询,由于每一个操做都排队,稍后与数据库的链接彻底创建时执行。 这显然提升了其API的可用性。

看一下Mongoose的源码,它的每一个方法是如何经过代理添加预初始化队列。 能够看看实现这中模式的代码片断:https://github.com/Automattic/mongoose/blob/21f16c62e2f3230fe616745a40f22b4385a11b11/lib/drivers/node-mongodb-native/collection.js#L103-138

for (var i in Collection.prototype) {
  (function(i){
    NativeCollection.prototype[i] = function () {
      if (this.buffer) {
        // mongoose中,在缓冲区不为空时,只是简单地把这个操做加入缓冲区内
        this.addQueue(i, arguments);
        return;
      }

      var collection = this.collection
        , args = arguments
        , self = this
        , debug = self.conn.base.options.debug;

      if (debug) {
        if ('function' === typeof debug) {
          debug.apply(debug
            , [self.name, i].concat(utils.args(args, 0, args.length-1)));
        } else {
          console.error('\x1B[0;36mMongoose:\x1B[0m %s.%s(%s) %s %s %s'
            , self.name
            , i
            , print(args[0])
            , print(args[1])
            , print(args[2])
            , print(args[3]))
        }
      }

      return collection[i].apply(collection, args);
    };
  })(i);
}

复制代码

异步批处理和缓存

在高负载的应用程序中,缓存起着相当重要的做用,几乎在网络中的任何地方,从网页,图像和样式表等静态资源到纯数据(如数据库查询的结果)都会使用缓存。 在本节中,咱们将学习如何将缓存应用于异步操做,以及如何充分利用缓存解决高请求吞吐量的问题。

实现没有缓存或批处理的服务器

在这以前,咱们来实现一个小型的服务器,以便用它来衡量缓存和批处理等技术在解决高负载应用程序的优点。

让咱们考虑一个管理电子商务公司销售的web服务器,特别是对于查询咱们的服务器全部特定类型的商品交易的总和的状况。 为此,考虑到LevelUP的简单性和灵活性,咱们将再次使用LevelUP。咱们要使用的数据模型是存储在sales这一个sublevel中的简单事务列表,它是如下的形式:

transactionId {amount, item}
复制代码

keytransactionId表示,value则是一个JSON对象,它包含amount,表示销售金额和item,表示项目类型。 要处理的数据是很是基本的,因此让咱们当即在名为的totalSales.js文件中实现API,将以下所示:

const level = require('level');
const sublevel = require('level-sublevel');

const db = sublevel(level('example-db', {valueEncoding: 'json'}));
const salesDb = db.sublevel('sales');

module.exports = function totalSales(item, callback) {
  console.log('totalSales() invoked');
  let sum = 0;
  salesDb.createValueStream()  // [1]
    .on('data', data => {
      if(!item || data.item === item) {  // [2]
        sum += data.amount;
      }
    })
    .on('end', () => {
      callback(null, sum);  // [3]
    });
};
复制代码

该模块的核心是totalSales函数,它也是惟一exportsAPI;它进行以下工做:

  1. 咱们从包含交易信息的salesDbsublevel建立一个StreamStream将从数据库中提取全部条目。
  2. 监听data事件,这个事件触发时,将从数据库Stream中提取出每一项,若是这一项的item参数正是咱们须要的item,就去累加它的amount到总的sum里面。
  3. 最后,end事件触发时,咱们最终调用callback()方法。

上述查询方式可能在性能方面并很差。理想状况下,在实际的应用程序中,咱们可使用索引,甚至使用增量映射来缩短实时计算的时间;可是,因为咱们须要体现缓存的优点,对于上述例子来讲,慢速的查询实际上更好,由于它会突出显示咱们要分析的模式的优势。

为了完成总销售应用程序,咱们只须要从HTTP服务器公开totalSalesAPI;因此,下一步是构建一个(app.js文件):

const http = require('http');
const url = require('url');
const totalSales = require('./totalSales');

http.createServer((req, res) => {
  const query = url.parse(req.url, true).query;
  totalSales(query.item, (err, sum) => {
    res.writeHead(200);
    res.end(`Total sales for item ${query.item} is ${sum}`);
  });
}).listen(8000, () => console.log('Started'));
复制代码

咱们建立的服务器是很是简单的;咱们只须要它暴露totalSales API。 在咱们第一次启动服务器以前,咱们须要用一些示例数据填充数据库;咱们可使用专用于本节的代码示例中的populate_db.js脚原本执行此操做。该脚本将在数据库中建立100K个随机销售交易。 好的! 如今,一切都准备好了。 像往常同样,启动服务器,咱们执行如下命令:

node app
复制代码

请求这个HTTP接口,访问至如下URL

http://localhost:8000/?item=book
复制代码

可是,为了更好地了解服务器的性能,咱们须要连续发送多个请求;因此,咱们建立一个名为loadTest.js的脚本,它以200 ms的间隔发送请求。它已经被配置为链接到服务器的URL,所以,要运行它,执行如下命令:

node loadTest
复制代码

咱们会看到这20个请求须要一段时间才能完成。注意测试的总执行时间,由于咱们如今开始咱们的服务,并测量咱们能够节省多少时间。

批量异步请求

在处理异步操做时,最基本的缓存级别能够经过将一组调用集中到同一个API来实现。这很是简单:若是咱们在调用异步函数的同时在队列中还有另外一个还没有处理的回调,咱们能够将回调附加到已经运行的操做上,而不是建立一个全新的请求。看下图的状况:

前面的图像显示了两个客户端(它们能够是两台不一样的机器,或两个不一样的Web请求),使用彻底相同的输入调用相同的异步操做。 固然,描述这种状况的天然方式是由两个客户开始两个单独的操做,这两个操做将在两个不一样的时刻完成,如前图所示。如今考虑下一个场景,以下图所示:

上图向咱们展现了如何对API的两个请求进行批处理,或者换句话说,对两个请求执行到相同的操做。经过这样作,当操做完成时,两个客户端将同时被通知。这表明了一种简单而又很是强大的方式来下降应用程序的负载,而没必要处理更复杂的缓存机制,这一般须要适当的内存管理和缓存失效策略。

在电子商务销售的Web服务器中使用批处理

如今让咱们在totalSales API上添加一个批处理层。咱们要使用的模式很是简单:若是在API被调用时已经有另外一个相同的请求挂起,咱们将把这个回调添加到一个队列中。当异步操做完成时,其队列中的全部回调当即被调用。

如今,让咱们来改变以前的代码:建立一个名为totalSalesBatch.js的新模块。在这里,咱们将在原始的totalSales API之上实现一个批处理层:

const totalSales = require('./totalSales');

const queues = {};
module.exports = function totalSalesBatch(item, callback) {
  if(queues[item]) {  // [1]
    console.log('Batching operation');
    return queues[item].push(callback);
  }
  
  queues[item] = [callback];  // [2]
  totalSales(item, (err, res) => {
    const queue = queues[item];  // [3]
    queues[item] = null;
    queue.forEach(cb => cb(err, res));
  });
};
复制代码

totalSalesBatch()函数是原始的totalSales() API的代理,它的工做原理以下:

  1. 若是请求的item已经存在队列中,则意味着该特定item的请求已经在服务器任务队列中。在这种状况下,咱们所要作的只是将回调push到现有队列,并当即从调用中返回。不进行后续操做。
  2. 若是请求的item没有在队列中,这意味着咱们必须建立一个新的请求。为此,咱们为该特定item的请求建立一个新队列,并使用当前回调函数对其进行初始化。 接下来,咱们调用原始的totalSales() API
  3. 当原始的totalSales()请求完成时,则执行咱们的回调函数,咱们遍历队列中为该特定请求的item添加的全部回调,并分别调用这些回调函数。

totalSalesBatch()函数的行为与原始的totalSales() API的行为相同,不一样之处在于,如今对于相同内容的请求API进行批处理,从而节省时间和资源。

想知道相比于totalSales() API原始的非批处理版本,在性能方面的优点是什么?而后,让咱们将HTTP服务器使用的totalSales模块替换为咱们刚刚建立的模块,修改app.js文件以下:

//const totalSales = require('./totalSales');
const totalSales = require('./totalSalesBatch');
http.createServer(function(req, res) {
// ...
});
复制代码

若是咱们如今尝试再次启动服务器并进行负载测试,咱们首先看到的是请求被批量返回。

除此以外,咱们观察到请求的总时间大大减小;它应该至少比对原始totalSales() API执行的原始测试快四倍!

这是一个惊人的结果,证实了只需应用一个简单的批处理层便可得到巨大的性能提高,比起缓存机制,也没有显得太复杂,由于,无需考虑缓存淘汰策略。

批处理模式在高负载应用程序和执行较为缓慢的API中发挥巨大做用,正是因为这种模式的运用,能够批量处理大量的请求。

异步请求缓存策略

异步批处理模式的问题之一是对于API的答复越快,咱们对于批处理来讲,其意义就越小。有人可能会争辩说,若是一个API已经很快了,那么试图优化它就没有意义了。然而,它仍然是一个占用应用程序的资源负载的因素,总结起来,仍然能够有解决方案。另外,若是API调用的结果不会常常改变;所以,这时候批处理将并不会有较好的性能提高。在这种状况下,减小应用程序负载并提升响应速度的最佳方案确定是更好的缓存模式。

缓存模式很简单:一旦请求完成,咱们将其结果存储在缓存中,该缓存能够是变量,数据库中的条目,也能够是专门的缓存服务器。所以,下一次调用API时,能够当即从缓存中检索结果,而不是产生另外一个请求。

对于一个有经验的开发人员来讲,缓存不该该是多么新的技术,可是异步编程中这种模式的不一样之处在于它应该与批处理结合在一块儿,以达到最佳效果。缘由是由于多个请求可能并发运行,而没有设置缓存,而且当这些请求完成时,缓存将会被设置屡次,这样作则会形成缓存资源的浪费。

基于这些假设,异步请求缓存模式的最终结构以下图所示:

上图给出了异步缓存算法的两个步骤:

  1. 与批处理模式彻底相同,与在未设置高速缓存时接收到的任何请求将一块儿批处理。这些请求完成时,缓存将会被设置一次。
  2. 当缓存最终被设置时,任何后续的请求都将直接从缓存中提供。

另外咱们须要考虑Zalgo的副作用(咱们已经在Chapter 2-Node.js Essential Patterns中看到了它的实际应用)。在处理异步API时,咱们必须确保始终以异步方式返回缓存的值,即便访问缓存只涉及同步操做。

在电子商务销售的Web服务器中使用异步缓存请求

实践异步缓存模式的优势,如今让咱们将咱们学到的东西应用到totalSales() API

与异步批处理示例程序同样,咱们建立一个代理,其做用是添加缓存层。

而后建立一个名为totalSalesCache.js的新模块,代码以下:

const totalSales = require('./totalSales');

const queues = {};
const cache = {};

module.exports = function totalSalesBatch(item, callback) {
  const cached = cache[item];
  if (cached) {
    console.log('Cache hit');
    return process.nextTick(callback.bind(null, null, cached));
  }
  
  if (queues[item]) {
    console.log('Batching operation');
    return queues[item].push(callback);
  }
  
  queues[item] = [callback];
  totalSales(item, (err, res) => {
    if (!err) {
      cache[item] = res;
      setTimeout(() => {
        delete cache[item];
      }, 30 * 1000); //30 seconds expiry
    }
    
    const queue = queues[item];
    queues[item] = null;
    queue.forEach(cb => cb(err, res));
  });
};
复制代码

咱们能够看到前面的代码与咱们异步批处理的不少地方基本相同。 其实惟一的区别是如下几点:

  • 咱们须要作的第一件事就是检查缓存是否被设置,若是是这种状况,咱们将当即使用callback()返回缓存的值,这里必需要使用process.nextTick(),由于缓存多是异步设定的,须要等到下一次事件轮询时才可以保证缓存已经被设定。

  • 继续异步批处理模式,可是此次,当原始API成功完成时,咱们将结果保存到缓存中。此外,咱们还设置了一个缓存淘汰机制,在30秒后使缓存失效。 一个简单而有效的技术!

如今,咱们准备尝试咱们刚建立的totalSales模块。 先更改app.js模块,以下所示:

// const totalSales = require('./totalSales');
// const totalSales = require('./totalSalesBatch');
const totalSales = require('./totalSalesCache');
   http.createServer(function(req, res) {
     // ...
});
复制代码

如今,从新启动服务器,并使用loadTest.js脚本进行配置,就像咱们在前面的例子中所作的那样。使用默认的测试参数,与简单的异步批处理模式相比,很明显地有了更好的性能提高。 固然,这很大程度上取决于不少因素;例如收到的请求数量,以及一个请求和另外一个请求之间的延迟等。当请求数量较高且跨越较长时间时,使用高速缓存批处理的优点将更为显著。

Memoization被称作缓存函数调用的结果的算法。 在npm中,你能够找到许多包来实现异步的memoization,其中最著名的之一之一是memoizee

有关实现缓存机制的说明

咱们必须记住,在实际应用中,咱们可能想要使用更先进的失效技术和存储机制。 这多是必要的,缘由以下:

  • 大量的缓存值可能会消耗大量内存。 在这种状况下,能够应用最近最少使用(LRU)算法来保持恒定的存储器利用率。
  • 当应用程序分布在多个进程中时,对缓存使用简单变量可能会致使每一个服务器实例返回不一样的结果。若是这对于咱们正在实现的特定应用程序来讲是不但愿的,那么解决方案就是使用共享存储来存储缓存。 经常使用的解决方案是RedisMemcached
  • 与定时淘汰缓存相比,手动淘汰高速缓存可以使得高速缓存使用寿命更长,同时提供更新的数据,但固然,管理起缓存来要复杂得多。

使用Promise进行批处理和缓存

Chapter4-Asynchronous Control Flow Patterns with ES2015 and Beyond中,咱们看到了Promise如何极大地简化咱们的异步代码,可是在处理批处理和缓存时,它则能够提供更大的帮助。

利用Promise进行异步批处理和缓存策略,有以下两个优势:

  • 多个then()监听器能够附加到相同的Promise实例。
  • then()监听器最多保证被调用一次,即便在Promise已经被resolve了以后,then()也能正常工做。 此外,then()老是会被保证其是异步调用的。

简而言之,第一个优势正是批处理请求所须要的,而第二个优势则在Promise已是解析值的缓存时,也会提供一样的的异步返回缓存值的机制。

下面开始看代码,咱们能够尝试使用PromisestotalSales()建立一个模块,在其中添加批处理和缓存功能。建立一个名为totalSalesPromises.js的新模块:

const pify = require('pify');  // [1]
const totalSales = pify(require('./totalSales'));

const cache = {};
module.exports = function totalSalesPromises(item) {
  if (cache[item]) {  // [2]
    return cache[item];
  }

  cache[item] = totalSales(item)  // [3]
    .then(res => {  // [4]
      setTimeout(() => {delete cache[item]}, 30 * 1000); //30 seconds expiry
      return res;
    })
    .catch(err => {  // [5]
      delete cache[item];
      throw err;
    });
  return cache[item];  // [6]
};
复制代码

Promise确实很好,下面是上述函数的功能描述:

  1. 首先,咱们须要一个名为pify的模块,它容许咱们对totalSales()模块进行promisification。这样作以后,totalSales()将返回一个符合ES2015标准的Promise实例,而不是接受一个回调函数做为参数。
  2. 当调用totalSalesPromises()时,咱们检查给定的项目类型是否已经在缓存中有相应的Promise。若是咱们已经有了这样的Promise,咱们直接返回这个Promise实例。
  3. 若是咱们在缓存中没有针对给定项目类型的Promise,咱们继续经过调用原始(promisified)的totalSales()来建立一个Promise实例。
  4. Promise正常resolve了,咱们设置了一个清除缓存的时间(假设为30秒),咱们返回res将操做的结果返回给应用程序。
  5. 若是Promise被异常reject了,咱们当即重置缓存,并再次抛出错误,将其传播到Promise chain中,因此任何附加到相同Promise的其余应用程序也将收到这一异常。
  6. 最后,咱们返回咱们刚才建立或者缓存的Promise实例。

很是简单直观,更重要的是,咱们使用Promise也可以实现批处理和缓存。 若是咱们如今要尝试使用totalSalesPromise()函数,稍微调整app.js模块,由于如今使用Promise而不是回调函数。 让咱们经过建立一个名为appPromises.js的app模块来实现:

const http = require('http');
const url = require('url');
const totalSales = require('./totalSalesPromises');

http.createServer(function(req, res) {
  const query = url.parse(req.url, true).query;
  totalSales(query.item).then(function(sum) {
    res.writeHead(200);
    res.end(`Total sales for item ${query.item} is ${sum}`);
  });
}).listen(8000, function() {console.log('Started')});
复制代码

它的实现与原始应用程序模块几乎彻底相同,不一样的是如今咱们使用的是基于Promise的批处理/缓存封装版本; 所以,咱们调用它的方式也略有不一样。

运行如下命令开启这个新版本的服务器:

node appPromises
复制代码

运行与CPU-bound的任务

虽然上面的totalSales()在系统资源上面消耗较大,可是其也不会影响服务器处理并发的能力。 咱们在Chapter1-Welcome to the Node.js Platform中了解到有关事件循环的内容,应该为此行为提供解释:调用异步操做会致使堆栈退回到事件循环,从而使其免于处理其余请求。

可是,当咱们运行一个长时间的同步任务时,会发生什么状况,从不会将控制权交还给事件循环?

这种任务也被称为CPU-bound,由于它的主要特色是CPU利用率较高,而不是I/O操做繁重。 让咱们当即举一个例子上看看这些类型的任务在Node.js中的具体行为。

解决子集总和问题

如今让咱们作一个CPU占用比较高的高计算量的实验。下面来看的是子集总和问题,咱们计算一个数组中是否具备一个子数组,其总和为0。例如,若是咱们有数组[1, 2, -4, 5, -3]做为输入,则知足问题的子数组是[1, 2, -3][2, -4, 5, -3]

最简单的算法是把每个数组元素作遍历而后依次计算,时间复杂度为O(2^n),或者换句话说,它随着输入的数组长度成指数增加。这意味着一组20个整数则会有多达1, 048, 576中状况,显然不可以经过穷举来作到。固然,这个问题的解决方案可能并不算复杂。为了使事情变得更加困难,咱们将考虑数组和问题的如下变化:给定一组整数,咱们要计算全部可能的组合,其总和等于给定的任意整数。

const EventEmitter = require('events').EventEmitter;
class SubsetSum extends EventEmitter {
  constructor(sum, set) {
      super();
      this.sum = sum;
      this.set = set;
      this.totalSubsets = 0;
    } //...
}
复制代码

SubsetSum类是EventEmitter类的子类;这使得咱们每次找到一个匹配收到的总和做为输入的新子集时都会发出一个事件。 咱们将会看到,这会给咱们很大的灵活性。

接下来,让咱们看看咱们如何可以生成全部可能的子集组合:

开始构建一个这样的算法。建立一个名为subsetSum.js的新模块。在其中声明一个SubsetSum类:

_combine(set, subset) {
  for(let i = 0; i < set.length; i++) {
    let newSubset = subset.concat(set[i]);
    this._combine(set.slice(i + 1), newSubset);
    this._processSubset(newSubset);
  }
}
复制代码

无论算法其中究竟是什么内容,但有两点要注意:

  • _combine()方法是彻底同步的;它递归地生成每个可能的子集,而不把CPU控制权交还给事件循环。若是咱们考虑一下,这对于不须要任何I/O的算法来讲是很是正常的。
  • 每当生成一个新的组合时,咱们都会将这个组合提供给_processSubset()方法以供进一步处理。

_processSubset()方法负责验证给定子集的元素总和是否等于咱们要查找的数字:

_processSubset(subset) {
  console.log('Subset', ++this.totalSubsets, subset);
  const res = subset.reduce((prev, item) => (prev + item), 0);
  if (res == this.sum) {
    this.emit('match', subset);
  }
}
复制代码

简单地说,_processSubset()方法将reduce操做应用于子集,以便计算其元素的总和。而后,当结果总和等于给定的sum参数时,会发出一个match事件。

最后,调用start()方法开始执行算法:

start() {
  this._combine(this.set, []);
  this.emit('end');
}
复制代码

经过调用_combine()触发算法,最后触发一个end事件,代表全部的组合都被检查过,而且任何可能的匹配都已经被计算出来。 这是可能的,由于_combine()是同步的; 所以,只要前面的函数返回,end事件就会触发,这意味着全部的组合都被计算出来了。

接下来,咱们在网络上公开刚刚建立的算法。可使用一个简单的HTTP服务器对响应的任务做出响应。 特别是,咱们但愿以/subsetSum?data=<Array>&sum=<Integer>这样的请求格式进行响应,传入给定的数组和sum,使用SubsetSum算法进行匹配。

在一个名为app.js的模块中实现这个简单的服务器:

const http = require('http');
const SubsetSum = require('./subsetSum');

http.createServer((req, res) => {
  const url = require('url').parse(req.url, true);
  if(url.pathname === '/subsetSum') {
    const data = JSON.parse(url.query.data);
    res.writeHead(200);
    const subsetSum = new SubsetSum(url.query.sum, data);
    subsetSum.on('match', match => {
      res.write('Match: ' + JSON.stringify(match) + '\n');
    });
    subsetSum.on('end', () => res.end());
    subsetSum.start();
  } else {
    res.writeHead(200);
    res.end('I\m alive!\n');
  }
}).listen(8000, () => console.log('Started'));
复制代码

因为SubsetSum实例使用事件返回结果,因此咱们能够在算法生成后当即对匹配的结果使用Stream进行处理。另外一个须要注意的细节是,每次咱们的服务器都会返回I'm alive!,这样咱们每次发送一个不一样于/subsetSum的请求的时候。能够用来检查咱们服务器是否挂掉了,这在稍后将会看到。

开始运行:

node app
复制代码

一旦服务器启动,咱们准备发送咱们的第一个请求;让咱们尝试发送一组17个随机数,这将致使产生131,071个组合,那么服务器将会处理一段时间:

curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]"--data-urlencode "sum=0"
复制代码

这是若是咱们在第一个请求仍在运行的时候在另外一个终端中尝试输入如下命令,咱们将发现一个巨大的问题:

curl -G http://localhost:8000
复制代码

咱们会看到直到第一个请求结束以前,最后一个请求一直处于挂起的状态。服务器没有返回响应!这正如咱们所想的那样。Node.js事件循环运行在一个单独的线程中,若是这个线程被一个长的同步计算阻塞,它将不能再执行一个循环来响应I'm alive!, 咱们必须知道,这种代码显然不可以用于同时接收到多个请求的应用程序。

可是不要对Node.js中绝望,咱们能够经过几种方式来解决这种状况。咱们来分析一下最多见的两种方案:

使用setImmediate

一般,CPU-bound算法是创建在必定规则之上的。它能够是一组递归调用,一个循环,或者基于这些的任何变化/组合。 因此,对于咱们的问题,一个简单的解决方案就是在这些步骤完成后(或者在必定数量的步骤以后),将控制权交还给事件循环。这样,任何待处理的I / O仍然能够在事件循环在长时间运行的算法产生CPU的时间间隔中处理。对于这个问题而言,解决这一问题的方式是把算法的下一步在任何可能致使挂起的I/O请求以后运行。这听起来像是setImmediate()方法的完美用例(咱们已经在Chapter2-Node.js Essential Patterns中介绍过这一API)。

模式:使用setImmediate()交错执行长时间运行的同步任务。

使用setImmediate进行子集求和算法的步骤

如今咱们来看看这个模式如何应用于子集求和算法。 咱们所要作的只是稍微修改一下subsetSum.js模块。 为方便起见,咱们将建立一个名为subsetSumDefer.js的新模块,将原始的subsetSum类的代码做为起点。 咱们要作的第一个改变是添加一个名为_combineInterleaved()的新方法,它是咱们正在实现的模式的核心:

_combineInterleaved(set, subset) {
  this.runningCombine++;
  setImmediate(() => {
    this._combine(set, subset);
    if(--this.runningCombine === 0) {
      this.emit('end');
    }
  });
}
复制代码

正如咱们所看到的,咱们所要作的只是使用setImmediate()调用原始的同步的_combine()方法。然而,如今的问题是由于该算法再也不是同步的,咱们更难以知道什么时候已经完成了全部的组合的计算。

为了解决这个问题,咱们必须使用很是相似于咱们在Chapter3-Asynchronous Control Flow Patterns with Callbacks看到的异步并行执行的模式来追溯_combine()方法的全部正在运行的实例。 当_combine()方法的全部实例都已经完成运行时,触发end事件,通知任何监听器,进程须要作的全部动做都已经完成。

对于最终子集求和算法的重构版本。首先,咱们须要将_combine()方法中的递归步骤替换为异步:

_combine(set, subset) {
  for(let i = 0; i < set.length; i++) {
    let newSubset = subset.concat(set[i]);
    this._combineInterleaved(set.slice(i + 1), newSubset);
    this._processSubset(newSubset);
  }
}
复制代码

经过上面的更改,咱们确保算法的每一个步骤都将使用setImmediate()在事件循环中排队,在事件循环队列中I / O请求以后执行,而不是同步运行形成阻塞。

另外一个小调整是对于start()方法:

start() {
  this.runningCombine = 0;
  this._combineInterleaved(this.set, []);
}
复制代码

在前面的代码中,咱们将_combine()方法的运行实例的数量初始化为0.咱们还经过调用_combineInterleaved()来将调用替换为_combine(),并移除了end的触发,由于如今_combineInterleaved()是异步处理的。 经过这个最后的改变,咱们的子集求和算法如今应该可以经过事件循环能够运行的时间间隔交替地运行其可能大量占用CPU的代码,而且不会再形成阻塞。

最后更新app.js模块,以便它可使用新版本的SubsetSum

const http = require('http');
// const SubsetSum = require('./subsetSum');
const SubsetSum = require('./subsetSumDefer');
http.createServer(function(req, res) {
  // ...
})
复制代码

和以前同样的方式开始运行,结果以下:

此时,使用异步的方式运行,再也不会阻塞CPU了。

interleaving模式

正如咱们所看到的,在保持应用程序的响应性的同时运行一个CPU-bound的任务并不复杂,只须要使用setImmediate()把同步执行的代码变为异步执行便可。可是,这不是效率最好的模式;实际上,延迟执行一个任务会额外带来一个小的开销,在这样的算法中,聚沙成塔,则会产生重大的影响。这一般是咱们在运行CPU限制任务时所须要的最后一件事情,特别是若是咱们必须将结果直接返回给用户,这应该在合理的时间内进行响应。 缓解这个问题的一个可能的解决方案是只有在必定数量的步骤以后使用setImmediate(),而不是在每一步中使用它。可是这仍然不能解决问题的根源。

记住,这并非说一旦咱们想要经过异步的模式来执行CPU-bound的任务,咱们就应该不惜一切代价来避免这样的额外开销,事实上,从更广阔的角度来看,同步任务并不必定很是漫长和复杂,以致于形成麻烦。在繁忙的服务器中,即便是阻塞事件循环200毫秒的任务也会产生不但愿的延迟。 在那些并发量并不高的服务器来讲,即便产生必定短时的阻塞,也不会影响性能,使用交错执行setImmediate()多是避免阻塞事件循环的最简单也是最有效的方法。

process.nextTick()不能用于交错长时间运行的任务。正如咱们在Chapter1-Welcome to the Node.js Platform中看到的,nextTick()会在任何未返回的I / O以前调度,而且在重复调用process.nextTick()最终会致使I / O饥饿。 你能够经过在前面的例子中用process.nextTick()替换setImmediate()来验证。

使用多个进程

使用interleaving模式并非咱们用来运行CPU-bound任务的惟一方法;防止事件循环阻塞的另外一种模式是使用子进程。咱们已经知道Node.js在运行I / O密集型应用程序(如Web服务器)的时候是最好的,由于Node.js可使得咱们能够经过异步来优化资源利用率。

因此,咱们必须保持应用程序响应的最好方法是不要在主应用程序的上下文中运行昂贵的CPU-bound任务,而是使用单独的进程。这有三个主要的优势:

  • 同步任务能够全速运行,而不须要交错执行的步骤
  • Node.js中处理进程很简单,可能比修改一个使用setImmediate()的算法更容易,而且多进程容许咱们轻松使用多个处理器,而无需扩展主应用程序自己。
  • 若是咱们真的须要超高的性能,可使用低级语言,如性能良好的C

Node.js有一个充足的API库带来与外部进程交互。 咱们能够在child_process模块中找到咱们须要的全部东西。 并且,当外部进程只是另外一个Node.js程序时,将它链接到主应用程序是很是容易的,咱们甚至不以为咱们在本地应用程序外部运行任何东西。这得益于child_process.fork()函数,该函数建立一个新的子Node.js进程,并自动建立一个通讯管道,使咱们可以使用与EventEmitter很是类似的接口交换信息。来看如何用这个特性来重构咱们的子集求和算法。

将子集求和任务委托给其余进程

重构SubsetSum任务的目标是建立一个单独的子进程,负责处理CPU-bound的任务,使服务器的事件循环专一于处理来自网络的请求:

  1. 咱们将建立一个名为processPool.js的新模块,它将容许咱们建立一个正在运行的进程池。建立一个新的进程代价昂贵,须要时间,所以咱们须要保持它们不断运行,尽可能不要产生中断,时刻准备好处理请求,使咱们能够节省时间和CPU。此外,进程池须要帮助咱们限制同时运行的进程数量,以免将使咱们的应用程序受到拒绝服务(DoS)攻击。
  2. 接下来,咱们将建立一个名为subsetSumFork.js的模块,负责抽象子进程中运行的SubsetSum任务。 它的角色将与子进程进行通讯,并将任务的结果展现为来自当前应用程序。
  3. 最后,咱们须要一个worker(咱们的子进程),一个新的Node.js程序,运行子集求和算法并将其结果转发给父进程。

DoS攻击是企图使其计划用户没法使用机器或网络资源,例如临时或无限中断或暂停链接到Internet的主机的服务。

实现一个进程池

先从构建processPool.js模块开始:

const fork = require('child_process').fork;
class ProcessPool {
  constructor(file, poolMax) {
      this.file = file;
      this.poolMax = poolMax;
      this.pool = [];
      this.active = [];
      this.waiting = [];
    } //...
}
复制代码

在模块的第一部分,引入咱们将用来建立新进程的child_process.fork()函数。 而后,咱们定义ProcessPool的构造函数,该构造函数接受表示要运行的Node.js程序的文件参数以及池中运行的最大实例数poolMax做为参数。而后咱们定义三个实例变量:

  • pool表示的是准备运行的进程
  • active表示的是当前正在运行的进程列表
  • waiting包含全部这些请求的任务队列,保存因为缺乏可用的资源而没法当即实现的任务

ProcessPool类的acquire()方法,它负责取出一个准备好被使用的进程:

acquire(callback) {
  let worker;
  if(this.pool.length > 0) {  // [1]
    worker = this.pool.pop();
    this.active.push(worker);
    return process.nextTick(callback.bind(null, null, worker));
  }

  if(this.active.length >= this.poolMax) {  // [2]
    return this.waiting.push(callback);
  }

  worker = fork(this.file);  // [3]
  this.active.push(worker);
  process.nextTick(callback.bind(null, null, worker));
}
复制代码

函数逻辑以下:

  1. 若是在进程池中有一个准备好被使用的进程,咱们只需将其移动到active数组中,而后经过异步的方式调用其回调函数。
  2. 若是池中没有可用的进程,或者已经达到运行进程的最大数量,必须等待。经过把当前回调放入waiting数组。
  3. 若是咱们尚未达到运行进程的最大数量,咱们将使用child_process.fork()建立一个新的进程,将其添加到active列表中,而后调用其回调。

ProcessPool类的最后一个方法是release(),其目的是将一个进程放回进程池中:

release(worker) {
  if(this.waiting.length > 0) {  // [1]
    const waitingCallback = this.waiting.shift();
    waitingCallback(null, worker);
  }
  this.active = this.active.filter(w => worker !==  w);  // [2]
  this.pool.push(worker);
}
复制代码

前面的代码也很简单,其解释以下:

  • 若是在waiting任务队列里面有任务须要被执行,咱们只需为这个任务分配一个进程worker执行。
  • 不然,若是在waiting任务队列中都没有须要被执行的任务,咱们则把active的进程列表中的进程放回进程池中。

正如咱们所看到的,进程历来没有中断,只在为其不断地从新分配任务,使咱们能够经过在每一个请求不从新启动一个进程达到节省时间和空间的目的。然而,重要的是要注意,这可能并不老是最好的选择,这很大程度上取决于咱们的应用程序的要求。为减小进程池长期占用内存,可能的调整以下:

  • 在一个进程空闲一段时间后,终止进程,释放内存空间。
  • 添加一个机制来终止或重启没有响应的或者崩溃了的进程。
父子进程通讯

如今咱们的ProcessPool类已经准备就绪,咱们可使用它来实现SubsetSumFork模块,SubsetSumFork的做用是与子进程进行通讯获得子集求和的结果。前面曾说到,用child_process.fork()启动一个进程也给了咱们建立了一个简单的基于消息的管道,经过实现subsetSumFork.js模块来看看它是如何工做的:

const EventEmitter = require('events').EventEmitter;
const ProcessPool = require('./processPool');
const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);

class SubsetSumFork extends EventEmitter {
  constructor(sum, set) {
    super();
    this.sum = sum;
    this.set = set;
  }

  start() {
    workers.acquire((err, worker) => {  // [1]
      worker.send({sum: this.sum, set: this.set});

      const onMessage = msg => {
        if (msg.event === 'end') {  // [3]
          worker.removeListener('message', onMessage);
          workers.release(worker);
        }

        this.emit(msg.event, msg.data);  // [4]
      };

      worker.on('message', onMessage);  // [2]
    });
  }
}

module.exports = SubsetSumFork;
复制代码

首先注意,咱们在subsetSumWorker.js调用ProcessPool的构造函数建立ProcessPool实例。 咱们还将进程池的最大容量设置为2

另外,咱们试图维持原来的SubsetSum类相同的公共API。实际上,SubsetSumForkEventEmitter的子类,它的构造函数接受sumset,而start()方法则触发算法的执行,而这个SubsetSumFork实例运行在一个单独的进程上。调用start()方法时会发生的状况:

  1. 咱们试图从进程池中得到一个新的子进程。在建立进程成功以后,咱们尝试向子进程发送一条消息,包含sumsetsend()方法是Node.js自动提供给child_process.fork()建立的全部进程,这实际上与父子进程之间的通讯管道有关。
  2. 而后咱们开始监听子进程返回的任何消息,咱们使用on()方法附加一个新的事件监听器(这也是全部以child_process.fork()建立的进程提供的通讯通道的一部分)。
  3. 在事件监听器中,咱们首先检查是否收到一个end事件,这意味着SubsetSum全部任务已经完成,在这种状况下,咱们删除onMessage监听器并释放worker,并将其放回进程池中,再也不让其占用内存资源和CPU资源。
  4. worker{event,data}格式生成消息,使得任什么时候候一旦子进程处理完毕任务,咱们在外部都能接收到这一消息。

这就是SubsetSumFork模块如今咱们来实现这个worker应用程序。

与父进程进行通讯

如今咱们来建立subsetSumWorker.js模块,咱们的应用程序,这个模块的所有内容将在一个单独的进程中运行:

const SubsetSum = require('./subsetSum');

process.on('message', msg => {  // [1]
  const subsetSum = new SubsetSum(msg.sum, msg.set);
  
  subsetSum.on('match', data => {  // [2]
    process.send({event: 'match', data: data});
  });
  
  subsetSum.on('end', data => {
    process.send({event: 'end', data: data});
  });
  
  subsetSum.start();
});
复制代码

因为咱们的handler处于一个单独的进程中,咱们没必要担忧这类CPU-bound任务阻塞事件循环,全部的HTTP请求将继续由主应用程序的事件循环处理,而不会中断。

当子进程开始启动时,父进程:

  1. 子进程当即开始监听来自父进程的消息。这能够经过process.on()函数轻松实现。咱们指望从父进程中惟一的消息是为新的SubsetSum任务提供输入的消息。只要收到这样的消息,咱们建立一个SubsetSum类的新实例,并注册matchend事件监听器。最后,咱们用subsetSum.start()开始计算。
  2. 每次子集求和算法收到事件时,把结果它封装在格式为{event,data}的对象中,并将其发送给父进程。这些消息而后在subsetSumFork.js模块中处理,就像咱们在前面的章节中看到的那样。

注意:当子进程不是Node.js进程时,则上述的通讯管道就不可用了。在这种状况下,咱们仍然能够经过在暴露于父进程的标准输入流和标准输出流之上实现咱们本身的协议来创建父子进程通讯的接口。

多进程模式

尝试新版本的子集求和算法,咱们只须要替换HTTP服务器使用的模块(文件app.js):

运行结果以下:

更有趣的是,咱们也能够尝试同时启动两个subsetSum任务,咱们能够充分看到多核CPU的做用。 相反,若是咱们尝试同时运行三个subsetSum任务,结果应该是最后一个启动将挂起。这不是由于主进程的事件循环被阻塞,而是由于咱们为subsetSum任务设置了两个进程的并发限制。

正如咱们所看到的,多进程模式比interleaving模式更增强大和灵活;然而,因为单个机器提供的CPU和内存资源量仍然是一个硬性限制,因此它仍然不可扩展。在这种状况下,将负载分配到多台机器上,则是更优秀的解决办法。

值得一提的是,在运行CPU-bound任务时,多线程能够成为多进程的替代方案。目前,有几个npm包公开了一个用于处理用户级模块的线程的API;其中最流行的是webworker-threads。可是,即便线程更轻量级,完整的进程也能够提供更大的灵活性,并具有更高更可靠的容错处理。

总结

本章讲述如下三点:

  • 异步初始化模块
  • 批处理和缓存在Node.js异步中的运用
  • 使用异步或者多进程来处理CPU-bound的任务
相关文章
相关标签/搜索