python学习之RabbitMQ-----消息队列

RabbitMQ队列

首先咱们在讲rabbitMQ以前咱们要说一下python里的queue:两者干的事情是同样的,都是队列,用于传递消息python

在python的queue中有两个一个是线程queue,一个是进程queue(multiprocessing中的queue)。线程queue不可以跨进程,用于多个线程之间进行数据同步交互;进程queue只是用于父进程与子进程,或者同属于赞成父进程下的多个子进程 进行交互。也就是说若是是两个彻底独立的程序,即便是python程序,也依然不可以用这个进程queue来通讯。那若是咱们有两个独立的python程序,分属于两个进程,或者是python和其余语言windows

安装:windows下服务器

首先须要安装 Erlang环境
官网: 
http://www.erlang.org/
Linux版:     使用yum安装
 
而后安装RabbitMQ了 
首先下载RabbitMQ 的Windows版本
下载地址:

安装pika:负载均衡

以前安装过了pip,直接打开cmd,运行pip install pika运维

安装完毕以后,实现一个最简单的队列通讯:socket

producer:函数

 1 import pika  2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  4 #声明一个管道
 5 channel = connection.channel()  6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')  9 #routing_key是queue的名字
10 channel.basic_publish(exchange='', 11                       routing_key='hello',#queue的名字 12                       body='Hello World!', 13  ) 14 print("[x] Send 'Hello World!'") 15 connection.close()

 

先创建一个基本的socket,而后创建一个管道,在管道中发消息,而后声明一个queue,起个队列的名字,以后真正的发消息(basic_publish)fetch

consumer:ui

 1 import pika  2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  3 channel = connection.channel()  4 
 5 channel.queue_declare(queue='hello')  6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties) 10     print(" [x] Received %r" % body) 11 
12 channel.basic_consume(callback,#若是收到消息,就调用callback来处理消息
13                       queue='hello', 14                       no_ack=True 15  ) 16 
17 print(' [*] Waiting for messages. To exit press CTRL+C') 18 channel.start_consuming()

 

 start_consuming()只要一启动就一直运行下去,他不止收一条,永远在这里卡住。url

在上面不论是produce仍是consume,里面都声明了一个queue,这个是为何呢?由于咱们不知道是消费者先开始运行仍是生产者先运行,这样若是没有声明的话就会报错。

下面咱们来看一下一对多,即一个生产者对应多个消费者:

首先咱们运行3个消费者,而后不断的用produce去发送数据,咱们能够看到消费者是经过一种轮询的方式进行不断的接受数据,每一个消费者消费一个。

那么假如咱们消费者收到了消息,而后处理这个消息须要30秒钟,在处理的过程当中,消费者断电了宕机了,那消费者尚未处理完,咱们设这个任务咱们必须处理完,那咱们应该有一个确认的信息,说这个任务完成了或者是没有完成,因此个人生产者要确认消费者是否把这个任务处理完了,消费者处理完以后要给这个生产者服务器端发送一个确认信息,生产者才会把这个任务从消息队列中删除。若是没有处理完,消费者宕机了,没有给生产者发送确认信息,那就表示没有处理完,那咱们看看rabbitMQ是怎么处理的

咱们能够在消费者的callback中添加一个time.sleep()进行模拟宕机。callback是一个回调函数,只要事件一触发就会调用这个函数。函数执行完了就表明消息处理完了,若是函数没有处理完,那就说明。。。。

咱们能够看到在消费者代码中的basic_consume()中有一个参数叫no_ack=True,这个意思是这条消息是否被处理完都不会发送确认信息,通常咱们不加这个参数,rabbitMQ默认就会给你设置成消息处理完了就自动发送确认,咱们如今把这个参数去掉,而且在callback中添加一句话运行:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties) #time.sleep(30)
    print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag)

 

运行的结果就是,我先运行一次生产者,数据被消费者1接收到了,可是我把消费者1宕机,中止运行,那么消费者2就接到了消息,即只要消费者没有发送确认信息,生产者就不会把信息删除。

RabbitMQ消息持久化:

咱们能够生成好多的消息队列,那咱们怎么查看消息队列的状况呢:rabbitmqctl.bat list_queues

如今的状况是,消息队列中还有消息,可是服务器宕机了,那这个消息就丢了,那我就须要这个消息强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在每次声明队列的时候加上一个durable参数(客户端和服务器端都要加上这个参数),

在这个状况下,咱们把rabbitMQ服务器重启,发现只有队列名留下了,可是队列中的消息没有了,这样咱们还须要在生产者basic_publish中添加一个参数:properties

producer:

import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #声明一个管道
channel = connection.channel() #声明queue
channel.queue_declare(queue = 'hello2',durable=True) #routing_key是queue的名字
channel.basic_publish(exchange='', routing_key='hello2', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2,#make message persistent  ) ) print("[x] Send 'Hello World!'") connection.close()

 

这样就可使得消息持久化

如今是一个生产者对应三个消费者,很公平的收发收发,可是实际的状况是,咱们机器的配置是不同的,有的配置是单核的有的配置是多核的,可能i7处理器处理4条消息的时候和其余的处理器处理1条消息的时间差很少,那差的处理器那里就会堆积消息,而好的处理器那里就会造成闲置,在现实中作运维的,咱们会在负载均衡中设置权重,谁的配置高权重高,任务就多一点,可是在rabbitMQ中,咱们只作了一个简单的处理就能够实现公平的消息分发,你有多大的能力就处理多少消息

即:server端给客户端发送消息的时候,先检查如今还有多少消息,若是当前消息没有处理完毕,就不会发送给这个消费者消息。若是当前的消费者没有消息就发送

这个只须要在消费者端进行修改加代码:

import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello2',durable=True) def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties) #time.sleep(30)
    print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,#若是收到消息,就调用callback来处理消息
                      queue='hello2', #no_ack=False
 ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()

 

 咱们在生成一个consume2,在callback中sleep20秒来模拟

我先启动两个produce,被consume接受,而后在启动一个,就被consumer2接受,可是由于consumer2中sleep20秒,处理慢,因此这时候在启动produce,就又给了consume进行处理

 

Publish\Subscrible(消息发布\订阅)

前面都是1对1的发送接收数据,那我想1对多,想广播同样,生产者发送一个消息,全部消费者都收到消息。那咱们怎么作呢?这个时候咱们就要用到exchange

exchange在一端收消息,在另外一端就把消息放进queue,exchange必须精确的知道收到的消息要干什么,是否应该发到一个特定的queue仍是发给许多queue,或者说把他丢弃,这些都被exchange的类型所定义

exchange在定义的时候是有类型的,以决定究竟是那些queue符合条件,能够接受消息:

fanout:全部bind到此exchange的queue均可以接受消息

direct:经过rounroutingKey和exchange决定的那个惟一的queue能够接收消息

topic:全部符合routingKey的routingKey所bind的queue能够接受消息

headers:经过headers来决定把消息发给哪些queue

消息publisher:

 1 import pika  2 import sys  3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  5 
 6 channel = connection.channel()  7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')  9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message) 12 print("[x] Send %r " % message) 13 connection.close()

 

这里的exchange以前是空的,如今赋值log;在这里也没有声明queue,广播不须要写queue

 消息subscriber:

 1 import pika  2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  3 channel = connection.channel()  4 channel.exchange_declare(exchange='logs',exchange_type='fanout')  5 
 6 #exclusive惟一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)  9 queue_name = result.method.queue 10 
11 channel.queue_bind(exchange='logs',queue=queue_name) 12 
13 print('[*] Waiting for logs,To exit press CTRL+C') 14 
15 def callback(ch,method,properties,body): 16     print("[X] %r" % body) 17 channel.basic_consume(callback,queue = queue_name,no_ack=True) 18 channel.start_consuming()

 

在消费者这里咱们有定义了一个queue,注意一下注释中的内容。可是咱们在发送端没有声明queue,为何发送端不须要接收端须要呢?在consume里有一个channel.queue_bind()函数,里面绑定了exchange转换器上,固然里面还须要一个queue_name

运行结果:

就至关于收音机同样,实时广播,打开三个消费者,生产者发送一条数据,而后3个消费者同时接收到

有选择的接收消息(exchange_type = direct)

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据关键字断定应该将数据发送到指定的队列

publisher:

 1 import pika  2 import sys  3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  4 channel = connection.channel()  5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')  7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message) 11 
12 print("[X] Send %r:%r" %(severity,message)) 13 connection.close()

 

subscriber:

import pika import sys connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connect.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:]# if not severities: sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity) print('[*]Waiting for logs.To exit press CTRL+c') def callback(ch,method,properties,body): print("[x] %r:%r"%(method.routing_key,body)) channel.basic_consume(callback,queue = queue_name,no_ack=True) channel.start_consuming()

 

更加细致的过滤(exchange_type=topic)

 

publish:

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()

 

subscriber:

 1 import pika  2 import sys  3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  5 channel = connection.channel()  6 
 7 channel.exchange_declare(exchange='topic_logs',  8                          exchange_type='topic')  9 
10 result = channel.queue_declare(exclusive=True) 11 queue_name = result.method.queue 12 
13 binding_keys = sys.argv[1:] 14 if not binding_keys: 15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 16     sys.exit(1) 17 
18 for binding_key in binding_keys: 19     channel.queue_bind(exchange='topic_logs', 20                        queue=queue_name, 21                        routing_key=binding_key) 22 
23 print(' [*] Waiting for logs. To exit press CTRL+C') 24 
25 
26 def callback(ch, method, properties, body): 27     print(" [x] %r:%r" % (method.routing_key, body)) 28 
29 
30 channel.basic_consume(callback, 31                       queue=queue_name, 32                       no_ack=True) 33 
34 channel.start_consuming()

 

 

以上都是服务器端发消息,客户端收消息,消息流是单向的,那若是咱们想要发一条命令给远程的客户端去执行,而后想让客户端执行的结果返回,则这种模式叫作rpc

RabbitMQ RPC

rpc server:

 1 import pika  2 import time  3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  4 channel = connection.channel()  5 
 6 channel.queue_declare(queue='rpc_queue')  7 def fib(n):  8     if n==0:  9         return 0 10     elif n==1: 11         return 1
12     else: 13         return fib(n-1)+fib(n-2) 14 
15 def on_request(ch,method,props,body): 16     n = int(body) 17     print("[.] fib(%s)" %n) 18     response = fib(n) 19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to, 21                      properties=pika.BasicProperties(correlation_id=props.correlation_id), 22                      body = str(response)) 23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue') 26 
27 print("[x] Awaiting rpc requests") 28 channel.start_consuming()

 

 

rpc client:

 1 import pika  2 import uuid,time  3 class FibonacciRpcClient(object):  4     def __init__(self):  5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  6 
 7         self.channel = self.connection.channel()  8 
 9         result = self.channel.queue_declare(exclusive=True) 10         self.callback_queue = result.method.queue 11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue) 14 
15     def on_response(self,ch,method,props,body): 16         if self.corr_id == props.correlation_id: 17             self.response = body 18 
19     def call(self,n): 20         self.response = None 21         self.corr_id = str(uuid.uuid4()) 22         self.channel.basic_publish(exchange='',routing_key='rpc_queue', 23                                    properties=pika.BasicProperties( 24                                        reply_to=self.callback_queue, 25                                        correlation_id=self.corr_id 26  ), 27                                    body=str(n),#传的消息,必须是字符串
28  ) 29         while self.response is None: 30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....") 32             time.sleep(0.5) 33         return int(self.response) 34 fibonacci_rpc = FibonacciRpcClient() 35 print("[x] Requesting fib(30)") 36 response = fibonacci_rpc.call(30) 37 print("[.] Got %r"%response)

 

以前的start_consuming是进入一个阻塞模式,没有消息就等待消息,有消息就收过来

self.connection.process_data_events()是一个非阻塞版的start_consuming,就是说发了一个东西给客户端,每过一点时间去检查有没有消息,若是没有消息,能够去干别的事情

reply_to = self.callback_queue是用来接收反应队列的名字

corr_id = str(uuid.uuid4()),correlation_id第一在客户端会经过uuid4生成,第二在服务器端返回执行结果的时候也会传过来一个,因此说若是服务器端发过来的correlation_id与本身的id相同 ,那么服务器端发出来的结果就确定是我刚刚客户端发过去的指令的执行结果。如今就一个服务器端一个客户端,无所谓缺人不确认。如今客户端是非阻塞版的,咱们能够不让它打印没有消息,而是执行新的指令,这样就两条消息,不必定按顺序完成,那咱们就须要去确认每一个返回的结果是哪一个命令的执行结果。

整体的模式是这样的:生产者发了一个命令给消费者,不知道客户端何时返回,仍是要去收结果的,可是它又不想进入阻塞模式,想每过一段时间看这个消息收回来没有,若是消息收回来了,就表明收完了。 

运行结果:

服务器端开启,而后在启动客户端,客户端先是等待消息的发送,而后作出反应,直到算出斐波那契

相关文章
相关标签/搜索