在上篇 RabbitMQ 之Work Queues (工做队列) 教程中,咱们建立了一个工做队列,工做队列背后的假设是每一个任务都交付给一个工做者。html
在这一部分,咱们将作一些彻底不一样的事情 - 咱们将向多个消费者传递信息。此模式称为“发布/订阅”。java
这篇为译文加上本身的理解,英文原文请移步:http://www.rabbitmq.com/tutorials/tutorial-three-java.html安全
为了说明这种模式,咱们将构建一个简单的日志记录系统。服务器
它将包含两个程序 - 第一个将发出日志消息,第二个将接收和打印它们。app
在咱们的日志记录系统中,接收程序的每一个运行副本都将获取消息。less
这样咱们就能够运行一个接收器并将日志定向到磁盘; 同时咱们将可以运行另外一个接收器并在屏幕上看到日志。ide
基本上,发布的日志消息将被广播给全部接收者。学习
在本教程的前几部分中,咱们向队列发送消息和从队列接收消息。如今是时候在Rabbit中引入完整的消息传递模型了。ui
让咱们快速回顾一下前面教程中介绍的内容:spa
RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者一般甚至不知道消息是否会被传递到任何队列。
相反,生产者只能向交换器发送消息。交换是一件很是简单的事情。一方面,它接收来自生产者的消息,另外一方面将它们推送到队列。
交换所必须确切知道如何处理收到的消息。它应该附加到特定队列吗?它应该附加到许多队列吗?或者它应该被丢弃。其规则由交换类型定义 。
Tips: 能够看出,这节课咱们多了一个Exchanges ,生产者产生的消息将再也不直接发送给队列,而是由Exchange来处理这件事情。
有几种交换类型可供选择:direct, topic, headers and fanout. 咱们将专一于最后这个-- fanout.
让咱们建立一个这种类型的交换,并将其称为日志:
channel.exchangeDeclare(“logs”,“fanout”);
fanout (扇出交换)很是简单。 正如您可能从名称中猜到的那样,它只是将收到的全部消息广播到它知道的全部队列中。而这正是咱们记录器所须要的。
要列出服务器上的交换,您能够运行有用的rabbitmqctl:
Linux 执行下列命令
sudo rabbitmqctl list_exchanges
Windows 执行下列命令
rabbitmqctl list_exchanges
在这个列表中有一些 amq.* exchanges(交换) 和一些默认的 (没有命名的) exchange(交换)
他们是默认建立的,可是你可能不须要使用他们如今。
在本教程的前几部分中,咱们对交换一无所知,但仍可以向队列发送消息。 这是可能的,由于咱们使用的是默认交换,咱们经过空字符串(“”)来识别。
回想一下咱们以前是如何发布消息的:
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数是交换的名称。 空字符串表示默认或无名交换:消息被路由到具备routingKey指定名称的队列(若是存在)
如今,咱们能够发布到咱们的命名交换:
channel.basicPublish( "logs", "", null, message.getBytes());
Temporary queues 临时队列
您可能还记得之前咱们使用的是具备指定名称的队列(请记住hello和task_queue?)。
可以命名队列对咱们来讲相当重要 - 咱们须要将工做人员指向同一个队列。当您想要在生产者和消费者之间共享队列时,为队列命名很重要。
但咱们的记录器并不是如此。咱们但愿了解全部日志消息,而不只仅是它们的一部分。咱们也只对目前流动的消息感兴趣,而不是旧消息。要解决这个问题,咱们须要两件事。
首先,每当咱们链接到Rabbit时,咱们都须要一个新的空队列。为此,咱们可使用随机名称建立队列,或者更好 - 让服务器为咱们选择随机队列名称。
其次,一旦咱们断开消费者,就应该自动删除队列。
在Java客户端中,当咱们没有向queueDeclare()提供参数时,咱们 使用生成的名称建立一个非持久的,独占的自动删除队列:
String queueName = channel.queueDeclare().getQueue();
你也能够学习更多关于 exclusive flag和其余队列属性 在 guide on queues.
此时,queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
咱们已经建立了一个扇出交换和一个队列。
如今咱们须要告诉交换机将消息发送到咱们的队列。交换和队列之间的关系称为绑定。
channel.queueBind(queueName, "logs", "");
从如今开始,日志交换会将消息附加到咱们的队列中。
rabbitmqctl list_bindings
生成日志消息的生产者程序与前一个教程没有太大的不一样。
最重要的变化是咱们如今想要将消息发布到咱们的日志交换而不是无名交换。
Tips:
这里简单谈下个人理解:
假设P是咱们平时工做的领导,X是秘书(某任务自动分配系统),C1 是员工张三,C2 是员工李四,
领导制定(发布)好任务列表后,交给秘书(X, 任务分配系统(Exchange)),秘书(X, 任务分配 系统Exchange)将任务发送到这两个邮箱(消息队列)中便可。
张三,李四都绑定(订阅)了不一样的邮箱(不一样的队列名称),那么张三和李四取消息便从本身绑定的邮箱(队列)中取便可。
上篇博文中的工做队列所谓的无名交换能够理解为没有秘书(exchange)这个角色,并且共用同一个消息队列,如此而已。
咱们须要在发送时提供routingKey,可是对于扇出交换,它的值会被忽略。这里是EmitLog.java程序的代码 :
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME,"", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] strings) { if (strings.length < 1) return "info: Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
如您所见,在创建链接后咱们宣布了交换。此步骤是必要的,由于禁止发布到不存在的交换。
若是没有队列绑定到交换机,消息将会丢失,但这对咱们没有问题; 若是没有消费者在听,咱们能够安全地丢弃该消息。
ReceiveLogs.java的代码:
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
使用rabbitmqctl list_bindings,您能够验证代码是否实际建立了咱们想要的绑定和队列。
rabbitmqctl list_bindings
运行两个ReceiveLogs.java程序时,您应该看到以下内容:
Tips: amq.gen-JzTY20BRgKO-HjmUJj0wLg 是随机生成的队列名称。
本篇完~