当前笔记中的内容针对的是 thinkphp-queue 的 v1.1.2 版本,如今官方已经更新到了 v1.1.3 版本, 下文中提到的几个Bug在最新的master分支上均已修复。 笔记中的部份内容还未更新。php
传统的程序执行流程通常是 即时|同步|串行的,在某些场景下,会存在并发低,吞吐量低,响应时间长等问题。在大型系统中,通常会引入消息队列的组件,将流程中部分任务抽离出来放入消息队列,并由专门的消费者做针对性的处理,从而下降系统耦合度,提升系统性能和可用性。html
通常来讲,能够抽离的任务具备如下的特色:laravel
容许延后|异步|并行处理 (相对于传统的 即时|同步|串行 的执行方式)git
容许延后:github
抢购活动时,先快速缓冲有限的参与人数到消息队列,后续再排队处理实际的抢购业务;redis
容许异步:sql
业务处理过程当中的邮件,短信等通知thinkphp
容许并行:shell
用户支付成功以后,邮件通知,微信通知,短信通知能够由多个不一样的消费者并行执行,通知到达的时间不要求前后顺序。数据库
容许失败和重试
thinkphp-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:
thinkphp-queue 内置了 Redis,Database,Topthink ,Sync这四种驱动。本文主要介绍 thinkphp-queue 结合其内置的 redis 驱动的使用方式和基本原理。
注1:如无特殊说明,下文中的 ‘消息’ 和 ‘任务’两个词指代的是同一个概念,即队列中的一个成员。该成员对消息队列而言是其内部保存的消息; 对业务应用而言是一个待执行的任务。请根据语境区分。
注2:本文编写时(2017-02-15)使用的 thinkphp-queue 的版本号是 v1.1.2 。该版本中部分功能并未所有完成,如 subscribe 模式,以及存在几个bug(稍后会说起)。若有变动,请以官方最新版为准。
先经过一段代码,了解一下 thinkphp-queue 的基本使用流程。
目标:
在业务控制器中推送一个新消息到一个名为 ‘helloJobQueue’ 的队列中,该消息中包含咱们自定义的业务数据,而后,编写一个名为 Hello 的消费者类,并经过命令行去调用该消费者类获取这个消息,拿到定义的数据。
composer install thinkphp-queue
使用 Redis [推荐]
安装并启动 Redis 服务
使用数据库 [不推荐]
CREATE TABLE `prefix_jobs` ( `id` int(11) NOT NULL AUTO_INCREMENT, `queue` varchar(255) NOT NULL, `payload` longtext NOT NULL, `attempts` tinyint(3) unsigned NOT NULL, `reserved` tinyint(3) unsigned NOT NULL, `reserved_at` int(10) unsigned DEFAULT NULL, `available_at` int(10) unsigned NOT NULL, `created_at` int(10) unsigned NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
根据选择的存储方式,在 \application\extra\queue.php
这个配置文件中,添加消息队列对应的驱动配置
return [ 'connector' => 'Redis', // Redis 驱动 'expire' => 60, // 任务的过时时间,默认为60秒; 若要禁用,则设置为 null 'default' => 'default', // 默认的队列名称 'host' => '127.0.0.1', // redis 主机ip 'port' => 6379, // redis 端口 'password' => '', // redis 密码 'select' => 0, // 使用哪个 db,默认为 db0 'timeout' => 0, // redis链接的超时时间 'persistent' => false, // 是不是长链接 // 'connector' => 'Database', // 数据库驱动 // 'expire' => 60, // 任务的过时时间,默认为60秒; 若要禁用,则设置为 null // 'default' => 'default', // 默认的队列名称 // 'table' => 'jobs', // 存储消息的表名,不带前缀 // 'dsn' => [], // 'connector' => 'Topthink', // ThinkPHP内部的队列通知服务平台 ,本文不做介绍 // 'token' => '', // 'project_id' => '', // 'protocol' => 'https', // 'host' => 'qns.topthink.com', // 'port' => 443, // 'api_version' => 1, // 'max_retries' => 3, // 'default' => 'default', // 'connector' => 'Sync', // Sync 驱动,该驱动的实际做用是取消消息队列,还原为同步执行 ];
1.3.1 配置文件中的 expire 参数说明
expire 参数指的是任务的过时时间。 过时的任务,其准确的定义是
expire 不为null
时 ,thinkphp-queue 会在每次获取下一个任务以前检查并重发过时(执行超时)的任务。
expire 为null
时,thinkphp-queue 不会检查过时的任务,性能相对较高一点。可是须要注意:
对expire 参数理解或者使用不当时,很容易产生一些bug,后面会举例提到。
咱们在业务控制器中建立一个新的消息,并推送到 helloJobQueue
队列
新增 \application\index\controller\JobTest.php
控制器,在该控制器中添加 actionWithHelloJob
方法
<?php /** * 文件路径: \application\index\controller\JobTest.php * 该控制器的业务代码中借助了thinkphp-queue 库,将一个消息推送到消息队列 */ namespace application\index\controller; use think\Exception; use think\Queue; class JobTest { /** * 一个使用了队列的 action */ public function actionWithHelloJob(){ // 1.当前任务将由哪一个类来负责处理。 // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法 $jobHandlerClassName = 'application\index\job\Hello'; // 2.当前任务归属的队列名称,若是为新队列,会自动建立 $jobQueueName = "helloJobQueue"; // 3.当前任务所需的业务数据 . 不能为 resource 类型,其余类型最终将转化为json形式的字符串 // ( jobData 为对象时,须要在先在此处手动序列化,不然只存储其public属性的键值对) $jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ; // 4.将该任务推送到消息队列,等待对应的消费者去执行 $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false if( $isPushed !== false ){ echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>"; }else{ echo 'Oops, something went wrong.'; } } }
注意: 在这个例子当中,咱们是手动指定的 $jobHandlerClassName
,更合理的作法是先定义好消息名称与消费者类名的映射关系,而后由某个能够获取该映射关系的类来推送这个消息。这样,生产者只须要知道消息的名称,而无需指定哪一个消费者类来处理。
除了
Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
这种方式以外,还能够直接传入Queue::push( $jobHandlerObject ,null , $jobQueueName );
这时,须要在 $jobHandlerObject 中定义一个handle()
方法,消息队列在执行到该任务时会自动反序列化该对象,并调用其handle()
方法。 该方式的缺点是没法传入自定义数据。
编写 Hello 消费者类,用于处理 helloJobQueue
队列中的任务
新增 \application\index\job\Hello.php
消费者类,并编写其 fire()
方法
<?php /** * 文件路径: \application\index\job\Hello.php * 这是一个消费者类,用于处理 helloJobQueue 队列中的任务 */ namespace application\index\job; use think\queue\Job; class Hello { /** * fire方法是消息队列默认调用的方法 * @param Job $job 当前的任务对象 * @param array|mixed $data 发布任务时自定义的数据 */ public function fire(Job $job,$data){ // 若有必要,能够根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行. $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if(!isJobStillNeedToBeDone){ $job->delete(); return; } $isJobDone = $this->doHelloJob($data); if ($isJobDone) { //若是任务执行成功, 记得删除任务 $job->delete(); print("<info>Hello Job has been done and deleted"."</info>\n"); }else{ if ($job->attempts() > 3) { //经过这个方法能够检查这个任务已经重试了几回了 print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n"); $job->delete(); // 也能够从新发布这个任务 //print("<info>Hello Job will be availabe again after 2s."."</info>\n"); //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行 } } } /** * 有些消息在到达消费者时,可能已经再也不须要执行了 * @param array|mixed $data 发布任务时自定义的数据 * @return boolean 任务执行的结果 */ private function checkDatabaseToSeeIfJobNeedToBeDone($data){ return true; } /** * 根据消息中的数据进行实际的业务处理 * @param array|mixed $data 发布任务时自定义的数据 * @return boolean 任务执行的结果 */ private function doHelloJob($data) { // 根据消息中的数据进行实际的业务处理... print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n"); print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n"); print("<info>Hello Job is Done!"."</info> \n"); return true; } }
至此,全部的代码都已准备完毕,在运行消息队列以前,咱们先看一下如今的目录结构:
在浏览器中访问 http://your.project.domain/index/job_test/actionWithHelloJob ,能够看到消息推送成功。
切换当前终端窗口的目录到项目根目录下,执行
php think queue:work --queue helloJobQueue
能够看到执行的结果相似以下:
至此,咱们成功地经历了一个消息的 建立 -> 推送 -> 消费 -> 删除 的基本流程
下文,将介绍 thinkphp-queue 的详细使用方法。如配置介绍,基本原理,各类特殊状况的处理等
queue:subscribe 命令 [截至2017-02-15,做者暂未实现该模式,略过]
queue:work 命令
work 命令: 该命令将启动一个 work 进程来处理消息队列。
php think queue:work --queue helloJobQueue
queue:listen 命令
listen 命令: 该命令将会建立一个 listen 父进程 ,而后由父进程经过 proc_open(‘php think queue:work’)
的方式来建立一个work 子 进程来处理消息队列,且限制该work进程的执行时间。
php think queue:listen --queue helloJobQueue
Work 模式
php think queue:work \
--daemon //是否循环执行,若是不加该参数,则该命令处理完下一个消息就退出
--queue helloJobQueue //要处理的队列的名称
--delay 0 \ //若是本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--force \ //系统处于维护状态时是否仍然处理任务,并未找到相关说明 --memory 128 \ //该进程容许使用的内存上限,以 M 为单位 --sleep 3 \ //若是队列中无任务,则sleep多少秒后从新检查(work+daemon模式)或者退出(listen或非daemon模式) --tries 2 //若是任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
Listen 模式
php think queue:listen \
--queue helloJobQueue \ //监听的队列的名称
--delay 0 \ //若是本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0 --memory 128 \ //该进程容许使用的内存上限,以 M 为单位 --sleep 3 \ //若是队列中无任务,则多长时间后从新检查,daemon模式下有效 --tries 0 \ //若是任务已经超太重发次数上限,则进入失败处理逻辑,默认为0 --timeout 60 //建立的work子进程的容许执行的最长时间,以秒为单位
能够看到 listen 模式下,不包含 --deamon
参数,缘由下面会说明
二者均可以用于处理消息队列中的任务
区别在于:
2.3.1 执行原理不一样
work 命令是单进程的处理模式。
按照是否设置了 --daemon
参数,work命令又可分为单次执行和循环执行两种模式。
--daemon
参数,该模式下,work进程在处理完下一个消息后直接结束当前进程。当不存在新消息时,会sleep一段时间而后退出。--daemon
参数,该模式下,work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当不存在新消息时,会在每次循环中sleep一段时间。listen 命令是 父进程 + 子进程 的处理模式。
listen命令所在的父进程会建立一个单次执行模式的work子进程,并经过该work子进程来处理队列中的下一个消息,当这个work子进程退出以后,listen命令所在的父进程会监听到该子进程的退出信号,并从新建立一个新的单次执行的work子进程
2.3.2 退出时机不一样
--timeout
参数配置,此时work子进程会被强制结束,listen所在的父进程也会抛出一个 ProcessTimeoutException
异常并退出。开发者能够选择捕获该异常,让父进程继续执行,也能够选择经过 supervisor 等监控软件重启一个新的listen命令。--memory
参数配置时,父子进程均会退出。正常状况下,listen进程自己占用的内存是稳定不变的。2.3.3 性能不一样
work 命令是在脚本内部作循环,框架脚本在命令执行的初期就已加载完毕;
而listen模式则是处理完一个任务以后新开一个work进程,此时会从新加载框架脚本。
所以: work 模式的性能会比listen模式高。
注意:当代码有更新时,work 模式下须要手动去执行 php think queue:restart
命令重启队列来使改动生效;而listen 模式会自动生效,无需其余操做。
2.3.4 超时控制能力
work 模式本质上既不能控制进程自身的运行时间,也没法限制执行中的任务的执行时间。
举例来讲,假如你在某次上线以后,在上文中的 \application\index\job\Hello.php
消费者的fire
方法中添加了一段死循环 :
public function fire(){ while(true){ //死循环 $consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n"); sleep(1); } }
那么这个循环将永远不能中止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会有任何的告警。更严重的是,若是你配置了expire ,那么这个死循环的任务可能会污染到一样处理 helloJobQueue
队列的其余work进程,最后好几个work进程将被卡死在这段死循环中。详情后文会说明。
work 模式下的超时控制能力,实际上应该理解为 多个work 进程配合下的过时任务重发能力。
而 listen命令能够限制其建立的work子进程的超时时间。
listen 命令可经过 --timeout
参数限制work子进程容许运行的最长时间,超过该时间限制仍未结束的子进程会被强制结束;
这里有必要补充一下 expire 和 timeout 之间的区别:
expire 在配置文件中设置,timeout 在 listen命令 的命令行参数中设置,并且,expire 和 timeout 是两个不一样层次上的概念:
expire 是指任务的过时时间。这个时间是全局的,影响到全部的work进程。(无论是独立的work命令仍是 listen 模式下建立的的work子进程) 。expire 针对的对象是 任务。
timeout 是指work子进程的超时时间。这个时间只对当前执行的listen 命令有效。timeout 针对的对象是 work子进程。
2.3.5 使用场景不一样
根据上面的介绍,能够看到,
work 命令的适用场景是:
listen命令的适用场景是:
开始一个消息队列:
php think queue:work
中止全部的消息队列:
php think queue:restart
重启全部的消息队列:
php think queue:restart
php think queue:work
多模块
单模块项目推荐使用
app\job
做为任务类的命名空间多模块项目可用使用
app\module\job
做为任务类的命名空间 也能够放在任意能够自动加载到的地方
多任务
若是一个任务类里有多个小任务的话,在发布任务时,须要用
任务的类名@方法名
如app\lib\job\Job2@task1
、app\lib\job\Job2@task2
注意:命令行中的 --queue 参数不支持@解析
多任务例子:
\application\index\controller\JobTest.php
控制器中,添加 actionWithMultiTask()
方法:public function actionWithMultiTask(){ $taskType = $_GET['taskType']; switch ($whichTask) { case 'taskA': $jobHandlerClassName = 'application\index\job\MultiTask@taskA'; $jobDataArr = ['a' => '1']; $jobQueueName = "multiTaskJobQueue"; break; case 'taskB': $jobHandlerClassName = 'application\index\job\MultiTask@taskB'; $jobDataArr = ['b' => '2']; $jobQueueName = "multiTaskJobQueue"; break; default: break; } $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName); if ($isPushed !== false) { echo("the $taskType of MultiTask Job has been Pushed to ".$jobQueueName ."<br>"); }else{ throw new Exception("push a new $taskType of MultiTask Job Failed!"); } }
\application\index\job\MultiTask.php
消费者类,并编写其 taskA()
和 taskB()
方法<?php /** * 文件路径: \application\index\job\MultiTask.php * 这是一个消费者类,用于处理 multiTaskJobQueue 队列中的任务 */ namespace application\index\job; use think\queue\Job; class MultiTask { public function taskA(Job $job,$data){ $isJobDone = $this->_doTaskA($data); if ($isJobDone) { $job->delete(); print("Info: TaskA of Job MultiTask has been done and deleted"."\n"); }else{ if ($job->attempts() > 3) { $job->delete(); } } } public function taskB(Job $job,$data){ $isJobDone = $this->_doTaskA($data); if ($isJobDone) { $job->delete(); print("Info: TaskB of Job MultiTask has been done and deleted"."\n"); }else{ if ($job->attempts() > 2) { $job->release(); } } } private function _doTaskA($data) { print("Info: doing TaskA of Job MultiTask "."\n"); return true; } private function _doTaskB($data) { print("Info: doing TaskB of Job MultiTask "."\n"); return true; }
延迟执行,相对于即时执行,是用来限制某个任务的最先可执行时刻。在到达该时刻以前,该任务会被跳过。
能够利用该功能实现定时任务。
使用方式:
// 即时执行 $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName); // 延迟 2 秒执行 $isPushed = Queue::later( 2, $jobHandlerClassName, $jobDataArr, $jobQueueName); // 延迟到 2017-02-18 01:01:01 时刻执行 $time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now'); $isPushed = Queue::later($time2wait,$jobHandlerClassName, $jobDataArr, $jobQueueName);
// 重发,即时执行 $job->release(); // 重发,延迟 2 秒执行 $job->release(2); // 延迟到 2017-02-18 01:01:01 时刻执行 $time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now'); $job->release($time2wait);
//若是消费者类的fire()方法抛出了异常且任务未被删除时,将自动重发该任务,重发时,会设置其下次执行前延迟多少秒,默认为0
php think queue:work --delay 3
thinkphp-queue 中,消息的重发时机有3种:
if( $isJobDone === false){ $job->release(); }
null
时,work 进程内部每次查询可用任务以前,会先自动重发已过时的任务。补充:
在database 模式下,2.7.1 和 2.7.2 中的重发逻辑是先删除原来的任务,而后插入一个新的任务。2.7.3 中的重发时机是直接更新原任务。
而在redis 模式下,3种重发都是先删除再插入。
无论是哪一种重发方式,重发以后,任务的已尝试次数会在原来的基础上 +1 。
此外,消费者类中须要注意,若是 fire()
方法中可能抛出异常,那么
$job->delete()
,以避免产生bug。fire()
方法中又手动使用 $job->release()
, 这样会致使该任务被重发两次,产生两个同样的新任务。当同时知足如下条件时,将触发任务失败回调:
--tries
参数的值大于0--tries
参数queue_failed
事件标签及其对应的回调代码failed()
方法,用于接收任务失败的通知注意, queue_failed
标签须要在安装了 thinkphp-queue
以后 手动 去 \application\tags.php
文件中添加。
注意:该版本有bug,若想实现失败任务回调功能,须要先修改位于 think-queue\src\queue\Worker.php
中的 logFailedJob
方法 , 修改方式以下:
/** * Log a failed job into storage. * @param \Think\Queue\Job $job * @return array */ protected function logFailedJob(Job $job) { // 将原来的 queue.failed' 修改成 'queue_failed' 才能够触发任务失败回调 if (Hook::listen('queue.failed', $job, null, true)) { $job->delete(); $job->failed(); } return ['job' => $job, 'failed' => true]; }
首先,咱们添加 queue_failed
事件标签, 及其对应的回调方法
// 文件路径: \application\tags.php // 应用行为扩展定义文件 return [ // 应用初始化 'app_init' => [], // 应用开始 'app_begin' => [], // 模块初始化 'module_init' => [], // 操做开始执行 'action_begin' => [], // 视图内容过滤 'view_filter' => [], // 日志写入 'log_write' => [], // 应用结束 'app_end' => [], // 任务失败统一回调,有四种定义方式 'queue_failed'=> [ // 数组形式,[ 'ClassName' , 'methodName'] ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues'] // 字符串(静态方法),'StaicClassName::methodName' // 'MyQueueFailedLogger::logAllFailedQueues' // 字符串(对象方法),'ClassName',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法 // 'application\\behavior\\MyQueueFailedLogger' // 闭包形式 /* function( &$jobObject , $extra){ // var_dump($jobObject); return true; } */ ] ];
这里,咱们选择数组形式的回调方式,新增 \application\behavior\MyQueueFailedLogger
类,添加一个 logAllFailedQueues()
方法
<?php /** * 文件路径: \application\behavior\MyQueueFailedLogger.php * 这是一个行为类,用于处理全部的消息队列中的任务失败回调 */ namespace application\behavior; class MyQueueFailedLogger { const should_run_hook_callback = true; /** * @param $jobObject \think\queue\Job //任务对象,保存了该任务的执行状况和业务数据 * @return bool true //是否须要删除任务并触发其failed() 方法 */ public function logAllFailedQueues(&$jobObject){ $failedJobLog = [ 'jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello' 'queueName' => $jobObject->getQueue(), // 'helloJobQueue' 'jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }' 'attempts' => $jobObject->attempts(), // 3 ]; var_export(json_encode($failedJobLog,true)); // $jobObject->release(); //重发任务 //$jobObject->delete(); //删除任务 //$jobObject->failed(); //通知消费者类任务执行失败 return self::should_run_hook_callback; } }
须要注意该回调方法的返回值:
failed()
方法failed()
方法,须要开发者另行处理失败任务的删除和通知。最后,在消费者类中,添加 failed()
方法
/** * 文件路径: \application\index\job\HelloJob.php */ /** * 该方法用于接收任务执行失败的通知,你能够发送邮件给相应的负责人员 * @param $jobData string|array|... //发布任务时传递的 jobData 数据 */ public function failed($jobData){ send_mail_to_somebody() ; print("Warning: Job failed after max retries. job data is :".var_export($data,true)."\n"; }
这样,就能够作到任务失败的记录与告警
过时这个概念用文字比较难描述清楚,建议先看一下 深刻理解 中 3.4 消息处理的详细流程图
Redis
在 Redis 中,每个 队列 都三个key 与之对应 ,以 helloJobQueue 队列举例,其在redis 中的保存方式为:
key名 | 类型 | 说明 |
---|---|---|
queues:helloJobQueue | List , 列表 | 待执行的任务列表 |
queues:helloJobQueue:delayed | Sorted Set,有序集合 | 延迟执行和定时执行的任务集合 |
queues:helloJobQueue:reserved | Sorted Set,有序集合 | 执行中的任务集合 |
使用的
:
分隔符, 只是用来表示相关key的关联性。自己没有特殊含义。使用分隔符是一种常见的组织key的方式。
其中,在queues:helloJobQueue
列表中,每一个元素的形式以下:
在 queues:helloJobQueue:delayed
和 queues:helloJobQueue:delayed
有序集合中,每一个元素的形式以下:
能够看到,在有序集合中,每一个元素表明一个任务,该元素的 Score 为该任务的入队时间戳,任务的 value 为json 格式,保存了任务的执行状况和业务数据。将value decode 为数组后形式以下:
[
'job' => 'application\\index\\job\\Hello' , // jobHandlerClassName,消费者类的类名 'data' => [ // 生产者传入的业务数据 'time' => '2017-02-18 16:20:10', 'data' => 'I have 648 apples' ], 'id' => '77IasdasadIasdadadadKL8t', // 一个随机的32位字符串 'attempts' => 2 // 任务的已尝试次数 ]
redis驱动下,为了实现任务的延迟执行和过时重发,任务将在这三个key中来回转移,详情可见 3.5
Database
在 Database 中,每一个任务对应到表中的一行,queue 字段用来区分不一样的队列。
表的字段结构以下:
其中,payLoad 字段保存了消息的执行者和业务数据,payLoad 字段采用 json 格式的字符串来保存消息,将其 decode 为数组后形式以下:
[
'job' => 'application\\index\\job\\Hello', // jobHandlerClassName,消费者类的类名 'data' => string|array|integer|object // 生产者传入的业务数据 ]
这些类构成了消息队列中的几个角色:
角色 | 类名 | 说明 |
---|---|---|
命令行 | Command + Worker | 负责解析命令行参数,控制队列的启动,重启 |
驱动 | Queue + Connector | 负责队列的建立,以及消息的入队,出队等操做 |
任务 | Job | 用于将消息转化为一个任务对象,供消费者使用 |
生产者 | 业务代码 | 负责消息的建立与发布 |
消费者 | 业务代码 | 负责任务的接收与执行 |
各个类之间的关系图以下:
下图中,展现了database 模式下消息处理的详细流程,redis 驱动下大致相似
在redis驱动下,为了实现任务的延迟执行和过时重发,任务将在这三个key中来回转移。
在3.4 Database模式下消息处理的消息流程中,咱们知道,若是配置的expire 不是null ,那么 thinkphp-queue的work进程每次在获取下一个可执行任务以前,会先尝试重发全部过时的任务。而在redis驱动下,这个步骤则作了更多的事情,详情以下:
queue:xxx:delayed
的key中查询出有哪些任务在当前时刻已经能够开始执行,而后将这些任务转移到 queue:xxx
的key的尾部。queue:xxx:reserved
的key中查询出有哪些任务在当前时刻已通过期,而后将这些任务转移到 queue:xxx
的key的尾部。queue:xxx
的key的头部取出一个任务,若是取出成功,那么,将这个任务转移到 queue:xxx:reserved
的key 的头部,同时将这个任务实例化成任务对象,交给消费者去执行。用图来表示这个步骤的具体过程以下:
redis队列中的过时任务重发步骤--执行前:
redis队列中的过时任务重发步骤--执行后:
测试环境 :
虚拟机 Ubuntu 16.04 , PHP 7.1 ,TP5,Redis 3.2 , 双核 I5 6400,3G 内存
测试方式 :
使用 Redis 驱动,在一个控制器中循环推送 40000 条消息到消息队列;
使用php think queue:work --daemon
去消费这些消息,计算推送和消费各自所耗的时间。
测试结果 :
在最简单的逻辑下,平均每秒中可推送8000个消息,平均每秒可消费200个消息。
注意:因为在测试时,Host 机自己的cpu和内存长期100%,而且虚拟机中的各项服务并未专门调优,所以该测试结果并不具有参考性。
3.7.1 在 消费者类的 fire()
方法中,忘记使用 $job->delete()
去删除消息,这种状况下,会产生一系列的bug:
配置的 expire 为 null
, 则该任务被执行一次后会永远留在消息队列中,占用消息队列的空间 , 除非开发者另行处理。
配置的 expire 不为 null
,该任务在 expire 秒后被认为是过时任务,并被消息队列还原为待执行状态,在消息队列的后面的循环中继续被获取,这时,若是
--tries
参数为0 或者未设置,那么每隔 一段时间该任务就会被执行一次。--tries
参数 n 大于0 , 那么当这个任务被误执行的次数超过n 时,会由消息队列尝试去触发失败回调事件:
所以,在 使用 thinkphp-queue 时,请记得:
$job->delete()
删除任务fire()
方法中,使用 $job->attempt()
检查任务已执行次数,对于次数异常的,做相应的处理。fire()
方法中,根据业务数据来判断该任务是否已经执行过,以免该任务被重复执行。3.7.2 使用了 queue:work --daemon
,但更新代码后没有使用 queue:restart
重启 work 进程, 使得 work 进程中的代码与最新的代码不一样,出现各类问题。
3.7.3 使用了 queue:work --daemon
,可是消费者类的 fire() 方法中存在死循环,或 sleep(n)
等逻辑,致使消息队列被堵塞;或者使用了 exit()
, die()
这样的逻辑,致使work进程直接终止 。
3.7.4 配置的 expire 为 null
,这时若是采用的是 Redis 驱动且使用了延迟功能,如 later(n)
, release(n)
方法或者 --delay
参数不为0 , 那么将致使被延迟的任务永远没法处理。(这个可能属于框架的Bug)
3.7.5 配置的 expire 为null
,但并无自行处理过时的任务,致使过时的任务得不处处理,且一直占用消息队列的空间。
3.7.6 配置的 expire 不为null
,但配置的 expire 时间过短,以致于 expire 时间 < 消费者的 fire()
方法所需时间 + 删除该任务所需的时间 ,那么任务将被误认为执行超时,从而被消息队列还原为待执行状态。
3.7.7 使用 Queue::push($jobHandlerClassName , $jobData, $jobQueueName );
推送任务时,$jobData
中包含未序列化的对象。这时,在消费者端拿到的 $jobData
中拿到的是该对象的public 属性的键值对数组。所以,须要在推送前手动序列化对象,在消费者端再手动反序列化还原为对象。
//TBD
//TBD
TP5的消息队列与Laravel的消息队列比较类似,下面是与laravel 中的消息队列的一些对比:
thinkphp-queue (v1.1.2) | laravel-queue (v5.3) | |
---|---|---|
内置的驱动 | Database,Redis,Sync,TopThink | Database,Redis, Sync(在laravel中称为 null)。 |
Redis驱动要求 | 安装redis的C扩展 | 安装 predis 包 + LUA脚本 |
推送任务 | 容许推送 消费者类名,消费者对象 | 容许推送消费者类名,消费者对象,闭包 |
失败任务处理 | 触发失败回调事件 (有Bug) | 触发失败回调事件 + 移动任务到 failed_jobs表? |
消息订阅 | subscribe 命令+ Topthink驱动(注:未实现/未提供) | subscribe 命令 + 安装IronMQ 驱动 |
删除任务 | 消费者类中手动删除 | 任务完成后自动删除 |
推送到多个队列 | 需本身实现 | 原生支持 |
延迟执行 | 支持 (有Bug) | 支持 |
消息重发 | 支持 | 支持 |
检查已执行次数 | 原生支持 | 需在消费者类中显式 use 相关的 trait |
执行方式 | work 模式 + listen 模式 | work 模式 + listen 模式 |
进程命令 | 开启,中止,重启 | 开启,中止,重启 |
任务命令 | 无 | 展现失败任务列表,重试某个失败任务,删除某个失败任务 |
支持的事件 | 失败回调事件 | 失败回调事件,支持消费前事件,消 |