RbbitMQ消息队列及python实现

  一、简介

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。html

  全部主要的编程语言均有与代理接口通信的客户端库。官网:http://www.rabbitmq.com/python

    RabbidMQ是一个消息代理:它接受和转发消息。你能够把它想象成一个邮局:当你把你想要寄出的邮件放在一个邮箱里时,你能够肯定,邮递员先生或女士最终会把邮件交给你的收件人。linux

  在这个类比中,rabbitmq是一个邮箱、一个邮局和一个邮递员。编程

    帮助文档:http://www.rabbitmq.com/getstarted.html小程序

  二、安装

    Erlang与RabbitMQ,安装路径都应不含空格符。windows

  Erlang使用了环境变量HOMEDRIVE与HOMEPATH来访问配置文件.erlang.cookie,应注意这两个环境变量的有效性。须要设定环境变量ERLANG_HOME,并把%ERLANG_HOME%\bin加入到全局路径中。
  RabbitMQ使用本地computer name做为服务器的地址,所以须要注意其有效性,或者直接解析为127.0.0.1
  可能须要在本地网络防火墙打开相应的端口
  首先下载安装 Erlang:http://www.erlang.org/downloads
  下载 RabbitMQhttp://www.rabbitmq.com/download.html
  windows下安装完成后
   系统服务中有RabbitMQ服务,中止、启动、重启
  
 

三、测试

 3.一、启用管理插件

  

 

  进入安装路径D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.12下的sbin目录,启动命令行,而后输入: rabbitmq-plugins enable rabbitmq_management,以下:
  

   3.二、管理界面

     打开浏览器输入:http://127.0.0.1:15672/浏览器

    

 

    经过默认帐户 guest/guest 登陆,若是可以登陆,说明安装成功。bash

    

 

 

     添加Admin用户服务器

    

 

    添加用户角色cookie

    一、超级管理员(administrator)
    可登录管理控制台,可查看全部的信息,而且能够对用户,策略(policy)进行操做。
    二、监控者(monitoring)
    可登录管理控制台,同时能够查看rabbitmq节点的相关信息(进程数,内存使用状况,磁盘使用状况等)
    三、策略制定者(policymaker)
    可登录管理控制台, 同时能够对policy进行管理。但没法查看节点的相关信息(上图红框标识的部分)。
    四、普通管理者(management)
    仅可登录管理控制台,没法看到节点信息,也没法对策略进行管理。
    五、其余
    没法登录管理控制台,一般就是普通的生产者和消费者。
    

    添加角色testhost

    

      选中Admin用户,设置权限:

    

      看到权限已加:

 

      

 

  

 

 

 

    

 

四、测试实例

    安装pika模块,python使用rabbitmq服务,可使用现成的类库pika、txAMQP或者py-amqplib,这里选择了pika。在命令行中直接使用pip命令:pip install pika

    队列是位于rabbitmq中的邮箱的名称。尽管消息经过rabbitmq和在程序中传送,但它们只能存储在队列中。队列只受主机内存和磁盘限制的约束,

    它本质上是一个大的消息缓冲区。许多生产者能够向一个队列发送消息,许多消费者能够尝试从一个队列接收数据。

    请注意:生产者、消费者和代理没必要驻留在同一主机上;实际上,在大多数应用程序中,它们没必要驻留在同一主机上。应用程序也能够同时是生产者和消费者。

 

    在本文章中,使用python编写两个小程序:一个发送单个消息的生产者(发送者)和一个接收并打印消息的消费者(接收者)。这是一个信息传递的“你好世界”。

  在下图中,“P”是咱们的生产者,“C”是咱们的消费者。中间的框是一个队列-一个消息缓冲区,rabbitmq表明使用者保留该缓冲区。

    

    生产者将消息发送到“hello”队列。消费者从该队列接收消息。

 4.一、发送

    咱们的第一个程序send.py将向队列发送一条消息。咱们须要作的第一件事是与rabbitmq服务器创建链接。

     

 

import pika

  connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  channel = connection.channel()

    咱们如今已经链接到本地机器上的一个代理,所以就是本地主机。若是咱们想链接到另外一台机器上的代理,咱们只需在这里指定它的名称或IP地址。

     接下来,在发送以前,咱们须要确保收件人队列存在。若是咱们向不存在的位置发送消息,rabbitmq将只删除该消息。让咱们建立一个Hello队列,将消息传递到该队列:

channel.queue_declare(queue='hello')

     此时,准备发送消息。第一条消息将只包含一个字符串hello world!把它发送到问候队列。

    在rabbitmq中,消息永远不能直接发送到队列,它老是须要经过一个交换。但不要被这些细节拖累。如今须要知道的只是如何使用由空字符串标识的默认交换。

  此交换是特殊的它容许咱们精确地指定消息应该进入哪一个队列。队列名称须要在路由routing_key参数中指定:

   

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
  print(" [x] Sent 'Hello World!'")

    在退出程序以前,须要确保网络缓冲区已刷新,而且消息实际上已传递到rabbitmq。此时能够轻轻地关闭链接。

connection.close()

    若是这是您第一次使用rabbitmq,而且您没有看到“已发送”消息,那么您可能会感到头疼,想知道可能出了什么问题。可能代理启动时没有足够的可用磁盘空间(默认状况下,它须要至少200 MB的可用空间),

  所以拒绝接受消息。检查代理日志文件以确认并在必要时下降限制。配置文件文档将向您展现如何设置。

 

4.二、接收

    

    我第二个程序receive.py将接收来自队列的消息,并将它们打印到屏幕上。一样,首先咱们须要链接到rabbitmq服务器。负责链接到Rabbit的代码与之前相同。

   下一步和前面同样,是确保队列存在。使用queue_declare建立队列是等幂的,咱们能够根据须要屡次运行该命令,而且只建立一个队列。

    

channel.queue_declare(queue='hello')

    您可能会问咱们为何要再次声明队列咱们已经在之前的代码中声明了队列。若是咱们肯定队列已经存在,就能够避免这种状况。例如,若是send.py程序之前运行过。但咱们还不肯定先运行哪一个程序。

  在这种状况下,最好在两个程序中重复声明队列。

    若是但愿看到rabbitmq有哪些队列以及其中有多少消息。可使用rabbitmqctl工具(做为特权用户)执行此操做:

    在linux上:sudo rabbitmqctl list_queues

     在windows上:rabbitmqctl.bat list_queues

    从队列接收消息更加复杂。它经过向队列订阅回调函数来工做。每当咱们收到一条消息,这个回调函数就会被PIKA库调用。在咱们的示例中,此函数将在屏幕上打印消息的内容。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

 

    接下来,咱们须要告诉rabbitmq这个特定的回调函数应该从hello队列接收消息:

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

    要使该命令成功,咱们必须确保要订阅的队列存在。幸运的是,咱们相信咱们已经使用上面queue declare建立了的队列。

    最后,咱们进入一个永不结束的循环,它等待数据并在必要时运行回调。

print(' [*] Waiting for messages. To exit press CTRL+C')
  channel.start_consuming()

    完整的send.py代码

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()


channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

    完整的receive.py代码

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()


channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

    如今咱们能够在终端上试用咱们的程序。首先,让咱们启动一个消费者,它将连续运行等待交付:

 python receive.py

send.py

相关文章
相关标签/搜索