RabbitMQ :主题(Topic)

一、 主题转发(Topic Exchange)

发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符能够是任何东西,可是通常都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你能够定义任何数量的标识符,上限为255个字节。
绑定键和选择键的形式同样。主题类型的转发器背后的逻辑和直接类型的转发器很相似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。须要注意的是:关于绑定键有两种特殊的状况。
*能够匹配一个标识符。
#能够匹配0个或多个标识符。
java

二、 图解:

咱们准备发送关于动物的消息。消息会附加一个选择键包含3个标识符(两个点隔开)。第一个标识符描述动物的速度,第二个标识符描述动物的颜色,第三个标识符描述动物的物种:<speed>.<color>.<species>。
咱们建立3个绑定键:Q1与*.orange.*绑定Q2与*.*.rabbit和lazy.#绑定。
能够简单的认为:
Q1对全部的橙色动物感兴趣。
Q2想要知道关于兔子的一切以及关于懒洋洋的动物的一切。
一个附带quick.orange.rabbit的选择键的消息将会被转发到两个队列。附带lazy.orange.elephant的消息也会被转发到两个队列。另外一方面quick.orange.fox只会被转发到Q1,lazy.brown.fox将会被转发到Q2。lazy.pink.rabbit虽然与两个绑定键匹配,可是也只会被转发到Q2一次。quick.brown.fox不能与任何绑定键匹配,因此会被丢弃。
若是咱们违法咱们的约定,发送一个或者四个标识符的选择键,相似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,因此消息将会被丢弃。
另外一方面,lazy.orange.male.rabbit,虽然是四个标识符,也能够与lazy.#匹配,从而转发至Q2。
注:主题类型的转发器很是强大,能够实现其余类型的转发器。
当一个队列与绑定键#绑定,将会收到全部的消息,相似fanout类型转发器。
当绑定键中不包含任何#与*时,相似direct类型转发器。web

发送端:dom

 

package event;ui

import java.util.UUID; spa

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
 
public class EmitLogTopic 

 
    private static final String EXCHANGE_NAME = "topic_logs"; 
 
    public static void main(String[] argv) throws Exception 
    { 
        // 建立链接和频道 
        ConnectionFactory factory = new ConnectionFactory(); 
        factory.setHost("localhost"); 
        Connection connection = factory.newConnection(); 
        Channel channel = connection.createChannel(); 
        //与direct模式有相似之处,都使用routing key做为路由
        //不一样之处在于direct模式只能指定固定的字符串,而topic能够指定一个字符串模式
        channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 
 
        String[] routing_keys = new String[] { "kernal.info", "cron.warning", 
                "auth.info", "kernel.critical" }; 
        for (String routing_key : routing_keys) 
        { 
            String msg = UUID.randomUUID().toString(); 
            channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg 
                    .getBytes()); 
            System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + "."); 
        } 
 
        channel.close(); 
        connection.close(); 
    } 
orm

接收端1:rabbitmq

package event;队列

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.QueueingConsumer; 
 
public class ReceiveLogsTopicForKernel 

 
    private static final String EXCHANGE_NAME = "topic_logs"; 
 
    public static void main(String[] argv) throws Exception 
    { 
        // 建立链接和频道 
        ConnectionFactory factory = new ConnectionFactory(); 
        factory.setHost("localhost"); 
        Connection connection = factory.newConnection(); 
        Channel channel = connection.createChannel(); 
        // 声明转发器 
        channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 
        // 随机生成一个队列 
        String queueName = channel.queueDeclare().getQueue(); 
         
        //接收全部与kernel相关的消息 
        channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*"); 
 
        System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C"); 
 
        QueueingConsumer consumer = new QueueingConsumer(channel); 
        channel.basicConsume(queueName, true, consumer); 
 
        while (true) 
        { 
            QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
            String message = new String(delivery.getBody()); 
            String routingKey = delivery.getEnvelope().getRoutingKey(); 
 
            System.out.println(" [x] Received routingKey = " + routingKey 
                    + ",msg = " + message + "."); 
        } 
    } 
ci

接收端2:路由

 

package event;

import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.QueueingConsumer;    public class ReceiveLogsTopicForCritical  {        private static final String EXCHANGE_NAME = "topic_logs";        public static void main(String[] argv) throws Exception      {          // 建立链接和频道          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          // 声明转发器          channel.exchangeDeclare(EXCHANGE_NAME, "topic");          // 随机生成一个队列          String queueName = channel.queueDeclare().getQueue();            // 接收全部与kernel相关的消息          channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");            System.out                  .println(" [*] Waiting for critical messages. To exit press CTRL+C");            QueueingConsumer consumer = new QueueingConsumer(channel);          channel.basicConsume(queueName, true, consumer);            while (true)          {              QueueingConsumer.Delivery delivery = consumer.nextDelivery();              String message = new String(delivery.getBody());              String routingKey = delivery.getEnvelope().getRoutingKey();                System.out.println(" [x] Received routingKey = " + routingKey                      + ",msg = " + message + ".");          }      }  } 

相关文章
相关标签/搜索