RabbitMQ 在 Spring Boot 进阶之交换器 Direct Exchange

上篇文章中咱们只看到了的生产者的消息发送与消费者的消息消费,实际上它隐藏了rabbitMQ中一个重要的环节。 上篇文章中,咱们在生产者中直接定义了消息送达队列的名字java

@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }

}

咱们指定了将消息发送到< hello >这个队列中。可是这样在实际使用时,每每难以知足某些业务需求。 因此RabbitMQ给咱们提供了四种交换器来知足不一样的需求。 分别是:Direct、Fanout、Topic、Headersspring

####假如咱们目前有一个业务,它负责资料的上传。分布式

第一个需求是,不一样的用户上传后,后续的处理不一样。ide

  • 好比说咱们的普通用户上传资料后,能够得到必定的积分。
  • 内部人员上传资料后,不须要积分,但它会增长必定的业绩。

按照上一篇文章的作法,咱们是没法用一个队列来完成这部分操做的。spring-boot

这时候咱们可使用Direct交换器,将业绩队列与积分队列绑定到咱们的交换器上。测试


  • 首先定义一个交换器,名字随便起。
static final String ExchangeName = "spring-boot-exchange";

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(ExchangeName);
    }
  • 而后定义两个Queue,一个是pointQueue,积分队列。另外一个是performanceQueue,业绩队列。
static final String pointQueueName = "point";

    static final String performanceQueueName = "performance";
    @Bean
    Queue performanceQueue() {
        return new Queue(performanceQueueName, false);
    }

    @Bean
    Queue pointQueue() {
        return new Queue(pointQueueName, false);
    }
  • 将两个Queue绑定到交换器上。这里BindingBuilder的with方法,即routingKey是咱们的路由键,交换器将根据这里定义的路由键规则进行队列消息的分发。
@Bean
    Binding bindingPerformance(Queue performanceQueue, DirectExchange exchange) {
        return BindingBuilder.bind(performanceQueue)
                             .to(exchange)
                             .with(performanceQueueName);
    }

    @Bean
    Binding bindingPointQueue(Queue pointQueue, DirectExchange exchange) {
        return BindingBuilder.bind(pointQueue)
                             .to(exchange)
                             .with(pointQueueName);
    }
  • 定义两个消费者
@Component
public class PointReceiver {

    public void receivePointMessage(String message) {
        System.out.println("接收到来自积分队列的信息 <" + message + ">");
    }

}

@Component
public class PerformanceReceiver {

    public void receivePerformanceMessage(String message) {
        System.out.println("接收到来自业绩队列的信息 <" + message + ">");
    }
}
  • 为消费者配置监听器
@Bean
    MessageListenerAdapter pointListenerAdapter(PointReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receivePointMessage");
    }

    @Bean
    MessageListenerAdapter performanceListenerAdapter(PerformanceReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receivePerformanceMessage");
    }
  • 将监听器配置到 Container 中
@Bean
    SimpleMessageListenerContainer pointContainer(ConnectionFactory connectionFactory,
        MessageListenerAdapter pointListenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(pointQueueName);
        container.setMessageListener(pointListenerAdapter);
        return container;
    }

    @Bean
    SimpleMessageListenerContainer performanceContainer(ConnectionFactory connectionFactory,
        MessageListenerAdapter performanceListenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(performanceQueueName);
        container.setMessageListener(performanceListenerAdapter);
        return container;
    }
  • 测试类以下,在这里,咱们使用rabbitTemplate,在ExchangeName这个以前定义的交换器上,配置分发的规则,也就是第二个参数 routingKey 路由键。
@Component
public class Runner implements CommandLineRunner {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Sending message>>>");
        rabbitTemplate.convertAndSend(ExchangeName, pointQueueName, "增长积分~");
        rabbitTemplate.convertAndSend(ExchangeName, performanceQueueName, "增长业绩~");
    }
}
  • 程序运行,控制台输出
Sending message>>>
接收到来自业绩队列的信息 <增长业绩~>
接收到来自积分队列的信息 <增长积分~>

这说明分发时,咱们使用的交换器已经成功地经过路由键进行了消息的分发。 而且监听器也根据队列的不一样,将消息送达到了不一样的消费者中。ui

这就是rabbitMQ的Direct Exchange。

Direct Exchange – 处理路由键。须要将一个队列绑定到交换机上,要求该消息与一个特定的路由键彻底匹配。这是一个完整的匹配。若是一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。this

这里写图片描述

参考文章: http://blog.csdn.net/rainday0310/article/details/22082503 https://spring.io/guides/gs/messaging-rabbitmq/ 以及书籍《RabbitMQ实战 高效部署分布式消息队列》.net

相关文章
相关标签/搜索