MQ - RabbitMQ - 架构及工做原理

参考网址:

RabbitMQ的应用场景以及基本原理介绍
javascript

RabbitMQ使用详解html

RabbitMQ的Java应用(1) -- Rabbit Java Client使用java

1. 系统架构

几个概念说明: 
Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, 
Exchange:消息交换机,它指定消息按什么规则,路由到哪一个队列。 
Queue:消息的载体,每一个消息都会被投到一个或多个队列。 
Binding:绑定,它的做用就是把exchange和queue按照路由规则绑定起来. 
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 
vhost:虚拟主机,一个broker里能够有多个vhost,用做不一样用户的权限分离。 
Producer:消息生产者,就是投递消息的程序. 
Consumer:消息消费者,就是接受消息的程序. 
算法

Channel:消息通道,在客户端的每一个链接里,可创建多个channel.编程

从示意图能够看出消息生产者并无直接将消息发送给消息队列,而是经过创建与Exchange的Channel,将消息发送给Exchange,Exchange根据规则,将消息转发给指定的消息队列。消费者经过创建与消息队列相连的Channel,从消息队列中获取消息。安全

这里谈到的Channel能够理解为创建在生产者/消费者和RabbitMQ服务器之间的TCP链接上的虚拟链接,一个TCP链接上能够创建多个Channel。 RabbitMQ服务器的Exchange对象能够理解为生产者发送消息的邮局,消息队列能够理解为消费者的邮箱。Exchange对象根据它定义的规则和消息包含的routing key以及header信息将消息转发到消息队列。服务器

根据转发消息的规则不一样,RabbitMQ服务器中使用的Exchange对象有四种,Direct Exchange, Fanout Exchange, Topic Exchange, Header Exchange,若是定义Exchange时没有指定类型和名称, RabbitMQ将会为每一个消息队列设定一个Default Exchange,它的Routing Key是消息队列名称。架构

2.任务分发机制

2.1Round-robin dispathching循环分发

RabbbitMQ的分发机制很是适合扩展,并且它是专门为并发程序设计的,若是如今load加剧,那么只须要建立更多的Consumer来进行任务处理。并发

2.2Message acknowledgment消息确认

为了保证数据不被丢失,RabbitMQ支持消息确认机制,为了保证数据能被正确处理而不只仅是被Consumer收到,那么咱们不能采用no-ack,而应该是在处理完数据以后发送ack. 
在处理完数据以后发送ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ能够安全的删除它了. 
若是Consumer退出了可是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer,这样就保证在Consumer异常退出状况下数据也不会丢失. 
RabbitMQ它没有用到超时机制.RabbitMQ仅仅经过Consumer的链接中断来确认该Message并无正确处理,也就是说RabbitMQ给了Consumer足够长的时间作数据处理。 
若是忘记ack,那么当Consumer退出时,Mesage会从新分发,而后RabbitMQ会占用愈来愈多的内存.app

3.Message durability消息持久化

要持久化队列queue的持久化须要在声明时指定durable=True; 
这里要注意,队列的名字必定要是Broker中不存在的,否则不能改变此队列的任何属性. 
队列和交换机有一个建立时候指定的标志durable,durable的惟一含义就是具备这个标志的队列和交换机会在重启以后从新创建,它不表示说在队列中的消息会在重启后恢复 
消息持久化包括3部分 
1. exchange持久化,在声明时指定durable => true

hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
  • 1

2.queue持久化,在声明时指定durable => true

channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
  • 1

3.消息持久化,在投递时指定delivery_mode => 2(1是非持久化).

channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); 
  • 1

若是exchange和queue都是持久化的,那么它们之间的binding也是持久化的,若是exchange和queue二者之间有一个持久化,一个非持久化,则不容许创建绑定. 
注意:一旦建立了队列和交换机,就不能修改其标志了,例如,建立了一个non-durable的队列,而后想把它改变成durable的,惟一的办法就是删除这个队列而后重现建立。

4.Fair dispath 公平分发

你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。n是取余后的,它无论Consumer是否还有unacked Message,只是按照这个默认的机制进行分发. 
那么若是有个Consumer工做比较重,那么就会致使有的Consumer基本没事可作,有的Consumer却毫无休息的机会,那么,Rabbit是如何处理这种问题呢? 
这里写图片描述 
经过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每一个Consumer在同一个时间点最多处理一个Message,换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它

channel.basic_qos(prefetch_count=1) 
  • 1

注意,这种方法可能会致使queue满。固然,这种状况下你可能须要添加更多的Consumer,或者建立更多的virtualHost来细化你的设计。

5.分发到多个Consumer

5.1Exchange

先来温习如下交换机路由的几种类型: 
Direct Exchange:直接匹配,经过Exchange名称+RountingKey来发送与接收消息. 
Fanout Exchange:广播订阅,向全部的消费者发布消息,可是只有消费者将队列绑定到该路由器才能收到消息,忽略Routing Key. 
Topic Exchange:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey能够采用通配符,如:*或#,RoutingKey命名采用.来分隔多个词,只有消息这将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息; 
Headers Exchange:消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,而后消费者接收消息同时须要定义相似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey. 
默认的exchange:若是用空字符串去声明一个exchange,那么系统就会使用”amq.direct”这个exchange,咱们建立一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去

channel.BasicPublish("", "TaskQueue", properties, bytes);
  • 1

由于在第一个参数选择了默认的exchange,而咱们申明的队列叫TaskQueue,因此默认的,它在新建一个也叫TaskQueue的routingKey,并绑定在默认的exchange上,致使了咱们能够在第二个参数routingKey中写TaskQueue,这样它就会找到定义的同名的queue,并把消息放进去。 
若是有两个接收程序都是用了同一个的queue和相同的routingKey去绑定direct exchange的话,分发的行为是负载均衡的,也就是说第一个是程序1收到,第二个是程序2收到,以此类推。 
若是有两个接收程序用了各自的queue,但使用相同的routingKey去绑定direct exchange的话,分发的行为是复制的,也就是说每一个程序都会收到这个消息的副本。行为至关于fanout类型的exchange。 
下面详细来讲:

5.2 Bindings 绑定

绑定其实就是关联了exchange和queue,或者这么说:queue对exchange的内容感兴趣,exchange要把它的Message deliver到queue。

5.3Direct exchange

Driect exchange的路由算法很是简单:经过bindingkey的彻底匹配,能够用下图来讲明. 
这里写图片描述 
Exchange和两个队列绑定在一块儿,Q1的bindingkey是orange,Q2的binding key是black和green. 
当Producer publish key是orange时,exchange会把它放到Q1上,若是是black或green就会到Q2上,其他的Message被丢弃.

5.4 Multiple bindings

多个queue绑定同一个key也是能够的,对于下图的例子,Q1和Q2都绑定了black,对于routing key是black的Message,会被deliver到Q1和Q2,其他的Message都会被丢弃. 
这里写图片描述

5.5 Topic exchange

对于Message的routing_key是有限制的,不能使任意的。格式是以点号“.”分割的字符表。好比:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。你能够听任意的key在routing_key中,固然最长不能超过255 bytes。 
对于routing_key,有两个特殊字符

  • *(星号)表明任意一个单词
  • #(hash)0个或多个单词 
    这里写图片描述 
    Producer发送消息时须要设置routing_key,routing_key包含三个单词和连个点号o,第一个key描述了celerity(灵巧),第二个是color(色彩),第三个是物种: 
    在这里咱们建立了两个绑定: Q1 的binding key 是”.orange.“; Q2 是 “..rabbit” 和 “lazy.#”:

    • Q1感兴趣全部orange颜色的动物
    • Q2感兴趣全部rabbits和全部的lazy的. 
      例子:rounting_key 为 “quick.orange.rabbit”将会发送到Q1和Q2中 
      rounting_key 为”lazy.orange.rabbit.hujj.ddd”会被投递到Q2中,#匹配0个或多个单词。

6.消息序列化

RabbitMQ使用ProtoBuf序列化消息,它可做为RabbitMQ的Message的数据格式进行传输,因为是结构化的数据,这样就极大的方便了Consumer的数据高效处理,固然也可使用XML,与XML相比,ProtoBuf有如下优点: 
1.简单 
2.size小了3-10倍 
3.速度快了20-100倍 
4.易于编程 
6.减小了语义的歧义. 
,ProtoBuf具备速度和空间的优点,使得它如今应用很是普遍

7. 工做原理

核心官网有介绍,说的connecnton,channel之类的,到底怎么样,who care? 

整体来看,咱们关注业务实现是:1)消息怎么投递的。2)消费者怎么消费消息。3)消息是不是可靠投递。4)消息投递方式。5)消息的生命周期。6)消息队列生命周期

3.2  消息是怎么投递的?(记住一点,生产者消息投递都是面向交换机的)

RabbitMQ 是面向交换机投递消息的。交换机可能绑定有许多队列,交换机如何将消息投递给这些队列呢?

首先说一下面向交换机的设计的优点:

1)这明显借助了数据链路层那个交换机的设计思想。除了层级分明之外,还能从分提升链路利用率(可能有点抽像)。                    

2)从代码层面来看:若是没有交换机,你至少得维护一个十分庞大的路由表,而后从路由表正确投递消息,有了交互机,这里路由表就会被拆分到多个交换机里面,效果没必要多说。                      

3)而后就是高度的解耦,不一样的交换机可有不一样的路由规则,要是没有交换机。。。。。。

在RabbitMQ,交换机有4种投递方式,就是枚举类BuiltinExchangeType的4个枚举变量:

DIRECT:会将全部消息先取消息的ROUTE_KEY,而后投递到与ROUTE_KEY绑定的队列里面(if(msg.routekey.equals(queue.routekey)))。

FANOUT:此种模式下,根本不检查消息的ROUTE_KEY,直接投送到交换机所拥有的全部队列里面。

TOPIC,HEADERS自行看一下官网怎么说的,不想码字了^_^||

总结起来就一个函数就把消息发出去了:channel.basicPublish(excange_name,route_key,false,bs,"test".getBytes());能够去官网查一下这个API

3.3 消费者怎么消费消息(记住一点,消费者消费消息是面向消息队列的,这与生成者有点不同)

还不是就是TCP长链接心跳的那些事,就是这么一个API:channel.basicConsume(QUEUE_AUTODELETE, true, consumer);consumer是Consumer类的一个实例,你直接去处理回调接口就ok了

3.4 消息传递是否可靠

很明显是可靠的,除非你将消息队列,声明成非持久模式,这事你又重启了机器。这会丢失消息的。还有就是他有应答机制,你能够经过设置消费者消费消息的模式,去手动应答。channel.basicConsume(?,autoACk,?)的autoAck参数设置

3.5 消息的生命周期

一旦受到消费者应答,标识消息已被消费,则消息被回收掉。

3.6 队列生命周期

channel.queueDeclare(QUEUE_NAME,false,false,true,null);

第二个参数设置为true,会将消息持久化到磁盘,第四个参数设置为true表示没有消息而且没有链接则删除改队列,详情能够查一下API