基于redis的 sorted set + hash,实现定时执行任务的Demophp
sorted set 介绍:git
思路:github
php实现代码DEMOweb
<?php class DoTest { private const YIELD_KEY = 'yield:list'; private const YIELD_DATA_KEY = 'yield:data'; public function run() { $bbj = RedisClient::instance()->bbj(); //获取排序(低到高)中第一个task_id $data = $bbj->zRange(self::YIELD_KEY, 0, 0, true); if (empty($data)) { echo "无数据" . PHP_EOL; return null; } $mem = array_keys($data)[0]; $ts = array_values($data)[0]; $now = time(); //校验是否到时 if ($ts > $now) { echo "还未到时间,无需操做" . PHP_EOL;; return null; } //移除集合,多进程并执行到时任务(只能被成功移除一次) $row = $bbj->zRem(self::YIELD_KEY, $mem); if (empty($row)) { echo "已经被剔除" . PHP_EOL;; return false; } //获取当前要执行的任务数据JSON $dataJson = $bbj->hGet(self::YIELD_DATA_KEY, $mem); //todo 执行定时任务业务逻辑 var_export($data); var_export($dataJson); //使用完后删除任务数据JSON $bbj->hdel(self::YIELD_DATA_KEY, $mem); return true; } public function add(int $time, $i, string $content) { $data = [ 'msg' => $content . $time ]; $bbj = RedisClient::instance()->bbj(); $dataJson = json_encode($data); $taskId = $time . '_' . $i; $isSc = $bbj->zAdd(self::YIELD_KEY, $time, $taskId); if ($isSc) $isSc = $bbj->hSet(self::YIELD_DATA_KEY, $taskId, $dataJson); var_export($isSc); } public function addFeature($i) { $time = Carbon::now()->addMinute()->timestamp; $this->add($time, $i, '将来执行内容'); } public function addCurrent($i) { $time = time(); $this->add($time, $i, '立刻执行内容'); } /** * 取消定时任务,根据任务ID * @param $taskId */ public function removeYield($taskId) { $bbj = RedisClient::instance()->bbj(); $bbj->zRem(self::YIELD_KEY, $taskId); $bbj->hDel(self::YIELD_DATA_KEY, $taskId); } }
监控定时任务队列redis
<?php $dt = new DoTest(); while (true) { $rt = $dt->run(); if (is_null($rt)) { sleep(1); } }
添加当前执行和将来执行任务json
<?php $dt = new DoTest(); while (true) { for ($i = 0; $i < 100000; $i++) { // 添加当即执行当前任务 $dt->addCurrent($i); // 添加待执行将来任务 $dt->addFeature($i); } }
取消定时任务异步
<?php $dt = new DoTest(); $dt->removeYield('taskId');
首发于Github🌈大话WEB开发,欢迎Star 🥰this