人生终将是场单人旅途,孤独以前是迷茫,孤独事后是成长。
本篇是消息队列RabbitMQ
的第三弹。java
RabbitMQ的入门和RabbitMQ+SpringBoot的整合能够点此连接进去回顾,今天要讲的是RabbitMQ
的交换机。git
本篇是理解RabbitMQ
很重要的一篇,交换机是消息的第一站,只有理解了交换机的分发模式,咱们才能知道不一样交换机根据什么规则分发消息,才能明白在面对不一样业务需求的时候应采用哪一种交换机。程序员
祝有好收获,先赞后看,快乐无限。github
先来放上几乎每篇都要出现一遍的我画了很久的RabbitMQ
架构图。spring
前两篇文中咱们一直没有显式的去使用Exchange
,都是使用的默认Exchange
,其实Exchange
是一个很是关键的组件,有了它才有了各类消息分发模式。编程
我先简单说说Exchange
有哪几种类型:segmentfault
Fanout-Exchange
会将它接收到的消息发往全部与他绑定的Queue中。Direct-Exchange
会把它接收到的消息发往与它有绑定关系且Routingkey
彻底匹配的Queue中(默认)。Topic-Exchange
与Direct-Exchange类似,不过Topic-Exchange不须要全匹配,能够部分匹配,它约定:Routingkey
为一个句点号“. ”分隔的字符串(咱们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。Header-Exchange
不依赖于RoutingKey或绑定关系来分发消息,而是根据发送的消息内容中的headers属性进行匹配。此模式已经再也不使用,本文中也不会去讲,你们知道便可。本文中咱们主要讲前三种Exchange
方式,相信凭借着我简练的文字和灵魂的画技给你们好好讲讲,争取老妪能解。架构
Tip:本文的代码演示直接使用SpringBoot+RabbitMQ的模式。spring-boot
先来看看Fanout-Exchange
,Fanout-Exchange
又称扇形交换机,这个交换机应该是最容易理解的。
Exchange
和Queue
创建一个绑定关系,Exchange
会分发给全部和它有绑定关系的Queue
中,绑定了十个Queue
就把消息复制十份进行分发。
这种绑定关系为了效率确定都会维护一张表,从算法效率上来讲通常是O(1),因此Fanout-Exchange
是这几个交换机中查找须要被分发队列最快的交换机。
下面是一段代码演示:
@Bean public Queue fanout1() { return new Queue("fanout1"); } @Bean public Queue fanout2() { return new Queue("fanout2"); } @Bean public FanoutExchange fanoutExchange() { // 三个构造参数:name durable autoDelete return new FanoutExchange("fanoutExchange", false, false); } @Bean public Binding binding1() { return BindingBuilder.bind(fanout1()).to(fanoutExchange()); } @Bean public Binding binding2() { return BindingBuilder.bind(fanout2()).to(fanoutExchange()); }
为了清晰明了,我新建了两个演示用的队列,而后建了一个FanoutExchange
,最后给他们都设置上绑定关系,这样一组队列和交换机的绑定设置就算完成了。
紧接着编写一下生产者和消费者:
public void sendFanout() { Client client = new Client(); // 应读者要求,之后代码打印的地方都会改为log方式,这是一种良好的编程习惯,用System.out.println通常是不推荐的。 log.info("Message content : " + client); rabbitTemplate.convertAndSend("fanoutExchange",null,client); System.out.println("消息发送完毕。"); } @Test public void sendFanoutMessage() { rabbitProduce.sendFanout(); }
@Slf4j @Component("rabbitFanoutConsumer") public class RabbitFanoutConsumer { @RabbitListener(queues = "fanout1") public void onMessage1(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已确认"); } @RabbitListener(queues = "fanout2") public void onMessage2(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已确认"); } }
这两段代码都很好理解,再也不赘述,有遗忘的能够去看RabbitMQ第一弹的内容。
其中发送消息的代码有三个参数,第一个参数是Exchange
的名称,第二个参数是routingKey
的名称,这个参数在扇形交换机里面用不到,在其余两个交换机类型里面会用到。
代码的准备到此结束,咱们能够运行发送方法以后run一下了~
项目启动后,咱们能够先来观察一下队列与交换机的绑定关系有没有生效,咱们在RabbitMQ控制台使用rabbitmqctl list_bindings
命令查看绑定关系。
关键部分我用红框标记了起来,这就表明着名叫fanoutExchange
的交换机绑定着两个队列,一个叫fanout1
,另外一个叫fanout2
。
紧接着,咱们来看控制台的打印状况:
能够看到,一条信息发送出去以后,两个队列都接收到了这条消息,紧接着由咱们的两个消费者消费。
Tip: 若是你的演示应用启动以后没有消费信息,能够尝试从新运行一次生产者的方法发送消息。
Direct-Exchange
是一种精准匹配的交换机,咱们以前一直使用默认的交换机,其实默认的交换机就是Direct类型。
若是将Direct交换机都比做一所公寓的管理员,那么队列就是里面的住户。(绑定关系)
管理员天天都会收到各类各样的信件(消息),这些信件的地址不光要标明地址(ExchangeKey)还须要标明要送往哪一户(routingKey),否则消息没法投递。
以上图为例,准备一条消息发往名为SendService
的直接交换机中去,这个交换机主要是用来作发送服务,因此其绑定了两个队列,SMS队列和MAIL队列,用于发送短信和邮件。
咱们的消息除了指定ExchangeKey
还须要指定routingKey
,routingKey
对应着最终要发送的是哪一个队列,咱们的示例中的routingKey
是sms,这里这条消息就会交给SMS队列。
听了上面这段,可能你们对routingKey
还不是很理解,咱们上段代码实践一下,你们应该就明白了。
准备工做:
@Bean public Queue directQueue1() { return new Queue("directQueue1"); } @Bean public Queue directQueue2() { return new Queue("directQueue2"); } @Bean public DirectExchange directExchange() { // 三个构造参数:name durable autoDelete return new DirectExchange("directExchange", false, false); } @Bean public Binding directBinding1() { return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms"); } @Bean public Binding directBinding2() { return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail"); }
新建两个队列,新建了一个直接交换机,并设置了绑定关系。
这里的示例代码和上面扇形交换机的代码很像,惟一能够说不一样的就是绑定的时候多调用了一个with
将routingKey
设置了上去。
因此是交换机和队列创建绑定关系的时候设置的routingKey
,一个消息到达交换机以后,交换机经过消息上带来的routingKey
找到本身与队列创建绑定关系时设置的routingKey
,而后将消息分发到这个队列去。
生产者:
public void sendDirect() { Client client = new Client(); log.info("Message content : " + client); rabbitTemplate.convertAndSend("directExchange","sms",client); System.out.println("消息发送完毕。"); }
消费者:
@Slf4j @Component("rabbitDirectConsumer") public class RabbitDirectConsumer { @RabbitListener(queues = "directQueue1") public void onMessage1(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已确认"); } @RabbitListener(queues = "directQueue2") public void onMessage2(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已确认"); } }
效果图以下:
只有一个消费者进行了消息,符合咱们的预期。
Topic-Exchange
是直接交换机的模糊匹配版本,Topic类型的交换器,支持使用"*"和"#"通配符定义模糊bindingKey,而后按照routingKey
进行模糊匹配队列进行分发。
*
:可以模糊匹配一个单词。#
:可以模糊匹配零个或多个单词。由于加入了两个通配定义符,因此Topic交换机的routingKey
也有些变化,routingKey
可使用.
将单词分开。
这里咱们直接来用一个例子说明会更加的清晰:
准备工做:
// 主题交换机示例 @Bean public Queue topicQueue1() { return new Queue("topicQueue1"); } @Bean public Queue topicQueue2() { return new Queue("topicQueue2"); } @Bean public TopicExchange topicExchange() { // 三个构造参数:name durable autoDelete return new TopicExchange("topicExchange", false, false); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#"); }
新建两个队列,新建了一个Topic交换机,并设置了绑定关系。
这里的示例代码咱们主要看设置routingKey
,这里的routingKey
用上了通配符,且中间用.
隔开,这就表明topicQueue1
消费sms
开头的消息,topicQueue2
消费mail
开头的消息,具体不一样往下看。
生产者:
public void sendTopic() { Client client = new Client(); log.info("Message content : " + client); rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client); System.out.println("消息发送完毕。"); }
消费者:
@Slf4j @Component("rabbitTopicConsumer") public class RabbitTopicConsumer { @RabbitListener(queues = "topicQueue1") public void onMessage1(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已确认"); } @RabbitListener(queues = "topicQueue2") public void onMessage2(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已确认"); } }
这里咱们的生产者发送的消息routingKey
是sms.liantong
,它就会被发到topicQueue1
队列中去,这里消息的routingKey
也须要用.
隔离开,用其余符号没法正确识别。
若是咱们的routingKey
是sms.123.liantong
,那么它将没法找到对应的队列,由于topicQueue1
的模糊匹配用的通配符是*
而不是#
,只有#
是能够匹配多个单词的。
Topic-Exchange
和Direct-Exchange
很类似,我就再也不赘述了,通配符*
和#
的区别也很简单,你们能够本身试一下。
周一没更文实在惭愧,去医院抽血了,抽了三管~,吃多少才能补回来~
RabbitMQ已经更新了三篇了,这三篇的内容有些偏基础,下一篇将会更新高级部份内容:包括防止消息丢失,防止消息重复消费等等内容,但愿你们持续关注。
最近这段时间压力挺大,优狐令我八月底以前升级到三级,因此各位读者的赞对我很重要,但愿你们可以高抬贵手,帮我一哈~
好了,以上就是本期的所有内容,感谢你能看到这里,欢迎对本文点赞收藏与评论,👍大家的每一个点赞都是我创做的最大动力。
我是耳朵,一个一直想作知识输出的伪文艺程序员,咱们下期见。