发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符能够是任何东西,可是通常都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你能够定义任何数量的标识符,上限为255个字节。
绑定键和选择键的形式同样。主题类型的转发器背后的逻辑和直接类型的转发器很相似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。须要注意的是:关于绑定键有两种特殊的状况。
*能够匹配一个标识符。
#能够匹配0个或多个标识符。
java
咱们准备发送关于动物的消息。消息会附加一个选择键包含3个标识符(两个点隔开)。第一个标识符描述动物的速度,第二个标识符描述动物的颜色,第三个标识符描述动物的物种:<speed>.<color>.<species>。
咱们建立3个绑定键:Q1与*.orange.*绑定Q2与*.*.rabbit和lazy.#绑定。
能够简单的认为:
Q1对全部的橙色动物感兴趣。
Q2想要知道关于兔子的一切以及关于懒洋洋的动物的一切。
一个附带quick.orange.rabbit的选择键的消息将会被转发到两个队列。附带lazy.orange.elephant的消息也会被转发到两个队列。另外一方面quick.orange.fox只会被转发到Q1,lazy.brown.fox将会被转发到Q2。lazy.pink.rabbit虽然与两个绑定键匹配,可是也只会被转发到Q2一次。quick.brown.fox不能与任何绑定键匹配,因此会被丢弃。
若是咱们违法咱们的约定,发送一个或者四个标识符的选择键,相似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,因此消息将会被丢弃。
另外一方面,lazy.orange.male.rabbit,虽然是四个标识符,也能够与lazy.#匹配,从而转发至Q2。
注:主题类型的转发器很是强大,能够实现其余类型的转发器。
当一个队列与绑定键#绑定,将会收到全部的消息,相似fanout类型转发器。
当绑定键中不包含任何#与*时,相似direct类型转发器。web
发送端:dom
package event;ui
import java.util.UUID; spa
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic
{
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception
{
// 建立链接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//与direct模式有相似之处,都使用routing key做为路由
//不一样之处在于direct模式只能指定固定的字符串,而topic能够指定一个字符串模式
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String[] routing_keys = new String[] { "kernal.info", "cron.warning",
"auth.info", "kernel.critical" };
for (String routing_key : routing_keys)
{
String msg = UUID.randomUUID().toString();
channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
.getBytes());
System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
}
channel.close();
connection.close();
}
} orm
接收端1:rabbitmq
package event;队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogsTopicForKernel
{
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception
{
// 建立链接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 随机生成一个队列
String queueName = channel.queueDeclare().getQueue();
//接收全部与kernel相关的消息
channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received routingKey = " + routingKey
+ ",msg = " + message + ".");
}
}
} ci
接收端2:路由
package event;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogsTopicForCritical { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { // 建立链接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明转发器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 随机生成一个队列 String queueName = channel.queueDeclare().getQueue(); // 接收全部与kernel相关的消息 channel.queueBind(queueName, EXCHANGE_NAME, "*.critical"); System.out .println(" [*] Waiting for critical messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received routingKey = " + routingKey + ",msg = " + message + "."); } } }