RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通信的世界里有不少公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),可是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),所以,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。html
RabbitMQ是由RabbitMQ
Technologies Ltd开发而且提供商业支持的。该公司在2010年4月被SpringSource(VMWare的一个部门)收购。在2013年5月被并入Pivotal。其实VMWare,Pivotal和EMC本质上是一家的。不一样的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,如今并无上市。算法
RabbitMQ的官网是http://www.rabbitmq.com架构
言归正传。RabbitMQ,或者说AMQP解决了什么问题,或者说它的应用场景是什么?并发
对于一个大型的软件系统来讲,它会有不少的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块的如何通讯?这和传统的IPC有很大的区别。传统的IPC不少都是在单一系统上的,模块耦合性很大,不适合扩展(Scalability);若是使用socket那么不一样的模块的确能够部署到不一样的机器上,可是仍是有不少问题须要解决。好比:app
1)信息的发送者和接收者如何维持这个链接,若是一方的链接中断,这期间的数据如何方式丢失?异步
2)如何下降发送者和接收者的耦合度?socket
3)如何让Priority高的接收者先接到数据?高并发
4)如何作到load balance?有效均衡接收者的负载?性能
5)如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不一样的数据,如何作有效的filter。设计
6)如何作到可扩展,甚至将这个通讯模块发到cluster上?
7)如何保证接收者接收到了完整,正确的数据?
AMDQ协议解决了以上的问题,而RabbitMQ实现了AMQP。
成为系统架构可能不太合适,可能叫应用场景的系统架构更合适。
这个系统架构图版权属于sunjun041640。
RabbitMQ Server: 也叫broker server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是维护一条从Producer到Consumer的路线,保证数据可以按照指定的方式进行传输。可是这个保证也不是100%的保证,可是对于普通的应用来讲这已经足够了。固然对于商业系统来讲,能够再作一层数据一致性的guard,就能够完全保证系统的一致性了。
Client A & B: 也叫Producer,数据的发送方。createmessages and publish (send) them to a broker server (RabbitMQ).一个Message有两个部分:payload(有效载荷)和label(标签)。payload顾名思义就是传输的数据。label是exchange的名字或者说是一个tag,它描述了payload,并且RabbitMQ也是经过这个label来决定把这个Message发给哪一个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。
Client 1,2,3:也叫Consumer,数据的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比做是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer。固然可能会把同一个Message发送给不少的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来讲,它是不知道谁发送的这个信息的。就是协议自己不支持。可是固然了若是Producer发送的payload包含了Producer的信息就另当别论了。
对于一个数据从Producer到Consumer的正确传递,还有三个概念须要明确:exchanges, queues and bindings。
Exchanges are where producers publish their messages.
Queuesare where the messages end up and are received by consumers
Bindings are how the messages get routed from the exchange to particular queues.
还有几个概念是上述图中没有标明的,那就是Connection(链接),Channel(通道,频道)。
Connection: 就是一个TCP的链接。Producer和Consumer都是经过TCP链接到RabbitMQ Server的。之后咱们能够看到,程序的起始处就是创建这个TCP链接。
Channels: 虚拟链接。它创建在上述的TCP链接中。数据流动都是在Channel中进行的。也就是说,通常状况是程序起始创建TCP链接,第二步就是创建这个Channel。
那么,为何使用Channel,而不是直接使用TCP链接?
对于OS来讲,创建和关闭TCP链接是有代价的,频繁的创建关闭TCP链接对于系统的性能有很大的影响,并且TCP的链接数也有限制,这也限制了系统处理高并发的能力。可是,在TCP链接中创建Channel是没有上述代价的。对于Producer或者Consumer来讲,能够并发的使用多个Channel进行Publish或者Receive。有实验代表,1s的数据能够Publish10K的数据包。固然对于不一样的硬件环境,不一样的数据包大小这个数据确定不同,可是我只想说明,对于普通的Consumer或者Producer来讲,这已经足够了。若是不够用,你考虑的应该是如何细化split你的设计。
默认状况下,若是Message 已经被某个Consumer正确的接收到了,那么该Message就会被从queue中移除。固然也可让同一个Message发送到不少的Consumer。
若是一个queue没被任何的Consumer Subscribe(订阅),那么,若是这个queue有数据到达,那么这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被当即发送到这个Consumer,这个数据被Consumer正确收到时,这个数据就被从queue中删除。
那么什么是正确收到呢?经过ack。每一个Message都要被acknowledged(确认,ack)。咱们能够显示的在程序中去ack,也能够自动的ack。若是有数据没有被ack,那么:
RabbitMQ Server会把这个信息发送到下一个Consumer。
若是这个app有bug,忘记了ack,那么RabbitMQ Server不会再发送数据给它,由于Server认为这个Consumer处理能力有限。
并且ack的机制能够起到限流的做用(Benefitto throttling):在Consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的balance Consumer的load。
固然对于实际的例子,好比咱们可能会对某些数据进行merge,好比merge 4s内的数据,而后sleep 4s后再获取数据。特别是在监听系统的state,咱们不但愿全部的state实时的传递上去,而是但愿有必定的延时。这样能够减小某些IO,并且终端用户也不会感受到。
有两种方式,第一种的Reject可让RabbitMQ Server将该Message 发送到下一个Consumer。第二种是从queue中当即删除该Message。
Consumer和Procuder均可以经过 queue.declare 建立queue。对于某个Channel来讲,Consumer不能declare一个queue,却订阅其余的queue。固然也能够建立私有的queue。这样只有app自己才可使用这个queue。queue也能够自动删除,被标为auto-delete的queue在最后一个Consumer unsubscribe后就会被自动删除。那么若是是建立一个已经存在的queue呢?那么不会有任何的影响。须要注意的是没有任何的影响,也就是说第二次建立若是参数和第一次不同,那么该操做虽然成功,可是queue的属性并不会被修改。
那么谁应该负责建立这个queue呢?是Consumer,仍是Producer?
若是queue不存在,固然Consumer不会获得任何的Message。可是若是queue不存在,那么Producer Publish的Message会被丢弃。因此,仍是为了数据不丢失,Consumer和Producer都try to create the queue!反正无论怎么样,这个接口都不会出问题。
queue对load balance的处理是完美的。对于多个Consumer来讲,RabbitMQ 使用循环的方式(round-robin)的方式均衡的发送给不一样的Consumer。
从架构图能够看出,Procuder Publish的Message进入了Exchange。接着经过“routing keys”, RabbitMQ会找到应该把这个Message放到哪一个queue里。queue也是经过这个routing keys来作的绑定。
有三种类型的Exchanges:direct, fanout,topic。 每一个实现了不一样的路由算法(routing algorithm)。
· Direct exchange: 若是 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue建立时,它会自动的以queue的名字做为routing key来绑定那个exchange。
· Fanout exchange: 会向响应的queue广播。
· Topic exchange: 对key进行模式匹配,好比ab*能够传递到全部ab*的queue。
每一个virtual host本质上都是一个RabbitMQ Server,拥有它本身的queue,exchagne,和bings rule等等。这保证了你能够在多个不一样的application中使用RabbitMQ。
接下来我会使用Python来讲明RabbitMQ的使用方法。
原文地址:www.liuhaihua.cn/archives/48…