转载请标明出处:http://blog.csdn.net/lmj623565791/article/details/37706355 java
上一篇博客中,咱们进步改良了咱们的日志系统。咱们使用direct类型转发器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只可以无脑的转发,若是你还不了解:RabbitMQ (四) 路由选择 (Routing)。 dom
虽然使用direct类型改良了咱们的系统,可是仍然存在一些局限性:它不可以基于多重条件进行路由选择。
在咱们的日志系统中,咱们有可能但愿不只根据日志的级别并且想根据日志的来源进行订阅。这个概念相似unix工具:syslog,它转发日志基于严重性(info/warning/crit…)和设备(auth/cron/kern…)
这样可能给咱们更多的灵活性:咱们可能只想订阅来自’cron’的致命错误日志,而不是来自’kern’的。
为了在咱们的系统中实现上述的需求,咱们须要学习稍微复杂的主题类型的转发器(topic exchange)。
一、 主题转发(Topic Exchange)
发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符能够是任何东西,可是通常都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你能够定义任何数量的标识符,上限为255个字节。
绑定键和选择键的形式同样。主题类型的转发器背后的逻辑和直接类型的转发器很相似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。须要注意的是:关于绑定键有两种特殊的状况。
*能够匹配一个标识符。
#能够匹配0个或多个标识符。
二、 图解:
咱们准备发送关于动物的消息。消息会附加一个选择键包含3个标识符(两个点隔开)。第一个标识符描述动物的速度,第二个标识符描述动物的颜色,第三个标识符描述动物的物种:<speed>.<color>.<species>。
咱们建立3个绑定键:Q1与*.orange.*绑定Q2与*.*.rabbit和lazy.#绑定。
能够简单的认为:
Q1对全部的橙色动物感兴趣。
Q2想要知道关于兔子的一切以及关于懒洋洋的动物的一切。
一个附带quick.orange.rabbit的选择键的消息将会被转发到两个队列。附带lazy.orange.elephant的消息也会被转发到两个队列。另外一方面quick.orange.fox只会被转发到Q1,lazy.brown.fox将会被转发到Q2。lazy.pink.rabbit虽然与两个绑定键匹配,可是也只会被转发到Q2一次。quick.brown.fox不能与任何绑定键匹配,因此会被丢弃。
若是咱们违法咱们的约定,发送一个或者四个标识符的选择键,相似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,因此消息将会被丢弃。
另外一方面,lazy.orange.male.rabbit,虽然是四个标识符,也能够与lazy.#匹配,从而转发至Q2。
注:主题类型的转发器很是强大,能够实现其余类型的转发器。
当一个队列与绑定键#绑定,将会收到全部的消息,相似fanout类型转发器。
当绑定键中不包含任何#与*时,相似direct类型转发器。
三、 完整的例子
发送端EmitLogTopic.java: 工具
- package com.zhy.rabbit._05_topic_exchange;
-
- import java.util.UUID;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class EmitLogTopic
- {
-
- private static final String EXCHANGE_NAME = "topic_logs";
-
- public static void main(String[] argv) throws Exception
- {
- // 建立链接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
-
- String[] routing_keys = new String[] { "kernal.info", "cron.warning",
- "auth.info", "kernel.critical" };
- for (String routing_key : routing_keys)
- {
- String msg = UUID.randomUUID().toString();
- channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
- .getBytes());
- System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
- }
-
- channel.close();
- connection.close();
- }
- }
咱们发送了4条消息,分别设置了不一样的选择键。
接收端1,ReceiveLogsTopicForKernel.java 学习
- package com.zhy.rabbit._05_topic_exchange;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
-
- public class ReceiveLogsTopicForKernel
- {
-
- private static final String EXCHANGE_NAME = "topic_logs";
-
- public static void main(String[] argv) throws Exception
- {
- // 建立链接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明转发器
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- // 随机生成一个队列
- String queueName = channel.queueDeclare().getQueue();
-
- //接收全部与kernel相关的消息
- channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
-
- System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
-
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- String routingKey = delivery.getEnvelope().getRoutingKey();
-
- System.out.println(" [x] Received routingKey = " + routingKey
- + ",msg = " + message + ".");
- }
- }
- }
直接收和Kernel相关的日志消息。
接收端2,ReceiveLogsTopicForCritical.java ui
- package com.zhy.rabbit._05_topic_exchange;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
-
- public class ReceiveLogsTopicForCritical
- {
-
- private static final String EXCHANGE_NAME = "topic_logs";
-
- public static void main(String[] argv) throws Exception
- {
- // 建立链接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明转发器
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- // 随机生成一个队列
- String queueName = channel.queueDeclare().getQueue();
-
- // 接收全部与kernel相关的消息
- channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
-
- System.out
- .println(" [*] Waiting for critical messages. To exit press CTRL+C");
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
-
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- String routingKey = delivery.getEnvelope().getRoutingKey();
-
- System.out.println(" [x] Received routingKey = " + routingKey
- + ",msg = " + message + ".");
- }
- }
- }
只接收致命错误的日志消息。
运行结果: spa
[x] Sent routingKey = kernal.info ,msg = a7261f0d-18cc-4c85-ba80-5ecd9283dae7.
[x] Sent routingKey = cron.warning ,msg = 0c7e4484-66e0-4846-a869-a7a266e16281.
[x] Sent routingKey = auth.info ,msg = 3273f21f-6e6e-42f2-83df-1f2fafa7a19a.
[x] Sent routingKey = kernel.critical ,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9. .net
-------------------------------------------------------------------------------------------------------------------- 3d
[*] Waiting for messages about kernel. To exit press CTRL+C
[x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9. unix
-------------------------------------------------------------------------------------------------------------------- 日志
[*] Waiting for critical messages. To exit press CTRL+C
[x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.
能够看到,咱们经过使用topic类型的转发器,成功实现了多重条件选择的订阅。