工做队列中,每一个任务之分发给一个工做者。若是须要分发一个消息给多个消费者,这种模式被称为“发布/订阅”html
RabbitMQ完整的消息模型python
发布者(producer)是发布消息的应用程序git
队列(queue)用于消息存储的缓冲服务器
消费者(consumer)是接收消息的应用程序spa
RabbitMQ消息模型的核心理念是:.net
发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。日志
发布者(producer)只须要把消息发送给一个交换器(exchage),而后由它一边从发布者接收消息,一边把消息推入队列。交换器必须知道如何处理它接收到的消息,是应该推送到指定的队列仍是多个队列,或者直接忽略消息。这些规则经过exchange type来定义。code
一、directorm
处理路由键,须要将一个队列绑定到交换机上,要求该消息与一个特定的路由键彻底匹配。server
是完整的匹配,与routing_key对应。
二、topic
将路由键和某模式进行匹配。此时队列须要绑定在一个模式上。
三、headers
heads类型的Exchange不依赖于routing key与bingding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对,当消息发送到Exchange时,RabbitMQ会取到该消息的headers,对比其中的键值对是否彻底匹配Queue与Exchange绑定时指定的键值对,若是彻底匹配,则消息路由到该队列,不然不会路由。
四、fanout
不处理路由键,只须要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的全部队列上。相似子网广播,每台子网内的主机都得到一份复制的消息。
fanout交换机转发消息是最快的。
exchange_declare(self, exchange=None, exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None, type=None) method of pika.adapters.blocking_connection.BlockingChannel instance This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class. If passive set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not and if the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found). :param exchange: The exchange name consists of a non-empty sequence of these characters: letters, digits, hyphen, underscore, period, or colon. :type exchange: str or unicode :param str exchange_type: The exchange type to use :param bool passive: Perform a declare or just check to see if it exists :param bool durable: Survive a reboot of RabbitMQ :param bool auto_delete: Remove when no more queues are bound to it :param bool internal: Can only be published to by other exchanges :param bool nowait: Do not expect an Exchange.DeclareOk response :param dict arguments: Custom key/value pair arguments for the exchange :param str type: The deprecated exchange type parameter
guosong@guosong:~$ rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic ...done.
以前的列子中指定exchange=‘’,命名为空字符串。
channel.basic_publish(exchange='', routing_key='hello', body=message)
exchange参数就是交换器的名称,空字符串表明匿名交换器,消息将会根据指定的routing_key分发到指定的队列。
对于共享同队列的需求,指定队列名称比较重要。
对于日志系统而言,打算接收全部的日志消息,关心的是最新的消息。
为了解决这个问题,须要作两件事情。
一、随机建立队列名
RabbitMQ能够为咱们选择一个随机的队列名(推荐),固然也能够指定。
#经过不指定queue参数值,实现RabbitMQ分配随机队列名 result = channel.queue_declare()
#能够经过以下方式获取消息队列名称
result.method.queue
二、当与消费者(consumer)断开链接的时候,这个队列应当被删除。可使用exclusive标识。
result = channel.queue_declare(exclusive=True)
已经建立一个fanout类型的交换器和一个队列。须要告诉交换器如何发送消息给咱们的队列。
交换器和队列之间的关系称之为绑定(binding)
#logs交换器将会把消息添加到队列中 #队列是上面说的服务器随机命名的 channel.queue_bind(exchange='logs', queue=result.method.queue)
绑定列表查看
guosong@guosong:~$ rabbitmqctl list_bindings Listing bindings ... exchange task_queue queue task_queue [] gs_test_exchange exchange task_queue queue task_queue [] ...done.
把消息发送给logs交换器,在发送的时候提供routing_key参数,可是它的值会被fanout交换器忽略
#!/usr/bin/env python #-*- coding:utf8 -*- import sys import pika import logging logging.basicConfig(format='%(levelname)s:$(message)s',level=logging.CRITICAL) def emit_log(): pika.connection.Parameters.DEFAULT_HOST = 'localhost' pika.connection.Parameters.DEFAULT_PORT = 5672 pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = '/' pika.connection.Parameters.DEFAULT_USERNAME = 'guosong' pika.connection.Parameters.DEFAULT_PASSWORD = 'guosong' para = pika.connection.Parameters() connection = pika.BlockingConnection(para) channel = connection.channel() #声明一个logs交换器,类型为fanout,不容许发布消息到不存在的交换器 channel.exchange_declare(exchange='logs',type='fanout') message = '.'.join(sys.argv[1:]) or "info:Hello World!" #发送的时候指定routing_key为空,没有绑定队列到交换器上,消息将会丢失 #对于日志类消息,若是没有消费者监听的话,这些消息就会忽略 channel.basic_publish(exchange='logs',routing_key='',body=message) #%r也是string类型 print "[x] Sent %r" % (message,) connection.close() if __name__ == '__main__': emit_log()
一、两个临时队列
二、两个consumer输出
第二个consumer开启晚一些,所以其收到消息少一些,由于在发布的时候指定routing_key为空,exchange不会保留。
三、在consumer退出的时候,两个临时产生的队列也自动删除。
logs交换器把数据发送给两个系统命名的队列,符合指望需求。
一、http://zhanghua.1199.blog.163.com/blog/static/4644980720128732417654/
二、http://adamlu.net/rabbitmq/tutorial-three-python
三、http://blog.csdn.net/puncha/article/details/8449273
四、http://www.ostest.cn/archives/497