###前言java
- RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
- 自己支持不少的协议:AMQP, XMPP, SMTP, STONP
- RabbitMQ服务器是用Erlang语言编写的,实现了代理(Broker)架构,意味着消息在发送到客户端以前能够在中央节点上排队。 此特性使得RabbitMQ易于使用和部署,适宜于不少场景如路由、负载均衡或消息持久化等
###主要特性服务器
- 消息集群(Clustering): RabbitMQ支持分布式部署,多个RabbitMQ服务器组成一个集群造成一个逻辑Broker
- 可靠性(Reliability): RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
- 高可用(Highly Available Queues): 队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍然可用。
###基本组成网络
Producer: 消息发送者 Message :消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成 RabbitMQ: - Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。默认"/" - Exchange: 路由器,接收消息,根据RoutingKey分发消息,有四种类型: - headers:消息头类型 路由器,内部应用 - direct:精准匹配类型 路由器 - topic:主题匹配类型 路由器,支持正则 模糊匹配 - fanout:广播类型 路由器,RoutingKey无效 - RoutingKey: 路由规则 - Queue: 队列,用于存储消息,保存消息直到发送给消费者。它是消息的容器,也是消息的终点(消息的目的地) - Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连 - Connection:网络链接,好比一个TCP链接。 - Channel:信道:创建和销毁 TCP 都是很是昂贵的开销,因此引入了信道的概念,以复用一条 TCP 链接。 Consumer: 消息消费者
###RabbitMQ 消息确认策略架构
Confrim: 简单说就是直接传送消息 client > mq, 接收到 mq > ack, mq 在异步的完成接下来的事情 发送线程不会当即获得MQ反馈结果,发送后经过callback确认成功失败 Transaction: client 请求开启事务 > 发送message > client 提交事务,整个过程是同步的,mq必须完成消息持久化、消息同步等 发送线程会当即获得MQ反馈结果,同一线程中,多个发送阻塞进行
###消息消费的两种模式负载均衡
1. 无需反馈: 消费者从消息队列获取消息后,服务端就认为该消息已经成功消费 2. 消息消费完给服务器返回确认状态,表示该消息已被消费 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
###持久化异步
RabbitMQ中消息的持久化须要保证Exchange、Queue、Message都进行持久化操做。 Exchange的持久化 @param exchange the name of the exchange @param type the exchange type @param durable true if we are declaring a durable exchange exchangeDeclare(String exchange, String type, boolean durable) Queue的持久化 @param queue the name of the queue @param durable true if we are declaring a durable queue (the queue will survive a server restart) @param exclusive true if we are declaring an exclusive queue (restricted to this connection) @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) @param arguments other properties (construction arguments) for the queue queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) autoDelete:代表该队列是否自动删除。 自动删除,若是该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。 Message的持久化 @param exchange the exchange to publish the message to @param routingKey the routing key @param props other properties for the message - routing headers etc @param body the message body basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) exchange:表示该消息发送到哪一个Exchange routingKey:表示该消息的Routing Key props:表示该消息的属性 BasicProperties.PERSISTENT_TEXT_PLAIN BasicProperties.PERSISTENT_BASIC body:消息实体
###存活时间 TTL分布式
Queue TTL(Queue上没有Consumer) Map<String, Object> queueArgs = new HashMap<>(); // 设置1分钟过时 queueArgs.put("x-expires", 60000); channel.queueDeclare("queue", false, false, false, queueArgs); Message TTL(消息在队列中的存活时间,超过该时间消息将被删除或者不能传递给消费者) // 设置消息属性-TTL为30s BasicProperties properties = new BasicProperties.Builder().expiration("30000").build(); channel.basicPublish("exchange", "kanyuxia", properties,"hello".getBytes(StandardCharsets.UTF_8));