在上一篇中使用直接交换器改进了咱们的系统,使得它可以有选择的进行接收消息,但它仍然有局限性——它不能基于多个条件进行路由。本节咱们就进行可以基于多个条件进行路由的topics exchange学习。java
发送给主题交换器的消息不能是任意的routing_key—它必须是一个单词列表,由点分隔。这些词能够是任意的,但一般它们指定与消息相关的一些特性。几个有效的路由示例:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".。路由键中能够有任意多的字,最多能够有255个字节。
ide
路由键也须要是相同的形式,topic交换器背后的逻辑相似于direct交换器——发送带有特定路由键的消息将被传送到绑定匹配路由键的全部队列中,学习
然而,有两个重要的特殊状况须要绑定键:
ui
We created three bindings: Q1 is bound with binding key "*.orange.*" and Q2 with "*.*.rabbit" and "lazy.#".spa
These bindings can be summarised as:code
Q1对全部的橙色动物都感兴趣。
blog
Q2但愿听到关于兔子的一切,以及关于懒惰动物的一切。
three
设置了路由键为 "quick.orange.rabbit"的消息将被投递到两个队列,消息 "lazy.orange.elephant" 也被投递到他们两个,而"quick.orange.fox"将被投递到第一个队列,"lazy.brown.fox"仅被投递到第二个队列,"quick.brown.fox"没有匹配将被舍弃。rabbitmq
经过上面的学习,咱们知道,topic主题的交换器投递消息与redict交换器的不一样在于,交换器类型和路由键的模糊匹配,如今咱们就去把以前的代码进行改变,只须要将代码中的交换器类型改成topic,并将绑定的路由键更改一下,投递消息用的是肯定的路由键,接收消息经过设置匹配的模糊绑定键,能够订阅到多条件的消息,接下来上代码,并将改动的代码如下划线形式标记出来。队列
发送方代码:
package com.rabbitmq.HelloWorld;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Publish {
private static final String EXCHANGE_NAME = "exchangeC";
public static void main(String[] args) throws IOException, TimeoutException {
// TODO Auto-generated method stub
// 建立工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.185");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setPort(5672);
// 建立链接
Connection connetion = factory.newConnection();
// 得到信道
Channel channel = connetion.createChannel();
// 声明交换器(声明了一个名字位exchangeA,类型修改fanout为direct类型的交换器)
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String message = "555,2,2,33,66";
// 发送消息,将第二项参数routingkey
channel.basicPublish(EXCHANGE_NAME, "my.hello.haha", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connetion.close();
}
}
接收方一:
package com.rabbitmq.HelloWorld;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Subscribe {
private static final String EXCHANGE_NAME = "exchangeC";
private static final String QUEUE_NAME = "queueA";
public static void main(String[] args) throws IOException, TimeoutException {
// TODO Auto-generated method stub
// 建立工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.185");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setPort(5672);
// 建立链接
Connection connetion = factory.newConnection();
// 得到信道
Channel channel = connetion.createChannel();
// 声明交换器(声明了一个名字位exchangeA,类型修改fanout为direct的交换器)
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 声明一个队列,在此采用临时队列
String queueName = channel.queueDeclare().getQueue();
// channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 队列和交换器进行绑定,并设定路由键为error
channel.queueBind(queueName, EXCHANGE_NAME, "my.#");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
// TODO Auto-generated method stub
String message = new String(body,"utf-8");
System.out.println("[x] received'"+message+"'");
}
};
channel.basicConsume(queueName, consumer);
}
}
接收方二:
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Subscribe {
private static final String EXCHANGE_NAME = "exchangeC";
private static final String QUEUE_NAME = "queueA";
public static void main(String[] args) throws IOException, TimeoutException {
// TODO Auto-generated method stub
// 建立工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.185");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setPort(5672);
// 建立链接
Connection connetion = factory.newConnection();
// 得到信道
Channel channel = connetion.createChannel();
// 声明交换器(声明了一个名字位exchangeA,修改fanout类型为direct类型的交换器�?
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 声明�?个队列,在此采用临时队列
String queueName = channel.queueDeclare().getQueue();
// channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 队列和交换器进行绑定,未设定路由键
channel.queueBind(queueName, EXCHANGE_NAME, "my.hello.*");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
// TODO Auto-generated method stub
String message = new String(body,"utf-8");
System.out.println("[x] received'"+message+"'");
}
};
channel.basicConsume(queueName, consumer);
}
}
运行后,两个接收方均接收道理发送方发送的消息,尽管咱们在两个接收方配置的绑定键并不相同,可是其模糊匹配规则都可以匹配到发送方发送消息的路由键,若是有大量接收方,咱们就能够经过设置不一样的绑定键来有选择的接收较多的消息或者是不接受消息。