本文所说的定时任务或者说计划任务并非不少人想象中的那样,好比说天天凌晨三点自动运行起来跑一个脚本。这种都已经烂大街了,随便一个 Crontab 就能搞定了。php
这里所说的定时任务能够说是计时器任务,好比说用户触发了某个动做,那么从这个点开始过二十四小时咱们要对这个动做作点什么。那么若是有 1000 个用户触发了这个动做,就会有 1000 个定时任务。因而这就不是 Cron 范畴里面的内容了。html
举个最简单的例子,一个用户推荐了另外一个用户,咱们定一个二十四小时以后的任务,看看被推荐的用户有没有来注册,若是没注册就给他搞一条短信过去mysql
设置了生存时间的Key,在过时时能不能有所提示?web
若是能对过时Key有个监听,如何对过时Key进行一个回调处理?redis
如何使用 Redis 来实现定时任务?sql
更具体需求:数据库
如今须要作一个拍卖活动,如何在拍卖结束那一刻,就执行任务进行相关逻辑;数组
如何在订单交易有效期时间结束的那一刻,进行相关逻辑服务器
在 Redis 的 2.8.0 版本以后,其推出了一个新的特性——键空间消息(Redis Keyspace Notifications),它配合 2.0.0 版本以后的 SUBSCRIBE 就能完成这个定时任务websocket
的操做了,不过定时的单位是秒。
(1)Publish / Subscribe
Redis 在 2.0.0 以后推出了 Pub / Sub 的指令,大体就是说一边给 Redis 的特定频道发送消息,另外一边从 Redis 的特定频道取值——造成了一个简易的消息队列。
(2)Redis Keyspace Notifications
在 Redis 里面有一些事件,好比键到期、键被删除等。而后咱们能够经过配置一些东西来让 Redis 一旦触发这些事件的时候就往特定的 Channel 推一条消息。
大体的流程就是咱们给 Redis 的某一个 db 设置过时事件,使其键一旦过时就会往特定频道推消息,我在本身的客户端这边就一直消费这个频道就行了。
之后一来一条定时任务,咱们就把这个任务状态压缩成一个键,而且过时时间为距这个任务执行的时间差。那么当键一旦到期,就到了任务该执行的时间,Redis 天然会把过时消息推去,咱们的客户端就能接收到了。这样一来就起到了定时任务的做用。
由于开启键空间通知功能须要消耗一些 CPU , 因此在默认配置下, 该功能处于关闭状态。
能够经过修改 redis.conf
文件, 或者直接使用 CONFIG SET
命令来开启或关闭键空间通知功能:
当 notify-keyspace-events
选项的参数为空字符串时,功能关闭。
另外一方面,当参数不是空字符串时,功能开启。
notify-keyspace-events
的参数能够是如下字符的任意组合, 它指定了服务器该发送哪些类型的通知:
字符 | 发送的通知 |
---|---|
K |
键空间通知,全部通知以 __keyspace@<db>__ 为前缀 |
E |
键事件通知,全部通知以 __keyevent@<db>__ 为前缀 |
g |
DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知 |
$ |
字符串命令的通知 |
l |
列表命令的通知 |
s |
集合命令的通知 |
h |
哈希命令的通知 |
z |
有序集合命令的通知 |
x |
过时事件:每当有过时键被删除时发送 |
e |
驱逐(evict)事件:每当有键由于 maxmemory 政策而被删除时发送 |
A |
参数 g$lshzxe 的别名 |
输入的参数中至少要有一个 K
或者 E
, 不然的话, 无论其他的参数是什么, 都不会有任何通知被分发。
举个例子, 若是只想订阅键空间中和列表相关的通知, 那么参数就应该设为 Kl
, 诸如此类。
将参数设为字符串 "AKE"
表示发送全部类型的通知。
监听过时事件须要设置Redis 配置文件
notify-keyspace-events "Ex"
如下列表记录了不一样命令所产生的不一样通知:
[DEL key key …] 命令为每一个被删除的键产生一个 del
通知。
RENAME key newkey 产生两个通知:为来源键(source key)产生一个 rename_from
通知,并为目标键(destination key)产生一个 rename_to
通知。
EXPIRE key seconds 和 EXPIREAT key timestamp 在键被正确设置过时时间时产生一个 expire
通知。当 EXPIREAT key timestamp 设置的时间已通过期,或者 EXPIRE key seconds传入的时间为负数值时,键被删除,并产生一个 del
通知。
[SORT key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern …]] [ASC | DESC] [ALPHA] [STORE destination]](http://redisdoc.com/database/sort.html#sort) 在命令带有 STORE
参数时产生一个 sortstore
事件。若是 STORE
指示的用于保存排序结果的键已经存在,那么程序还会发送一个 del
事件。
SET key value [EX seconds] [PX milliseconds] [NX|XX] 以及它的全部变种(SETEX key seconds value 、 SETNX key value 和 GETSET key value)都产生 set
通知。其中 SETEX key seconds value 还会产生 expire
通知。
[MSET key value key value …] 为每一个键产生一个 set
通知。
SETRANGE key offset value 产生一个 setrange
通知。
INCR key 、 DECR key 、 INCRBY key increment 和 DECRBY key decrement 都产生 incrby
通知。
INCRBYFLOAT key increment 产生 incrbyfloat
通知。
APPEND key value 产生 append
通知。
[LPUSH key value value …] 和 LPUSHX key value 都产生单个 lpush
通知,即便有多个输入元素时,也是如此。
[RPUSH key value value …] 和 RPUSHX key value 都产生单个 rpush
通知,即便有多个输入元素时,也是如此。
RPOP key 产生 rpop
通知。若是被弹出的元素是列表的最后一个元素,那么还会产生一个 del
通知。
LPOP key 产生 lpop
通知。若是被弹出的元素是列表的最后一个元素,那么还会产生一个 del
通知。
LINSERT key BEFORE|AFTER pivot value 产生一个 linsert
通知。
LSET key index value 产生一个 lset
通知。
LTRIM key start stop 产生一个 ltrim
通知。若是 LTRIM key start stop 执行以后,列表键被清空,那么还会产生一个 del
通知。
RPOPLPUSH source destination 和 BRPOPLPUSH source destination timeout 产生一个 rpop
通知,以及一个 lpush
通知。两个命令都会保证 rpop
的通知在 lpush
的通知以前分发。若是从键弹出元素以后,被弹出的列表键被清空,那么还会产生一个 del
通知。
HSET hash field value 、 HSETNX hash field value 和 HMSET 都只产生一个 hset
通知。
HINCRBY 产生一个 hincrby
通知。
HINCRBYFLOAT 产生一个 hincrbyfloat
通知。
[SADD key member member …] 产生一个 sadd
通知,即便有多个输入元素时,也是如此。
[SREM key member member …] 产生一个 srem
通知,若是执行 [SREM key member member …] 以后,集合键被清空,那么还会产生一个 del
通知。
SMOVE source destination member 为来源键(source key)产生一个 srem
通知,并为目标键(destination key)产生一个 sadd
事件。
SPOP key 产生一个 spop
事件。若是执行 SPOP key 以后,集合键被清空,那么还会产生一个 del
通知。
[SINTERSTORE destination key key …] 、 [SUNIONSTORE destination key key …] 和 [SDIFFSTORE destination key key …] 分别产生 sinterstore
、 sunionostore
和 sdiffstore
三种通知。若是用于保存结果的键已经存在,那么还会产生一个 del
通知。
ZINCRBY key increment member 产生一个 zincr
通知。(译注:非对称,请注意。)
[ZADD key score member [[score member] [score member] …]](http://redisdoc.com/sorted_set/zadd.html#zadd) 产生一个 zadd
通知,即便有多个输入元素时,也是如此。
[ZREM key member member …] 产生一个 zrem
通知,即便有多个输入元素时,也是如此。若是执行 [ZREM key member member …] 以后,有序集合键被清空,那么还会产生一个 del
通知。
ZREMRANGEBYSCORE key min max 产生一个 zrembyscore
通知。(译注:非对称,请注意。)若是用于保存结果的键已经存在,那么还会产生一个 del
通知。
ZREMRANGEBYRANK key start stop 产生一个 zrembyrank
通知。(译注:非对称,请注意。)若是用于保存结果的键已经存在,那么还会产生一个 del
通知。
[ZINTERSTORE destination numkeys key [key …] [WEIGHTS weight [weight …]] [AGGREGATE SUM|MIN|MAX]](http://redisdoc.com/sorted_set/zinterstore.html#zinterstore) 和 [ZUNIONSTORE destination numkeys key [key …] [WEIGHTS weight [weight …]] [AGGREGATE SUM|MIN|MAX]](http://redisdoc.com/sorted_set/zunionstore.html#zunionstore) 分别产生 zinterstore
和 zunionstore
两种通知。若是用于保存结果的键已经存在,那么还会产生一个 del
通知。
每当一个键由于过时而被删除时,产生一个 expired
通知。
每当一个键由于 maxmemory
政策而被删除以回收内存时,产生一个 evicted
通知。
全部命令都只在键真的被改动了以后,才会产生通知。
好比说,当 [SREM key member member …] 试图删除不存在于集合的元素时,删除操做会执行失败,由于没有真正的改动键,因此这一操做不会发送通知。
若是对命令所产生的通知有疑问, 最好仍是使用如下命令, 本身来验证一下:
$ redis-cli config set notify-keyspace-events KEA $ redis-cli --csv psubscribe '__key*__:*' Reading messages... (press Ctrl-C to quit) "psubscribe","__key*__:*",1
而后, 只要在其余终端里用 Redis 客户端发送命令, 就能够看到产生的通知了:
"pmessage","__key*__:*","__keyspace@0__:foo","set" "pmessage","__key*__:*","__keyevent@0__:set","foo" ...
Redis 使用如下两种方式删除过时的键:
当一个键被访问时,程序会对这个键进行检查,若是键已通过期,那么该键将被删除。
底层系统会在后台渐进地查找并删除那些过时的键,从而处理那些已通过期、可是不会被访问到的键。
当过时键被以上两个程序的任意一个发现、 而且将键从数据库中删除时, Redis 会产生一个 expired
通知。
Redis 并不保证生存时间(TTL)变为 0
的键会当即被删除: 若是程序没有访问这个过时键, 或者带有生存时间的键很是多的话, 那么在键的生存时间变为 0
, 直到键真正被删除这中间, 可能会有一段比较显著的时间间隔。
所以, Redis 产生 expired
通知的时间为过时键被删除的时候, 而不是键的生存时间变为 0
的时候。
由于 Redis 目前的订阅与发布功能采起的是发送即忘(fire and forget)策略, 因此若是你的程序须要可靠事件通知(reliable notification of events), 那么目前的键空间通知可能并不适合你:当订阅事件的客户端断线时, 它会丢失全部在断线期间分发给它的事件。并不能确保消息送达。将来有计划容许更可靠的事件传递,但可能这将在更通常的层面上解决,或者为Pub / Sub自己带来可靠性,或者容许Lua脚本拦截Pub / Sub消息来执行诸如推送将事件列入清单。
对于每一个修改数据库的操做,键空间通知都会发送两种不一样类型的事件消息:keyspace 和 keyevent。以 keyspace 为前缀的频道被称为键空间通知(key-space notification), 而以 keyevent 为前缀的频道则被称为键事件通知(key-event notification)。
事件是用 __keyspace@DB__:KeyPattern 或者 __keyevent@DB__:OpsType 的格式来发布消息的。
DB表示在第几个库;KeyPattern则是表示须要监控的键模式(能够用通配符,如:__key*__:*);OpsType则表示操做类型。所以,若是想要订阅特殊的Key上的事件,应该是订阅keyspace。
好比说,对 0 号数据库的键 mykey 执行 DEL 命令时, 系统将分发两条消息, 至关于执行如下两个 PUBLISH 命令:
PUBLISH __keyspace@0__:sampleKey del
PUBLISH __keyevent@0__:del sampleKey
订阅第一个频道 __keyspace@0__:mykey 能够接收 0 号数据库中全部修改键 mykey 的事件, 而订阅第二个频道 __keyevent@0__:del 则能够接收 0 号数据库中全部执行 del 命令的键。
为了高可用性,为了确保解决过时事件的执行,将 定时事件存入MySQL数据库。触发键过时事件后,再查询一次数据库,检查一下过时事件是否所有执行了。
CREATE TABLE `tb_time_limit_task` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `key` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT 'Redis键', `status` tinyint(3) unsigned NOT NULL COMMENT '状态,0未处理,1已处理', `start_time` decimal(13,3) unsigned NOT NULL COMMENT '开始时间(小数部分为毫秒)', `end_time` decimal(13,3) unsigned NOT NULL COMMENT '结束时间(小数部分为毫秒)', PRIMARY KEY (`id`), KEY `we` (`key`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='这个表用于记录须要时间控制的任务Key,配合Redis、以及回调脚本使用'; key存储规则是 类名@方法名@参数... (参数可为空,多个参数以@分隔) 例子: PTCountdown@countdown@218
(查询数据库)任务状态检查,执行未正常执行的任务
任务状态检查
查询 ”结束时间 < 当前时间“ 的未处理的任务
若是存在,则执行任务,
1.先解析key,类名@方法名@参数... 2.而后根据类名去执行相应方法
链接redis
链接成功
(查询数据库)任务状态检查,查看在脚本未运行期间是否有部分任务未处理,可能很长时间才连上redis,须要查看链接时间内的任务情况;
可能会永远连不上,则每10s,尝试重连
生成订阅消息丢失控制键
向redis初始新增 10个有效期(900/1800/...)的键
#SILCK`1 900
#SILCK`2 1800
#SILCK`3 2700
...
#SILCK`10 9000
这一步的目的是 每900秒(15)分钟,查询数据库,检查任务执行状况
订阅过时事件
正常键过时
执行任务
订阅消息控制键过时
检查任务状态
若是超过一半的控制键都过时了,那么从新生成10个
<?php /** * Description:时间结点任务监听 * Created by dong.cx * DateTime: 2019-03-15 10:58 */ namespace wladmin\cmd; \think\Loader::addNamespace('wlmis', './wlmis/'); use wlmis\logic\timeLimitTask\base\TimeLimitTaskLogic; use think\Config; use think\console\Input; use think\console\Output; use think\console\Command; use think\Log; use wlmis\common\redis\Redis; use wlmis\logic\timeLimitTask\base\LogRecord; class TimeLimitTask extends Command { use LogRecord; /** * 订阅信息丢失控制键最大数量 * @var int */ protected $subscription_info_loss_control_key_max = 10; /** * 订阅信息丢失控制键最后执行的索引,键的索引从1开始,为0表示未执行过,这个变量用于控制订阅信息控制键自动生成 * @var int */ protected $subscription_info_loss_control_key_last = 0; public function __construct($name = null) { parent::__construct($name); // 日志记录初始化 Log::init([ 'type' => 'File', 'path' => RUNTIME_PATH . 'redis-logs/', // error和sql日志单独记录 'apart_level' => ['log', 'error', 'sql', 'debug', 'info', 'notice'], ]); } /** * 运行方式 php tp5cornnew.php TimeLimitTask * @author dong.cx 2019-04-02 10:59 */ protected function configure() { $this->setName('TimeLimitTask')->setDescription('Redis keyspace notification subscription script'); } protected function execute(Input $input, Output $output) { // 配置断线重连 Config::set('database.break_reconnect', true); $config = Config::get('redis_db'); $reconnect_str = ''; RedisReconnect: try { $this->logRecord('info', "ThinkPHP Version: " . THINK_VERSION); $this->logRecord('info', $reconnect_str . "Redis host: " . $config['host'], true, true); // 进行任务状态检查 $this->taskStatusCheck(); $redis = new Redis(get_class($this), true); if ($redis->ping() == '+PONG') { $this->logRecord('info', 'Connection succeeded', true, true); // 查看在脚本未运行期间是否有部分任务未处理 $this->taskStatusCheck(); } // 生成订阅消息丢失控制键 $this->subscription_info_loss_control(true); $this->logRecord('info', 'Start listening', true, true); // 订阅消息 $redis->psubscribe(array( '__keyevent@' . $config['db'] . '__:expired' ), function ($redis, $pattern, $channelName, $message) { $msg_split = explode('`', $message); if (count($msg_split) == 2 && $msg_split[0] == '#SILCK' && is_numeric($msg_split[1])) { $this->subscription_info_loss_control_key_last = $msg_split[1]; $this->taskStatusCheck(); if ($this->subscription_info_loss_control_key_last > ($this->subscription_info_loss_control_key_max / 2)) { $this->subscription_info_loss_control(); $this->subscription_info_loss_control_key_last = 0; } } else { // 这里表明是Redis回调执行 $this->task($message); } }); } catch (\RedisException $redisThrow) { // Redis抛出异常,通常的状况是失去链接,执行从新链接 $this->logRecord('notice', "Redis loses connection and is reconnecting...", true, true); try { $redis->close(); } catch (\Exception $ee) { } sleep(10); $reconnect_str = 'Reconnect '; goto RedisReconnect; } catch (\Exception $e) { // 运行错误,这里抛出错误的缘由为这个文件中的代码有误,其余任务执行代码抛出错误,不会致使运行中断 - 执行到这里运行中断 $this->logRecord('error', 'Run-time error' . PHP_EOL . 'File location: ' . $e->getFile() . PHP_EOL . 'Line: ' . $e->getLine() . PHP_EOL . 'Error Message: ' . $e->getMessage() . PHP_EOL, true, true); } } /** * 任务执行 * @param string $key 任务键名,记录于Redis中的键名 * 键名规则:类名@方法名@参数...(后续的多个参数都用@分隔),在时间限制任务基类中有生成键的封装函数 * @author: dong.cx */ private function task($key) { try { $params = explode('@', $key, 3); if (count($params) < 2) { return; } $class = new \ReflectionClass('wlmis\\logic\\timeLimitTask\\' . $params[0]); $instance = $class->newInstance(); $transfer = array(); if (count($params) == 3) { $transfer = explode('@', $params[2]); } $instance->call_func($params[1], $transfer); } catch (\Exception $e) { $this->logRecord('notice', 'Task execution class or method not found! Or call the method to throw an error.' . PHP_EOL . 'Pass Key Parameter: ' . $key . PHP_EOL . 'File location: ' . get_class($this) . PHP_EOL . 'Line: ' . $e->getLine() . PHP_EOL . 'Error Message: ' . $e->getMessage() . PHP_EOL . PHP_EOL); } } /** * 任务状态检查,执行未正常执行的任务 * @author dong.cx 2019-04-02 10:57 */ private function taskStatusCheck() { try { $result = (new TimeLimitTaskLogic())->getNotPerformedTask(); if (!empty($result)) { $this->logRecord('info', 'Find ' . count($result) . ' unprocessed task:'); foreach ($result as $value) { $this->task($value['key']); } } } catch (\Exception $e) { $this->logRecord('notice', 'An exception occurred during task status checking.'); } } /** * 生成订阅消息丢失控制键 * @param boolean $always_output_screen 无论不否在调试模式都输出到屏幕 * * @author dong.cx 2019-04-02 10:58 */ private function subscription_info_loss_control($always_output_screen = false) { try { $this->logRecord('info', 'Generates subscription information loss control keys.', true, $always_output_screen); $success = 0; $error = 0; $redis = new Redis(); for ($i = 1; $i <= $this->subscription_info_loss_control_key_max; $i++) { $redis->setex('#SILCK`' . $i, $i * 900, '') ? $success++ : $error++; } $this->logRecord('info', 'Generates loss control keys: ' . $this->subscription_info_loss_control_key_max . ' total, ' . $success . ' success, ' . $error . ' error', true, $always_output_screen); $redis->close(); } catch (\Exception $e) { $this->logRecord('notice', 'An exception occurs when the subscription information loss control key is created.', true, $always_output_screen); } } }
<?php /** * Description:拍卖倒计时操做 * Created by dong.cx * DateTime: 2019-03-18 10:04 */ namespace wlmis\logic\timeLimitTask; use think\Config; use think\Exception; use wlmis\common\redis\Redis; use wlmis\dao\addons\auction\AuctionGoodsDao; use wlmis\logic\oper\addons\auction\AuctionLogic; use wlmis\logic\timeLimitTask\base\TimeLimitBaseLogic; class AuctionCutDownLogic extends TimeLimitBaseLogic { private $auctionGoodsDao; public function __construct() { parent::__construct(); $this->auctionGoodsDao = new AuctionGoodsDao(); } /** * 拍卖结束, 更新拍品表/保单表 操做 * @param $params * * @author dong.cx 2019-03-18 18:39 */ public function auctionEndCutDown($params) { $auctionId = $params[0]; $auctionLogic = new AuctionLogic(); try { if (!$auctionId || !is_numeric($auctionId)) throw new Exception('Params error'); $goodsInfo = $this->auctionGoodsDao->load($auctionId, 'final_end_time'); if (!$goodsInfo) { $this->logRecord('notice', 'tb_auction_goods主键:' . $auctionId . '不存在'); } else { parent::startTrans(); // 拍卖结束 $result = $auctionLogic->auctionEnded($auctionId); if ($result['code'] == 0) { $this->logRecord('notice', $result['msg']); } // 更改mysql中键的状态为已处理 $this->recording_process_mysql($this->key_splice(__FUNCTION__, [$auctionId])); // 删除 redis 当前价 $redis = new Redis(); $redis->del('auction_gid@' . $auctionId . '@current_bid'); websocket_send($auctionId . 'bid/index', true, 2, '拍卖结束'); } parent::commit(); } catch (Exception $e) { parent::rollback(); $this->throw_message(__FUNCTION__, $e); } } /** * 拍卖交易结束 * 无订单/未付款,不释放保证金 * @param $params * * @author dong.cx 2019-03-18 20:15 */ public function dealCutDown($params) { $auctionId = $params[0]; $auctionLogic = new AuctionLogic(); try { parent::startTrans(); if (!$auctionId || !is_numeric($auctionId)) throw new Exception('Params error'); $goodsInfo = $this->auctionGoodsDao->load($auctionId, 'final_end_time'); if (!$goodsInfo) { $this->logRecord('notice', 'tb_auction_goods主键:' . $auctionId . '不存在'); } elseif (!$goodsInfo['final_end_time']) { $this->logRecord('notice', 'tb_auction_goods主键:' . $auctionId . '的拍品还未结束或最终结束时间为空'); } else { $result = $auctionLogic->checkStatus($auctionId); if ($result['code'] == 0) $this->logRecord('notice', $result['msg']); // 更改mysql中键的状态为已处理 $this->recording_process_mysql($this->key_splice(__FUNCTION__, [$auctionId])); } parent::commit(); } catch (Exception $e) { parent::rollback(); $this->throw_message(__FUNCTION__, $e); } } /** * 建立拍卖结束倒计时任务 * @param $auctionId * @param int $ttl * * @throws Exception * @author dong.cx 2019-04-01 09:49 */ public function auction_end_countdown_create($auctionId, $ttl=0) { return $this->create('auctionEndCutDown', $ttl, [$auctionId]); } /** * 删除拍卖结束倒计时任务 * @param int $auctionId 拍卖商品表主键 * * @return bool|int * @throws Exception * @author dong.cx 2019-04-01 10:08:49 */ public function auction_end_countdown_delete($auctionId) { return $this->del_key('auctionEndCutDown', [$auctionId]); } /** * 建立交易倒计时任务 * @param int $auctionId 拍卖商品表主键 * @param int $ttl 生存时间 * * @throws Exception * 异常代码: * 500 redis操做失败 * @author dong.cx 2019-03-22 15:36 */ public function deal_countdown_create($auctionId, $ttl=0) { $this->create('dealCutDown', $ttl + Config::get('auction_deal_limit_time'), [$auctionId]); } /** * 删除交易倒计时任务 * @param int $auctionId 拍卖商品表主键 * * @return bool|int * @throws Exception * @author dong.cx 2019-03-22 15:36 */ public function deal_countdown_delete($auctionId) { return $this->del_key('countdown', [$auctionId]); } }
<?php /** * Created by dong.cx * Date: 2019/3/27 17:13 * Description: 时间限制任务基类 * 每个子类继承这个基类实现时间任务调度 * 子类中开放给Redis调度的函数设置访问权限为protected,防止外部误触发 * 子类中其余开放给内部调用的访问权限为public * ************************************************ * 存储到Redis中的键名规则为:类名@方法名@参数...(参数可为空,多个参数则以@分隔) key_splice 函数可生成键 * 全部的参数经过一个数组传入方法(一维索引数组,跟存储函数 create 传入参数时同样) * 类名、方法名,尽可能精简,能节约带宽以及Redis查询速度 * 参数设计也尽可能精简,全部操做都在服务端内部完成,因此能用1个条件准确查询数据库的,不要用两个条件查询 * * 存储键直接使用 create 方法,以秒为单位,会自动拼接键键 * 若是以毫秒为单位则 create_ms 方法 * ************************************************ */ namespace wlmis\logic\timeLimitTask\base; use think\Exception; use wlmis\common\redis\Redis; use wlmis\model\sys\TimeLimitTaskModel; use wlmis\logic\BaseLogic; class TimeLimitBaseLogic extends BaseLogic { use LogRecord; /** * Redis链接实例 * @var Redis */ protected $redis; /** * TimeLimitBaseLogic constructor. * @author dong.cx */ public function __construct() { parent::__construct(); $this->redis = new Redis(); } /** * 任务调度入口 * @param string $funcName 调用方法名 * @param array $params 传递参数 * @author: dong.cx */ public function call_func($funcName, $params = array()) { call_user_func(array($this, $funcName), $params); } /** * 键拼接 * 键用 @ 符号做为分隔符,因此方法名、参数中不可出现 * 键名规则中的类名会自动生成 * @param string $funcName 方法名 * @param array $params 参数(必须传入一维索引数组,请勿传入关联数组,按照顺序生成参数,关联数组不保证顺序) * @return string 返回键 * @author: dong.cx */ protected function key_splice($funcName, $params = array()) { $class = explode('\\', get_class($this)); $paramsStr = ''; foreach ($params as $value) { $paramsStr .= '@' . $value; } return $class[count($class) - 1] . '@' . $funcName . $paramsStr; } /** * 向Redis存储键(延时单位秒) * 会自动将参数进行拼接,而后存入Redis * @param string $funcName 调用方法名 * @param int $ttl 延时(秒) * @param array $params 参数(必须传入一维索引数组,请勿传入关联数组,按照顺序生成参数,关联数组不保证顺序) * @throws Exception * ********************* * 异常代码: * 500 redis操做失败 * ********************* * @author: dong.cx */ public function create($funcName, $ttl = 0, $params = array()) { $key = $this->key_splice($funcName, $params); $this->recording_mysql($key, $ttl); if (!($this->redis->setex($key, $ttl, ''))) { throw new Exception('Redis存储失败', 500); } } /** * 向Redis存储键(延时单位毫秒) * 会自动将参数进行拼接,而后存入Redis * @param string $funcName 调用方法名 * @param int $ttl 延时(毫秒) * @param array $params 参数(必须传入一维索引数组,请勿传入关联数组,按照顺序生成参数,关联数组不保证顺序) * @throws Exception * ********************* * 异常代码: * 500 redis操做失败 * ********************* * @author: dong.cx */ public function create_ms($funcName, $ttl = 0, $params = array()) { $key = $this->key_splice($funcName, $params); $this->recording_mysql($key, $ttl, true); if (!($this->redis->psetex($key, $ttl, ''))) { throw new Exception('Redis存储失败', 500); } } /** * 获取指定键的剩余生存时间(秒) * @param string $funcName 任务方法名 * @param array $params 任务参数 * @return bool|int 若是为false,说明Redis链接失败 * 若是为-1,说明改键不是定时键 * 若是为-2,说明键不存在(已消失) * 其余为剩余生存时间(秒) * @throws Exception * @author: dong.cx */ protected function getTTL($funcName, $params = array()) { $key = $this->key_splice($funcName, $params); return $this->redis->ttl($key); } /** * 获取指定键的剩余生存时间(毫秒) * @param string $funcName 任务方法名 * @param array $params 任务参数 * @return bool|int 若是为false,说明Redis链接失败 * 若是为-1,说明改键不是定时键 * 若是为-2,说明键不存在(已消失) * 其余为剩余生存时间(秒) * @throws Exception * @author: dong.cx */ protected function getPTTL($funcName, $params = array()) { $key = $this->key_splice($funcName, $params); return $this->redis->pttl($key); } /** * 删除指定键 * *********************************************** * 删除不会触发事件,用于无用记录的删除 * 如生成支付订单二次提交时删除前面一个未处理任务。 * 通常在设计任务处理流程时须要考虑到无用任务的触发,并进行规避,必要时进行主动删除任务能够减轻服务器负担 * 任务处理流程应该作到无用记录的触发不会影响到系统正常运行 * *********************************************** * @param $funcName * @param array $params * @return bool|int 返回false则Redis实例获取失败,链接不上,返回int则为影响的记录条数 * @throws Exception * @author: dong.cx */ protected function del_key($funcName, $params = array()) { $key = $this->key_splice($funcName, $params); TimeLimitTaskModel::where('key', $key)->update([ 'sts' => 1 ]); return $this->redis->del($key); } /** * 记录键到mysql中, * @param string $key 键 * @param int $ttl 触发时间 * @param bool $mode 当为false时,触发时间为秒,当为true时,触发时间为毫秒 * @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\ModelNotFoundException * @throws \think\exception\DbException * @author: dong.cx */ private function recording_mysql($key, $ttl, $mode = false) { if ($mode) { // 这里说明 TTL 以毫秒为单位 $currentTime = bcmul(microtime(true), '1', 3); $endTime = bcadd($currentTime, bcdiv($ttl, '1000', 3), 3); } else { // 这里说明 TTL 以秒为单位 $currentTime = time(); $endTime = $currentTime + $ttl; } if (TimeLimitTaskModel::field('id')->where('key', $key)->find() !== null) { TimeLimitTaskModel::where('key', $key)->update([ 'status' => 0, 'start_time' => $currentTime, 'end_time' => $endTime ]); } else { TimeLimitTaskModel::create([ 'key' => $key, 'status' => 0, 'start_time' => $currentTime, 'end_time' => $endTime, 'sts' => 0 ]); } } /** * 更改键在mysql中的状态为已处理 * @param $key * @author: dong.cx */ protected function recording_process_mysql($key) { $tlm = new TimeLimitTaskModel(); $tlm->where('key', $key)->update([ 'status' => 1 ]); } /** * 抛出错误信息 * @param string $funcName 出错方法名(__FUNCTION__) * @param \Exception $e 错误信息 * @author: dong.cx */ protected function throw_message($funcName, \Exception $e) { $this->logRecord('error', 'The task logic has made an error:' . PHP_EOL . 'Class:' . get_class($this) . PHP_EOL . 'Method name:' . $funcName . PHP_EOL . 'File:' . $e->getFile() . PHP_EOL . 'Line: ' . $e->getLine() . PHP_EOL . 'Error Message:' . $e->getMessage() . PHP_EOL); } /** * 析构函数 * @author dong.cx */ public function __destruct() { $this->redis->close(); } }
✘ ~/Documents/card253 php tp5cornnew.php TimeLimitTask 【2019-04-08 11:40:02】ThinkPHP Version: 5.0.7 【2019-04-08 11:40:02】Redis host: 127.0.0.1 【2019-04-08 11:40:02】Connection succeeded 【2019-04-08 11:40:02】Generates subscription information loss control keys. 【2019-04-08 11:40:02】Generates loss control keys: 10 total, 10 success, 0 error 【2019-04-08 11:40:02】Start listening
使用:
只须要启动脚本,
在须要的时候,新增任务便可
参考资料:
Redis实践操做之—— keyspace notification(键空间通知)
原文出处:https://www.cnblogs.com/martini-d/p/10675945.html