RabbitMQ中,全部生产者提交的消息都由Exchange(交换机)来接受,而后Exchange按照特定的策略转发到Queue进行存储,RabbitMQ提供了四种Exchange:fanout、direct、topic、header,其中header模式在实际使用中较少,本文只对前三种模式进行比较。java
性能方面比较:fanout > direct >> topic,比例大约为11:10:6spring
任何生产者发送到Fanout Exchange交换机的消息都被转发到与其绑定(binging)的全部的Queue队列中存储,并由监听该队列Queue的消费者所消费,整个过程通常状况下关系为:生产者与Fanout Exchange(交换机)为多对一,Exchange(交换机)与队列Queue为一对多,队列Queue与消费者为一对多;性能
(1)定义Queue队列、Fanout Exchange交换机,并将队列绑定在交换机上ui
package com.neo.rabbit; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutRabbitConfig { //定义三个Queue队列A、B、C @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } //定义FanoutWxchange交换机 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } //将定义好Queue队列绑定在该交换机上 @Bean Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
(2)定义生产者,发布消息到具体的交换机上,再由交换机自动转发到绑定的全部队列Queue中存储this
package com.neo.rabbit.fanout; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(int i) { String context = "hi, fanout msg is " +i; //指定名字为fanoutExchange的交换机,第二个参数为具体的Queue名,忽略不写 this.rabbitTemplate.convertAndSend("fanoutExchange","", context); } }
(3)定义消费者,实现MessageListener监听器接口,或SpringBoot中经过注解的方式根据具体的Queue队列名(RouteKey)进行实时的监听,从队列中获取消息code
package com.neo.rabbit.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver A : " + message); } }
注意:
Topic Exchange 转发消息主要是根据通配符方式匹配Queue队列中的routing_key(即为队列名)。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键(routing_key)之间匹配后交换机才能转发消息。(任何发送到Topic Exchange的消息都会被转发到全部关心RouteKey中指定话题的Queue上)blog
在这种交换机模式下:(*:表示一个词,#:表示零个或多个词)接口
定义的过程跟前面的 Fanout模式差很少,所不一样的是:Queue队列绑定到交换机时,须要with带有某种规则的去绑定,即BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"),topic.#为匹配规则,只要生产者发布消息时的路由键routing_key知足这一规则, 便可将消息转发到该Queue(queueMessages)队列中队列
package com.neo.rabbit; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicRabbitConfig { final static String message = "topic.message"; final static String messages = "topic.messages"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue,即无需定义交换机(Exchange),直接定义好队列(Queue)便可,生产者发布消息时,经过abbitMQ自带的交换机Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)转发到具体的RouteKey队列中存储,若是vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。路由
该模式为默认的、简单的一种,应用的方式为(都为在同一个队列中进行操做):