杂项之rabbitmq
本节内容
- rabbitmq简介
- AMQP协议
- rabbitmq使用
- 应用举例
rabbitmq简介
介绍rabbitmq以前,先介绍一下AMQP协议,由于rabbitmq就是基于AMQP协议实现的一个服务程序。(目前为止应该也是惟一实现了AMQP协议的服务)html
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通讯。前端
arbbitmq使用erlang语言实现AMQP协议,因为erlang语言是被设计用于电话交换机的编程语言,很适合被用来开发消息队列,因此,用erlang语言实现的rabbitmq的效率惊人的高。python
AMQP协议
消息代理和他们所扮演的角色
消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。web
因为AMQP是一个网络协议,因此这个过程当中的发布者,消费者,消息代理 能够存在于不一样的设备上。算法
目前最新的AMQP协议版本为AMQP 0-9-1数据库
AMQP 0-9-1模型简介
AMQP 0-9-1的工做过程以下图:消息(message)被发布者(publisher)发送给交换机(exchange),交换机经常被比喻成邮局或者邮箱。而后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。编程

发布者(publisher)发布消息时能够给消息指定各类消息属性(message meta-data)。有些属性有可能会被消息代理(brokers)使用,然而其余的属性则是彻底不透明的,它们只能被接收消息的应用所使用。ubuntu
从安全角度考虑,网络是不可靠的,接收消息的应用也有可能在处理消息的时候失败。基于此缘由,AMQP模块包含了一个消息确认(message acknowledgements)的概念:当一个消息从队列中投递给消费者后(consumer),消费者会通知一下消息代理(broker),这个能够是自动的也能够由处理消息的应用的开发者执行。当“消息确认”被启用的时候,消息代理不会彻底将消息从队列中删除,直到它收到来自消费者的确认回执(acknowledgement)。数组
在某些状况下,例如当一个消息没法被成功路由时,消息或许会被返回给发布者并被丢弃。或者,若是消息代理执行了延期操做,消息会被放入一个所谓的死信队列中。此时,消息发布者能够选择某些参数来处理这些特殊状况。浏览器
队列,交换机和绑定统称为AMQP实体(AMQP entities)。
AMQP是一个可编程的协议
AMQP 0-9-1是一个可编程协议,某种意义上说AMQP的实体和路由规则是由应用自己定义的,而不是由消息代理定义。包括像声明队列和交换机,定义他们之间的绑定,订阅队列等等关于协议自己的操做。
这虽然能让开发人员自由发挥,但也须要他们注意潜在的定义冲突。固然这在实践中不多会发生,若是发生,会以配置错误(misconfiguration)的形式表现出来。
应用程序(Applications)声明AMQP实体,定义须要的路由方案,或者删除再也不须要的AMQP实体。
交换机和交换机类型
交换机是用来发送消息的AMQP实体。交换机拿到一个消息以后将它路由给一个或零个队列。它使用哪一种路由算法是由交换机类型和被称做绑定(bindings)的规则所决定的。AMQP 0-9-1的代理提供了四种交换机:
- Direct exchange(直连交换机)
- 预声明的默认名称: (Empty string) and amq.direct
- Fanout exchange(扇型交换机)
- Topic exchange(主题交换机)
- Headers exchange(头交换机)
- 预声明的默认名称: amq.match (and amq.headers in RabbitMQ)
除交换机类型外,在声明交换机时还能够附带许多其余的属性,其中最重要的几个分别是:
- Name (交换机的名字)
- type (交换机类型)
- passive (被动模式,布尔值)
- Durable (消息代理重启后,交换机是否还存在,布尔值)
- Auto_delete (当全部与之绑定的消息队列都完成了对此交换机的使用后,删掉它,布尔值)
- Arguments(依赖代理自己)
交换机能够有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们须要在代理再次上线后从新被声明)。然而并非全部的应用场景都须要持久化的交换机。
直连交换机(默认交换机)
直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也能够处理多播路由)。下边介绍它是如何工做的:
- 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
- 当一个携带着路由键为R的消息被发送给直连交换机时,交换机会把它路由给绑定值一样为R的队列。
直连交换机常常用来循环分发任务给多个工做者(workers)。当这样作的时候,咱们须要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

默认交换机(default exchange)其实是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每一个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
举个例子:
当你声明了一个名为”search-indexing-online”的队列,AMQP代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为”search-indexing-online”。所以,当携带着名为”search-indexing-online”的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为”search-indexing-online”的队列中。换句话说,默认交换机看起来貌似可以直接将消息投递给队列,尽管技术上并无作相关的操做。
扇型交换机
扇型交换机(funout exchange)将消息路由给绑定到它身上的全部队列,而不理会绑定的路由键。若是N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这全部的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
由于扇型交换机投递消息的拷贝到全部绑定到它的队列,因此他的应用案例都极其类似:
- 大规模多用户在线(MMO)游戏可使用它来处理排行榜更新等全局事件
- 体育新闻网站能够用它来近乎实时地将比分更新分发给移动客户端
- 分发系统使用它来广播各类状态和配置更新
- 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,所以XMPP可能会是个更好的选择)

主题交换机
主题交换机(topic exchanges)经过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机常常用来实现各类分发/订阅模式及其变种。主题交换机一般用来实现消息的多播路由(multicast routing)。
主题交换机拥有很是普遍的用户案例。不管什么时候,当一个问题涉及到那些想要有针对性的选择须要接收消息的 多消费者/多应用(multiple consumers/applications) 的时候,主题交换机均可以被列入考虑范围。
使用案例:
- 分发有关于特定地理位置的数据,例如销售点
- 由多个工做者(workers)完成的后台任务,每一个工做者负责处理某些特定的任务
- 股票价格更新(以及其余类型的金融数据更新)
- 涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
- 云端的不一样种类服务的协调
- 分布式架构/基于系统的软件封装,其中每一个构建者仅能处理一个特定的架构或者系统。
头交换机
有时消息的路由操做会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键创建路由规则。经过判断消息头的值可否与指定的绑定相匹配来确立路由规则。
咱们能够绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它须要考虑某条消息(message)是须要部分匹配仍是所有匹配。上边说的“更多一段消息”就是”x-match”参数。当”x-match”设置为“any”时,消息头的任意一个值被匹配就能够知足条件,而当”x-match”设置为“all”的时候,就须要消息头的全部值都匹配成功。
头交换机能够视为直连交换机的另外一种表现形式。头交换机可以像直连交换机同样工做,不一样之处在于头交换机的路由规则是创建在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至能够是整数或者哈希值(字典)等。

队列
AMQP中的队列(queue)跟其余消息队列或任务队列中的队列是很类似的:它们存储着即将被应用消费掉的消息。队列跟交换机共享某些属性,可是队列也有一些另外的属性。
- Name (队列名称)
- Durable(消息代理重启后,队列依旧存在)
- Exclusive(只被一个链接(connection)使用,并且当链接关闭后队列即被删除)
- Auto-delete(当最后一个消费者退订后即被删除)
- Arguments(一些消息代理用他来完成相似与TTL的某些额外功能)
队列在声明(declare)后才能被使用。若是一个队列尚不存在,声明一个队列会建立它。若是声明的队列已经存在,而且属性彻底相同,那么这次声明不会对原有队列产生任何影响。若是声明中的属性与已存在队列的属性有差别,那么一个错误代码为406的通道级异常就会被抛出。
队列名称
队列的名字能够由应用(application)来取,也可让消息代理(broker)直接生成一个。队列的名字能够是最多255字节的一个utf-8字符串。若但愿AMQP消息代理生成队列名,须要给队列的name参数赋值一个空字符串:在同一个通道(channel)的后续的方法(method)中,咱们可使用空字符串来表示以前生成的队列名称。之因此以后的方法能够获取正确的队列名是由于通道能够默默地记住消息代理最后一次生成的队列名称。
以”amq.”开始的队列名称被预留作消息代理内部使用。若是试图在队列声明时打破这一规则的话,一个通道级的403 (ACCESS_REFUSED)错误会被抛出。
队列持久化
持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称做暂存队列(Transient queues)。并非全部的场景和案例都须要将队列持久化。
持久化的队列并不会使得路由到它的消息也具备持久性。假若消息代理挂掉了,从新启动,那么在重启的过程当中持久化队列会被从新声明,不管怎样,只有通过持久化的消息才能被从新恢复。
绑定
绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。若是要指示交换机“E”将消息路由给队列“Q”,那么“Q”就须要与“E”进行绑定。绑定操做须要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
一个比喻:
- queue=(队列名称) 队列(queue)是咱们想要去的位于纽约的目的地
- exchange=(交换机名称) 交换机(exchange)是JFK机场
- routing_key=(路由键) 绑定(binding)就是JFK机场到目的地的路线。可以到达目的地的路线能够是一条或者多条
拥有了交换机这个中间层,不少由发布者直接到队列难以实现的路由方案可以得以实现,而且避免了应用开发者的许多重复劳动。
若是AMQP的消息没法路由到队列(例如,发送到的交换机没有绑定队列),消息会被就地销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。
消费者
消息若是只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才可以体现。在AMQP 0-9-1 模型中,有两种途径能够达到此目的:
- 将消息投递给应用 (“push API”)
- 应用根据须要主动获取消息 (“pull API”)
使用push API,应用(application)须要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,咱们能够说应用注册了一个消费者,或者说订阅了一个队列。一个队列能够注册多个消费者,也能够注册一个独享的消费者(当独享消费者存在时,其余消费者即被排除在外)。
每一个消费者(订阅者)都有一个叫作消费者标签的标识符。它能够被用来退订消息。消费者标签其实是一个字符串。
消息确认
消费者应用(Consumer applications) 用来接受和处理消息的应用 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。并且网络缘由也有可能引发各类问题。这就给咱们出了个难题,AMQP代理在何时删除消息才是正确的?AMQP 0-9-1 规范给咱们两种建议:
- 当消息代理(broker)将消息发送给应用后当即删除。(使用AMQP方法:basic.deliver或basic.get-ok)
- 待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用AMQP方法:basic.ack)
前者被称做自动确认模式(automatic acknowledgement model),后者被称做显式确认模式(explicit acknowledgement model)。在显式模式下,由消费者应用来选择何时发送确认回执(acknowledgement)。应用能够在收到消息后当即发送,或将未处理的消息存储后发送,或等到消息被处理完毕后再发送确认回执(例如,成功获取一个网页内容并将其存储以后)。
若是一个消费者在还没有发送确认回执的状况下挂掉了,那AMQP代理会将消息从新投递给另外一个消费者。若是当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,而后再次尝试投递。
拒绝消息
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用能够向消息代理代表,本条消息因为“拒绝消息(Rejecting Messages)”的缘由处理失败了(或者未能在此时完成)。当拒绝某条消息时,应用能够告诉消息代理如何处理这条消息——销毁它或者从新放入队列。当此队列只有一个消费者时,请确认不要因为拒绝消息而且选择了从新放入队列的行为而引发消息在同一个消费者身上无限循环的状况发生。
Negative Acknowledgements
在AMQP中,basic.reject方法用来执行拒绝消息的操做。但basic.reject有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。可是若是你使用的是RabbitMQ,那么你可使用被称做negative acknowledgements(也叫nacks)的AMQP 0-9-1扩展来解决这个问题。
预取消息
在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每一个消费者一次能够接受多少条消息是很是有用的。这能够在试图批量发布消息的时候起到简单的负载均衡和提升消息吞吐量的做用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,若是生产应用每分钟才发送一条消息,这说明处理工做尚在运行。)
注意,RabbitMQ只支持通道级的预取计数,而不是链接级的或者基于大小的预取。
消息属性和有效载荷(消息主体)
AMQP模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以致于AMQP 0-9-1 明确的定义了它们,而且应用开发者们无需费心思思考这些属性名字所表明的具体含义。例如:
- Content type(内容类型)
- Content encoding(内容编码)
- Routing key(路由键)
- Delivery mode (persistent or not)
- 投递模式(持久化 或 非持久化)
- Message priority(消息优先权)
- Message publishing timestamp(消息发布的时间戳)
- Expiration period(消息有效期)
- Publisher application id(发布应用的ID)
有些属性是被AMQP代理所使用的,可是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称做消息头(headers)。他们跟HTTP协议的X-Headers很类似。消息属性须要在消息被发布的时候定义。
AMQP的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被AMQP代理看成不透明的字节数组来对待。消息代理不会检查或者修改有效载荷。消息能够只包含属性而不携带有效载荷。它一般会使用相似JSON这种序列化的格式数据,为了节省,协议缓冲器和MessagePack将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP及其同行者们一般使用”content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工做,但这仅仅是基于约定而已。
消息可以以持久化的方式发布,AMQP代理会将此消息存储在磁盘上。若是服务器重启,系统会确认收到的持久化消息未丢失。简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具备持久化性质:它彻底取决与消息自己的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能形成必定的影响(就像数据库操做同样,健壮性的存在一定形成一些性能牺牲)。
消息确认
因为网络的不肯定性和应用失败的可能性,处理确认回执(acknowledgement)就变的十分重要。有时咱们确认消费者收到消息就能够了,有时确认回执意味着消息已被验证而且处理完毕,例如对某些数据已经验证完毕而且进行了数据存储或者索引操做。
这种情形很常见,因此 AMQP 0-9-1 内置了一个功能叫作 消息确认(message acknowledgements),消费者用它来确认消息已经被接收或者处理。若是一个应用崩溃掉(此时链接会断掉,因此AMQP代理亦会得知),并且消息的确认回执功能已经被开启,可是消息代理还没有得到确认回执,那么消息会被重新放入队列(而且在还有还有其余消费者存在于此队列的前提下,当即投递给另一个消费者)。
协议内置的消息确认功能将帮助开发者创建强大的软件。
链接
AMQP链接一般是长链接。AMQP是一个使用TCP提供可靠投递的应用层协议。AMQP使用认证机制而且提供TLS(SSL)保护。当一个应用再也不须要链接到AMQP代理的时候,须要优雅的释放掉AMQP链接,而不是直接将TCP链接关闭。
通道
有些应用须要与AMQP代理创建多个链接。不管怎样,同时开启多个TCP链接都是不合适的,由于这样作会消耗掉过多的系统资源而且使得防火墙的配置更加困难。AMQP 0-9-1提供了通道(channels)来处理多链接,能够把通道理解成共享一个TCP链接的多个轻量化链接。
在涉及多线程/进程的应用中,为每一个线程/进程开启一个通道(channel)是很常见的,而且这些通道不能被线程/进程共享。
一个特定通道上的通信与其余通道上的通信是彻底隔离的,所以每一个AMQP方法都须要携带一个通道号,这样客户端就能够指定此方法是为哪一个通道准备的。
虚拟主机
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟Web servers虚拟主机概念很是类似,这为AMQP实体提供了彻底隔离的环境。当链接被创建的时候,AMQP客户端来指定使用哪一个虚拟主机。
AMQP是可扩展的
AMQP 0-9-1 拥有多个扩展点:
- 定制化交换机类型 可让开发者们实现一些开箱即用的交换机类型还没有很好覆盖的路由方案。例如 geodata-based routing。
- 交换机和队列的声明中能够包含一些消息代理可以用到的额外属性。例如RabbitMQ中的per-queue message TTL便是使用该方式实现。
- 特定消息代理的协议扩展。例如RabbitMQ所实现的扩展。
- 新的 AMQP 0-9-1 方法类可被引入。
- 消息代理能够被其余的插件扩展,例如RabbitMQ的管理前端 和 已经被插件化的HTTP API。
这些特性使得AMQP 0-9-1模型更加灵活,而且可以适用于解决更加宽泛的问题。
AMQP 0-9-1 客户端生态系统
AMQP 0-9-1 拥有众多的适用于各类流行语言和框架的客户端。其中一部分严格遵循AMQP规范,提供AMQP方法的实现。另外一部分提供了额外的技术,方便使用的方法和抽象。有些客户端是异步的(非阻塞的),有些是同步的(阻塞的),有些将这二者同时实现。有些客户端支持“供应商的特定扩展”(例如RabbitMQ的特定扩展)。
由于AMQP的主要目标之一就是实现交互性,因此对于开发者来说,了解协议的操做方法而不是只停留在弄懂特定客户端的库就显得十分重要。这样一来,开发者使用不一样类型的库与协议进行沟通时就会容易的多。
rabbitmq使用
rabbitmq安装
在ubuntu中rabbitmq安装步骤:
- 使用
sudo apt-get install rabbitmq-server
安装rabbitmq,以后开启rabbitmq的web管理页面
- 使用
sudo /usr/sbin/rabbitmq-plugins enable rabbitmq_management
开启rabbitmq的web管理页面插件
- 使用
sudo /etc/init.d/rabbitmq-server start
启动rabbitmq服务
- 打开浏览器访问服务器的web页面:http://ipaddr:15672,默认登陆帐号/密码:guest/guest(为安全起见,建议生产环境不使用该帐号)
- rabbitmq的链接端口默认是5672端口,可使用
sudo netstat -ntlp
查看,若是开启了web管理页面的话,应该能看到5672,15672,25672几个端口被打开了
rabbitmq用户管理
- 用户管理
- 新增用户:
rabbitmqctl add_user Username Password
- 删除用户:
rabbitmqctl delete_user Username
- 修改用户密码:
rabbitmqctl change_password Username Newpassword
- 查看用户列表:
rabbitmqctl list_users
- 用户角色
- 超级管理员(administrator)可登录管理控制台(启用management plugin的状况下),可查看全部的信息,而且能够对用户,策略(policy)进行操做。
- 监控者(monitoring)可登录管理控制台(启用management plugin的状况下),同时能够查看rabbitmq节点的相关信息(进程数,内存使用状况,磁盘使用状况等)
- 策略制定者(policymaker)可登录管理控制台(启用management plugin的状况下), 同时能够对policy进行管理。但没法查看节点的相关信息。
- 普通管理者(management)仅可登录管理控制台(启用management plugin的状况下),没法看到节点信息,也没法对策略进行管理。
- 其余 没法登录管理控制台,一般就是普通的生产者和消费者。
- 设置用户角色:
rabbitmqctl set_user_tags User Tag
User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其余自定义名称)。
- 可给用户设置多个角色:
rabbitmqctl set_user_tags huxianglin monitoring policymaker
- 用户权限设置
- 用户权限指的是用户对exchange,queue的操做权限,包括配置权限,读写权限。配置权限会影响到exchange,queue的声明和删除。读写权限影响到从queue里取消息,向exchange发送消息以及queue和exchange的绑定(bind)操做。例如: 将queue绑定到某exchange上,须要具备queue的可写权限,以及exchange的可读权限;向exchange发送消息须要具备exchange的可写权限;从queue里取数据须要具备queue的可读权限。
- 设置用户权限:
rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP
举例:rabbitmqctl set_permissions -p / huxianglin ".*" ".*" ".*"
给用户huxianglin设置vhost为/的全部队列和交换机全部权限
- 查看(指定hostpath)全部用户的权限信息:
rabbitmqctl list_permissions [-p VHostPath]
- 查看指定用户的权限信息:
rabbitmqctl list_user_permissions User
- 清除用户的权限信息:
rabbitmqctl clear_permissions [-p VHostPath] User
rabbitmq监控
- 队列信息查看:
rabbitmqctl list_queues [-p <vhostpath>] [<queueinfoitem> ...]
- 交换机查看:
rabbitmqctl list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
- 绑定信息查看:
rabbitmqctl list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
- 链接信息查看:
rabbitmqctl list_connections [<connectioninfoitem> ...]
- 链接channel查看:
rabbitmqctl list_channels [<channelinfoitem> ...]
- 链接用户查看:
rabbitmqctl list_consumers [-p <vhostpath>]
- 查看rabbitmq-server状态:
rabbitmqctl status
应用举例
python链接rabbitmq须要安装一个包,pika,安装命令:pip3 install pika
。
例子1:direct交换机demo
rabbit_direct_publish.py
import pika
import sys
credentials=pika.PlainCredentials("huxianglin","huxianglin")
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.184.128',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
serverity=sys.argv[1] if len(sys.argv)>1 else "info"
message = ' '.join(sys.argv[2:]) or "info: Hello World!"
channel.basic_publish(exchange='direct_logs',
routing_key=serverity,
body=message)
print(" [x] Sent %r:%r" % (serverity,message))
connection.close()
rabbit_direct_consumer.py
import pika
import sys
credentials=pika.PlainCredentials("huxianglin","huxianglin")
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.184.128',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
serverities=sys.argv[1:]
if not serverities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" %sys.argv[0])
sys.exit(1)
for serverity in serverities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=serverity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key,body))
channel.basic_consume(callback,
queue=queue_name,)
channel.start_consuming()
例子2:扇形交换机demo:
rabbit_fanout_publish.py
import pika
import sys
credentials=pika.PlainCredentials("huxianglin","huxianglin")
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.184.128',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
rabbit_fanout_consumer.py
import pika
credentials=pika.PlainCredentials("huxianglin","huxianglin")
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.184.128',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
例子3:topic交换机demo:
rabbit_topic_publish.py
import pika
import sys
credentials=pika.PlainCredentials("huxianglin","huxianglin")
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.184.128',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key=sys.argv[1] if len(sys.argv)>1 else "anonymous.info"
message = ' '.join(sys.argv[2:]) or "Hello World!"
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key,message))
connection.close()
rabbit_topic_consumer.py
import pika
import sys
credentials=pika.PlainCredentials("huxianglin","huxianglin")
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.184.128',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys=sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]\n" %sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key,body))
channel.basic_consume(callback,
queue=queue_name,)
channel.start_consuming()
例子4:rpc调用demo:
rpc_client.py
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
'''Will make sure that data events are processed. Dispatches timer and channel callbacks if not called from the scope of BlockingConnection or BlockingChannel callback. Your app can block on this method. :param float time_limit:suggested upper bound on processing time in seconds. The actual blocking time depends on the granularity of the underlying ioloop. Zero means return as soon as possible. None means there is no limit on processing time and the function will block until I/O produces actionalable events. Defaults to 0 for backward compatibility. This parameter is NEW in pika 0.10.0. 该方法能够传递参数time_limit=0,默认为0,即为不阻塞的检测channel是否有消息回来,若是有消息接收到,则执行回调函数,当time_limit不为0时,将每次检测阻塞time_limit秒。该方法在pika0.10.0中新加。 '''
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
rpc_server.py
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
本文引用了以下资料来源:
amqp协议介绍以及rabbitmq官网