RabbitMQ 工做队列的使用

工做队列的应用场景一般为一个生产者、一个队列、多个消费者公平消费队列。php

工做队列一般配合如下设置共同使用:json

  • 开启队列持久化
  • 开启消息持久化
  • 开启公平分发
  • 关闭自动消息确认
  • 手动确认消息

安装依赖

# composer.json
{
    "require": {
        "php-amqplib/php-amqplib": ">=3.0"
    }
}
> composer.phar install

模式结构

image.png

生产者

生产者链接到RabbitMQ,发送一条消息,而后退出。composer

# send.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立链接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 建立通道
$channel = $connection->channel();

// 建立队列,已存在的不会重复建立,第三个参数为开启队列持久化
$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}

// 第二个参数 delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT 为设置消息持久化
$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

// 经过默认的交换机发送消息到队列 (消息内容, 默认交换机, 路由键);
$channel->basic_publish($msg, '', 'task_queue');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

消费者

消费者监听来自RabbitMQ的消息,一般须要一直保持运行状态以监听消息。函数

# receive.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立链接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 建立通道
$channel = $connection->channel();

// 建立队列,已存在的不会重复建立,第三个参数为开启队列持久化
$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

// 定义回调函数
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done\n";
    // 手动消息确认
    $msg->ack();
};

// 设置 prefetch_count = 1,开启公平分发(默认为循环分发)
// 在处理并确认上一条消息以前,不要将新消息发送给消费者,而发送给其余消费者
$channel->basic_qos(null, 1, null);

// 第四个参数设为false关闭自动消息确认,为true打开自动消息确认即投递消息后马上标记为删除
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

运行

打开一个终端,运行消费者:fetch

php receive.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'

打开另外一个终端,运行消费者:ui

php receive.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

打开另外一个终端,运行生产者:spa

php send.php First message.
php send.php Second message..
php send.php Third message...
php send.php Fourth message....
php send.php Fifth message.....

注意事项

  1. 关闭自动消息确认必须记得要手动确认,不然会致使消息没法释放,内存消耗愈来愈大。
  2. RabbitMQ 不容许使用不一样的参数从新定义已有队列,将返回错误,如非持久化的队列设置为持久化的队列等。

查看未确认消息

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
相关文章
相关标签/搜索