发布订阅的应用场景一般为一个生产者、一个交换机,多个消费者各自建立队列绑定到交换机上订阅消息并消费。php
发布订阅一般配合如下设置共同使用:json
# composer.json { "require": { "php-amqplib/php-amqplib": ">=3.0" } } > composer.phar install
在 RabbitMQ 消息传递模型中,生产者是不会向队列直接发送消息的,只能将消息发送给交换机。composer
交换机接收来自生产者的消息,而后将它们推送到队列中。函数
发布订阅使用的是fanout
交换机,这个交换机很是简单,将它收到的全部消息广播到它绑定的全部队列。ui
生产者链接到RabbitMQ,发送一条消息,而后退出。spa
# send.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立链接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 定义一个名为 logs 的 fanout 广播交换机 $channel->exchange_declare('logs', 'fanout', false, false, false); $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "info: Hello World!"; } // $msg = new AMQPMessage($data); // 将消息发送到名为 logs 的 fanout 广播交换机 (消息内容, 交换机, 路由键); $channel->basic_publish($msg, 'logs'); echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close();
消费者监听来自RabbitMQ的消息,一般须要一直保持运行状态以监听消息。日志
# receive.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立链接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 定义一个名为 logs 的 fanout 广播交换机 $channel->exchange_declare('logs', 'fanout', false, false, false); // 建立一个随机命名的新队列,第三个参数为关闭队列持久化,第四个参数为当声明它的链接关闭时队列会被自动删除 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); // 将随机命名的队列绑定到 fanout 广播交换机,生产者向交换机发送消息将被广播到绑定的队列中 $channel->queue_bind($queue_name, 'logs'); echo " [*] Waiting for messages. To exit press CTRL+C\n"; // 定义回调函数 $callback = function ($msg) { echo ' [x] ', $msg->body, "\n"; }; // 第四个参数设为true开启自动消息确认,即投递消息后马上标记为删除 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while ($channel->is_open()) { $channel->wait(); } $channel->close(); $connection->close();
打开一个终端,运行消费者,将日志放到文件中:code
php receive.php > logs_from_rabbit.log
打开另外一个终端,运行消费者,将日志输出到终端:rabbitmq
php receive.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
打开另外一个终端,运行生产者:队列
php send.php
sudo rabbitmqctl list_exchanges
sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.