Node.js 中使用 Redis 来实现定时任务

原文连接:http://xcoder.in/2015/06/05/scheduled-task-using-redis/javascript

很久没写博文了,最近在跟随着公司大牛们的脚步秘密研发新产品中。php

不过前几天有一个小需求的东西能够提出来写一点点小干货儿跟你们分享分享。米娜桑会的就能够忽略了,反正我也是随便写的;若是以为本文对你有用的话还请多多支持喵。(●´ω`●)ゞjava

本文所说的定时任务或者说计划任务并非不少人想象中的那样,好比说天天凌晨三点自动运行起来跑一个脚本。这种都已经烂大街了,随便一个 Crontab 就能搞定了。node

这里所说的定时任务能够说是计时器任务,好比说用户触发了某个动做,那么从这个点开始过二十四小时咱们要对这个动做作点什么。那么若是有 1000 个用户触发了这个动做,就会有 1000 个定时任务。因而这就不是 Cron 范畴里面的内容了。linux

举个最简单的例子,一个用户推荐了另外一个用户,咱们定一个二十四小时以后的任务,看看被推荐的用户有没有来注册,若是没注册就给他搞一条短信过去。Σ>―(〃°ω°〃)♡→git

最初的设想

一开始我是想把这个计时器作在内存里面直接调用的。github

考虑到 Node.js 的定时并非那么准确(不管是 setTimeout 仍是 setInterval),因此原本打算本身维护这个定时器队列。redis

又考虑到 Node.js 原生对象比较耗内存。以前我用 JSON 对象存了一本字典,约十二万多的词条,原文件大概也就五六兆,用 Node.js 的原生对象一存竟然有五六百兆的内存占用——因此打算这个定时器队列用 C++ 来写 addon。npm

考虑到任什么时候候插入的任务都有可能在已有的任务以前或者以后,因此原本想用 C++ 来写一个小根堆。每次用户来一个任务的时候就将这个任务插入到堆中。数组

若是按照上述方法的话,再加上对时间要求掐得也不是那么紧,因而就是一个不断的 process.nextTick() 的过程。

process.nextTick() 当中执行这么一个函数:

  1. 从小根堆中不断获取堆顶的任务并处理,一直处理到堆顶任务的执行时间大于当前时间为止。
  2. 继续 process.nextTick() 来让下一个 tick 执行步骤 1 中的流程。

因此最后就是一边往小根堆插入任务,另外一边经过不断 process.nextTick() 消费任务的这么一个过程。

最后,为了考虑到程序重启的时候内存数据会丢失,还应该作一个持久化的事情——在每次插入任务的时候顺便往持久化中间件中插一条副本,好比 MySQL、MongoDB、Redis、Riak 等等任何三方依赖。消费任务的时候顺便把中间件中的这条任务数据给删除。

也就是说中间件中永远存的就是当前还没有完成的任务。每当程序重启的时候都先从中间件中把全部任务读取进来重建一下堆,而后就能继续工做了。

若是当时我没有发现 Redis 的这个妙用的话,上述的流程将会是我实现咱们定时任务的流程了。

Redis 妙用

在 Redis 的 2.8.0 版本以后,其推出了一个新的特性——键空间消息(Redis Keyspace Notifications),它配合 2.0.0 版本以后的 SUBSCRIBE 就能完成这个定时任务的操做了,不过定时的单位是秒

Publish / Subscribe

Redis 在 2.0.0 以后推出了 Pub / Sub 的指令,大体就是说一边给 Redis 的特定频道发送消息,另外一边从 Redis 的特定频道取值——造成了一个简易的消息队列

好比咱们能够往 foo 频道推一个消息 bar,那么就能够直接:

PUBLISH foo bar

另外一边咱们在客户端订阅 foo 频道就能接受到这个消息了。

举个例子,若是在 Node.js 里面使用 ioredis 这个包那么看起来就会像这样:

javascriptvar Redis = require("ioredis");
var sub = new Redis(/** 链接信息 */);
sub.once("connect", function() {
    // 假设咱们须要选择 redis 的 db,由于实际上咱们不会去污染默认的 db 0
    sub.select(DB_NUMBER, function(err) {
        if(err) process.exit(4);
        sub.subscribe("foo", function() {
            //... 订阅频道成功
        });
    });
});

// 监遵从 `foo` 来的消息
sub.on("message", function(channel, msg) {
    console.log(channel, msg);
});

Redis Keyspace Notifications

在 Redis 里面有一些事件,好比键到期、键被删除等。而后咱们能够经过配置一些东西来让 Redis 一旦触发这些事件的时候就往特定的 Channel 推一条消息。

本文所涉及到的需求的话咱们所须要关心的事件是 EXPIRE 即过时事件。

大体的流程就是咱们给 Redis 的某一个 db 设置过时事件,使其键一旦过时就会往特定频道推消息,我在本身的客户端这边就一直消费这个频道就行了。

之后一来一条定时任务,咱们就把这个任务状态压缩成一个键,而且过时时间为距这个任务执行的时间差。那么当键一旦到期,就到了任务该执行的时间,Redis 天然会把过时消息推去,咱们的客户端就能接收到了。这样一来就起到了定时任务的做用。

消息类型

当达到必定条件后,有两种类型的这种消息会被触发,用哪一个须要本身选了。举个例子,咱们删除了在 db 0 中一个叫 foo 的键,那么系统会往两个频道推消息,一个是 del 事件频道推 foo 消息,另外一个是 foo 频道推 del 消息,它们小俩口被系统推送的指令分别等价于:

PUBLISH __keyspace@0__:foo del
PUBLISH __keyevent@0__:del foo

其中往 foo 推送 del 的频道名为 __keyspace@0__:foo,便是 "__keyspace@" + DB_NUMBER + "__:" + KEY_NAME;而 del 的频道名为 "__keyevent@" + DB_NUMBER + "__:" + EVENT_NAME

配置

即便你的 Redis 版本达标了,可是 Redis 默认是关闭这个功能的,你须要修改配置文件来打开它,或者直接在 CLI 里面经过指令修改。这里就说说配置文件的修改吧。

若是不想看我在这里罗里吧嗦的,也能够直接去看 Redis 的相关文档

首先打开 Redis 的配置文件,在不一样的系统和安装方式下文件位置可能不一样,好比经过 brew 安装的 MacOS 下多是在 /usr/local/etc/redis.conf 下面,经过 apt-get 安装的 Ubuntu 下多是在 /etc/redis/redis.conf 下,总之找到配置文件。或者本身写一个配置文件,启动的时候指定配置文件地址就好。

而后找到一项叫 notify-keyspace-events 的地方,若是找不到则自行添加,其值能够是 ExKlg 等等。这些字母的具体含义以下所示:

  • K,表示 keyspace 事件,有这个字母表示会往 __keyspace@<db>__ 频道推消息。
  • E,表示 keyevent 事件,有这个字母表示会往 __keyevent@<db>__ 频道推消息。
  • g,表示一些通用指令事件支持,如 DELEXPIRERENAME 等等。
  • $,表示字符串(String)相关指令的事件支持。
  • l,表示列表(List)相关指令事件支持。
  • s,表示集合(Set)相关指令事件支持。
  • h,哈希(Hash)相关指令事件支持。
  • z,有序集(Sorted Set)相关指令事件支持。
  • x,过时事件,与 g 中的 EXPIRE 不一样的是,gEXPIRE 是指执行 EXPIRE key ttl 这条指令的时候顺便触发的事件,而这里是指那个 key 恰好过时的这个时间点触发的事件。
  • e,驱逐事件,一个 key 因为内存上限而被驱逐的时候会触发的事件。
  • Ag$lshzxe 的别名。也就是说 AKE 的意思就表明了全部的事件。

结合上述列表咱们就能拼凑出本身所须要的事件支持字符串了,在个人需求中我只须要 Ex 就能够知足了,因此配置项就是这样的:

notify-keyspace-events Ex

而后保存配置文件,启动 Redis 就启用了过时事件的支持了。

实践

咱们先说任务的创造者吧。因为这里 Redis 的事件只会传键名,并不会传键值,而过时事件触发的时候那个键已经没了,你也没法获取键值,加上个人主系统和任务系统是分布式的,因此就把全部须要的信息往键名塞。

一个最简单的键名设计就是 任务类型 + ":" + JSON.stringify 化后的参数数组;更有甚者能够直接把任务类型替换成所需的函数路径,好比须要执行这个任务的函数在 task/foo/bar 文件下面的 baz 函数,参数 arguments 数组为 [ 1, 2 ],那么键名的设计能够是 task/foo/bar.baz:[1,2],反正咱们只须要触发这个键,用不着去查询这个键。等到真正过时了任务系统接收到这个键名的时候再一一解析,获得须要执行 task/foo/bar.baz 这个消息,而且网函数里面传入 [1,2] 这个 arguments

因此当接收到一个定时任务的时候,咱们获得消息、函数名、过时时间参数,这个函数能够以下设计:

javascript/** 咱们假设 redis 是一个 ioredis 的对象 */

var sampleTaskMaker = function(message, func, timeout) {
    message = JSON.stringify(message);
    console.log("Received a new task:", func, message, "after " + timeout + ".");

    // 这里的 uuid 是 npm 一个包
    // 生成一个惟一 uuid 的目的是为了防止两个任务用了相同的函数和参数,那么
    // 键名可能会重复并覆盖的状况
    // uuid 的文档为 https://www.npmjs.com/package/node-uuid
    //
    // 这里的 ❤️ 是一个分隔符,冒号是分割 uuid 和后面内容的,而 ❤️ 是分割函数名
    // 和消息的
    var key = uuid.v1().replace(/-/g, "") +
        ":❤️" + func + "❤️" + message;
    var content = "";

    redis.multi()
        .set(key, content)
        .expire(key, timeout)
        .exec(function(err) {
            if(err) {
                console.error("Failed to publish EXPIRE EVENT for " + content);
                console.error(err);
                return;
            }
        });
};

Ioredis 的稳定能够点此查看。

而后在任务系统里面的一开始监听这个过时频道:

javascript// assign 是 sugarjs 里面的函数
// 把 db 塞到字符串里面的 {db} 里去
var subscribeKey = "__keyevent@{db}__:expired".assign({ db: 1 });

// 假设 sub 是 ioredis 的对象
sub.once("connect", function() {
    // 假设咱们须要选择 redis 的 db,由于实际上咱们不会去污染默认的 db 0
    sub.select(1, function(err) {
        if(err) process.exit(4);
        sub.subscribe("foo", function() {
            //... 订阅频道成功
        });
    });
});

// 监遵从 `foo` 来的消息
sub.on("message", sampleOnExpired);

注意: 咱们这里选择 db 1 是由于一旦开启过时事件监听,那么这个 db 的全部过时事件都会被发送。为了避免跟正常使用的 redis 过时键混淆,咱们为这个事情专门用一个新的 db。好比咱们在本身正常使用的 db 0 里面监听了,那么不是咱们任务触发的过时事件也会传过来,这个时候咱们解析的键名就不对了。

最后就是咱们的 sampleOnExpired 函数了。

javascriptvar sampleOnExpired = function(channel, key) {
    // UUID:❤️func❤️params
    var body = key.split("❤️");
    if(body.length < 3) return;

    // 取出 body 第一位为 func
    var func = body[1];

    // 推出前两位,后面剩下的有多是参数里面自带 ❤️ 而被分割,因此要拼回去
    body.shift(); body.shift();
    var params = body.join("❤️");

    // 而后把 params 传入 func 去执行
    // func:
    //   path1/path2.func
    func = func.split(".");
    if(func.length !== 2) {
        console.error("Bad params for task:", func.join("."), "-", params);
        return;
    }

    var path = func[0];
    func = func[1];

    var mod;
    try {
        mod = require("./tasks/" + path);
    } catch(e) {
        console.error("Failed to load module", path);
        console.error(e.stack);
        return;
    }

    process.nextTick(function() {
        try {
            mod[func].apply(null, JSON.parse(params));
        } catch(e) {
            console.error("Failed to call function", path, "-", func, "-", params);
            console.error(e.stack);
        }
    });
};

这个简易的架子搭好后,你只须要去写一堆任务执行函数,而后在生成任务的时候把相应参数传给 sampleTaskMaker 就行了。Redis 会自动过时而且触发事件给你的 sampleOnExpired 函数,而后就会去执行相应的任务处理函数了。

小结

其实这个需求在咱们项目目前就是给用户定时发提醒短信用的。若是没有发现 Redis 的这个妙用,我仍是会去用第二节里面的方法来写的。其实这期间也有考虑过用 RabbitMQ,不过貌似它的定时消息须要作一些 Hack,比较麻烦,最后就放弃了。

Redis 的这个方法实际上是我在谷歌搜出来的,别人在 StackOverflow 回答的答案。我参考了以后用我本身的方法实现了出来,而且把代码的关键部分提取出来整理成这篇小文,还但愿能给各位看官一些用吧,望打赏。

若是没有什么用也憋喷我,毕竟我是个蒟蒻。有更好的方法但愿留个言,望告知。谢谢。(´,,•ω•,,)♡

相关文章
相关标签/搜索