RabbitMQ(python实现)学习之二:Producer发送消息至多个消息队列queue(广播消息)

1.1本部份内容简介python

这部分咱们将要发送一个消息到多个Consumer,这部分称之为“publish/subscribe”缓存

咱们实现的方式就是发送端,发送一个消息,与此同时,多个接收端将同时接收到消息并打印在屏幕上面。函数

1.2exchange简介测试

在前面的博文中,咱们的讲解是:发送端发送消息至消息队列,接收端从消息队列获取消息。如今咱们来介绍一下rabbitmq的完整消息传送模型。spa

>Producer:用来发送消息的应用程序命令行

>queue:用来存储消息的缓存3d

>Consumer:用来接收消息的应用程序code

消息传送模型的核心是,Producer从不会直接将消息传送给queue,而是,将消息传送给exchange,exchange是个很简单的东西,在一侧,他接收来自Producer的消息,另外一侧将消息传送给queue。exchange将消息传送给你个queue,仍是传送给多个queue,这主要是由exchange的type决定。模型图以下:blog

        

 

exchange有不少type可用,以下:direct、topic、headers、fanout。本博客针对fanout讲解,后续博文对其余类型有所讲解,让咱们建立一个exchange,type为fanout,名字为logs,代码以下:rabbitmq

channel.exchange(exchange='logs',type='fanout')

对于type为fanout的exchange,理解起来很是简单,它将接收到的消息,广播给他所知道的全部的queue,即全部和他创建链接的queue。前面的博文降到了命令行查看list_exchanges的命令以下:

 $ :sudo rabbitmqctl list_exchanges
 Listing exchanges ...
 logs      fanout
 amq.direct      direct
 amq.topic       topic
 amq.fanout      fanout
 amq.headers     headers
 ...done.

对于上图中,你会看到不少amq.*的exchange,这些是系统默认创建的,在你不创建exchange时,系统默认创建上面几个。

对于消息的发布函数basic_publish()也随之变为:

channel.basic_publish(exchange='logs',routing_key='',body=message)

1.3临时queue

 正如你前面学到的,对于一个queue,会有本身的名字(hello什么的),

首先:

result = channel.queue_declare()

而后经过result.method.queue,系统会随机给queue命名。

若是咱们想Producer与Consumer断开链接时,队列queue删除,那么须要改为下面的代码:

result = channel.queue_declare(exclusive=True)

1.4Bingings(将queue与exchange绑定)

模型图以下:

        

咱们已经建立了一个type为fanout的exchange,如今,咱们要告诉exchange,将消息发送给咱们本身定义的queue,在exchange与queue之间创建链接的是binding,代码以下:

channel.queue_bind(exchange='logs',queue=result.method.queue)

在命令行查看binding的列表,命令以下:

$: sudo rabbitmqctl list_bindings

 1.5最终代码

最终的模型以下:

           

send.py代码以下:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"  #若是键盘有输入,message为键盘输入,若是键盘没有输入,消息message="info: Hello World!";
 channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()

 

receive.py代码

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

1.6代码测试

开启一个命令行窗口,运行send.py:

$: python send.py   #(此时你传送的内容为info: Hello World!)或者  

$: python send.py message
#message为你想发送的内容

开启两个命令行窗口,分别运行receive.py,两个窗口你会看到有相同的消息输出:

$: python receive.py
相关文章
相关标签/搜索