在以前的文章RabbitMQ入门(二)工做队列中,咱们建立了一个工做队列。工做队列背后的假设是每一项任务都被准确地传送至一个worker。在本文中,咱们将会作一些不一样的事情——咱们将会把一个消息发送至许多消费者中。这种模式被称为订阅模式(publish/subscribe)
。
为了解释这种模式,咱们将会构建一个简单的日志系统。它包含两个程序——第一个将会产生消息,第二个将会接收并输出这些消息。
在咱们的日志系统中,每个正在运行的接收程序都会收到消息。在这种方式下,咱们能够运行一个接收程序来接收并将日志保存至硬盘;同时,咱们还能运行另外一个接收程序,在屏幕上观察到日志的输出。
特别地,发送的这些消息都会被广播到全部的接收程序。python
在以前的文章中,咱们向队列发送消息,从队列中接受消息。如今是时候介绍RabbitMQ中的所有消息转发模式。
让咱们快速地浏览下以前文章中讲了些什么:缓存
RabbitMQ中消息传输模式的核心思想是生产者毫不会直接向队列发送任何消息。实际上,一般状况下生产者甚至都不会知道消息是否会被发送至队列。
生产者会将消息发送至交换(exchange)
。交换
并不复杂。一方面它从生产者中接受消息,另外一方面将消息推送至队列。交换
必须知道,当它接受一个消息时,它该怎么作。是否这个消息会附加至一个特殊的队列?是否它会附加至许多队列?或者它会被丢弃。这个规则用交换类型(exchange type)
来定义。bash
有一些可用的交换类型
:直接分发(direct)
,通配分发(topic)
,headers
和复制分发(fanout)
。咱们将会集中讲最后一个——fanout。咱们建立一个交换
,类型为fanout,并取名为logs:spa
channel.exchange_declare(exchange='logs', exchange_type='fanout')
fanout交换很是简单。顾名思义,它会将全部它知道的接收队列的消息都广播出去。而这也正是咱们的日志系统所须要的。
如今,咱们能够发布已经命名好的队列了:.net
channel.basic_publish(exchange='logs', routing_key='', body=message)
你也许还记得在以前的文章中,咱们须要给队列取名。可是呢,给队列命名太麻烦了——咱们须要将workers指定到同一个队列。当你须要在生产者和消费者之间共享队列的时候,给队列命名又是很重要的。
这种情形并不适合咱们的日志系统。咱们想要监听全部的消息,而不是部分消息。同时,咱们仅对当前的流动消息感兴趣,而不是以前的消息。为了解决这个问题,咱们须要作两件事情。
首先,不管什么时候咱们链接到RabbitMQ,咱们须要一个新的空队列。为此,咱们建立一个随机命名的队列,或者更好的是,让RabbitMQ Server来给咱们建立一个随机命名的队列。所以,咱们能够利用queue_declare
命令,设置queuq
参数为空:日志
result = channel.queue_declare(queue='')
此时,result.method.queue
会包含一个随机命名的队列,好比说,它会和amq.gen-JzTY20BRgKO-HjmUJj0wLg
相似。
其次,一旦消息者的链接关闭,咱们须要删除队列。这能够用exclusive
参数搞定:code
result = channel.queue_declare(queue='', exclusive=True)
咱们已经建立了一个fanout 交换和队列。如今咱们须要告诉交换,将消息发送至队列。交换与队列之间的关系叫作绑定(Bindings)
。blog
channel.queue_bind(exchange='logs', queue=result.method.queue)
从如今开始,logs
交换将会在咱们的队列后追加消息。rabbitmq
生产者代码(emit_log.py):队列
# -*- coding: utf-8 -*- import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
消费者代码(receive_log.py):
# -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
开启四个终端,其中一个用于保存日志:
python3 receive_log.py > logs_from_rabbit.log
另外一个用于观察日志输出:
python3 receive_log.py
日志产生:
python3 emit_log.py
监听绑定:
sudo rabbitmqctl list_bindings
运行截图以下:
本次分享到此结束,感谢你们阅读~