RabbitMQ消息队列(二):”Hello, World“

本文将使用Python(pika 0.9.8)实现从Producer到Consumer传递数据”Hello, World“。html

首先复习一下上篇所学:RabbitMQ实现了AMQP定义的消息队列。它实现的功能”很是简单“:从Producer接收数据而后传递到Consumer。它能保证多并发,数据安全传递,可扩展。python

和任何的Hello world同样,它们都不复杂。咱们将会设计两个程序,一个发送Hello world,另外一个接收这个数据而且打印到屏幕。
总体的设计以下图:git

RabbitMQ消息队列(二):”Hello, World“

1. 环境配置

RabbitMQ 实现了AMQP。所以,咱们须要安装AMPQ的library。幸运的是对于多种编程语言都有实现。咱们可使用如下lib的任何一个:github

在这里咱们将使用pika. 能够经过
pip
包管理工具来安装:编程

$ sudo pip install pika==0.9.8复制代码

这个安装依赖于pip和git-core。安全

  • On Ubuntu:
    $ sudo apt-get install python-pip git-core
    复制代码
  • On Debian:
    $ sudo apt-get install python-setuptools git-core
    $ sudo easy_install pip
    复制代码
  • On Windows:To install easy_install, run the MS Windows Installer for
    setuptools

    > easy_install pip
    > pip install pika==0.9.8
    复制代码

2. Sending

RabbitMQ消息队列(二):”Hello, World“

第一个program send.py:发送Hello world 到queue。正如咱们在上篇文章提到的,你程序的第一句话就是创建链接,第二句话就是建立channel:bash

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()复制代码

建立链接传入的参数就是RabbitMQ Server的ip或者name。架构

关于谁建立queue,上篇文章也讨论过:Producer和Consumer都应该去建立。并发

接下来咱们建立名字为hello的queue:编程语言

channel.queue_declare(queue='hello')复制代码

建立了channel,咱们能够经过相应的命令来list queue:

$ sudo rabbitmqctl list_queues
Listing queues ...
hello    0
...done.复制代码

如今咱们已经准备好了发送了。
从架构图能够看出,Producer只能发送到exchange,它是不能直接发送到queue的。如今咱们使用默认的exchange(名字是空字符)。这个默认的exchange容许咱们发送给指定的queue。routing_key就是指定的queue名字。

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
复制代码

退出前别忘了关闭connection。

connection.close()复制代码

3. Receiving

RabbitMQ消息队列(二):”Hello, World“

第二个program receive.py 将从queue中获取Message而且打印到屏幕。

第一步仍是建立connection。第二步建立channel。第三步建立queue,name = hello:

channel.queue_declare(queue='hello')复制代码

接下来要subscribe了。在这以前,须要声明一个回调函数来处理接收到的数据。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)复制代码

subscribe:

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)复制代码

最后,准备好无限循环监听吧:

print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()复制代码

4. 最终版本

send.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()复制代码

receive.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

channel.start_consuming()复制代码

5. 最终运行

先运行 send.py program:

$ python send.py
 [x] Sent 'Hello World!'复制代码

send.py 每次运行完都会中止。注意:如今数据已经存到queue里了。接收它:

$ python receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'复制代码

接下来,就要奉上更接近实际环境的例子。取决与个人课余时间啊。。。

参考文献:

1. http://www.rabbitmq.com/tutorials/tutorial-one-python.html

相关文章
相关标签/搜索