(using php-amqplib)php
本教程假设RabbitMQ是安装在标准端口上运行(5672)。若是您使用不一样的主机、端口或凭据,则链接设置须要调整。html
在哪里获得帮助git
若是您在本教程中遇到困难,能够经过邮件列表与咱们联系。github
在前面的教程中,咱们改进了日志系统。咱们使用的是一种直接广播方式,而不是只使用一种直接(direct)广播方式的fanout交换机,从而得到了有选择地接收日志的可能性。segmentfault
虽然使用直接direct
交换机改进了咱们的系统,但它仍然有局限性——它不能根据多个标准进行路由。工具
在咱们的日志系统中,咱们可能但愿订阅基于严重性的日志,但也要基于发出日志的源。你可能从syslog UNIX工具知道这个概念,路由日志基于严重性(info/warn/crit…)和设备(auth/cron/kern…)。ui
这会给咱们很大的灵活性,咱们可能要听关键的错误来自kern
, 全部日志来自kern
”。spa
为了在日志系统中实现这一点,咱们须要了解一个更复杂的主题topic
交换机。翻译
发送到一个话题交换机(topic exchange)信息,不能是任意routing_key -它必须是一个单词的列表,用逗号分隔。这些词能够是任何东西,但一般它们指定链接到消息的某些特性。一些有效的路由键的例子:stock.usd.nyse
、nyse.vmw
、"quick.orange.rabbit"
。在你喜欢的路由键中,最多能够有255个字节的单词。日志
绑定键也必须是相同的形式。主题交换背后的逻辑相似于一个直接的消息,用特定的路由键发送的消息将被发送到绑定到绑定键的全部队列中。可是有两个重要的绑定键的特殊状况:
*(星号)能够代替一个词。
#(哈希)能够代替零个或更多的单词。
在一个例子中解释这一点是最容易的:
在这个示例中,咱们将发送全部描述动物的消息。消息将用一个包含三个单词(两个点)的路由键发送。路由键中的第一个字将描述速度,第二个颜色和第三个种:<speed>.<colour>.<species>
。
咱们建立三的绑定:Q1绑定绑定键*.orange.*
和 Q2 with *.*.rabbit
和 lazy.#
。
这些绑定能够归纳为:
Q1对全部的橙色(orange
)动物很感兴趣。
Q2想听关于兔子(rabbits
)的一切,关于懒惰(lazy
)动物的一切。
带有quick.orange.rabbit
的路由键的消息将传送到两个队列中。信息lazy.orange.elephant
也将去他们俩。另外一方面,quick.orange.fox
只会进入第一排,而lazy.brown.fox
只到第二个。lazy.pink.rabbit
将被送到第二个队列只有一次,即便它匹配两个绑定。quick.brown.fox
不匹配任何绑定,因此它将被丢弃。
若是咱们违背合同,用一个或四个词,如orange
或quick.orange.male.rabbit
?那么,这些消息将不匹配任何绑定并将丢失。
另外一方面,lazy.orange.male.rabbit
,即便它有四个词,将匹配最后的绑定,并将交付给第二个队列。
Topic exchange
主题交换(Topic exchange)功能强大,能够像其余交换机同样。
当队列绑定
#
(hash)绑定键-它将收到的全部邮件,无论路由关键同样的fanout交换机。当特殊字符
*
(star)和#
(hash)中不使用绑定,主题交换机会表现的像一个direct交换机。
咱们将在日志系统中使用主题交换机(topic exchange)。咱们将从一个工做假设开始,假设日志的路由键有两个词:<facility>.<severity>
。
代码与前面的教程几乎相同。
emit_log_topic.php代码:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('topic_logs', 'topic', false, false, false); $routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info'; $data = implode(' ', array_slice($argv, 2)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'topic_logs', $routing_key); echo " [x] Sent ",$routing_key,':',$data," \n"; $channel->close(); $connection->close(); ?>
receive_logs_topic.php代码:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('topic_logs', 'topic', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $binding_keys = array_slice($argv, 1); if( empty($binding_keys )) { file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n"); exit(1); } foreach($binding_keys as $binding_key) { $channel->queue_bind($queue_name, 'topic_logs', $binding_key); } echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
接收全部日志:
php receive_logs_topic.php "#"
接受全部的日志来自kern
:
php receive_logs_topic.php "kern.*"
或者,若是您只想听关于critical
的日志:
php receive_logs_topic.php "*.critical"
你能够建立多个绑定:
php receive_logs_topic.php "kern.*" "*.critical"
触发一个日志来自路由键kern.critical
类型
php emit_log_topic.php "kern.critical" "A critical kernel error"
这些程序让咱们以为很好玩。请注意,代码对路由或绑定键不做任何假设,您可能但愿使用两个以上的路由键参数。
(所有源码:emit_log_topic.php 和 receive_logs_topic)
接下来,了解如何经过一个远程过程调用来执行往返消息,你能够阅读下一章节:RabbitMQ+PHP 教程六(RPC)。