rabbitmq的基本使用

如今微服务盛行, 咱们一般会进行解耦, 这时候就须要异步的消息队列来帮助各个服务之间解耦spring

rabbitmq的基本概念介绍

rabbitmq的基本概念有消息producer(消息生产者)、exchange(交换机)、queue(队列)、consumer(消费者)、routingKeysegmentfault

clipboard.png
(图中的P是producer, 即消息生产者, 中简的Server是Exchange(交换机) 和 Queue(队列))浏览器

  • Queue(队列)

queue是存放消息的队列, 实际上就是一个存放消息数据结构为队列的一个容器数据结构

  • exchange(交换机)

咱们可能会简单的觉得发送者会把消息发送到队列中, 而后消费者对队列进行监听。事实上, 消息发送者永远不会将消息直接发送到队列中, 而是将消息发送到exhang中, 再由exchange经过必定的路由规则路由到对应的消息队列中。
交换机有四种类型:异步

Direct
Topic
Headers
Fanout

Direct Exchange:

clipboard.png

  • routingKey

在上面介绍exchange中说到消息经过必定的路由规则路由到对应的队列中, routingKey就是起着这样的一个做用,一般咱们发送消息到exchane中的时候会携带一个routingKey, 而这个routingKey就是exchange和queue绑定的一个规则, 由此即可以将消息从exchange再发送到对应的queue上spring-boot

参考文章https://segmentfault.com/a/11...微服务

SpringBoot中使用rabbitmq

首先添加如下依赖:spa

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置项以下:3d

spring:
  rabbitmq:
    port: 5672
    password: guest
    username: guest
    host: localhost
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 1
        retry:
          enabled: true

在浏览器输入http://localhost:15672/, 在mq上咱们新建了一个名为exchange1, routingKey为exhcange1-queue1的exchange, 而且映射到名为queue1的队列, code

图片描述

发送消息代码Prodcuer:

public class Sender{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object object) {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("exchange1-queue1-id");
        String message = "hello world";
        rabbitTemplate.convertAndSend("exchange1", "exchange1-queue1", "helloworld", new CorrelationData());
    }
}

在上面的代码中咱们发送了一个消息到名为"exchange1", routingKey为"exchange1-queue1"的消息。咱们启动rabbitmq。
发送后能够在mq上看到以下图已经成功发送了。

图片描述

接下来贴上接受消息的receiver代码:

@Component
public class Receiver {

    @RabbitListener(queues = "queue1")
    public void receive(Message message, Channel channel) {
        try {
            System.out.println(message.getBody());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

接受消息后再看mq上以下图:
图片描述

能够看出queue1上Ready一栏是0,可是Unacked一栏和Total一栏依然有消息, 这是由于咱们再配置文件中设置的是手动的ack,这时候代码中没有进行ack, mq会认为消费者没有成功消费掉这条消息, 这时候就须要在配置文件中设置成自动ack, 或者在代码中手动进行ack,在消费者后添加以下代码:

channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
相关文章
相关标签/搜索