在第一篇教程中,咱们已经写了一个从已知队列中发送和获取消息的程序。在这篇教程中,咱们将建立一个工做队列(Work Queue),它会发送一些耗时的任务给多个工做者(Works )。php
工做队列(又称:任务队列——Task Queues)是为了不等待一些占用大量资源、时间的操做。当咱们把任务(Task)看成消息发送到队列中,一个运行在后台的工做者(worker)进程就会取出任务而后处理。当你运行多个工做者(workers),任务就会在它们之间共享。web
这个概念在网络应用中是很是有用的,它能够在短暂的HTTP请求中处理一些复杂的任务。shell
以前的教程中,咱们发送了一个包含“Hello World!”的字符串消息。如今,咱们将发送一些字符串,把这些字符串看成复杂的任务。咱们没有真是的例子,例如图片缩放、pdf文件转换。因此使用 sleep()函数来模拟这种状况。咱们在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。比 如”Hello…”就会耗时3秒钟。缓存
咱们对以前教程的send.php作些简单的调整,以即可以发送随意的消息。这个程序会按照计划发送任务到咱们的工做队列中。咱们把它命名为new_task.php:网络
$message = empty($argv[1]) ? 'Hello World!' : ' '.$argv[1]; $exchange->publish($message, $routeKey); var_dump("[x] Sent $message");
咱们的旧脚本(receive.php)一样须要作一些改动:它须要为消息体中每个点号(.)模拟1秒钟的操做。它会从队列中获取消息并执行,咱们把它命名为worker.php:函数
function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); sleep(substr_count($msg,'.')); $queue->ack($envelope->getDeliveryTag()); }
使用工做队列的一个好处就是它可以并行的处理队列。若是堆积了不少任务,咱们只须要添加更多的工做者(workers)就能够了,扩展很简单。学习
首先,咱们先同时运行两个worker.php脚本,它们都会从队列中获取消息,究竟是不是这样呢?咱们看看。fetch
你须要打开三个终端,两个用来运行worker.php脚本,这两个终端就是咱们的两个消费者(consumers)—— C1 和 C2。spa
shell1code
$php worker.php [*] Waiting for messages. To exit press CTRL+C
shell2
$ php worker.php [*] Waiting for messages. To exit press CTRL+C
第三个终端,咱们用来发布新任务。你能够发送一些消息给消费者(consumers):
shell3
$ php new_task.php First message.
shell3
$ php new_task.php Second message..
shell3
$ php new_task.php Third message...
shell3
$ php new_task.php Fourth message....
shell3
$ php new_task.php Fifth message.....
看看到底发送了什么给咱们的工做者(workers):
shell1
$ php worker.php [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
shell2
$ php worker.php [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
默认来讲,RabbitMQ会按顺序得把消息发送给每一个消费者(consumer)。平均每一个消费者都会收到同等数量得消息。这种发送消息得方式叫作——轮询(round-robin)。试着添加三个或更多得工做者(workers)。
当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给 消费者(consumers)以后,立刻就会在内存中移除。这种状况,你只要把一个工做者(worker)中止,正在处理的消息就会丢失。同时,全部发送 到这个工做者的尚未处理的消息都会丢失。
咱们不想丢失任何任务消息。若是一个工做者(worker)挂掉了,咱们但愿任务会从新发送给其余的工做者(worker)。
为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会经过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,而后RabbitMQ就会释放并删除这条消息。
若是消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被彻底处理,而后从新发送给其余消费者(consumer)。这样,及时工做者(workers)偶尔的挂掉,也不会丢失消息。
消息是没有超时这个概念的;当工做者与它断开连的时候,RabbitMQ会从新发送消息。这样在处理一个耗时很是长的消息任务的时候就不会出问题了。
消息是没有超时这个概念的;当工做者与它断开连的时候,RabbitMQ会从新发送消息。这样在处理一个耗时很是长的消息任务的时候就不会出问题了。 以前的例子中咱们使用$queue->ack()。当工做者(worker)完成了任务,就发送一个响应。
function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); sleep(substr_count($msg,'.')); $queue->ack($envelope->getDeliveryTag()); } $queue->consume('callback');
运行上面的代码,咱们发现即便使用CTRL+C杀掉了一个工做者(worker)进程,消息也不会丢失。当工做者(worker)挂掉这后,全部没有响应的消息都会从新发送。
忘了响应
一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出以后就会从新发送,若是它不可以释放没响应的消息,RabbitMQ就会占用愈来愈多的内存。
为了排除这种错误,你可使用rabbitmqctl命令,输出messages_unacknowledged字段:
``` $ sudo rabbitmqctl listqueues name messagesready messages_unacknowledged Listing queues ... hello 0 0 ...done.
```
若是你没有特地告诉RabbitMQ,那么在它退出或者崩溃的时候,它将会流失全部的队列和消息。为了确保信息不会丢失,有两个事情是须要注意的:咱们必须把“队列”和“消息”设为持久化。
首先,为了避免让队列丢失,须要把它声明为持久化(durable):
$queue->setFlags(AMQP_DURABLE);
尽管这行代码自己是正确的,可是仍然不会正确运行。由于咱们已经定义过一个叫hello的非持久化队列。RabbitMq不容许你使用不一样的参数从新定义一个队列,它会返回一个错误。但咱们如今使用一个快捷的解决方法——用不一样的名字,例如task_queue。
$queue->setName('task_queue'); $queue->setFlags(AMQP_DURABLE); $queue->declare();
这个$queue->declare();必须在生产者(producer)和消费者(consumer)对应的代码中修改。
这时候,咱们就能够确保在RabbitMq重启以后queue_declare队列不会丢失。
注意:消息持久化
将消息设为持久化并不能彻底保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间仍是有一 个很小的间隔时间。由于RabbitMq并非全部的消息都使用fsync(2)——它有可能只是保存到缓存中,并不必定会写到硬盘中。并不能保证真正的 持久化,但已经足够应付咱们的简单工做队列。若是你必定要保证持久化,你须要改写你的代码来支持事务(transaction)。
你应该已经发现,它仍旧没有按照咱们指望的那样进行分发。好比有两个工做者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。
这时由于RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有做出响应。它盲目的把第n-th条消息发给第n-th个消费者。
咱们可使用$channel->qos();方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工做者(worker),直到它已经处理了上一条消息而且做出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工做者(worker)。
$channel->qos(0,1);
关于队列大小
若是全部的工做者都处理繁忙状态,你的队列就会被填满。你须要留意这个问题,要么添加更多的工做者(workers),要么使用其余策略。
new_task.py的完整代码:
<?php /** * PHP amqp(RabbitMQ) Demo-2 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'task_queue'; $routeKey = 'task_queue'; $message = empty($argv[1]) ? 'Hello World!' : ' '.$argv[1]; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declare(); $exchange->publish($message, $routeKey); var_dump("[x] Sent $message"); $connection->disconnect();
咱们的worker:
<?php /** * PHP amqp(RabbitMQ) Demo-2 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'task_queue'; $routeKey = 'task_queue'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declare(); $queue->bind($exchangeName, $routeKey); var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); $channel->qos(0,1); } $connection->disconnect(); function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); sleep(substr_count($msg,'.')); $queue->ack($envelope->getDeliveryTag()); }
使用消息响应和prefetch_count你就能够搭建起一个工做队列了。这些持久化的选项使得在RabbitMQ重启以后仍然可以恢复。
如今咱们能够移步教程3学习如何发送相同的消息给多个消费者(consumers)