首先先介绍下rabbitmq的一些基础概念git
一、队列、生产者、消费者github
队列是RabbitMQ的内部对象,用于存储消息。P(生产者)生产消息并投递到队列中,C(消费者)能够从队列中获取消息并消费。spring
多个消费者能够订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每一个消费者都收到全部的消息并处理。express
二、Exchange、Bindingspringboot
刚才咱们看到生产者将消息投递到队列中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的状况是,生产者将消息发送到Exchange(交换器,下图中的X),再经过Binding将Exchange与Queue关联起来。网络
三、Exchange Type、Bingding key、routing keyapp
在绑定(Binding)Exchange与Queue的同时,通常会指定一个binding key。在绑定多个Queue到同一个Exchange的时候,这些Binding容许使用相同的binding key。ide
生产者在将消息发送给Exchange的时候,通常会指定一个routing key,来指定这个消息的路由规则,生产者就能够在发送消息给Exchange时,经过指定routing key来决定消息流向哪里。spring-boot
RabbitMQ经常使用的Exchange Type有三种:fanout、direct、topic。测试
fanout:把全部发送到该Exchange的消息投递到全部与它绑定的队列中。
direct:把消息投递到那些binding key与routing key彻底匹配的队列中。
topic:将消息路由到binding key与routing key模式匹配的队列中。
示例代码 git springboot-rabbitmq-exchange
四、direct模式实例
4.一、添加pom文件
<!-- rabbitmq依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
4.二、添加application.yml配置
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: xxx password: xxxx publisher-confirms: true #消息发送到交换机确认机制,是否确认回调
4.3 ExchangeConfig配置
@Configuration public class ExchangeConfig { /** * 1.定义direct exchange,绑定queueTest * 2.durable="true" rabbitmq重启的时候不须要建立新的交换机 * 3.direct交换器相对来讲比较简单,匹配规则为:若是路由键匹配,消息就被投送到相关的队列 * fanout交换器中没有路由键的概念,他会把消息发送到全部绑定在此交换器上面的队列中。 * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中 * key: queue在该direct-exchange中的key值,当消息发送给direct-exchange中指定key为设置值时, * 消息将会转发给queue参数指定的消息队列 */ @Bean public DirectExchange directExchange(){ DirectExchange directExchange = new DirectExchange(RabbitMqConfig.EXCHANGE,true,false); return directExchange; } @Bean public TopicExchange topicExchange(){ TopicExchange topicExchange = new TopicExchange(RabbitMqConfig.EXCHANGE_TOPIC,true,false); return topicExchange; } @Bean public FanoutExchange fanoutExchange (){ FanoutExchange fanoutExchange = new FanoutExchange(RabbitMqConfig.EXCHANGE_FANOUT,true,false); return fanoutExchange; } }
4.4 QueueConfig配置
@Configuration public class QueueConfig { @Bean public Queue firstQueue() { /** durable="true" 持久化 rabbitmq重启的时候不须要建立新的队列 auto-delete 表示消息队列没有在使用时将被自动删除 默认是false exclusive 表示该消息队列是否只在当前connection生效,默认是false */ return new Queue("first-queue",true,false,false); } @Bean public Queue secondQueue() { return new Queue("second-queue",true,false,false); } @Bean public Queue topicQueue() { return new Queue("topic-queue",true,false,false); } @Bean public Queue topicQueue1() { return new Queue("topic-queue1",true,false,false); } @Bean public Queue fanoutQueue1() { return new Queue("fanout-queue1",true,false,false); } @Bean public Queue fanoutQueue() { return new Queue("fanout-queue",true,false,false); } }
4.五、RabbitMqConfig配置
@Configuration public class RabbitMqConfig { /** 消息交换机的名字*/ public static final String EXCHANGE = "exchangeTest"; /** 消息交换机的名字*/ public static final String EXCHANGE_TOPIC = "exchangeTopic"; public static final String EXCHANGE_FANOUT = "exchangeFanout"; /** 队列key1*/ public static final String ROUTINGKEY1 = "queue_one_key1"; /** 队列key2*/ public static final String ROUTINGKEY2 = "queue_one_key2"; public static final String ROUTINGKEY3 = "*.topic.*"; public static final String ROUTINGKEY_TOPIC = "aaa.topic.*"; @Autowired private QueueConfig queueConfig; @Autowired private ExchangeConfig exchangeConfig; /** * 链接工厂 */ @Autowired private ConnectionFactory connectionFactory; /** 将消息队列1和交换机进行绑定 */ @Bean public Binding binding_one() { return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY1); } /** * 将消息队列2和交换机进行绑定 */ @Bean public Binding binding_two() { return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY2); } @Bean public Binding binding_topic() { return BindingBuilder.bind(queueConfig.topicQueue()).to(exchangeConfig.topicExchange()).with(RabbitMqConfig.ROUTINGKEY3); } @Bean public Binding binding_topic1() { return BindingBuilder.bind(queueConfig.topicQueue1()).to(exchangeConfig.topicExchange()).with(RabbitMqConfig.ROUTINGKEY_TOPIC); } @Bean public Binding binding_fanout() { return BindingBuilder.bind(queueConfig.fanoutQueue()).to(exchangeConfig.fanoutExchange()); } @Bean public Binding binding_fanout_for_third() { return BindingBuilder.bind(queueConfig.fanoutQueue1()).to(exchangeConfig.fanoutExchange()); } /** * queue listener 观察 监听模式 * 当有消息到达时会通知监听在对应的队列上的监听对象 * @return */ @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){ SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); simpleMessageListenerContainer.addQueues(queueConfig.firstQueue()); simpleMessageListenerContainer.setExposeListenerChannel(true); simpleMessageListenerContainer.setMaxConcurrentConsumers(5); simpleMessageListenerContainer.setConcurrentConsumers(1); simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 return simpleMessageListenerContainer; } /** * 定义rabbit template用于数据的接收和发送 * @return */ @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory); /**若使用confirm-callback或return-callback, * 必需要配置publisherConfirms或publisherReturns为true * 每一个rabbitTemplate只能有一个confirm-callback和return-callback */ template.setConfirmCallback(msgSendConfirmCallBack()); //template.setReturnCallback(msgSendReturnCallback()); /** * 使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true, * 可针对每次请求的消息去肯定’mandatory’的boolean值, * 只能在提供’return -callback’时使用,与mandatory互斥 */ // template.setMandatory(true); return template; } /** * 消息确认机制 * Confirms给客户端一种轻量级的方式,可以跟踪哪些消息被broker处理, * 哪些可能由于broker宕掉或者网络失败的状况而从新发布。 * 确认而且保证消息被送达,提供了两种方式:发布确认和事务。(二者不可同时使用) * 在channel为事务时,不可引入确认模式;一样channel为确认模式下,不可以使用事务。 * @return */ @Bean public MsgSendConfirmCallBack msgSendConfirmCallBack(){ return new MsgSendConfirmCallBack(); } }
4.6 DirectExchange 模式
绑定关系以下
/** 将消息队列1和交换机进行绑定 */ @Bean public Binding binding_one() { return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY1); } /** * 将消息队列2和交换机进行绑定 */ @Bean public Binding binding_two() { return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY2); }
生产者发送对应消息
/** * DirectExchange 生产者 发送消息 * @param uuid * @param message 消息 */ public void send(String uuid,Object message) { CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY2, message, correlationId); }
消费者分别消费对应队列信息
@Component public class SecondConsumer { @RabbitListener(queues = {"second-queue"}, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { // 处理消息 System.out.println("Second Consumer {} handleMessage :"+message); } }
消费结果
4.8 TopicExchange模式
因为在绑定队列时,绑定关系以下
public static final String ROUTINGKEY3 = "*.topic.*"; public static final String ROUTINGKEY_TOPIC = "aaa.topic.*"; @Bean public Binding binding_topic() { return BindingBuilder.bind(queueConfig.topicQueue()).to(exchangeConfig.topicExchange()).with(RabbitMqConfig.ROUTINGKEY3); } @Bean public Binding binding_topic1() { return BindingBuilder.bind(queueConfig.topicQueue1()).to(exchangeConfig.topicExchange()).with(RabbitMqConfig.ROUTINGKEY_TOPIC); }
那么此时生产者,发送aaa.topic.bbb的routing_key时。topicConsumer和topicConsumer1都能消费信息
/** * TopicExchange 生产者 * @param uuid * @param message */ public void sendTopic(String uuid,Object message) { CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_TOPIC, "aaa.topic.bbb", message, correlationId); }
方法的第一个参数是交换机名称,第二个参数是发送的key,第三个参数是内容,RabbitMQ将会根据第二个参数去寻找有没有匹配此规则的队列,若是有,则把消息给它,若是有不止一个,则把消息分发给匹配的队列(每一个队列都有消息!),显然在咱们的测试中,参数2匹配了两个队列,所以消息将会被发放到这两个队列中,而监听这两个队列的监听器都将收到消息!那么若是把参数2改成bbb.topic.bbb呢?显然只会匹配到一个队列,那么TopicConsumer方法对应的监听器收到消息!
消费分别消费对应队列信息
@Component public class TopicConsumer { @RabbitListener(queues = {"topic-queue"}, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { // 处理消息 System.out.println("TopicConsumer {} handleMessage :"+message); } }
@Component public class TopicConsumer1 { @RabbitListener(queues = {"topic-queue1"}, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { // 处理消息 System.out.println("TopicConsumer1 {} handleMessage :"+message); } }
消费者消费结果
4.9 fanoutExchange 模式
绑定关系以下
@Bean public Binding binding_fanout() { return BindingBuilder.bind(queueConfig.fanoutQueue()).to(exchangeConfig.fanoutExchange()); } @Bean public Binding binding_fanout_for_third() { return BindingBuilder.bind(queueConfig.fanoutQueue1()).to(exchangeConfig.fanoutExchange()); }
消费者配置
@Component public class FanoutConsumer { @RabbitListener(queues = {"fanout-queue"}, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { // 处理消息 System.out.println("FanoutConsumer {} handleMessage :"+message); } }
@Component public class FanoutConsumer1 { @RabbitListener(queues = {"fanout-queue1"}, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { // 处理消息 System.out.println("FanoutConsumer1 {} handleMessage :"+message); } }
生产者
public void sendFanout(String uuid,Object message) { CorrelationData correlationId = new CorrelationData(uuid); //中间是设置路由规则,因为是广播模式,这个规则会被抛弃 rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_FANOUT, "", message); }
Fanout Exchange形式又叫广播形式,所以咱们发送到路由器的消息会使得绑定到该路由器的每个Queue接收到消息,这个时候就算指定了Key,或者规则(即上文中convertAndSend方法的参数2),也会被忽略!
消费结果
5.0 消息确认回调
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData); if (b) { System.out.println("消息发送成功"); } else { System.out.println("消息发送失败:" + s+"\n从新发送"); } } }