在第一篇教程中咱们写程序从一个命名队列中发送和接收消息。在这篇中,咱们将创建一个在多个工做者之间用于分发耗时任务的工做队列。php
工做队列(也称为:任务队列)背后的主要思想是避免当即作一项资源密集型任务而且不得不等候它完成。而是咱们计划这个任务在稍后被完成。咱们封装一个任务为一条消息而且发送它到一个队列。一个在后台运行的工做进程将当即获取这个任务并最终执行它。当你运行多个工做进程时,任务将在它们之间被分配。html
这个概念在web应用中尤为有用,在一个短HTTP请求窗口中,处理一项复杂的任务是不太可能的。git
在教程的前一篇中,咱们发送一条包含“Hello World!”的消息。如今咱们将发送字符串来表明复杂任务。咱们没有一个真实的任务,相似于图片大小被调整或pdf文件被渲染,所以让咱们来伪装这个任务,经过假装咱们正忙——利用sleep()函数。咱们将用字符串中的逗号数量做为它的复杂性。每一个逗号将占用一秒的工做。一项被描述为Hello...的伪装的任务将占用三秒钟。github
咱们稍微修改一下咱们先前例子send.php的代码,容许从命令行发送任意的消息。这个程序将把任务发送到咱们的工做队列中,因此咱们命名它为new_task.php:web
$data = implode(' ', array_slice($argv, 1)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent ", $data, "\n";
咱们旧的receive.php脚本也须要一些改变:它须要假装在消息体内每个逗号有一秒钟的工做。它将从队列里获取信息而且执行任务,因此咱们称它为worker.php:shell
$callback = function($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback);
注意咱们的假装任务模拟执行时间。数组
像在第一篇教程讲述的同样运行它们:缓存
# shell 1
php worker.php
# shell 2 php new_task.php "A very hard task which takes two seconds.."
使用任务队列的优势之一就是可以并行工做。若是咱们正在创建一项积压的工做,咱们只要添加更多的工做者就能够轻松地扩大规模。bash
首先,让咱们试着同时运行两个worker.php脚本。这两个都将获得来自队列里的信息,但具体怎样?让咱们看看。函数
你须要打开三个控制台程序。两个将运行worker.php程序。这些控制台程序将使咱们的两个消费者——C1和C2。
# shell 1 php worker.php # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 php worker.php # => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台里,咱们将发布新的任务。一旦你启动消费者你就能发布一些信息了:
# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....
让咱们看看有什么被传给了咱们的工做者:
# shell 1 php worker.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
# shell 2 php worker.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
默认状况下,RabbitMQ将依次发送每一条消息到下一个消费者,平均每一个消费者将获得相同数量的消息。这种分发消息的方式就被称为轮询。试试用三个或更多的工做者。
作一项任务会花几秒钟时间。你可能会想若是其中的一个消费者执行一项长时间的任务而只执行了一部分就死掉了那会怎样。在咱们目前的代码中,一旦RabbitMQ传递一条消息给消费者后,它当即就会标记这条消息为删除。在这种状况下,若是你结束掉一个工做者,咱们将丢失它正处理的消息。咱们也会丢失被分发到这个工做者而还没有被处理的全部消息。
可是咱们不想丢失任何任务。若是一个工做者死掉了,咱们会想让这个任务交给另外一个工做者。
为了确保一条消息不会丢失,RabbitMQ支持消息确认机制。一个ack(acknowledgement)被消费者发回以告知RabbitMQ一条特定的消息已被接收并处理,RabbitMQ能够删除它了。
若是一个消费者死掉(它的通道被关闭,链接也被关闭,或者TCP链接丢失)没有发送一个ack,RabbitMQ就会知道,一条消息没有被彻底处理,则会将这条消息将从新排入队列。若是有其它的消费者同时在线,那么它将会迅速的从新传递这条消息给另外一个消费者。这样你就能够确信没有消息被丢失,即便工做者偶尔死掉。
若是没有任何消息超时,当消费者死掉时,RabbitMQ将从新传送消息。这样即便处理一条消息要花很长很长时间也没事。
消息确认机制默认是被关闭的。如今是时间将它们设置为打开了,经过设置 basic_consume的第四个参数为false(true是不ack),而后,一旦咱们完成一项任务就从工做者发送一个合适的确认。
$callback = function($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_consume('task_queue', '', false, false, false, false, $callback);
使用这些代码咱们能肯定即便在一个工做者正在处理一条消息时,你用CTRL+C结束掉这个进程,也没有什么丢失。这个工做者死掉后的不久全部未应答的消息将会从新被投递。
被忘记的确认
忘记确认是一个常见的错误。虽然这是一个简单的错误,可是后果是严重的。当你的客户端退出的时候消息将会被从新投递(这可能看起来像是随机的从新投递),可是由于不能释听任何未被确认的消息,RabbitMQ将会消耗愈来愈多的内存。
为了调试这种类型的错误,你可使用rabbitmqctl来打印 messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在Windows系统上,去掉sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
咱们已经学习了怎样确保即便消费者死亡任务也不会丢失。可是若是RabbitMQ服务中止了,咱们的任务将仍然会被丢失。
当RabbitMQ退出或崩溃时它将忘记队列和消息除非你让它不要这么作。要确保消息不被丢失,有两件事被要求去作:咱们须要标记队列和消息为持久的。
第一,咱们须要确保RabbitMQ不会丢失咱们的队列。为了这样,咱们须要定义它为持久的。要这么作,咱们就须要传递第三个参数为true到queue_declare:
$channel->queue_declare('hello', false, true, false, false);
尽管这条命令自己是正确的,可是在咱们目前的设置中是不起做用的。这是由于咱们已经定义了一个叫作hello的而不是持久的队列。RabbitMQ不容许你用不一样的参数从新定义一个已经存在的队列,这将返回一个错误到任何这么作的程序中。可是有一个快速的解决办法——咱们来用不一样的名字定义一个队列,例如task_queue:
$channel->queue_declare('task_queue', false, true, false, false);
At this point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting the delivery_mode = 2 message property which AMQPMessage takes as part of the property array.
这个设置为true的标志须要应用到生产者和消费者代码中。
这时咱们就能确信及时RabbitMQ从新启动,task_queue队列也不会被丢失。如今咱们须要标记咱们的消息为持久的——经过设置deliver_mode = 2消息属性,用做AMQPMessage属性数组的一部分。
$msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );
关于消息持久的注意事项
标记消息为持久也不能彻底保证消息不被丢失。尽管这使得RabbitMQ保存消息懂啊磁盘,也仍然会有一个间隙窗口当RabbitMQ已经接收一条消息而还没有保存它时。再者,RabbitMQ不会为每一条消息调用fsync(2) ——它可能仅被保存在缓存中而不是真的写到磁盘上。虽然持久的保证不是很强,可是对于咱们的简单任务队列已是足够了。若是你须要一个更强的保证,那么你可使用 publisher confirms。
你大概已经注意到了调度仍然不像咱们想的那样工做。例如,在有两个工做者的状况下,当全部的奇数消息是重量级的而偶数消息是轻量级的,一个工做者将一直处于繁忙状态而另外一个将几乎没有任何工做。对于这种状况RabbitMQ一无所知,仍然均匀地分发消息。
发生这种状况是由于RabbitMQ只在消息进入队列时才调度消息。它不看一个消费者未确认的消息数量。它仅仅盲目地分发每个第n调消息到第n个消费者。
为了不这种状况,咱们可使用设置prefetch_count = 1 的basic_qos方法 。这会让RabbitMQ不会一次去分配多余一条消息给工做者。或者,换句话说,不分发一条新的消息给一个工做者直到这个工做者已经处理完而且确认了前一条消息。转而分发消息到下一个不忙的消费者。
$channel->basic_qos(null, 1, null);
关于队列大小的注意事项
若是全部工做者都很忙,你的队列能够填满。你会想要关注这个,可能添加更多的工做者,或者有一些其它策略。
咱们的new_task.php文件的最终代码:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $data = implode(' ', array_slice($argv, 1)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'task_queue'); echo " [x] Sent ", $data, "\n"; $channel->close(); $connection->close(); ?>
咱们的worker.php文件:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('task_queue', '', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
使用消息确认和预取你能设置一个工做队列。持久选项让任务存活即便RabbitMQ被重启。
如今咱们可以前往下一篇文章,学习怎样传递相同的消息给多个消费者。