【RabbitMQ】Spring-Boot 整合 使用教程 可靠性投递 顺序消费 高可用方案 事务消息 幂等性 SET化架构 等等

目录

1-简介

1.1 RabbitMQ定义

RabbitMQ是一个消息代理和队列服务器,用来在不同应用之间共享数据,是Erlang语言开发的,基于AMQP协议。

1.2 AMQP定义

是一个二进制协议。

1.3 AMQP协议模型

AMQP协议模型

1.4 核心概念

1. Server:Broker,接受客户端连接
2. Connection:连接,应用程序与Broker的网络连接
3. Channel:网络信道,Channel是消息读写的通道
4. Message:消息,传递的数据,有properties何body组成,properties是消息的属性(可以设置顺序ID),body是消息内容
5. Virtual-Host:虚拟地址,用于"逻辑隔离",最上层的"消息路由",一个Virtual-Host中有多个Exchange和Queue,但是不能有同名的
6. Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列
7. Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing-key
8. Routing-key:一个路由规则,虚拟机可用他来确定如何路由一个特定消息
9. Queue:消息队列,保存消息并将它们转发给消费者

1.5 整体架构图

整体架构图

1.6 消息流转图

消息流转图

1.7 交换机图

交换机图

2-安装与配置

2.1 准备

1. rabbitMQ版本要与erlang版本对应起来
2. rabbitMQ-rpm和erlang-rpm可以去官网下载,tcp_wrappers、socat可以去https://pkgs.org下载

2.2 安装+启动

1. rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
2. rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
3. rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm    如果上一步提示缺少socat
4. rpm -ivh tcp_wrappers-7.6-77.el7.x86_64.rpm  如果上一步提示缺少tcp_wrappers
5. rabbitmq-server start &
6. rabbitmqctl stop_app
7. rabbitmq-plugins enable rabbitmq_managment

2.3 常用命令

1. rabbitmqctl stop_app
2. rabbitmqctl start_app
3. rabbitmqctl status 节点状态
4. rabbitmqctl list_users 列出所有用户
5. rabbitmqctl list_user_permissions username 列出用户权限
6. rabbitmqctl change_password username newpwd 修改用户密码
7. rabbitmqctl list_vhosts 列出所有虚拟主机
8. rabbitmqctl list_permissions -p vhostpath 列出该虚拟主机的所有权限
9. rabbitmqctl list_queues 列出所有队列
10. rabbitmqctl reset 移除所有数据
11. rabbitmqctl join_cluster <cluster-node> [--ram] 组成集群命令
12. rabbitmqctl cluster_status 查看集群状态

3-Exchange

3.1 交换机属性

1. name:交换机名称
2. type:交换机类型(direct、topic、fanout、headers)
3. durability:是否需要持久化,true为持久化
4. auto-delete:Exchange上的最后一个Queue被删除后,自动删除该Exchange
5. arguments:自定义参数

3.2 DirectExchange-直连

发送到DirectExchange的消息,会被转发到RouteKey中指定的Queue。
Direct可以使用Default-Exchange,不需要进行任何的binding操作,消息传递时,RouteKey必须完全匹配。

DirectExchange

3.3 TopicExchange-匹配

发送到TopicExchange的消息,会被转发到,匹配RouteKey中指定的Queue。
#:匹配多个词
*:匹配一个词

TopicExchange

3.4 FanoutExchange

不处理路由键,只要将队列绑定到交换机上;
发送到交换机上的消息,都会被转发到,与该交换机绑定的所有队列上;
FanoutExchange转发消息是最快的;

FanoutExchange

4-Binding+Queue+Message+Virtual

4.1 Binding-绑定

Exchange<-->Exchange,Exchange<-->Queue,他们之间的绑定关系
Binding中可以包含RouteKey或者参数

4.2 Queue-消息队列

实际存储消息数据
Durability:是否持久化,Durable:是,Transient:否
Auto-Delete:如果yes,则最后一个监听被移除后,该Queue也会自动被删除

4.3 Message-消息

应该程序和服务器之间传递的数据,由Properties(可以设置顺序ID)和Body组成
常用属性:delivery_mode、headers(自定义属性)、correlation_id:唯一id、expiration:过期时间

4.4 Virtual-Host-虚拟主机

虚拟地址,用于逻辑隔离,最上层的消息路由
一个Virtual-Host可以有若干个Exchange和Queue,但是同一个Virtual-Host中不能有同名的Exchange和Queue

5-高级特性

5.1 消息如何保证100%的投递成功

1. 消息落库,对消息状态进行打标

2. 消息延迟投递,做二次确认,回调检查

5.2 幂等性

1. 定义

幂等性 就是防止高并发的情况下,执行结果都是唯一的。
消费端实现幂等性,就是消息永远被消费一次。

2. 解决方案

1. 唯一ID+指纹码,利用数据库主键去重
    SELECT COUNT(1) FROM T_ORDER WHERE 唯一ID + 指纹码
    COUNT(1) == 0,则INSERT;
    好处:简单
    坏处:高并发下有数据库写入的性能瓶颈
    解决:根据ID进行分库分表,进行算法路由
    
2. 利用redis的原子性实现
    setnx key value、exists key、redis的自增
    问题:
        数据是否需要落库,落库的话,缓存和数据库如何保证原子性?
        数据不落库,如何设置定时同步策略?

5.3 confirm确认消息

1. 在channel中开启确认模式:channel.confirmSelect();
2. 在channel中添加监听:addConfirmListener();
3. 发生Nack的情况:磁盘写满、Queue达到上线、MQ其他异常
4. ack和Nack都收不到的情况:就要定时任务去处理

5.4 return消息机制

如果发送的消息,Exchange不存在或者RouteKey路由不到,这时就需要returnListener。
Mandatory:true-监听器接受到这些不可达的消息,false-broker会自动删除这些消息。
消费端自定义监听:继承DefaultConsumer

5.5 消费端限流

生产端不会限流,只有消费端限流;当机器突然有上万条消息,不做限流,可能会导致消费端服务器崩溃。
RabbitMQ提供了qos功能:非自动签收消息的情况下,一定数量消息未被确认前(通过consumer或channel设置qos值),不进行消费新的消息
void BasicQos(uint prefetchSize = 0 不限制消息大小, 
              ushort prefetchCount = 1 一次处理1条,手动ack后,在处理另一条, 
              bool global = false 这个限制是channel级别还是consumer级别);
consumer-->handleDelivery-->channel.basicAck(envelope.getDeliveryTag(), false);
consumer-->handleDelivery-->channel.basicNack(envelope.getDeliveryTag(), false, true-->重发);

5.6 TTL队列/消息

Time To Live 生存时间
支持消息的过期时间和队列的过期时间

5.7 DLX-死信队列

当消息变成死信(没有被消费者消费掉)的时候,他将被重新发送到另一个Exchange,这个Exchange就是死信队列
消息变成死信的情况:
    1. 消息被拒绝(basic.reject/basic.nack)并且requeue=false
    2. TTL过期
    3. 队列打到最大长度
在队列上添加:arguments.put("s-dead-letter-exchange", "dlx.exchange");

6-Spring-Boot-Demo

6.1 pom依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

6.2 application.yml

1. 公共配置
spring:
  rabbitmq:
    addresses: 192.168.11.76:5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 15000

2. 生产端配置
    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true # 保证监听有效

3. 消费端配置
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 5
        max-concurrency: 10
      order:
        key: springboot.*
        queue:
          name: queue-1
          durable: true
        exchange:
          name: exchange-1
          durable: true
          type: topic
          ignoreDeclarationExceptions: true 

6.3 生产端

@Component
public class RabbitSender {

	@Autowired
	private RabbitTemplate rabbitTemplate;  
	
	// 回调函数: confirm确认
	final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
		@Override
		public void confirm(CorrelationData correlationData, boolean ack, String cause) {
			System.err.println("correlationData: " + correlationData);
			System.err.println("ack: " + ack);
			if(!ack){
				System.err.println("异常处理...");
			}
		}
	};
	
	// 回调函数: return返回
	final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
		@Override
		public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
				String exchange, String routingKey) {
			System.err.println("return exchange: " + exchange + ", routingKey: " 
				+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
		}
	};
	
	// 发送消息方法调用: 构建Message消息
	public void send(Object message, Map<String, Object> properties) throws Exception {
		MessageHeaders mhs = new MessageHeaders(properties);
		Message msg = MessageBuilder.createMessage(message, mhs);
		rabbitTemplate.setConfirmCallback(confirmCallback);
		rabbitTemplate.setReturnCallback(returnCallback);
		CorrelationData correlationData = new CorrelationData("1234567890"); // id + 时间戳 全局唯一 
		rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
	}
	
	// 发送消息方法调用: 构建自定义对象消息
	public void sendOrder(Order order) throws Exception {
		rabbitTemplate.setConfirmCallback(confirmCallback);
		rabbitTemplate.setReturnCallback(returnCallback);
		CorrelationData correlationData = new CorrelationData("0987654321"); //id + 时间戳 全局唯一 
		rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
	}
}

6.4 消费端

@Component
public class RabbitReceiver {

	@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue-1", durable="true"),
			exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*"))
	@RabbitHandler
	public void onMessage(Message message, Channel channel) throws Exception {
		System.err.println("消费端Payload: " + message.getPayload());
		Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
		// 手动ACK
		channel.basicAck(deliveryTag, false);
	}
	
	@Value("spring.rabbitmq.listener.order.key")
	private String orderKey;
	
	@Value("spring.rabbitmq.listener.order.queue.name")
	private String orderQueueName;
	
	@Value("spring.rabbitmq.listener.order.queue.durable")
	private String orderQueueDurable;
	
	@Value("spring.rabbitmq.listener.order.exchange.name")
	private String orderExchangeName;
	
	@Value("spring.rabbitmq.listener.order.exchange.durable")
	private String orderExchangeDurable;
	
	@Value("spring.rabbitmq.listener.order.exchange.type")
	private String orderExchangeType;
	
	@Value("spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions")
	private String orderExchangeIgnoreDeclarationExceptions;
	
	@RabbitListener(bindings = @QueueBinding(value = @Queue(value = orderQueueName, durable = orderQueueDurable),
			exchange = @Exchange(value = orderExchangeName, durable = orderExchangeDurable, type = orderExchangeType, ignoreDeclarationExceptions = orderExchangeIgnoreDeclarationExceptions),
			key = orderKey))
	@RabbitHandler
	public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, 
			Channel channel, 
			@Headers Map<String, Object> headers) throws Exception {
		System.err.println("消费端order: " + order.getId());
		Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
		// 手动ACK
		channel.basicAck(deliveryTag, false);
	}
}

7-Spring-Cloud-Stream

7.1 架构图



7.2 概念

Barista接口:用来定义通道的类型和名称,通道名称作为配置用,通道类型作为该通道是发送消息还是接受消息
@output:输出注解
@input:输入注解
@StreamListener:监听消息注解

7.3 Demo

7.3.1 pom依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    <version>1.3.4.RELEASE</version>
</dependency>

7.3.2 producer-application.yml

spring:
  cloud:
    stream:
      bindings:
        output_channel:
          destination: exchange-3
          group: queue-3
          binder: rabbit-cluster
      binders: 
        rabbit-cluster: 
          type: rabbit
          environment: 
            spring:
              rabbitmq:
                addresses: 192.168.11.76:5672
                username: guest
                password: guest
                virtual-host: /

7.3.3 定义通道

public interface Barista {

    String OUTPUT_CHANNEL = "output_channel";  
   
    // @Output声明了它是一个输出类型的通道,名字是output_channel。 
    @Output(Barista.OUTPUT_CHANNEL)
    MessageChannel logoutput();  
}

7.3.4 发送消息

@EnableBinding(Barista.class)
@Service  
public class RabbitmqSender {  
  
    @Autowired  
    private Barista barista;  
    
    // 发送消息
    public String sendMessage(Object message, Map<String, Object> properties) throws Exception {  
        try{
        	MessageHeaders mhs = new MessageHeaders(properties);
        	Message msg = MessageBuilder.createMessage(message, mhs);
            boolean sendStatus = barista.logoutput().send(msg);
            System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
        }catch (Exception e){  
        	e.printStackTrace();
        }  
        return null;
    }  
}

7.3.5 consumer-application.yml

spring:
  cloud:
    stream:
      bindings:
        input_channel:
          destination: exchange-3
          group: queue-3
          binder: rabbit-cluster
          consumer:
            concurrency: 1

      rabbit:
        bindings:
          input_channel:
            consumer:
              requeue-rejected: false # 是否支持重发
              acknowledge-mode: MANUAL # 手动签收
              recovery-interval: 3000 # 3s重连
              durable-subscription: true # 是否启用持久化订阅
              max-concurrency: 5 # 最大监听数
              
      binders:
        rabbit-cluster:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: 192.168.11.76:5672
                username: guest
                password: guest
                virtual-host: /

7.3.6 定义通道

public interface Barista {
	  
    String INPUT_CHANNEL = "input_channel";  

    // @Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL。
    @Input(Barista.INPUT_CHANNEL)  
    SubscribableChannel loginput();
}

7.3.7 消费消息

@EnableBinding(Barista.class)
@Service
public class RabbitmqReceiver {  

    @StreamListener(Barista.INPUT_CHANNEL)  
    public void receiver(Message message) throws Exception {  
		Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
		Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    	System.out.println("Input Stream 1 接受数据:" + message);
    	channel.basicAck(deliveryTag, false);
    }  
}

8-RabbitMQ集群架构模式

8.1 简介

1. 主备模式:实现高可用集群,一般在并发和数据量不高的情况下使用,也称Warren模式。
2. 远程模式:实现双活的模式,也称Shovel模式,消息进行不同数据中心的复制工作,可以跨地域的两个MQ集群互联。
3. 镜像模式:也称Mirror模式,保证100%数据不丢失,简单、用的多。
            镜像队列:保证数据高可靠性方案,主要是实现数据同步

8.2 架构模式图

8.2.1 镜像模式

8.2.2 多活模式


9-架构设计

9.1 SET化架构

业务:解决业务遇到的扩展性和容灾等需求,支撑业务的高速发展
通用性:架构形成统一解决方案,岸边各业务线接入使用



9.2 集群架构图

集群架构图

9.3 RabbitMQ-架构设计方案

RabbitMQ-架构设计方案

9.4 批量消息发送

9.5 顺序消息

9.6 事务消息发送

9.7 消息幂等性设计