转载:http://netstu.5iunix.net/archives/201305-835/php
最近的作一个短信群发的项目,须要用到消息队列。所以开始了我对消息队列选型的漫长路.git
为何选型会纠结呢,直接使用ActiveMQ,RabittMQ,Gearman等流行的消息队列不就能够了吗? 在这个项目中,只有单台服务器,并且我采用了redis来作系统缓存,同时开启了php apc来缓存phalcon model Metadata.若是再开启其余进程,须要很合理的分配各个应用的资源,若是分配很差,极有可能产生浪费,同时还有可能形成应用性能低下.github
所以,为了实现项目结构的简单化,同时简化维护,我就打算用Redis来实现,但短信群发过程当中,必需要有优先级别的,直接使用Redis的 List显然不是太合适,本着这一打算,我还专门google了一下,发现有人使用zset来模拟FIFO,实现LIST中的lpush,rpop操做。web
经过Google发现有一个基于Redis的比较不错的消息队列 Resque https://github.com/resque/resque 而此消息队列则是使用ruby语言实现的.我更不想一个项目中使用多种语言了,幸亏,立刻又发现了基于resque的PHP实现, php-resque https://github.com/chrisboulton/php-resqueredis
Resque还包括不少其余插件以及其余语言的实现,详细列表请看官方列表 https://github.com/resque/resque/wiki/Alternate-Implementationsjson
php-resque自己并无包括查看消息队列status的用户界面,但咱们能够经过命令行查看,同时因为php-resque的消息队列实现和resque中命名一致,能够安装ruby的web界面来进行查看.bootstrap
我一直使用的是phpredis扩展. php-resque使用Redis Client是 Credis, https://github.com/colinmollenhour/credis ,而Credis的API彻底是基于phpredis来封装的,即API同样,前者是C扩展,后者是PHP实现.数组
促使我最终决定使用Redis及PHP-Resque的一个最要缘由是PHP-Resque内置可同时开启多个进程,对于海量数据下发来讲很是必要,同时不须要本身再写代码实现多进程或多线程了.缓存
下面具体介绍一下PHP-Resque的安装与使用.ruby
PHP-Resque安装
1.环境需求
2.安装Composer
$ curl -sS https://getcomposer.org/installer | php $ mv composer.phar /usr/local/bin/composer
3.安装PHP-Resque
git clone git://github.com/chrisboulton/php-resque.git cd php-resque composer install
php-resque的队列实现
全部的消息队列都差很少,有生产者,消费者,还有Task Server.只不过名称不一样而已.
生产者:接受用户提交信息,组成消息队列中的一个任务,并将其加入消息队列服务中.
消费者:通常为一个守护进程,从消息队列服务器中取出任务,对任务作出相应的处理.PHP-Resque中的消息者为守护进程+Job Class .即由守护进程从消息队列中取出任务,具体的任务中在任务的生产过程当中已经指定了由哪一个Job Class来作具体的处理.
Task Server: 消息队列服务器,本处指redis服务.
具体实现
下载安装PHP-Resque后,在php-resque/demo目录中,有相似以下几个文件(我修改测试过,和原有目录不太同样了):
[root@myfc demo]# ll total 52 -rw-r--r-- 1 root root 113 Apr 17 17:48 bad_job.php -rw-r--r-- 1 root root 494 Apr 17 17:48 check_status.php -rw-r--r-- 1 root root 1629 Apr 28 15:43 cli-bootstrap.php -rw-r--r-- 1 root root 396 Mar 25 10:27 config.ini -rw-r--r-- 1 root root 6 Apr 18 11:26 index.php -rw-r--r-- 1 root root 680 Apr 17 17:48 init.php -rw-r--r-- 1 root root 199 Apr 18 13:13 job.php -rw-r--r-- 1 root root 78 Apr 17 17:48 long_job.php -rw-r--r-- 1 root root 325 Apr 28 15:51 MtSend_Job.php -rw-r--r-- 1 root root 97 Apr 18 12:05 php_error_jobFF.php -rw-r--r-- 1 root root 382 Apr 18 11:43 queue.php -rw-r--r-- 1 root root 160 Apr 28 18:09 resque.php -rw-r--r-- 1 root root 80 Apr 18 13:22 Test_Job.php
生成任务(建立一个Job)
// Required if redis is located elsewhere Resque::setBackend('localhost:6379'); $args = array( 'name' => 'Chris' ); Resque::enqueue('default', 'My_Job', $args);
其中第一行为链接redis服务
第二行的数组是任务的内容,可随意编写.经过Resque::enqueue 序列化后加入到消息队列中.
最后一行的第一个参数表示 消息队列的名称(可随意标记,好比 email,log等),第二个参数表示取出任务后,由My_Job这个类来处理此条任务.
同时须要说明的是: 在PHP-Resque中也没有数字标识的优化级,但能够经过消息队列名称来实现优化级,具体下面会讲到.
定义Job Class
class My_Job { public function setUp() { // ... Set up environment for this job } public function perform() { // .. Run job } public function tearDown() { // ... Remove environment for this job } }
从上面的代码中能够看出,这是一个很是简单的PHP类文件, 方法perform是必须的,主要用来处理任务.其余两个方法无关紧要,根据你本身的应用环境看是否须要.
Demo目录中的job.php (修改过)
<?php class PHP_Job { public function perform() { sleep(5); // fwrite(STDOUT, 'Hello!'); $p = $this->args['name']; echo 'ttime '.$p; // print_r(json_decode($p)); echo "string"; } } ?>
从上面的例子能够看出,若是想在Job Class中读取任务的具体内容,可直接使用 $this->args['name']来进行读取.按上面的例子来讲,它将输出 Chris
测试消息队列
(1)建立任务
[root@myfc demo]# cd php-resque/demo
[root@myfc demo]# php queue.php PHP_Job
Queued job bcd1cb6f39d72b9b786716f81b757370
queue.php代码以下:
<?php if(empty($argv[1])) { die('Specify the name of a job to add. e.g, php queue.php PHP_Job'); } require __DIR__ . '/init.php'; date_default_timezone_set('GMT'); Resque::setBackend('127.0.0.1:6379'); $args = array( 'time' => time(), 'array' => array( 'test' => 'test', ), ); $jobId = Resque::enqueue('default', $argv[1], $args, true); echo "Queued job ".$jobId."\n\n";
此时查看Redis,会发现多了三条数据
resque:queues, default resque:queue:default, {"class":"PHP_Job","args":[{"time":1368167779,"array":{"test":"test"}}],"id":"875234d18af92212a7d60056daeae910"} resque:job:875234d18af92212a7d60056daeae910:status, {"status":1,"updated":1368167779,"started":1368167779}
注意: “,”逗号前是键名,后面是键值
在开启守护进程之前,请确保PHP_Job存在,PHP_Job为PHP类名,而非PHP文件名. 若是PHP_Job不存在,则此条任务会处理失败.(由于根本不存在处理它的文件)
(2)处理任务
在demo目录中,守护进程是由目录下的resque.php来进行,代码以下:
<?php date_default_timezone_set('GMT'); require 'bad_job.php'; require 'job.php'; require 'long_job.php'; require 'MtSend_Job.php'; require '../bin/resque';
Job Class须要放到此文件头部引入,固然也可使用其余方式引入. 上面代码中job.php便是类PHP_Job文件
开启守护进程参数说明:
在上面的代码中,最后一行引入的../bin/resque 其实也是一个php cli程序,代码以下:
#!/usr/bin/env php <?php // Find and initialize Composer $files = array( __DIR__ . '/../../vendor/autoload.php', __DIR__ . '/../../../autoload.php', __DIR__ . '/../../../../autoload.php', __DIR__ . '/../vendor/autoload.php', ); $found = false; foreach ($files as $file) { if (file_exists($file)) { require_once $file; break; } } if (!class_exists('Composer\Autoload\ClassLoader', false)) { die( 'You need to set up the project dependencies using the following commands:' . PHP_EOL . 'curl -s http://getcomposer.org/installer | php' . PHP_EOL . 'php composer.phar install' . PHP_EOL ); } $QUEUE = getenv('QUEUE'); if(empty($QUEUE)) { die("Set QUEUE env var containing the list of queues to work.\n"); } $REDIS_BACKEND = getenv('REDIS_BACKEND'); $REDIS_BACKEND_DB = getenv('REDIS_BACKEND_DB'); if(!empty($REDIS_BACKEND)) { if (empty($REDIS_BACKEND_DB)) Resque::setBackend($REDIS_BACKEND); else Resque::setBackend($REDIS_BACKEND, $REDIS_BACKEND_DB); } $logLevel = 0; $LOGGING = getenv('LOGGING'); $VERBOSE = getenv('VERBOSE'); $VVERBOSE = getenv('VVERBOSE'); if(!empty($LOGGING) || !empty($VERBOSE)) { $logLevel = Resque_Worker::LOG_NORMAL; } else if(!empty($VVERBOSE)) { $logLevel = Resque_Worker::LOG_VERBOSE; } $APP_INCLUDE = getenv('APP_INCLUDE'); if($APP_INCLUDE) { if(!file_exists($APP_INCLUDE)) { die('APP_INCLUDE ('.$APP_INCLUDE.") does not exist.\n"); } require_once $APP_INCLUDE; } $interval = 5; $INTERVAL = getenv('INTERVAL'); if(!empty($INTERVAL)) { $interval = $INTERVAL; } $count = 1; $COUNT = getenv('COUNT'); if(!empty($COUNT) && $COUNT > 1) { $count = $COUNT; } $PREFIX = getenv('PREFIX'); if(!empty($PREFIX)) { fwrite(STDOUT, '*** Prefix set to '.$PREFIX."\n"); Resque_Redis::prefix($PREFIX); } if($count > 1) { for($i = 0; $i < $count; ++$i) { $pid = Resque::fork(); if($pid == -1) { die("Could not fork worker ".$i."\n"); } // Child, start the worker else if(!$pid) { $queues = explode(',', $QUEUE); $worker = new Resque_Worker($queues); $worker->logLevel = $logLevel; fwrite(STDOUT, '*** Starting worker '.$worker."\n"); $worker->work($interval); break; } } } // Start a single worker else { $queues = explode(',', $QUEUE); $worker = new Resque_Worker($queues); $worker->logLevel = $logLevel; $PIDFILE = getenv('PIDFILE'); if ($PIDFILE) { file_put_contents($PIDFILE, getmypid()) or die('Could not write PID information to ' . $PIDFILE); } fwrite(STDOUT, '*** Starting worker '.$worker."\n"); $worker->work($interval); }
从上面的代码能够看出,在命令行启动守护进程时,能够传递一些环境变量.而 ../bin/resque经过getenv函数来获取环境变量.
支持的环境变量有:
========================================================================================================
QUEUE – 這个是必要的,会決定 worker 要執行什麼任務,重要的在前,例如 QUEUE=default,notify,mail,log 。也能够設定為 QUEUE=* 表示執行全部任務。
APP_INCLUDE – 这也能够说是必要的,因為 Resque 的 Job 都是写成PHP类,那 worker 執行的時候當然要把物件的檔案引入進來。能够設成 APP_INCLUDE=require.php 再在 require.php 中引入全部 Job 的 Class 文件便可。
COUNT – 設定 worker 數量,預設是1 COUNT=5 。
REDIS_BACKEND – 設定 Redis 的 ip, port。若是沒設定,預設是連 localhost:6379 。
LOGGING, VERBOSE – 設定 log, VERBOSE=1 便可。
VVERBOSE – 比較詳細的 log, VVERBOSE=1 debug 的時候能够開出來看。
INTERVAL – worker 檢查 queue 的間隔,預設是五秒 INTERVAL=5 。
PIDFILE – 若是你是開單 worker,能够指定 PIDFILE 把 pid 寫入,例如 PIDFILE=/var/run/resque.pid 。
=============================================================================
启动守护进程:
QUEUE=* APP_INCLUDE=require.php COUNT=5 VVERBOSE=1 php resque.php
QUEUE=* 代表处理全部的消息队列,此时则没有优化级别.若是须要优化级别,能够按前后顺序列出,如 QUEUE=mail1,mail2,mail3
APP_INCLUDE=require.php 文件中,能够引入其余的一些Job Class或其余辅助类,固然也能够直接在 demo/resque.php中引入
启动守护进程后:
[root@myfc demo]# QUEUE=* COUNT=5 VVERBOSE=1 php resque.php #!/usr/bin/env php *** Starting worker myfc:22143:* *** Starting worker myfc:22144:* ** [07:25:09 2013-05-10] Registered signals *** Starting worker myfc:22145:* ** [07:25:09 2013-05-10] Registered signals ** [07:25:09 2013-05-10] Registered signals *** Starting worker myfc:22146:* ** [07:25:09 2013-05-10] Registered signals *** Starting worker myfc:22148:* ** [07:25:09 2013-05-10] Registered signals [root@myfc demo]# ** [07:25:09 2013-05-10] Checking default ** [07:25:09 2013-05-10] Found job on default ** [07:25:09 2013-05-10] got (Job{default} | ID: 875234d18af92212a7d60056daeae910 | PHP_Job | [{"time":1368167779,"array":{"test":"test"}}]) ** [07:25:09 2013-05-10] Forked 22163 at 2013-05-10 07:25:09 ** [07:25:09 2013-05-10] Processing default since 2013-05-10 07:25:09 ** [07:25:09 2013-05-10] Checking default ** [07:25:09 2013-05-10] Sleeping for 5 ** [07:25:09 2013-05-10] Checking default ** [07:25:09 2013-05-10] Sleeping for 5 ** [07:25:09 2013-05-10] Checking default ** [07:25:09 2013-05-10] Sleeping for 5 ** [07:25:09 2013-05-10] Checking default ** [07:25:09 2013-05-10] Sleeping for 5 ttime 1368167779string** [07:25:14 2013-05-10] done (Job{default} | ID: 875234d18af92212a7d60056daeae910 | PHP_Job | [{"time":1368167779,"array":{"test":"test"}}]) ** [07:26:29 2013-05-10] Checking default ** [07:26:29 2013-05-10] Checking default ** [07:26:29 2013-05-10] Sleeping for 5 ** [07:26:29 2013-05-10] Sleeping for 5 ** [07:26:34 2013-05-10] Checking default ** [07:26:34 2013-05-10] Checking default ** [07:26:34 2013-05-10] Sleeping for 5 ** [07:26:34 2013-05-10] Sleeping for 5 ** [07:26:34 2013-05-10] Checking default ** [07:26:34 2013-05-10] Sleeping for 5 ** [07:26:34 2013-05-10] Checking default ** [07:26:34 2013-05-10] Checking default ** [07:26:34 2013-05-10] Sleeping for 5 ** [07:26:34 2013-05-10] Sleeping for 5 ** [07:26:39 2013-05-10] Checking default ** [07:26:39 2013-05-10] Checking default ** [07:26:39 2013-05-10] Sleeping for 5 ** [07:26:39 2013-05-10] Sleeping for 5 ** [07:26:39 2013-05-10] Checking default ** [07:26:39 2013-05-10] Sleeping for 5 ** [07:26:39 2013-05-10] Checking default ** [07:26:39 2013-05-10] Checking default ** [07:26:39 2013-05-10] Sleeping for 5 ** [07:26:39 2013-05-10] Sleeping for 5
能够看出,已经成功取出任务,并使用PHP_Job处理.而后守护进程一直在运行,等待处理新的任务
此时看一下进程文件:
[root@myfc sop]# ps -aux|grep php root 22143 0.0 0.0 310700 11496 pts/2 S 15:25 0:00 php resque.php root 22144 0.0 0.0 310700 11580 pts/2 S 15:25 0:00 php resque.php root 22145 0.0 0.0 310700 11496 pts/2 S 15:25 0:00 php resque.php root 22146 0.0 0.0 310700 11496 pts/2 S 15:25 0:00 php resque.php root 22148 0.0 0.0 310700 11496 pts/2 S 15:25 0:00 php resque.php
查看任务的状态
在生成任务的时候,使用 $token = Resque::enqueue('default', 'My_Job', $args, true); 生成一个任务,第四个参数设置为true,会返回任务的JobID,经过JobID能够查看任务的状态.
$status = new Resque_Job_Status($token); echo $status->get(); // Outputs the status
在类文件Resque_Job_Status中定义了全部类型
任务状态
Resque_Job_Status::STATUS_WAITING - 等待被取出处理 Resque_Job_Status::STATUS_RUNNING - 任务正在被处理 Resque_Job_Status::STATUS_FAILED - 任务处理失败 Resque_Job_Status::STATUS_COMPLETE - 任务处理完成
事件触发接口 暂时没用到,可查看官方文档
原本想把选型,使用,以及整合Phalcon写到一篇文章中,看来太长,仍是分开吧.抽空把集成到Phalcon以及消息队列实际应用写一下.
参考文档:
http://avnpc.com/pages/run-background-task-by-php-resque
http://blog.hsatac.net/2012/01/php-resque-introduction/
http://kamisama.me/2012/10/12/background-jobs-with-php-and-resque-part-4-managing-worker/
https://github.com/kamisama/Fresque
http://stackoverflow.com/questions/11814445/what-is-the-proper-way-to-setup-and-use-php-resque