上手了RabbitMQ?再来看看它的交换机(Exchange)吧

人生终将是场单人旅途,孤独以前是迷茫,孤独事后是成长。

楔子

本篇是消息队列RabbitMQ的第三弹。java

RabbitMQ的入门RabbitMQ+SpringBoot的整合能够点此连接进去回顾,今天要讲的是RabbitMQ的交换机。git

本篇是理解RabbitMQ很重要的一篇,交换机是消息的第一站,只有理解了交换机的分发模式,咱们才能知道不一样交换机根据什么规则分发消息,才能明白在面对不一样业务需求的时候应采用哪一种交换机。程序员


祝有好收获,先赞后看,快乐无限。github

本文代码: 码云地址GitHub地址算法

1. 🔍Exchange

rabbit架构图

先来放上几乎每篇都要出现一遍的我画了很久的RabbitMQ架构图。spring

前两篇文中咱们一直没有显式的去使用Exchange,都是使用的默认Exchange,其实Exchange是一个很是关键的组件,有了它才有了各类消息分发模式。编程

我先简单说说Exchange有哪几种类型:segmentfault

  1. fanoutFanout-Exchange会将它接收到的消息发往全部与他绑定的Queue中。
  2. directDirect-Exchange会把它接收到的消息发往与它有绑定关系且Routingkey彻底匹配的Queue中(默认)。
  3. topicTopic-Exchange与Direct-Exchange类似,不过Topic-Exchange不须要全匹配,能够部分匹配,它约定:Routingkey为一个句点号“. ”分隔的字符串(咱们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。
  4. headerHeader-Exchange不依赖于RoutingKey或绑定关系来分发消息,而是根据发送的消息内容中的headers属性进行匹配。此模式已经再也不使用,本文中也不会去讲,你们知道便可。

本文中咱们主要讲前三种Exchange方式,相信凭借着我简练的文字和灵魂的画技给你们好好讲讲,争取老妪能解。架构

Tip:本文的代码演示直接使用SpringBoot+RabbitMQ的模式。spring-boot

2. 📕Fanout-Exchange

先来看看Fanout-ExchangeFanout-Exchange又称扇形交换机,这个交换机应该是最容易理解的。

扇形交换机

ExchangeQueue创建一个绑定关系,Exchange会分发给全部和它有绑定关系的Queue中,绑定了十个Queue就把消息复制十份进行分发。

这种绑定关系为了效率确定都会维护一张表,从算法效率上来讲通常是O(1),因此Fanout-Exchange是这几个交换机中查找须要被分发队列最快的交换机。


下面是一段代码演示:

@Bean
    public Queue fanout1() {
        return new Queue("fanout1");
    }

    @Bean
    public Queue fanout2() {
        return new Queue("fanout2");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        // 三个构造参数:name durable autoDelete
        return new FanoutExchange("fanoutExchange", false, false);
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(fanout1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(fanout2()).to(fanoutExchange());
    }

为了清晰明了,我新建了两个演示用的队列,而后建了一个FanoutExchange,最后给他们都设置上绑定关系,这样一组队列和交换机的绑定设置就算完成了。

紧接着编写一下生产者和消费者:

public void sendFanout() {
        Client client = new Client();

        // 应读者要求,之后代码打印的地方都会改为log方式,这是一种良好的编程习惯,用System.out.println通常是不推荐的。
        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("fanoutExchange",null,client);
        System.out.println("消息发送完毕。");
    }

    @Test
    public void sendFanoutMessage() {
        rabbitProduce.sendFanout();
    }
@Slf4j
@Component("rabbitFanoutConsumer")
public class RabbitFanoutConsumer {
    @RabbitListener(queues = "fanout1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = "fanout2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

这两段代码都很好理解,再也不赘述,有遗忘的能够去看RabbitMQ第一弹的内容。

其中发送消息的代码有三个参数,第一个参数是Exchange的名称,第二个参数是routingKey的名称,这个参数在扇形交换机里面用不到,在其余两个交换机类型里面会用到。

代码的准备到此结束,咱们能够运行发送方法以后run一下了~

项目启动后,咱们能够先来观察一下队列与交换机的绑定关系有没有生效,咱们在RabbitMQ控制台使用rabbitmqctl list_bindings命令查看绑定关系。

扇形交换机绑定关系

关键部分我用红框标记了起来,这就表明着名叫fanoutExchange的交换机绑定着两个队列,一个叫fanout1,另外一个叫fanout2

紧接着,咱们来看控制台的打印状况:

扇形交换机确认消息

能够看到,一条信息发送出去以后,两个队列都接收到了这条消息,紧接着由咱们的两个消费者消费。

Tip: 若是你的演示应用启动以后没有消费信息,能够尝试从新运行一次生产者的方法发送消息。

3. 📗Direct-Exchange

Direct-Exchange是一种精准匹配的交换机,咱们以前一直使用默认的交换机,其实默认的交换机就是Direct类型。

若是将Direct交换机都比做一所公寓的管理员,那么队列就是里面的住户。(绑定关系)

管理员天天都会收到各类各样的信件(消息),这些信件的地址不光要标明地址(ExchangeKey)还须要标明要送往哪一户(routingKey),否则消息没法投递。

扇形交换机

以上图为例,准备一条消息发往名为SendService的直接交换机中去,这个交换机主要是用来作发送服务,因此其绑定了两个队列,SMS队列和MAIL队列,用于发送短信和邮件。

咱们的消息除了指定ExchangeKey还须要指定routingKeyroutingKey对应着最终要发送的是哪一个队列,咱们的示例中的routingKey是sms,这里这条消息就会交给SMS队列。


听了上面这段,可能你们对routingKey还不是很理解,咱们上段代码实践一下,你们应该就明白了。

准备工做:

@Bean
    public Queue directQueue1() {
        return new Queue("directQueue1");
    }

    @Bean
    public Queue directQueue2() {
        return new Queue("directQueue2");
    }

    @Bean
    public DirectExchange directExchange() {
        // 三个构造参数:name durable autoDelete
        return new DirectExchange("directExchange", false, false);
    }

    @Bean
    public Binding directBinding1() {
        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding directBinding2() {
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail");
    }

新建两个队列,新建了一个直接交换机,并设置了绑定关系。

这里的示例代码和上面扇形交换机的代码很像,惟一能够说不一样的就是绑定的时候多调用了一个withroutingKey设置了上去。

因此是交换机和队列创建绑定关系的时候设置的routingKey,一个消息到达交换机以后,交换机经过消息上带来的routingKey找到本身与队列创建绑定关系时设置的routingKey,而后将消息分发到这个队列去。

生产者:

public void sendDirect() {
        Client client = new Client();

        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("directExchange","sms",client);
        System.out.println("消息发送完毕。");
    }

消费者:

@Slf4j
@Component("rabbitDirectConsumer")
public class RabbitDirectConsumer {
    @RabbitListener(queues = "directQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = "directQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

效果图以下:

扇形交换机

只有一个消费者进行了消息,符合咱们的预期。

4. 📙Topic-Exchange

Topic-Exchange是直接交换机的模糊匹配版本,Topic类型的交换器,支持使用"*"和"#"通配符定义模糊bindingKey,而后按照routingKey进行模糊匹配队列进行分发。

  • *:可以模糊匹配一个单词。
  • #:可以模糊匹配零个或多个单词。

由于加入了两个通配定义符,因此Topic交换机的routingKey也有些变化,routingKey可使用.将单词分开。


这里咱们直接来用一个例子说明会更加的清晰:

准备工做:

// 主题交换机示例
    @Bean
    public Queue topicQueue1() {
        return new Queue("topicQueue1");
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue("topicQueue2");
    }

    @Bean
    public TopicExchange topicExchange() {
        // 三个构造参数:name durable autoDelete
        return new TopicExchange("topicExchange", false, false);
    }

    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*");
    }

    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#");
    }

新建两个队列,新建了一个Topic交换机,并设置了绑定关系。

这里的示例代码咱们主要看设置routingKey,这里的routingKey用上了通配符,且中间用.隔开,这就表明topicQueue1消费sms开头的消息,topicQueue2消费mail开头的消息,具体不一样往下看。

生产者:

public void sendTopic() {
        Client client = new Client();

        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client);
        System.out.println("消息发送完毕。");
    }

消费者:

@Slf4j
@Component("rabbitTopicConsumer")
public class RabbitTopicConsumer {
    @RabbitListener(queues = "topicQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = "topicQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

这里咱们的生产者发送的消息routingKeysms.liantong,它就会被发到topicQueue1队列中去,这里消息的routingKey也须要用.隔离开,用其余符号没法正确识别。

若是咱们的routingKeysms.123.liantong,那么它将没法找到对应的队列,由于topicQueue1的模糊匹配用的通配符是*而不是#,只有#是能够匹配多个单词的。

Topic-ExchangeDirect-Exchange很类似,我就再也不赘述了,通配符*#的区别也很简单,你们能够本身试一下。

后记

周一没更文实在惭愧,去医院抽血了,抽了三管~,吃多少才能补回来~

RabbitMQ已经更新了三篇了,这三篇的内容有些偏基础,下一篇将会更新高级部份内容:包括防止消息丢失,防止消息重复消费等等内容,但愿你们持续关注。


最近这段时间压力挺大,优狐令我八月底以前升级到三级,因此各位读者的赞对我很重要,但愿你们可以高抬贵手,帮我一哈~

好了,以上就是本期的所有内容,感谢你能看到这里,欢迎对本文点赞收藏与评论,👍大家的每一个点赞都是我创做的最大动力。

我是耳朵,一个一直想作知识输出的伪文艺程序员,咱们下期见。

本文代码:码云地址GitHub地址

相关文章
相关标签/搜索