在上篇博文 译:3.RabbitMQ 之Publish/Subscribe(发布和订阅) 咱们构建了一个简单的日志系统 咱们可以向许多接收者广播日志消息。html
在本篇博文中,咱们将为其添加一个功能 - 咱们将只能订阅一部分消息。 例如,咱们只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然可以在控制台上打印全部日志消息。java
本文是译文,英文原文请移步:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
算法
在前面的例子中,咱们已经在建立绑定。您可能会记得如下代码:app
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定是交换和队列之间的关系。这能够简单地理解为:队列对来自此交换的消息感兴趣。ide
绑定能够采用额外的routingKey参数。为了不与basic_publish参数混淆,咱们将其称为 绑定密钥。这就是咱们如何使用键建立绑定:ui
channel.queueBind(queueName,EXCHANGE_NAME,“black”);
绑定密钥的含义取决于交换类型。咱们以前使用的 扇出交换只是忽略了它的价值。spa
咱们上一个教程中的日志记录系统向全部消费者广播全部消息。咱们但愿扩展它以容许根据消息的严重性过滤消息。例如,咱们可能须要一个程序将日志消息写入磁盘以仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。日志
咱们使用的是扇出交换,它没有给咱们太大的灵活性 - 它只能进行无心识的广播。code
咱们将使用直接交换。直接交换背后的路由算法很简单 - 消息进入队列,其 绑定密钥与消息的路由密钥彻底匹配。htm
咱们上一个教程中的日志记录系统向全部消费者广播全部消息。咱们但愿扩展它以容许根据消息的严重性过滤消息。例如,咱们可能须要一个程序将日志消息写入磁盘以仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。
咱们使用的是扇出交换,它没有给咱们太大的灵活性 - 它只能进行无心识的广播。
咱们将使用直接交换。直接交换背后的路由算法很简单 - 消息进入队列,其 绑定密钥与消息的路由密钥彻底匹配。
为了说明这一点,请考虑如下设置:
在此设置中,咱们能够看到直接交换X与两个绑定到它的队列。第一个队列绑定orange 绑定,第二个绑定有两个绑定,一个绑定密钥为black,另外一个绑定为green。
在这样的设置中,使用路由密钥orange发布到交换机的消息 将被路由到队列Q1。路由键为black 或green的消息将转到Q2。全部其余消息将被丢弃。
使用相同的绑定密钥绑定多个队列是彻底合法的。
在咱们的例子中,咱们能够在X和Q1之间添加绑定键黑色的绑定。
在这种状况下, direct 直接交换将表现得像 fanout同样,并将消息广播到全部匹配的队列。路由密钥为black的消息将传送到 Q1和Q2。
咱们将此模型用于咱们的日志系统。咱们会将消息发送给直接交换,而不是扇出。
咱们将提供日志严重性做为路由密钥。这样接收程序将可以选择它想要接收的严重性。
让咱们首先关注发送日志。一如既往,咱们须要先建立一个交换:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
咱们已准备好发送消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
为简化起见,咱们假设“严重性”能够是“信息”,“警告”,“错误”之一。
接收消息将像上一个教程同样工做,但有一个例外 - 咱们将为咱们感兴趣的每一个严重性建立一个新的绑定。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }
EmitLogDirect.java:
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } private static String getSeverity(String[] strings) { if (strings.length < 1) return "info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length < startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
ReceiveLogsDirect.java
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for (String severity : argv) { channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }