在前面的教程中,咱们建立了一个工做队列。工做队列背后的假设是每一个任务都交付给一个工做人员处理。在这一部分中,咱们将作一些彻底不一样的事情——咱们将向多个消费者发送消息。此模式称为“发布/订阅”。php
为了说明这个模式,咱们将构建一个简单的日志系统。它将由两个程序组成,第一个程序将发出日志消息,第二个程序将接收并打印它们。html
在咱们的日志系统中,接收程序的每一个运行副本都会收到消息。这样咱们就能够运行一个接收器,并将日志引导到磁盘;同时,咱们还能够运行另外一个接收器,并在屏幕上看到日志。git
本质上,已发布的日志消息将被广播到全部接收器。github
在本教程的前几部分中,咱们从队列中发送和接收消息。如今是在Rabbit中引入完整消息传递模型的时候了。segmentfault
让咱们快速浏览一下前面教程中介绍的内容:安全
RabbitMQ消息传递模型的核心思想是,生产者不发送任何信息直接到队列。事实上,生产者甚至不知道消息是否会发送到任何队列。服务器
相反,生产商只能向交换机(Exchange)发送消息。交换机作的事情很简单。一方面,它接收来自生产者的信息,另外一边则推他们排队。Exchange必须知道如何处理接收到的消息。应该附加到特定队列吗?它应该被添加到多个队列?仍是应该被抛弃?。这个规则是由交换类型定义的。ui
有几种交换类型可用:direct, topic, headers 和 fanout。咱们将集中讨论最后一个——fanout。让咱们建立这种类型的交换,并称之为日志:spa
$channel->exchange_declare('logs', 'fanout', false, false, false);
fanout交换很是简单。正如你可能从这个名字猜到的,它只广播它收到的全部消息给它所知道的全部队列。这正是咱们须要的记录器。翻译
Listing exchanges
列出服务器上的交换机,你能够运行rabbitmqctl:
sudo rabbitmqctl list_exchanges在这个列表中会有一些amq. *交流和默认(未命名)交换。默认状况下建立这些>,但目前不太可能使用它们。
默认的交换机
在本教程的前几部分中,咱们对交换机一无所知,但仍然可以将消息发送到队列中。这是可能的,由于咱们使用的是默认的交换,咱们经过空字符串(“”)来标识它们。
回想一下咱们以前如何发布消息:
$channel->basic_publish($msg, '', 'hello');咱们在这里使用默认的或无名的交换:消息路由到指定的
routing_key
名称的队列,若是它存在的话。路由键是第三个参数:basic_publish
如今,咱们能够将其发布到咱们命名的Exchange中:
$channel->exchange_declare('logs', 'fanout', false, false, false); $channel->basic_publish($msg, 'logs');
也许你还记得之前咱们使用的队列所指定的名称(记得hello
和task_queue
?). 可以说出一个队列对咱们来讲相当重要 -- 咱们须要把工人指向同一个队列。当你想在生产者和消费者之间共享一个队列时,给队列一个名字是很重要的。
但咱们的记录器不是这样的。咱们想了解全部日志消息,而不单单是其中的一个子集。咱们也只对当前流动的消息感兴趣,而不是旧消息。为了解决这个问题,咱们须要两件事。
首先,每当咱们与Rabbit链接时,咱们须要一个新的空队列。为此,咱们能够建立一个带有随机名称的队列,或者更好 - 让服务器为咱们选择一个随机队列名。
第二,一旦断开消费者,队列应该自动删除。
在php客户端中,当咱们将队列名称做为空字符串提供时,咱们建立一个带有生成名称的非持久队列:
list($queue_name, ,) = $channel->queue_declare("");
方法返回时,queue_name
变量包含一个随机生成的RabbitMQ队列名称。例如,它可能看起来像amq.gen-jzty20brgko-hjmujj0wlg
当声明它关闭的链接时,队列将被删除,由于它被声明为独占。
咱们已经建立了fanout交换机和队列。如今咱们须要告诉Exchange发送消息到咱们的队列中。交换和队列之间的关系称为绑定。
$channel->queue_bind($queue_name, 'logs');
从如今开始,日志交换将向队列添加消息。
列出绑定列表(Listing bindings)
您能够使用现有的绑定列表,使用下面命令:
rabbitmqctl list_bindings
生成日志消息的生成程序与前面的教程没有多大区别。最重要的变化是,咱们如今但愿把消息发布到咱们的日志交换,而不是无名的。这里给出emit_log.php
代码:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); $data = implode(' ', array_slice($argv, 1)); if(empty($data)) $data = "info: Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'logs'); echo " [x] Sent ", $data, "\n"; $channel->close(); $connection->close(); ?>
如您所见,在创建链接以后,咱们声明交换。这一步是必要的,由于发布到一个不存在的交换机是禁止的。
若是没有队列绑定到Exchange,消息将丢失,但这对咱们来讲是好的;若是没有用户正在监听,咱们能够安全地丢弃消息。
receive_logs.php代码:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function($msg){ echo ' [x] ', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
若是要将日志保存到文件中,只需打开控制台并键入:
php receive_logs.php > logs_from_rabbit.log
若是您但愿看到屏幕上的日志,生成一个新的终端并运行:
php receive_logs.php
固然,而后触发日志类型:
php emit_log.php
使用rabbitmqctl list_bindings
能够验证代码其实是建立绑定和队列是咱们想要的。两receive_logs.php程序运行你应该看到:
sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.
对结果的解释很简单:来自Exchange日志的数据使用服务器分配的名称到两个队列中。这正是咱们想要的。
要了解如何侦听一个消息的子集,让咱们转到RabbitMQ+PHP 教程四(Routing)。