RabbitMQ是一个消息代理和队列服务器,用来在不同应用之间共享数据,是Erlang语言开发的,基于AMQP协议。
是一个二进制协议。
1. Server:Broker,接受客户端连接 2. Connection:连接,应用程序与Broker的网络连接 3. Channel:网络信道,Channel是消息读写的通道 4. Message:消息,传递的数据,有properties何body组成,properties是消息的属性(可以设置顺序ID),body是消息内容 5. Virtual-Host:虚拟地址,用于"逻辑隔离",最上层的"消息路由",一个Virtual-Host中有多个Exchange和Queue,但是不能有同名的 6. Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列 7. Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing-key 8. Routing-key:一个路由规则,虚拟机可用他来确定如何路由一个特定消息 9. Queue:消息队列,保存消息并将它们转发给消费者
1. rabbitMQ版本要与erlang版本对应起来 2. rabbitMQ-rpm和erlang-rpm可以去官网下载,tcp_wrappers、socat可以去https://pkgs.org下载
1. rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm 2. rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm 3. rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm 如果上一步提示缺少socat 4. rpm -ivh tcp_wrappers-7.6-77.el7.x86_64.rpm 如果上一步提示缺少tcp_wrappers 5. rabbitmq-server start & 6. rabbitmqctl stop_app 7. rabbitmq-plugins enable rabbitmq_managment
1. rabbitmqctl stop_app 2. rabbitmqctl start_app 3. rabbitmqctl status 节点状态 4. rabbitmqctl list_users 列出所有用户 5. rabbitmqctl list_user_permissions username 列出用户权限 6. rabbitmqctl change_password username newpwd 修改用户密码 7. rabbitmqctl list_vhosts 列出所有虚拟主机 8. rabbitmqctl list_permissions -p vhostpath 列出该虚拟主机的所有权限 9. rabbitmqctl list_queues 列出所有队列 10. rabbitmqctl reset 移除所有数据 11. rabbitmqctl join_cluster <cluster-node> [--ram] 组成集群命令 12. rabbitmqctl cluster_status 查看集群状态
1. name:交换机名称 2. type:交换机类型(direct、topic、fanout、headers) 3. durability:是否需要持久化,true为持久化 4. auto-delete:Exchange上的最后一个Queue被删除后,自动删除该Exchange 5. arguments:自定义参数
发送到DirectExchange的消息,会被转发到RouteKey中指定的Queue。 Direct可以使用Default-Exchange,不需要进行任何的binding操作,消息传递时,RouteKey必须完全匹配。
发送到TopicExchange的消息,会被转发到,匹配RouteKey中指定的Queue。 #:匹配多个词 *:匹配一个词
不处理路由键,只要将队列绑定到交换机上; 发送到交换机上的消息,都会被转发到,与该交换机绑定的所有队列上; FanoutExchange转发消息是最快的;
Exchange<-->Exchange,Exchange<-->Queue,他们之间的绑定关系 Binding中可以包含RouteKey或者参数
实际存储消息数据 Durability:是否持久化,Durable:是,Transient:否 Auto-Delete:如果yes,则最后一个监听被移除后,该Queue也会自动被删除
应该程序和服务器之间传递的数据,由Properties(可以设置顺序ID)和Body组成 常用属性:delivery_mode、headers(自定义属性)、correlation_id:唯一id、expiration:过期时间
虚拟地址,用于逻辑隔离,最上层的消息路由 一个Virtual-Host可以有若干个Exchange和Queue,但是同一个Virtual-Host中不能有同名的Exchange和Queue
1. 消息落库,对消息状态进行打标
2. 消息延迟投递,做二次确认,回调检查
1. 定义
幂等性 就是防止高并发的情况下,执行结果都是唯一的。 消费端实现幂等性,就是消息永远被消费一次。
2. 解决方案
1. 唯一ID+指纹码,利用数据库主键去重 SELECT COUNT(1) FROM T_ORDER WHERE 唯一ID + 指纹码 COUNT(1) == 0,则INSERT; 好处:简单 坏处:高并发下有数据库写入的性能瓶颈 解决:根据ID进行分库分表,进行算法路由 2. 利用redis的原子性实现 setnx key value、exists key、redis的自增 问题: 数据是否需要落库,落库的话,缓存和数据库如何保证原子性? 数据不落库,如何设置定时同步策略?
1. 在channel中开启确认模式:channel.confirmSelect(); 2. 在channel中添加监听:addConfirmListener(); 3. 发生Nack的情况:磁盘写满、Queue达到上线、MQ其他异常 4. ack和Nack都收不到的情况:就要定时任务去处理
如果发送的消息,Exchange不存在或者RouteKey路由不到,这时就需要returnListener。 Mandatory:true-监听器接受到这些不可达的消息,false-broker会自动删除这些消息。 消费端自定义监听:继承DefaultConsumer
生产端不会限流,只有消费端限流;当机器突然有上万条消息,不做限流,可能会导致消费端服务器崩溃。 RabbitMQ提供了qos功能:非自动签收消息的情况下,一定数量消息未被确认前(通过consumer或channel设置qos值),不进行消费新的消息 void BasicQos(uint prefetchSize = 0 不限制消息大小, ushort prefetchCount = 1 一次处理1条,手动ack后,在处理另一条, bool global = false 这个限制是channel级别还是consumer级别); consumer-->handleDelivery-->channel.basicAck(envelope.getDeliveryTag(), false); consumer-->handleDelivery-->channel.basicNack(envelope.getDeliveryTag(), false, true-->重发);
Time To Live 生存时间 支持消息的过期时间和队列的过期时间
当消息变成死信(没有被消费者消费掉)的时候,他将被重新发送到另一个Exchange,这个Exchange就是死信队列 消息变成死信的情况: 1. 消息被拒绝(basic.reject/basic.nack)并且requeue=false 2. TTL过期 3. 队列打到最大长度 在队列上添加:arguments.put("s-dead-letter-exchange", "dlx.exchange");
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
1. 公共配置 spring: rabbitmq: addresses: 192.168.11.76:5672 username: guest password: guest virtual-host: / connection-timeout: 15000 2. 生产端配置 publisher-confirms: true publisher-returns: true template: mandatory: true # 保证监听有效 3. 消费端配置 listener: simple: acknowledge-mode: manual concurrency: 5 max-concurrency: 10 order: key: springboot.* queue: name: queue-1 durable: true exchange: name: exchange-1 durable: true type: topic ignoreDeclarationExceptions: true
@Component public class RabbitSender { @Autowired private RabbitTemplate rabbitTemplate; // 回调函数: confirm确认 final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.err.println("correlationData: " + correlationData); System.err.println("ack: " + ack); if(!ack){ System.err.println("异常处理..."); } } }; // 回调函数: return返回 final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { System.err.println("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); } }; // 发送消息方法调用: 构建Message消息 public void send(Object message, Map<String, Object> properties) throws Exception { MessageHeaders mhs = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); CorrelationData correlationData = new CorrelationData("1234567890"); // id + 时间戳 全局唯一 rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData); } // 发送消息方法调用: 构建自定义对象消息 public void sendOrder(Order order) throws Exception { rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); CorrelationData correlationData = new CorrelationData("0987654321"); //id + 时间戳 全局唯一 rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData); } }
@Component public class RabbitReceiver { @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue-1", durable="true"), exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*")) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { System.err.println("消费端Payload: " + message.getPayload()); Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); // 手动ACK channel.basicAck(deliveryTag, false); } @Value("spring.rabbitmq.listener.order.key") private String orderKey; @Value("spring.rabbitmq.listener.order.queue.name") private String orderQueueName; @Value("spring.rabbitmq.listener.order.queue.durable") private String orderQueueDurable; @Value("spring.rabbitmq.listener.order.exchange.name") private String orderExchangeName; @Value("spring.rabbitmq.listener.order.exchange.durable") private String orderExchangeDurable; @Value("spring.rabbitmq.listener.order.exchange.type") private String orderExchangeType; @Value("spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions") private String orderExchangeIgnoreDeclarationExceptions; @RabbitListener(bindings = @QueueBinding(value = @Queue(value = orderQueueName, durable = orderQueueDurable), exchange = @Exchange(value = orderExchangeName, durable = orderExchangeDurable, type = orderExchangeType, ignoreDeclarationExceptions = orderExchangeIgnoreDeclarationExceptions), key = orderKey)) @RabbitHandler public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, Channel channel, @Headers Map<String, Object> headers) throws Exception { System.err.println("消费端order: " + order.getId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); // 手动ACK channel.basicAck(deliveryTag, false); } }
Barista接口:用来定义通道的类型和名称,通道名称作为配置用,通道类型作为该通道是发送消息还是接受消息 @output:输出注解 @input:输入注解 @StreamListener:监听消息注解
7.3.1 pom依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>1.3.4.RELEASE</version> </dependency>
7.3.2 producer-application.yml
spring: cloud: stream: bindings: output_channel: destination: exchange-3 group: queue-3 binder: rabbit-cluster binders: rabbit-cluster: type: rabbit environment: spring: rabbitmq: addresses: 192.168.11.76:5672 username: guest password: guest virtual-host: /
7.3.3 定义通道
public interface Barista { String OUTPUT_CHANNEL = "output_channel"; // @Output声明了它是一个输出类型的通道,名字是output_channel。 @Output(Barista.OUTPUT_CHANNEL) MessageChannel logoutput(); }
7.3.4 发送消息
@EnableBinding(Barista.class) @Service public class RabbitmqSender { @Autowired private Barista barista; // 发送消息 public String sendMessage(Object message, Map<String, Object> properties) throws Exception { try{ MessageHeaders mhs = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(message, mhs); boolean sendStatus = barista.logoutput().send(msg); System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus); }catch (Exception e){ e.printStackTrace(); } return null; } }
7.3.5 consumer-application.yml
spring: cloud: stream: bindings: input_channel: destination: exchange-3 group: queue-3 binder: rabbit-cluster consumer: concurrency: 1 rabbit: bindings: input_channel: consumer: requeue-rejected: false # 是否支持重发 acknowledge-mode: MANUAL # 手动签收 recovery-interval: 3000 # 3s重连 durable-subscription: true # 是否启用持久化订阅 max-concurrency: 5 # 最大监听数 binders: rabbit-cluster: type: rabbit environment: spring: rabbitmq: addresses: 192.168.11.76:5672 username: guest password: guest virtual-host: /
7.3.6 定义通道
public interface Barista { String INPUT_CHANNEL = "input_channel"; // @Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL。 @Input(Barista.INPUT_CHANNEL) SubscribableChannel loginput(); }
7.3.7 消费消息
@EnableBinding(Barista.class) @Service public class RabbitmqReceiver { @StreamListener(Barista.INPUT_CHANNEL) public void receiver(Message message) throws Exception { Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL); Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); System.out.println("Input Stream 1 接受数据:" + message); channel.basicAck(deliveryTag, false); } }
1. 主备模式:实现高可用集群,一般在并发和数据量不高的情况下使用,也称Warren模式。 2. 远程模式:实现双活的模式,也称Shovel模式,消息进行不同数据中心的复制工作,可以跨地域的两个MQ集群互联。 3. 镜像模式:也称Mirror模式,保证100%数据不丢失,简单、用的多。 镜像队列:保证数据高可靠性方案,主要是实现数据同步
8.2.1 镜像模式
8.2.2 多活模式
业务:解决业务遇到的扩展性和容灾等需求,支撑业务的高速发展 通用性:架构形成统一解决方案,岸边各业务线接入使用