深刻浅出 RabbitMQ

什么是 RabbitMQ

简介(优势)

  • 基于 ErLang 语言开发有高可用高并发的优势,适合集群。
  • 开源、稳定、易用、跨平台、支持多种语言、文档齐全。
  • 有消息确认机制和持久化机制,可靠性高。

概念

生产者和消费者

  • Producer:消息的生产者
  • Consumer:消息的消费者

Queue

  • 消息队列提供了 FIFO 的处理机制,具备缓存消息的能力。在 RabbitMQ 中,队列消息能够设置为持久化,临时或者自动删除。
  • 若是是持久化的队列,Queue 中的消息会在 Server 本地硬盘存储一份,防止系统 Crash 数据丢失。
  • 若是是临时的队列,Queue 中的数据在系统重启以后就会丢失。
  • 如实是自动删除的队列,当不存在用户链接到 Server,队列中的数据会被自动删除。

ExChange

ExChange 相似于数据通讯网络中的交换机,提供消息路由策略。java

RabbitMQ 中,生产者不是将消息直接发送给 Queue,而是先发送给 ExChangeExChange 根据生产者传递的 key 按照特定的路由算法将消息给指定的 Queue。一个 ExChange 能够绑定多个 Queue。和 Queue 同样,ExChange 也能够设置为持久化、临时或者自动删除。算法

Binding

所谓绑定就是将一个特定的 ExChange 和一个特定的 Queue 绑定起来。ExChangeQueue 的绑定能够是多对多的关系。spring

Virtual Host

RabbitMQ Server上能够建立多个虚拟的 Message Broker(又叫作 Virtual Hosts)。每个 vhost 本质上是一个迷你的 RabbitMQ Server,分别管理各自的 ExChangebinding。生产者和消费者链接 RabbitMQ Server 须要指定一个 Virtual Hostdocker

使用过程

  1. 客户端链接到消息队列服务器,打开一个 Channel
  2. 客户端声明一个 ExChange,并设置相关属性。
  3. 客户端声明一个 Queue,并设置相关属性。
  4. 客户端使用 Routing Key,在 ExChangeQueue 之间创建好绑定关系。
  5. 客户端投递消息到 ExChange
  6. ExChange 接收到消息后,就根据消息的 key 和已经设置的 bingding,进行消息路由,将消息投递到一个或多个队列里。

部署 RabbitMQ

使用 Docker Compose 部署

建立 docker-compose.yml缓存

version: '3.1'
services:
  rabbitmq:
    restart: always
    image: rabbitmq:management
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      TZ: Asia/Shanghai
      RABBITMQ_DEFAULT_USER: rabbit
      RABBITMQ_DEFAULT_PASS: 123456
    volumes:
      - ./data:/var/lib/rabbitmq

RabbitMQ WebUI 界面

  • 访问地址:http://{ip}:15672服务器

  • 首页网络

    深刻浅出 RabbitMQ

  • Global counts并发

    深刻浅出 RabbitMQ

  • 交换机页app

    深刻浅出 RabbitMQ

  • 队列页ide

    深刻浅出 RabbitMQ

    • Name:消息队列的名称,这里是经过程序建立的

    • Features:消息队列的类型,durable:true 为会持久化消息

    • Ready:准备好的消息

    • Unacked:未确认的消息

    • Total:所有消息

    若是都为 0 则说明所有消息处理完成

使用 RabbitMQ

建立生产者

建立一个名为 spring-boot-amqp-provider 的生产者项目。

相关配置

  • 建立 application.yml 文件

    spring:
    application:
      name: spring-boot-amqp
    rabbitmq:
      host: 192.168.75.133
      port: 5672
      username: rabbit
      password: 123456
  • 建立队列

    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
    * 队列配置
    */
    @Configuration
    public class RabbitMQConfiguration {
    
      @Bean
      public Queue queue() {
          return new Queue("helloRabbitMQ");
      }
    }
  • 建立消息提供者

    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
    * 消息提供者
    */
    @Component
    public class RabbitMQProvider {
    
      @Autowired
      private AmqpTemplate amqpTemplate;
    
      public void send() {
          String context = "hello" + new Date();
          System.out.println("Provider: " + context);
          amqpTemplate.convertAndSend("helloRabbitMQ", context);
      }
    }

发送消息

建立测试用例

import com.lusifer.spring.boot.amqp.Application;
import com.lusifer.spring.boot.amqp.provider.HelloRabbitProvider;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class AmqpTest {

    @Autowired
    private HelloRabbitProvider helloRabbitProvider;

    @Test
    public void testSender() {
        for (int i = 0; i < 10; i++) {
            RabbitMQProvider.send();
        }
    }
}

建立消费者

建立一个名为 spring-boot-amqp-consumer 的消费者项目。

相关配置

建立 application.yml 文件

spring:
  application:
    name: spring-boot-amqp-consumer
  rabbitmq:
    host: 192.168.75.133
    port: 5672
    username: rabbit
    password: 123456

接收消息

建立消息的消费监听组件

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "helloRabbitMQ")
public class HelloRabbitConsumer {

    @RabbitHandler
    public void process(String message) {
        System.out.println("Consumer: " + message);
    }
}
相关文章
相关标签/搜索