路由模式与发布订阅相似,区别是发布订阅交换机会将全部消息广播给全部消费者绑定的队列,消费者消费所有消息;而路由模式交换机只会将与路由键彻底匹配的消息发送给消费者绑定的队列,消费者只消费该类消息。php
# composer.json { "require": { "php-amqplib/php-amqplib": ">=3.0" } } > composer.phar install
路由模式使用的是direct
交换机,这个交换机的路由算法也很简单,将它收到的全部消息进入绑定键与消息的路由键彻底匹配的队列 。算法
生产者链接到RabbitMQ,发送一条消息,而后退出。json
# send.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立链接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 定义一个名为 direct_logs 的 direct 交换机 $channel->exchange_declare('direct_logs', 'direct', false, false, false); // 从参数中获取交换机的路由键 $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info'; $data = implode(' ', array_slice($argv, 2)); if (empty($data)) { $data = "Hello World!"; } // $msg = new AMQPMessage($data); // 经过名为 direct_logs 的 direct 交换机发送消息到队列 (消息内容, 交换机, 路由键); $channel->basic_publish($msg, 'direct_logs', $severity); echo ' [x] Sent ', $severity, ':', $data, "\n"; $channel->close(); $connection->close();
消费者监听来自RabbitMQ的消息,一般须要一直保持运行状态以监听消息。composer
# receive.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立链接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 定义一个名为 direct_logs 的 direct 交换机 $channel->exchange_declare('direct_logs', 'direct', false, false, false); // 建立一个随机命名的新队列,第三个参数为关闭队列持久化,第四个参数为当声明它的链接关闭时队列会被自动删除 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); // 定义全部的绑定键 $severities = array_slice($argv, 1); if (empty($severities)) { file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n"); exit(1); } // 将随机命名的队列绑定到 direct 交换机,生产者向交换机发送消息将被放到绑定的队列中 foreach ($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); } echo " [*] Waiting for logs. To exit press CTRL+C\n"; // 定义回调函数 $callback = function ($msg) { echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; // 第四个参数设为true开启自动消息确认,即投递消息后马上标记为删除 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while ($channel->is_open()) { $channel->wait(); } $channel->close(); $connection->close();
打开一个终端,运行消费者,接收警告和错误的消息:函数
php receive.php warning error > logs_from_rabbit.log
打开另外一个终端,运行消费者,接收信息、警告和错误的消息:ui
php receive.php info warning error # => [*] Waiting for logs. To exit press CTRL+C
打开另外一个终端,运行生产者,发送错误消息:spa
php send.php error "Run. Run. Or it will explode."
sudo rabbitmqctl list_exchanges
sudo rabbitmqctl list_bindings