本文将使用Python(pika 0.9.8)实现从Producer到Consumer传递数据”Hello, World“。html
首先复习一下上篇所学:RabbitMQ实现了AMQP定义的消息队列。它实现的功能”很是简单“:从Producer接收数据而后传递到Consumer。它能保证多并发,数据安全传递,可扩展。python
和任何的Hello world同样,它们都不复杂。咱们将会设计两个程序,一个发送Hello world,另外一个接收这个数据而且打印到屏幕。
总体的设计以下图:git
RabbitMQ 实现了AMQP。所以,咱们须要安装AMPQ的library。幸运的是对于多种编程语言都有实现。咱们可使用如下lib的任何一个:github
在这里咱们将使用pika. 能够经过
pip 包管理工具来安装:编程$ sudo pip install pika==0.9.8复制代码
这个安装依赖于pip和git-core。安全
- On Ubuntu:
$ sudo apt-get install python-pip git-core 复制代码
- On Debian:
$ sudo apt-get install python-setuptools git-core $ sudo easy_install pip 复制代码
- On Windows:To install easy_install, run the MS Windows Installer for
setuptools> easy_install pip > pip install pika==0.9.8 复制代码
第一个program send.py:发送Hello world 到queue。正如咱们在上篇文章提到的,你程序的第一句话就是创建链接,第二句话就是建立channel:bash
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()复制代码
建立链接传入的参数就是RabbitMQ Server的ip或者name。架构
关于谁建立queue,上篇文章也讨论过:Producer和Consumer都应该去建立。并发
接下来咱们建立名字为hello的queue:编程语言
channel.queue_declare(queue='hello')复制代码
建立了channel,咱们能够经过相应的命令来list queue:
$ sudo rabbitmqctl list_queues
Listing queues ...
hello 0
...done.复制代码
如今咱们已经准备好了发送了。
从架构图能够看出,Producer只能发送到exchange,它是不能直接发送到queue的。如今咱们使用默认的exchange(名字是空字符)。这个默认的exchange容许咱们发送给指定的queue。routing_key就是指定的queue名字。
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
复制代码
退出前别忘了关闭connection。
connection.close()复制代码
第二个program receive.py 将从queue中获取Message而且打印到屏幕。
第一步仍是建立connection。第二步建立channel。第三步建立queue,name = hello:
channel.queue_declare(queue='hello')复制代码
接下来要subscribe了。在这以前,须要声明一个回调函数来处理接收到的数据。
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)复制代码
subscribe:
channel.basic_consume(callback,
queue='hello',
no_ack=True)复制代码
最后,准备好无限循环监听吧:
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()复制代码
send.py:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()复制代码
receive.py:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()复制代码
先运行 send.py program:
$ python send.py
[x] Sent 'Hello World!'复制代码
send.py 每次运行完都会中止。注意:如今数据已经存到queue里了。接收它:
$ python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'复制代码
接下来,就要奉上更接近实际环境的例子。取决与个人课余时间啊。。。
参考文献:
1. http://www.rabbitmq.com/tutorials/tutorial-one-python.html