使用Redis keyspace notifications来实现延迟任务(NodeJS版)

1.使用场景

在不少系统中,特别是电商系统经常存在须要执行延迟任务。例如一个待支付订单,须要在30分钟后自动关闭。虽然有不少方式能够实现,好比说Job等,这里主要介绍利用Redis的新特性 keyspace notifications来实现。html

2.基础知识

重点!!!   Redis 2.8.0版本开始支持 keyspace notifications。若是你的Redis版本过低,能够洗洗睡了……redis

若是你还不了解Redis的Pub/Sub,强烈建议你先阅读该篇文章: Redis发布与订阅bash

接下来讲说咱们的主角:keyspace notificationsui

keyspace notifications默认是关闭状态,开启则须要修改redis.conf文件或经过CONFIG SET来开启或关闭该功能。这里咱们使用CONFIG SET来开启:spa

$ redis-cli config set notify-keyspace-events Ex复制代码

这里有人会问了, Ex 是什么意思呢?这是notify-keyspace-events的参数,完整的参数列表看看下面的表格:code

字符 发送的通知
K 键空间通知,全部通知以 __keyspace@<db>__ 为前缀
E 键事件通知,全部通知以 __keyevent@<db>__ 为前缀
g DELEXPIRERENAME 等类型无关的通用命令的通知
$ 字符串命令的通知
l 列表命令的通知
s 集合命令的通知
h 哈希命令的通知
z 有序集合命令的通知
x 过时事件:每当有过时键被删除时发送
e 驱逐(evict)事件:每当有键由于 maxmemory 政策而被删除时发送
A 参数 g$lshzxe 的别名

能够看出,咱们只开启了键事件通知和过时事件。由于咱们实现延时任务只须要这两个就足够了。话很少说,直接看代码。cdn

3. 实现方案

一个延迟任务应该具有哪些属性? 我以为至少有如下属性:htm

  • 任务类型。(例如:关闭订单)
  • 任务ID。(例如:订单ID)
  • 任务延迟时间。(例如:30分钟)
  • 任务额外数据。(例如:订单其余相关数据)

肯定好后,咱们能够继续往下走。blog

3.1 注册事件处理器

首先在工程启动后,咱们须要根据不一样的事件注册不一样的处理器:seo

const _ = require('lodash')
// 任务处理器map
const handlers = {}
// 事件类型map
const events = {}
const registerEventHandler = (type, handler) => {  
  if (!type) {    
      throw new Error('type不能为空')  
  }  
  if (!_.isFunction(handler)) {    
      throw new Error('handler类型非function')  
  }  
  handlers[type] = handler  
  events[type] = true
}复制代码

3.2 建立延迟任务

const redis = require('redis')
const client = redis.createClient()
const eventKeyPrefix = 'custom_event_'// 任务列表
const jobs = {}
const addDelayEvent = (type, id, body = {}, delay = 10 * 60) => {  
  const key = `${eventKeyPrefix}${type}_${id}`  
  const jobKey = `${type}_${id}`  
  client.setex(key, delay, 'delay event', (err) => {    
    if (err) {      
      return console.log('添加延迟事件失败:', err);    
    }    
    console.log('添加延迟事件成功');    
    jobs[jobKey] = body  
  })
}
复制代码

这里比较关键的点就是client.setex(key, expired, value)这个方法,咱们须要给key添加一个过时时间,那么当key过时后redis才会发出一个过时事件。

3.3 订阅过时事件

实现了前两个步骤后,咱们已经能够往redis里写入带有过时时间的key了。接下来关键的就是订阅过时事件并处理。

const redis = require('redis')
const sub = redis.createClient()

sub.on('pmessage', (pattern, channel, message) => {  
  // match key  
  const keyMatcher = new RegExp(`^${eventKeyPrefix}(${_.keys(events).join('|')})_(\\S+)$`)  
  const result = message.match(keyMatcher)  
  if (result) {    
    const type = result[1];    
    const id = result[2];    
    const handler = handlers[type]    
    console.log('订阅消息:type=%s, id=%s', type, id);    
    if (_.isFunction(handler)) {      
      const jobKey = `${type}_${id}`      
      if (jobs[jobKey]) {        
        handler(id, jobs[jobKey])      
      } else {        
        console.log('未找到延迟事件,type=%s,id=%s', type, id);      
      }    
    } else {      
      console.log('未找到事件处理器。type=%s', type)    
    }  
  }
})
// 订阅频道
sub.psubscribe('__key*__:expired')
复制代码

3.4 编写Demo

最后咱们写一个Demo来验证下咱们的功能。

const eventManager = require('./utils/eventManager')

eventManager.registerEventHandler('closeorder', (id, body) => {  
    console.log('关闭订单 id=%s, body=%o', id, body);
})

eventManager.addDelayEvent('closeorder', 1111, {name: 'test'}, 5)复制代码


Done! 

相关文章
相关标签/搜索