上一篇博文中,咱们使用direct exchange 代替了fanout exchange,此次咱们来看下topic exchange。html
1、Topic Exchange介绍sql
topic exchange和direct exchange相似,都是经过routing key和binding key进行匹配,不一样的是topic exchange能够为routing key设置多重标准。ide
direct路由器相似于sql语句中的精确查询;topic 路由器有点相似于sql语句中的模糊查询。ui
还记得吗?咱们在《RabbitMQ入门:发布/订阅(Publish/Subscribe)》中对exchange的分类进行过介绍:spa
Direct:彻底根据key进行投递的,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。 Topic:对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。 Fanout:不须要key,它采起广播模式,一个消息进来时,投递到与该交换机绑定的全部队列。 Headers:咱们能够不考虑它。
下面是官网给出的工做模型(P表明生产者,X表明exhange,红色的Q表明队列,C表明消费者):code
咱们来分析下这个模型。htm
它发送的消息是用来描述动物的。路由键有三个单词:<speed>.<color>.<species>,第一个单词描述了速度,第二个描述了颜色,第三个描述了物种。
有三个绑定键,Q1绑定键为*.orange.*(关注全部颜色为orange的动物); Q2的绑定键有两个,分别是*.*.rabbit(关注全部的兔子)和lazy.#(关注全部速度为lazy的动物)。blog
所以,路由键为quick.orange.rabbit的消息将发送到Q1和Q2,路由键为quick.orange.fox的消息将发送到Q1,路由键为lazy.brown.fox的消息将发送到Q2。路由键为lazy.pink.rabbit的消息将发送到Q2,可是注意,它只会到达Q2一次,尽管它匹配了两个绑定键。路由键为quick.brown.fox的消息由于不和任意的绑定键匹配,因此将会被丢弃。rabbitmq
若是有人手一抖发了个lazy.orange.male.rabbit这种四个单词的,这个怎么办呢? 因为它和lazy.#匹配,所以将发送到Q2。队列
2、代码示例
接下来咱们看下代码
public class LogTopicSender { // exchange名字 public static String EXCHANGE_NAME = "topicExchange"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立链接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.为通道声明topic类型的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 3.发送消息到指定的exchange,队列指定为空,由exchange根据状况判断须要发送到哪些队列 String routingKey = "info"; // String routingKey = "log4j.error"; // String routingKey = "logback.error"; // String routingKey = "log4j.warn"; String msg = " hello rabbitmq, I am " + routingKey; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("product send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 4.关闭链接 if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
public class LogTopicReciver { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立链接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.为通道声明topic类型的exchange channel.exchangeDeclare(LogTopicSender.EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 3.建立随机名字的队列 String queueName = channel.queueDeclare().getQueue(); // 4.创建exchange和队列的绑定关系 String[] bindingKeys = { "#" }; // String[] bindingKeys = { "log4j.*", "#.error" }; // String[] bindingKeys = { "*.error" }; // String[] bindingKeys = { "log4j.warn" }; for (int i = 0; i < bindingKeys.length; i++) { channel.queueBind(queueName, LogTopicSender.EXCHANGE_NAME, bindingKeys[i]); System.out.println(" **** LogTopicReciver keep alive ,waiting for " + bindingKeys[i]); } // 5.经过回调生成消费者并进行监听 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { // 获取消息内容而后处理 String msg = new String(body, "UTF-8"); System.out.println("*********** LogTopicReciver" + " get message :[" + msg + "]"); } }; // 6.消费消息 channel.basicConsume(queueName, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
生产者: product send a msg: hello rabbitmq, I am info product send a msg: hello rabbitmq, I am log4j.error product send a msg: hello rabbitmq, I am logback.error product send a msg: hello rabbitmq, I am log4j.warn 消费者1: **** LogTopicReciver keep alive ,waiting for # *********** LogTopicReciver get message :[ hello rabbitmq, I am info] *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn] 消费者2: **** LogTopicReciver keep alive ,waiting for log4j.* **** LogTopicReciver keep alive ,waiting for #.error *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
*********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn] 消费者3: **** LogTopicReciver keep alive ,waiting for *.error *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] 消费者4: **** LogTopicReciver keep alive ,waiting for log4j.warn *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn]