RabbitMQ基础概念详细介绍 rabbitmq 实现原理

http://www.diggerplus.org/archives/3110html

 

本文对rabbitmq基础介绍,彻底是为了下一篇rabbitmq性能测试作准备,让读者去了解咱们须要测试的是什么样一个“东西”。java

 

引言

你是否遇到过两个(多个)系统间须要经过定时任务来同步某些数据?你是否在为异构系统的不一样进程间相互调用、通信的问题而苦恼、挣扎?若是是,那么恭喜你,消息服务让你能够很轻松地解决这些问题。
消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通信)问题,你也能够把它用于系统间服务的相互调用(RPC)。本文将要介绍的RabbitMQ就是当前最主流的消息中间件之一。python

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。web

ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket连接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
Channel是咱们与RabbitMQ打交道的最重要的一个接口,咱们大部分的业务操做是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。正则表达式

Queue

Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。
RabbitMQ基础概念详细介绍shell

RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)能够从Queue中获取消息并消费。浏览器

RabbitMQ基础概念详细介绍

多个消费者能够订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每一个消费者都收到全部的消息并处理。
RabbitMQ基础概念详细介绍安全

Message acknowledgment

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其余意外)的状况,这种状况下就可能会致使消息丢失。为了不这种状况发生,咱们能够要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;若是RabbitMQ没有收到回执并检测到消费者的RabbitMQ链接断开,则RabbitMQ会将该消息发送给其余消费者(若是存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会致使该消息被发送给其余消费者,除非它的RabbitMQ链接断开。
这里会产生另一个问题,若是咱们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会致使严重的bug——Queue中堆积的消息会愈来愈多;消费者重启后会重复消费这些消息并重复执行业务逻辑…服务器

Message durability

若是咱们但愿即便在RabbitMQ服务重启的状况下,也不会丢失消息,咱们能够将Queue与Message都设置为可持久化的(durable),这样能够保证绝大部分状况下咱们的RabbitMQ消息不会丢失。但依然解决不了小几率丢失事件的发生(好比RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),若是咱们须要对这种小几率事件也要管理起来,那么咱们要用到事务。因为这里仅为RabbitMQ的简单介绍,因此这里将不讲解RabbitMQ相关的事务。架构

Prefetch count

前面咱们讲到若是有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时若是每一个消息的处理时间不一样,就有可能会致使某些消费者一直在忙,而另一些消费者很快就处理完手头工做并一直空闲的状况。咱们能够经过设置prefetchCount来限制Queue每次发送给每一个消费者的消息数,好比咱们设置prefetchCount=1,则Queue每次给每一个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

RabbitMQ基础概念详细介绍

Exchange

在上一节咱们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的状况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。

RabbitMQ基础概念详细介绍

Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节介绍。
RabbitMQ中的Exchange有四种类型,不一样的类型有着不一样的路由策略,这将在Exchange Types一节介绍。

routing key

生产者在将消息发送给Exchange的时候,通常会指定一个routing key,来指定这个消息的路由规则,而这个routing key须要与Exchange Type及binding key联合使用才能最终生效。
在Exchange Type与binding key固定的状况下(在正常使用时通常这些内容都是固定配置好的),咱们的生产者就能够在发送消息给Exchange时,经过指定routing key来决定消息流向哪里。
RabbitMQ为routing key设定的长度限制为255 bytes。

Binding

RabbitMQ中经过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
RabbitMQ基础概念详细介绍

Binding key

在绑定(Binding)Exchange与Queue的同时,通常会指定一个binding key;消费者将消息发送给Exchange时,通常会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。
在绑定多个Queue到同一个Exchange的时候,这些Binding容许使用相同的binding key。
binding key 并非在全部状况下都生效,它依赖于Exchange Type,好比fanout类型的Exchange就会无视binding key,而是将消息路由到全部绑定到该Exchange的Queue。

Exchange Types

RabbitMQ经常使用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。

fanout

fanout类型的Exchange路由规则很是简单,它会把全部发送到该Exchange的消息路由到全部与它绑定的Queue中。
RabbitMQ基础概念详细介绍

上图中,生产者(P)发送到Exchange(X)的全部消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。

direct

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key彻底匹配的Queue中。
RabbitMQ基础概念详细介绍

以上图的配置为例,咱们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);若是咱们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。若是咱们以其余routingKey发送消息,则消息不会路由到这两个Queue中。

topic

前面讲到direct类型的Exchange路由规则是彻底匹配binding key与routing key,但这种严格的匹配方式在不少状况下不能知足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage类似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不一样,它约定:

  • routing key为一个句点号“. ”分隔的字符串(咱们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • binding key与routing key同样也是句点号“. ”分隔的字符串
  • binding key中能够存在两种特殊字符“*”与“#”,用于作模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(能够是零个)

RabbitMQ基础概念详细介绍

以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,由于它们没有匹配任何bindingKey。

headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否彻底匹配Queue与Exchange绑定时指定的键值对;若是彻底匹配则消息会路由到该Queue,不然不会路由到该Queue。
该类型的Exchange没有用到过(不过也应该颇有用武之地),因此不作介绍。

RPC

MQ自己是基于异步的消息处理,前面的示例中全部的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
但实际的应用场景中,咱们极可能须要一些同步处理,须要同步等待服务端将个人消息处理完成后再进行下一步处理。这至关于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ基础概念详细介绍
RabbitMQ中实现RPC的机制是:

  • 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一块儿发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知个人消息发送到这个Queue中)和correlationId(这次请求的标识号,服务器处理完成后须要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
  • 服务器端收到消息并处理
  • 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
  • 客户端以前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理

总结

本文介绍了RabbitMQ中我的认为最重要的概念,充分利用RabbitMQ提供的这些功能就能够处理咱们绝大部分的异步业务了。
本篇的基本概念可能很难理解并消化,结合实际的应用代码应该会比较容易吸取。因此接下来要写的文章例中会包含实际的业务应用场景分析,为何使用RabbitMQ来实现,如何用RabbitMQ来实现。

 

 

http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/tutorials_with_python/[6]RPC/#_5 

 

 

 

http://blog.csdn.net/anzhsoft/article/details/19563091

 

一个客户端能够和多个exchange相连

consumer指定的时候只要指定QueueName就OK了    

 

这个系统架构图版权属于sunjun041640。

    RabbitMQ Server: 也叫broker server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是维护一条从Producer到Consumer的路线,保证数据可以按照指定的方式进行传输。可是这个保证也不是100%的保证,可是对于普通的应用来讲这已经足够了。固然对于商业系统来讲,能够再作一层数据一致性的guard,就能够完全保证系统的一致性了。

    Client A & B: 也叫Producer,数据的发送方。createmessages and publish (send) them to a broker server (RabbitMQ).一个Message有两个部分:payload(有效载荷)和label(标签)。payload顾名思义就是传输的数据。label是exchange的名字或者说是一个tag,它描述了payload,并且RabbitMQ也是经过这个label来决定把这个Message发给哪一个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。

    Client 1,2,3:也叫Consumer,数据的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比做是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer。固然可能会把同一个Message发送给不少的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来讲,它是不知道谁发送的这个信息的。就是协议自己不支持。可是固然了若是Producer发送的payload包含了Producer的信息就另当别论了。

     对于一个数据从Producer到Consumer的正确传递,还有三个概念须要明确:exchanges, queues and bindings。

        Exchanges are where producers publish their messages.

        Queuesare where the messages end up and are received by consumers

        Bindings are how the messages get routed from the exchange to particular queues.

   还有几个概念是上述图中没有标明的,那就是Connection(链接),Channel(通道,频道)。

 

   Connection: 就是一个TCP的链接。Producer和Consumer都是经过TCP链接到RabbitMQ Server的。之后咱们能够看到,程序的起始处就是创建这个TCP链接。

   Channels: 虚拟链接。它创建在上述的TCP链接中。数据流动都是在Channel中进行的。也就是说,通常状况是程序起始创建TCP链接,第二步就是创建这个Channel。

    那么,为何使用Channel,而不是直接使用TCP链接?

    对于OS来讲,创建和关闭TCP链接是有代价的,频繁的创建关闭TCP链接对于系统的性能有很大的影响,并且TCP的链接数也有限制,这也限制了系统处理高并发的能力。可是,在TCP链接中创建Channel是没有上述代价的。对于Producer或者Consumer来讲,能够并发的使用多个Channel进行Publish或者Receive。有实验代表,1s的数据能够Publish10K的数据包。固然对于不一样的硬件环境,不一样的数据包大小这个数据确定不同,可是我只想说明,对于普通的Consumer或者Producer来讲,这已经足够了。若是不够用,你考虑的应该是如何细化split你的设计。

 

 

对于API的使用能够参考官方的java文档,(里面的思想不管是java、cpp仍是python都类似)

https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.1/rabbitmq-java-client-javadoc-3.1.1/com/rabbitmq/client/Channel.html#basicConsume(java.lang.String, boolean, java.lang.String, com.rabbitmq.client.Consumer)

 

 

http://m.oschina.net/blog/186202

从AMQP协议能够看出,MessageQueue、Exchange和Binding构成了AMQP协议的核心,下面咱们就围绕这三个主要组件    从应用使用的角度全面的介绍如何利用Rabbit MQ构建消息队列以及使用过程当中的注意事项。

RabbitMQ系列二(构建消息队列) - 网易杭研后台技术中心 - 网易杭研后台技术中心的博客

 

 

  • 1. 声明MessageQueue

      在Rabbit MQ中,不管是生产者发送消息仍是消费者接受消息,都首先须要声明一个MessageQueue。这就存在一个问题,是生产者声明仍是消费者声明呢?要解决这个问题,首先须要明确:

a)消费者是没法订阅或者获取不存在的MessageQueue中信息。

b)消息被Exchange接受之后,若是没有匹配的Queue,则会被丢弃。

在明白了上述两点之后,就容易理解若是是消费者去声明Queue,就有可能会出如今声明Queue以前,生产者已发送的消息被丢弃的隐患。若是应用可以经过消息重发的机制容许消息丢失,则使用此方案没有任何问题。可是若是不能接受该方案,这就须要不管是生产者仍是消费者,在发送或者接受消息前,都须要去尝试创建消息队列。这里有一点须要明确,若是客户端尝试创建一个已经存在的消息队列,Rabbit MQ不会作任何事情,并返回客户端创建成功的。

       若是一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不容许该消费者在同一个channel去声明其余队列的。Rabbit MQ中,能够经过queue.declare命令声明一个队列,能够设置该队列如下属性:

a) Exclusive:排他队列,若是一个队列被声明为排他队列,该队列仅对首次声明它的链接可见,并在链接断开时自动删除。这里须要注意三点:其一,排他队列是基于链接可见的,同一链接的不一样信道是能够同时访问同一个链接建立的排他队列的。其二,“首次”,若是一个链接已经声明了一个排他队列,其余链接是不容许创建同名的排他队列的,这个与普通队列不一样。其三,即便该队列是持久化的,一旦链接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。

b)   Auto-delete:自动删除,若是该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

 c)   Durable:持久化,这个会在后面做为专门一个章节讨论。

d)  其余选项,例如若是用户仅仅想查询某一个队列是否已存在,若是不存在,不想创建该队列,仍然能够调用queue.declare,只不过须要将参数passive设为true,传给queue.declare,若是该队列已存在,则会返回true;若是不存在,则会返回Error,可是不会建立新的队列。

  • 2. 生产者发送消息

        在AMQP模型中,Exchange是接受生产者消息并将消息路由到消息队列的关键组件。ExchangeType和Binding决定了消息的路由规则。因此生产者想要发送消息,首先必需要声明一个Exchange和该Exchange对应的Binding。能够经过 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,声明一个Exchange须要三个参数:ExchangeName,ExchangeType和Durable。ExchangeName是该Exchange的名字,该属性在建立Binding和生产者经过publish推送消息时须要指定。ExchangeType,指Exchange的类型,在RabbitMQ中,有三种类型的Exchange:direct ,fanout和topic,不一样的Exchange会表现出不一样路由行为。Durable是该Exchange的持久化属性,这个会在消息持久化章节讨论。声明一个Binding须要提供一个QueueName,ExchangeName和BindingKey。下面咱们就分析一下不一样的ExchangeType表现出的不一样路由规则。

        生产者在发送消息时,都须要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey之后,会判断该ExchangeType:

                         a) 若是是Direct类型,则会将消息中的RoutingKey与该Exchange关联的全部Binding中的BindingKey进行比较,若是相等,则发送到该Binding对应的Queue中。

RabbitMQ系列二(构建消息队列) - 网易杭研后台技术中心 - 网易杭研后台技术中心的博客 

                  b)   若是是  Fanout  类型,则会将消息发送给全部与该  Exchange  定义过  Binding  的全部  Queues  中去,实际上是一种广播行为。
           

RabbitMQ系列二(构建消息队列) - 网易杭研后台技术中心 - 网易杭研后台技术中心的博客 

        c)若是是Topic类型,则会按照正则表达式,对RoutingKey与BindingKey进行匹配,若是匹配成功,则发送到对应的Queue中。

             RabbitMQ系列二(构建消息队列) - 网易杭研后台技术中心 - 网易杭研后台技术中心的博客

  • 3. 消费者订阅消息    

    在RabbitMQ中消费者有2种方式获取队列中的消息:

       a)  一种是经过basic.consume命令,订阅某一个队列中的消息,channel会自动在处理完上一条消息以后,接收下一条消息。(同一个channel消息处理是串行的)。除非关闭channel或者取消订阅,不然客户端将会一直接收队列的消息。

             b)  另一种方式是经过basic.get命令主动获取队列中的消息,可是绝对不能够经过循环调用basic.get来代替basic.consume,这是由于basic.get RabbitMQ在实际执行的时候,是首先consume某一个队列,而后检索第一条消息,而后再取消订阅。若是是高吞吐率的消费者,最好仍是建议使用basic.consume。

      若是有多个消费者同时订阅同一个队列的话,RabbitMQ是采用循环的方式分发消息的,每一条消息只能被一个订阅者接收。例如,有队列Queue,其中ClientA和ClientB都Consume了该队列,MessageA到达队列后,被分派到ClientA,ClientA回复服务器收到响应,服务器删除MessageA;再有一条消息MessageB抵达队列,服务器根据“循环推送”原则,将消息会发给ClientB,而后收到ClientB的确认后,删除MessageB;等到再下一条消息时,服务器会再将消息发送给ClientA。

       这里咱们能够看出,消费者再接到消息之后,都须要给服务器发送一条确认命令,这个便可以在handleDelivery里显示的调用basic.ack实现,也能够在Consume某个队列的时候,设置autoACK属性为true实现。这个ACK仅仅是通知服务器能够安全的删除该消息,而不是通知生产者,与RPC不一样。 若是消费者在接到消息之后还没来得及返回ACK就断开了链接,消息服务器会重传该消息给下一个订阅者,若是没有订阅者就会存储该消息。

        既然RabbitMQ提供了ACK某一个消息的命令,固然也提供了Reject某一个消息的命令。当客户端发生错误,调用basic.reject命令拒绝某一个消息时,能够设置一个requeue的属性,若是为true,则消息服务器会重传该消息给下一个订阅者;若是为false,则会直接删除该消息。固然,也能够经过ack,让消息服务器直接删除该消息而且不会重传。

  • 4. 持久化:

        Rabbit MQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,全部已声明的队列,Exchange,Binding以及队列中的消息都会丢失。经过设置Exchange和MessageQueue的durable属性为true,可使得队列和Exchange持久化,可是这还不能使得队列中的消息持久化,这须要生产者在发送消息的时候,将delivery mode设置为2,只有这3个所有设置完成后,才能保证服务器重启不会对现有的队列形成影响。这里须要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,不然在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能形成比较大的影响,可能会降低10倍不止。

  • 5. 事务:

     对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,由于consume命令自己没有任何Response返回,因此即便服务器崩溃,没有持久化该消息,生产者也没法获知该消息已经丢失。若是此时使用事务,即经过txSelect()开启一个事务,而后发送消息给服务器,而后经过txCommit()提交该事务,便可以保证,若是txCommit()提交了,则该消息必定会持久化,若是txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。固然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。

  • 6. Confirm机制:

      使用事务当然能够保证只有提交的事务,才会被服务器执行。可是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者能够感知服务器消息是否已被路由到正确的队列中——Confirm。若是设置channel为confirm状态,则经过该channel发送的消息都会被分配一个惟一的ID,而后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优势在于异步,生产者在发送消息之后,便可继续执行其余任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。若是消息服务器发生异常,致使该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就能够经过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越不少。可是Confirm机制,没法进行回滚,就是一旦服务器崩溃,生产者没法获得Confirm信息,生产者其实自己也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,可是若是原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统须要支持去重。

 

  • 其余:

Broker:简单来讲就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪一个队列。
Queue:消息队列载体,每一个消息都会被投入到一个或多个队列。
Binding:绑定,它的做用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里能够开设多个vhost,用做不一样用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每一个链接里,可创建多个channel,每一个channel表明一个会话任务。

消息队列的使用过程大概以下:

(1)客户端链接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间创建好绑定关系。
(5)客户端投递消息到exchange。

 

 

 

Linux下安装RabbitMQ

标签: rabbitmq消息队列

1. 安装erlang:

下载所需的源码:wget http://erlang.org/download/otp_src_R13B04.tar.gz wget http://erlang.org/download/otp_src_R13B04.tar.gz

而后./configure && make && make install

 

注:在configure以后发现有如下提示信息:

(1) odbc : ODBC library – link check failed

须要yum -y install unixODBC unixODBC-devel

(2)“ wxWidgets not found, wx will NOT be usable”及“fop is missing.”这两个能够忽略。

 

 

安装完毕,在命令行键入erl,将会出现以下命令行:

Erlang R13B04 (erts-5.7.5) [source] [smp:2:2] [rq:2][async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.7.5 

(abort with ^G)
1>

 

2.安装RabbitMQ

RabbitMQ的安装有不少版本,咱们使用Generic Unix版本。

http://www.rabbitmq.com/install-generic-unix.html中下载http://www.rabbitmq.com/releases/rabbitmq-server/v3.4.3/rabbitmq-server-generic-unix-3.4.3.tar.gz并解压到Linux中的任意目录(/mysoftware/rabbitmq_server-3.4.3/),解压后便可使用。

 

3.RabbitMQ开启与关闭

进入rabbitmq安装目录下的sbin目录,分别执行如下命令:

  1. 开启RabbitMQ服务:.  
  2. ./rabbitmq-server -detached (-detached为可选参数,表示后台开启)  
  3.   
  4. 开启RabbitMQ管理工具,经过浏览器访问http://localhost:15672使用:.  
  5. ./rabbitmq-plugins enable rabbitmq_management (3.3.1版本之后默认不容许用guest远程管理。能够手动建立管理员账号来远程管理,具体参见4.4)  
  6.   
  7. 关闭RabbitMQ服务:  
  8. ./rabbitmqctl stop  
开启RabbitMQ服务:.
./rabbitmq-server -detached (-detached为可选参数,表示后台开启)

开启RabbitMQ管理工具,经过浏览器访问http://localhost:15672使用:.
./rabbitmq-plugins enable rabbitmq_management (3.3.1版本之后默认不容许用guest远程管理。能够手动建立管理员账号来远程管理,具体参见4.4)

关闭RabbitMQ服务:
./rabbitmqctl stop
 
 
注意:若是是登陆远程管理界面(好比物理机访问虚拟机的管理界面时),除了关闭防火墙开通相应容许的端口之外,也不能用guest/guest访问,而应该新建立用户并赋予管理员权限才可访问
增长用户:rabbitmqctl add_user ddd ddd
设置管理员权限: rabbitmqctl set_user_tags diego administrator 这样才能够访问web页面提供的管理页面
./rabbitmqctl  set_permissions -p / diego '.*' '.*' '.*'  给用户diego加上有访问虚拟空间/的权限
 

4.设置RabbitMQ管理权限

 

Rabbitmq服务器的主要经过rabbitmqctl和rabbimq-plugins两个工具来管理,如下是一些经常使用功能。

1). 服务器启动与关闭

      启动: rabbitmq-server –detached

      关闭:rabbitmqctl stop

      若单机有多个实例,则在rabbitmqctlh后加–n 指定名称

2). 插件管理

      开启某个插件:rabbitmq-pluginsenable xxx

      关闭某个插件:rabbitmq-pluginsdisablexxx

      注意:重启服务器后生效。

3).virtual_host管理

      新建virtual_host: rabbitmqctladd_vhost  xxx

      撤销virtual_host:rabbitmqctl  delete_vhost xxx

4). 用户管理

      新建用户:rabbitmqctl add_user xxx pwd

      删除用户:   rabbitmqctl delete_user xxx

      改密码: rabbimqctl change_password {username} {newpassword}

      设置用户角色:rabbitmqctl set_user_tags {username} {tag ...}

              Tag能够为 administrator,monitoring, management

5). 权限管理

      权限设置:set_permissions [-pvhostpath] {user} {conf} {write} {read}

               Vhostpath

               Vhost路径

               user

     用户名

              Conf

      一个正则表达式match哪些配置资源可以被该用户访问。

              Write

      一个正则表达式match哪些配置资源可以被该用户读。

               Read

      一个正则表达式match哪些配置资源可以被该用户访问。

6). 获取服务器状态信息

       服务器状态:rabbitmqctl status

       队列信息:rabbitmqctl list_queues[-p vhostpath] [queueinfoitem ...]

                Queueinfoitem能够为:name,durable,auto_delete,arguments,messages_ready,

                messages_unacknowledged,messages,consumers,memory

       Exchange信息:rabbitmqctllist_exchanges[-p vhostpath] [exchangeinfoitem ...]

                 Exchangeinfoitem有:name,type,durable,auto_delete,internal,arguments.

       Binding信息:rabbitmqctllist_bindings[-p vhostpath] [bindinginfoitem ...]       

                 Bindinginfoitem有:source_name,source_kind,destination_name,destination_kind,routing_key,arguments

       Connection信息:rabbitmqctllist_connections [connectioninfoitem ...]

       Connectioninfoitem有:recv_oct,recv_cnt,send_oct,send_cnt,send_pend等。

       Channel信息:rabbitmqctl  list_channels[channelinfoitem ...]

      Channelinfoitem有consumer_count,messages_unacknowledged,messages_uncommitted,acks_uncommitted,messages_unconfirmed,prefetch_count,client_flow_blocked

 

 

 

rabbitmq 实现原理

AMQP(高级消息队列协议 Advanced Message Queue Protocol)

AMQP当中有四个概念很是重要: 虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。一个虚拟主机持有一组交换机、队列和绑定。为何须要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。所以,若是须要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创 建一个虚拟主机。每个RabbitMQ服务器都有一个默认的虚拟主机“/”。

Producer 要产生消息必需要建立一个 Exchange ,Exchange 用于转发消息,可是它不会作存储,若是没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息,固然若是消息老是发送过去就被直接丢弃那就没有什么意思了,一个 Consumer 想要接受消息的话,就要建立一个 Queue ,并把这个 Queue bind 到指定的 Exchange 上,而后 Exchange 会把消息转发到 Queue 那里,Queue 会负责存储消息,Consumer 能够经过主动 Pop 或者是 Subscribe 以后被动回调的方式来从 Queue 中取得消息。

 

Exchange,Queue,RoutingKey

 

蓝色-- Client(相对于Rabbitmq Server来讲)

绿色--Exchange

红色—Queue

     - 交换器(Exchange),它是发送消息的实体。

     - 队列(Queue),这是接收消息的实体。

     - 绑定器(Bind),将交换器和队列链接起来,而且封装消息的路由信息。

 

Exchange指向Queue的黑色线—RoutingKey,能够将它简单的理解为一条链接Exchange和Queue的路线

 

Exchange和Queue都须要经过channel来进行定义,而RoutingKey则只须要在binding时取个名字就好了。

 

这一块的理解是不正确的,

Exchange Queue RoutingKey关系说明:

 

Exchange Name

Queue Name

Routing Key

 

test.queue     

test.queue

 

test.queue2    

test.queue2

test.exchange  

test.queue     

test.routingkey

test.exchange  

test.queue2    

test.routingkey

test.exchange  

test.queue     

test.routingkey2

test.exchange1

test.queue     

test.routingkey

 

由结果能够看出,由Exchange,Queue,RoutingKey三个才能决定一个从Exchange到Queue的惟一的线路。

 

 

 

 

左边的Client向右边的Client发送消息,流程:

 

1,  获取Conection

 

2,  获取Channel

 

3,  定义Exchange,Queue

 

4,  使用一个RoutingKey将Queue Binding到一个Exchange上

 

5,  经过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上,

 

6,  接收方在接收时也是获取connection,接着获取channel,而后指定一个Queue直接到它关心的Queue上取消息,它对Exchange,RoutingKey及如何binding都不关心,到对应的Queue上去取消息就OK了(这个是重点,接收方,只关心queue的名字便可,其余的都不关心!!!!!)

 

换句话说:

发送放关心:ExchangName, QueueName,BindingKey

接收方关心:QuueuName

 

一个Client发送消息,哪些Client能够收到消息,其核心就在于Exchange,RoutingKey,Queue的关系上。

 

 

Exchange

RoutingKey

Queue

1

E1

R1

Q1

2

 

R2

Q2

3

E2

R3

Q1

4

 

R4

Q2

5

E1

R5

Q1

6

E2

R6

Q1

 

咱们能够这样理解,RoutingKey就像是个中间表,将两个表的数据进行多对多关联,只不过对于相同的Exchange和Queue,可使用不一样的RoutingKey重复关联屡次。

相关文章
相关标签/搜索