[译]基于NodeJS和Redis的任务调度

在uSTADIUM,咱们使用任务调度系统发送成千上万个推送通知。起初,对任务队列和调度程序的需求并不明显。咱们的服务器经过一个请求处理通知就能知足咱们的需求。但随着时间的推移,系统开始承受不住负载。一开始我不肯定可否解决这个问题,因此解决这个问题过程是一段有趣的经历。在本文中,我将讨论这个方案,咱们如何使用Redis构建它,以及咱们在系统扩展过程当中的经验。

问题

一旦了解了基本知识,构建API就没那么复杂了。咱们向服务器发送HTTP请求,它作一些工做,而后返回请求的数据。这个过程很简单。可是当请求须要完成超出其范围的工做时会发生什么呢?例如,当我提醒一个用户,系统须要向受影响的全部用户发送一个推送通知。在请求周期内处理这些通知将延迟最终的响应。随着咱们的通知系统变得愈来愈复杂,很明显咱们须要更多的等待时间。javascript

处理通知而后推送通知须要调用数据库和外部api。该过程拆解以下:java

  1. 发生须要生成通知的操做。
  2. 构造通知并将其插入数据库。
  3. 该通知被映射到将接收它的一组用户。
  4. 咱们为须要通知的用户检索全部设备的列表。
  5. 咱们向他们在咱们这里注册的每台设备发送推送通知。
  6. 咱们更新该通知的发送状态并删除无效的设备令牌。

这6个步骤中的每个都至少有一个与之关联的数据库查询。当须要将单个通知发送到单个用户的设备时,这个过程能够很是快地完成,可是若是须要更长的时间,那么请求就有超时的风险。咱们必须将这个逻辑分离出来,以即可以在请求/响应周期以外处理它。node

任务队列

任务队列管理了一份须要在单独进程中完成的工做列表。一个系统将工做添加到队列的末尾,而另外一个系统将工做项从顶部弹出。咱们须要建立一个表示上述工做的任务对象,而后将其添加到任务队列中。在咱们开始以前,我须要问几个基本问题。redis

1. 任务队列将位于何处?

咱们已经在使用Redis做为缓存系统,因此当我开始寻找构建队列的方法时,Redis是一个显而易见的选择。它不只可以很好地处理这种模式,并且有不少在线资源讨论它是如何构建的。对此还有许多其余选项,好比若是你正在使用谷歌应用程序引擎(GAE),你应该研究谷歌云任务队列,它提供了更多内置功能。数据库

2. 咱们如何知道什么时候将项添加到队列中?

我花了一点时间想弄明白。我不想每n毫秒轮询一次Redis来查找新做业。我发现了两种方法。第一个是Redis的发布/订阅系统。对于这个方法,我将有一个订阅通道并在其上接收消息的函数。这些消息将提醒我准备运行一个新任务。第二种方法是使用一个简单的Redis列表做为队列,使用阻塞列表pop原语(BLPOP),等待直到一个项目准备好并将其从队列中移除。api

在这个方案的第一次迭代中,咱们使用了Pub/Sub模式,可是它增长了一层不须要的复杂性。此外,当系统扩展时,咱们必须作额外的工做来验证消息没有在多台机器上处理。所以,咱们切换到List和BLPOP方法。缓存

3.咱们向任务队列插入什么?

“嗯,咱们把任务对象插进去,嗯……”你可能会这么想,可是队列只支持添加字符串,因此咱们不能真正插入一个对象。咱们必须把关键值推到末端。这个问题困扰着我,主要是由于我不肯定“最好”的方法是什么。键应该是数据库的主ID,仍是对Redis中的某个对象的引用?咱们应该在哪里画出这条线呢?我决定将events主键ID发送到队列,并容许任务决定如何处理它。例如,若是用户为一篇文章进行了upvote,我将把vote操做的ID推到vote_queue中,一旦它从队列中弹出,服务将知道如何处理它。服务器

方案

好了,我已经描述这个问题,并回答了个人一些问题(但愿这些问题也回答了你的一些问题),如今让咱们看一下这将如何工做的,如图:async

从图中能够看到,咱们有两个服务在服务器上运行。TaskScheduler将建立一个新任务,将其添加到数据库,而后将任务的ID推到任务队列的末尾。TaskManager等待任务添加到队列后适当地处理它。函数

代码示例

TaskScheduler.js是一个基本的例子,演示了如何将任务添加到数据库中,而后将其推到任务队列的末尾。一旦将其推入队列,当TaskManager开始监听时,它将开始处理。

/// TaskScheduler.js is an example of how one would schedule tasks on the task queue. 

var redis = require('redis');
var redisClient = redis.createClient();

const TaskScheduler = async function(work){
  // If you're using MySQL we would add the "Task" to the database.
  let task = await Database.query("INSERT INTO Task ...");  
  let taskID = task.insertId;
  
  await redisClient.rpush("task_queue", taskID);
}
复制代码

TaskQueue.js演示如何在NodeJS中使用async/await实现它的基本示例。

/// TaskQueue.js would be placed in your server and when it's launched 
/// to begin listening for tasks. Or, it can be extracted out to a seperate service.
var redis = require("redis");

/// TaskManager for listening to the queue and running work.
const TaskManager = async function(redisClient){
  while(true){
    let task;
    
    try{
      task = await redisClient.blpopAsync("task_queue", 0);
    } catch(error) {
      // Redis connect could have closed. Handle those cases here. 
      process.exit(1);
    }    
    
    try {
      await HandleTask(task);
    } catch (error) {
      // Handling the task failed. Try rerunning it or adding it to a "Failure" queue. 
    }
  }
}

/// Function that handles all the work for this task.
const HandleTask = async function(task){
  // Do the work!
}

// Run the TaskManager function
(async function() {
  // Initialize redis
  let redisClient = redis.createClient();
  await TaskManager(redisClient)
})()
复制代码

改进

因为我给出的代码只是一个基本的示例,因此还有不少地方须要改进。你可能想问,TaskManager应该放在哪里?若是直接将其添加到服务器,则在高使用率期间可能会使系统过载,但这取决于你的任务执行的工做类型。在咱们的系统中,咱们将全部这些提取到一个新的微服务中,并使用一个简单的API来检查它的状态。

一样,在示例代码中,咱们一次运行一个任务。这并不理想,由于长时间运行的任务可能得备份整个队列。所以,咱们应该有一个运行任务池,根据须要添加和删除这些任务。一旦池被填满,while循环将等待一个新的空间。

概述

本文所描述的方法并不太复杂,可是它将业务逻辑与应用程序逻辑解耦。有了这个小的更改,咱们就能够开始迭代系统的性能,并构建更健壮的队列和服务。咱们还能够复制此方法来处理各类长时间运行的流程,如推荐系统、文本处理等。

原文:Task Scheduling with NodeJS and Redis

相关文章
相关标签/搜索