Python RabbitMQ

RabbitMQjava

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。python

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通讯方法。应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。消 息传递指的是程序之间经过在消息中发送数据进行通讯,而不是经过直接调用彼此来通讯,直接调用一般是用于诸如远程过程调用的技术。排队指的是应用程序经过 队列来通讯。队列的使用除去了接收和发送应用程序同时执行的要求。服务器

RabbitMQ安装ide

安装配置epel
  # rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm

安装erlang
 
# yum -y install erlang

安装RabbitMQ
 
# yum -y install rabbitmq-server

win下安装推荐http://blog.csdn.net/a__java___a/article/details/17614797fetch


启动/中止 service rabbitmq-server start/stopui


安装APIgoogle

    pip install pikaspa

    or.net

    easy_install pikaorm

    or

    源码

    https://pypi.python.org/pypi/pika


使用API操做RabbitMQ

基于Queue能够实现生产者消费者模型,而对于RabbitMQ来讲,生产和消费再也不针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

import pika

# ######################### 生产者 #########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='MQ1')

channel.basic_publish(exchange='',routing_key='MQ1',body='Hello World!')

print(" [x] Sent 'Hello World!'")

connection.close()

import pika

# ########################## 消费者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='MQ1')

def callback(ch, method, properties, body):
   
print(" [x] Received %r" % body)

channel.basic_consume(callback,queue='MQ1',no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


1. acknowledgment 消息不丢失

no-ack = False,若是生产者遇到状况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会从新将该任务添加到队列中。

import pika
# ########################## 消费者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='MQ1')

def callback(ch, method, properties, body):
   
print(" [x] Received %r" % body)
   
import time
   time.sleep(10)
   
print('ok')
   
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue='MQ1',no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


2. durable   消息不丢失

生产者消息持久化

import pika
# ######################### 生产者 #########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 消息持久化
channel.queue_declare(queue='MQ2', durable=True)

channel.basic_publish(exchange='',routing_key='MQ2',body='Hello World!',
                     properties=pika.BasicProperties(
                         
delivery_mode=2, # 消息持久化
                     
))
print(" [x] Sent 'Hello World!'")
connection.close()

import pika
# ########################## 消费者 ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='MQ2', durable=True)  # 消息持久化

def callback(ch, method, properties, body):
   
print(" [x] Received %r" % body)
   
import time
   time.sleep(10)
   
print('ok')
   
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue='MQ2',no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


3.  消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

若是Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,极可能出现,一个机器配置不高的消费者那里堆积了不少消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,能够在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,再也不按照奇偶数排列


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='MQ2',durable=True)  # 消息持久化

def callback(ch, method, properties, body):
   
print(" [x] Received %r" % body)
   
import time
   time.sleep(10)
   
print('ok')
   
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                     queue='hello',
                     no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


4. 发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给全部的订阅者,而消息队列中的数据被消费一次便消失。因此,RabbitMQ实现发布和订阅时,会为每个订阅者建立一个队列,而发布者发布消息时,会将消息放置在全部相关队列中。

 exchange type = fanout

import pika,sys

##------------------------发布者-----------------

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',routing_key='',body=message)

print(" [x] Sent %r" % message)
connection.close()

import pika

##------------------------订阅者-----------------
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue    #获取随机队列名

channel.queue_bind(exchange='logs',queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   
print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()



5.  关键字发送

 exchange type = direct

以前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 断定应该将数据发送至指定队列。

import pika

# ######################### 生产者 #########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',type='direct')

message = 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                     routing_key= 'mingyue',   #绑定的关键字
                     
body=message)
print(" [x] Sent %r" % (message))
connection.close()

import pika
# # ########################## 消费者 ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 绑定两个不一样的关键字
channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key= 'shuoming')
channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key= 'mingyue')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

6.  模糊匹配

 exchange type = topic

在topic类型下,可让队列绑定几个模糊的关键字,以后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示能够匹配 0 个 或 多个 单词

  • *  表示只能匹配 一个 单词

发送者路由值              队列中

www.google.com          www.*  -- 不匹配

www.baidu.com           www.#  -- 匹配

import pika

# ######################### 生产者 #########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',type='topic')


message = 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                     routing_key= 'www.baidu.com',   #匹配模糊关键字
                     
body=message)
print(" [x] Sent %r" % (message))
connection.close()

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 绑定两个不一样的关键字
channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key= '
www.*')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()


Remote procedure call (RPC)

RPC 远程过程调用。客户端C,向服务端S请求一项服务,官网举了一个计算fibonacci值的例子。C 向S请求计算fib(x),S 则计算,算完以后发给,或者说反馈给C。C收到反馈信息以后才能想S 继续请求服务。

一个RPC流程是:

  • C 启动,而后建立一个匿名私有(exclusive=Ture)的反馈队列。

  • C 发送请求的时候,要附上reply_to(用户反馈的队列名)和correlation_id(反馈的***ID)。

  • 请求被发送到 rpc_queue 队列.

  • S 等待队列. 发现有消息到达则计算fib(x)而后经过反馈队列反馈给C.

  • C 等待反馈队列,发现有反馈信息到达,比对反馈***ID。符合,发送下一个计算请求。不符合,再等。

  • wKioL1byJDihA7kpAAA5qk8GcaI689.png

RPC Server

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
   
if n == 0:
       
return 0
   
elif n == 1:
       
return 1
   
else:
       
return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
   
n = int(body)

   
print(" [.] fib(%s)" % n)
   
response = fib(n)

   
ch.basic_publish(exchange='',
                    routing_key=props.reply_to,
                    properties=pika.BasicProperties(correlation_id = \
                                                        props.correlation_id),
                    body=str(response))
   
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()


RPC Client

import pika,uuid

class FibonacciRpcClient(object):
   
def __init__(self):
       
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
               
host='localhost'))

       
self.channel = self.connection.channel()

       
result = self.channel.queue_declare(exclusive=True)
       
self.callback_queue = result.method.queue

       self.channel.basic_consume(self.on_response, no_ack=True,
                                  queue=self.callback_queue)

   
def on_response(self, ch, method, props, body):
       
if self.corr_id == props.correlation_id:
           
self.response = body

   def call(self, n):
       
self.response = None
       
self.corr_id = str(uuid.uuid4())
       
self.channel.basic_publish(exchange='',
                                  routing_key='rpc_queue',
                                  properties=pika.BasicProperties(
                                       
reply_to = self.callback_queue,
                                        correlation_id = self.corr_id,
                                        ),
                                  body=str(n))
       
while self.response is None:
           
self.connection.process_data_events()
       
return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
相关文章
相关标签/搜索