源码:https://github.com/ltoddy/rabbitmq-tutorialpython
(using the Pika Python client)git
在上一篇教程中,咱们建立了工做队列。工做队列背后的假设是每一个任务只能传递给一个工做人员。
在这一部分,咱们将作一些彻底不一样的事情 - 咱们会向多个消费者传递信息。这种模式被称为“发布/订阅”。github
为了说明这种模式,咱们将创建一个简单的日志系统。它将包含两个程序 - 第一个将发送日志消息,第二个将接收并打印它们。服务器
在咱们的日志系统中,接收程序的每一个运行副本都会收到消息。这样咱们就能够运行一个接收器并将日志指向磁盘; 同时咱们将可以运行另外一个接收器并在屏幕上查看日志。spa
通常来讲,发布的日志消息将以广播的形式发给全部的接收者。日志
在本教程的前几部分中,咱们发送消息并从队列中接收消息。如今是时候在rabbitmq中引入完整的消息传递模型。code
让咱们快速回顾一下前面教程中的内容:教程
RabbitMQ中的消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列中。实际上,生产者一般甚至不知道邮件是否会被传送到任何队列中。rabbitmq
相反,生产者只能发送消息给交易所。交换是一件很是简单的事情。一方面它接收来自生产者的消息,另外一方则推进他们排队。
交易所必须知道如何处理收到的消息。是否应该附加到特定队列?它应该附加到许多队列中吗?或者它应该被丢弃。这些规则由交换类型定义 (exchange type)。队列
有几种可用的交换类型: direct, topic, header 和 fanout。咱们将关注最后一个 - fanout。让咱们建立该类型的交换,并将其称为logs:
channel.exchange_declare(exchange='logs', exchange_type='fanout')
fanout交换很是简单。正如你可能从名字中猜出的那样,它只是将收到的全部消息广播到它所知道的全部队列中。这正是咱们logger所须要的。
如今,咱们能够发布到咱们的指定交易所:
channel.basic_publish(exchange='logs', routing_key='', body=message)
正如你之前可能记得咱们正在使用具备指定名称的队列(还记得hello和task_queue吗?)。可以命名队列对咱们相当重要 - 咱们须要将工做人员指向同一队列。
当你想在生产者和消费者之间分享队列时,给队列一个名字是很重要的。
可是,咱们的记录器并不是如此。咱们但愿听到全部日志消息,而不单单是其中的一部分。咱们也只对目前流动的消息感兴趣,而不是旧消息。要解决这个问题,咱们须要作两件事。
首先,每当咱们链接到rabbitmq,咱们须要一个新的,空的队列。要作到这一点,咱们能够建立一个随机名称的队列,或者甚至更好 - 让服务器为咱们选择一个随机队列名称。
咱们能够经过不将队列参数提供给queue_declare来实现这一点:
result = channel.queue_declare()
此时,result.method.queue包含一个随机队列名称。例如,它可能看起来像amq.gen-i94oCE_tj3LyWsy-94KXHg。
其次,一旦消费者链接关闭,队列应该被删除。这是一个专有标志:
result = channel.queue_declare(exclusive=True)
咱们已经建立了一个fanout交换和一个队列。如今咱们须要告诉交换所将消息发送到咱们的队列。交换和队列之间的关系称为绑定。
channel.queue_bind(exchange='logs', queue=result.method.queue)
从如今起,logs 交易所会将消息附加到咱们的队列中。
发出日志消息的生产者程序与以前的教程没有多大区别。最重要的变化是咱们如今想发布消息到咱们的logs交易所,而不是无名字的消息。发送时咱们须要提供一个routing_key,可是对于fanout交换,它的值将被忽略。这里是emit_log.py脚本的代码 :
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello world!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
如你所见,创建链接后,咱们宣布交易所。这一步是必要的,由于发布到不存在的交易所是被禁止的。
若是没有队列绑定到交换机上,这些消息将会丢失,但这对咱们来讲没问题; 若是没有消费者正在收听,咱们能够放心地丢弃消息。
receive_logs.py的代码:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
咱们完成了。若是您想将日志保存到文件中,只需打开一个控制台并输入:
python receive_logs.py > logs_from_rabbit.log
若是你想在屏幕上看到日志,打开一个新的终端并运行:
python receive_logs.py
固然,
python emit_log.py
使用rabbitmqctl list_bindings,你能够验证代码是否真正建立了绑定和队列。当有两个receive_logs.py程序正在运行,你应该看到以下所示:
root@921edcb46341:/# rabbitmqctl list_bindings Listing bindings for vhost /... exchange amq.gen-6YXn7BycIwtI7kFuUrTbaA queue amq.gen-6YXn7BycIwtI7kFuUrTbaA [] exchange amq.gen-JhFL-rbMAoricMu5Dyo-hA queue amq.gen-JhFL-rbMAoricMu5Dyo-hA [] logs exchange amq.gen-6YXn7BycIwtI7kFuUrTbaA queue amq.gen-6YXn7BycIwtI7kFuUrTbaA [] logs exchange amq.gen-JhFL-rbMAoricMu5Dyo-hA queue amq.gen-JhFL-rbMAoricMu5Dyo-hA []