Python介绍RabbitMQ使用篇二WorkQueue

1. RabbitMQ WorkQueue基本工做模式介绍

上一篇咱们使用C#语言讲解了单个消费者从消息队列中处理消息的模型,这一篇咱们使用Python语言来说解多个消费者同时工做从一个Queue处理消息的模型。网络

工做队列(又称:任务队列——Task Queues)是为了不等待一些占用大量资源、时间的操做。当咱们把任务(Task)看成消息发送到队列中,一个运行在后台的工做者(worker)进程就会取出任务而后处理。当你运行多个工做者(workers),任务就会在它们之间共享。这个概念在网络应用中是很是有用的,它能够在短暂的HTTP请求中处理一些复杂的任务,我么能够将耗时的请求放在任务队列,而后立马返回响应,接下来由多个worker去处理复杂的业务操做。(这种架构叫作"分布式异步队列",有时候用来方式D-DOS攻击,12306网站就是采用这种模式)架构

用Python操做Python模块首先要到如pika这个包,利用pip install pika去安装。并发

咱们首先写一个new_task.py用来向任务队列中写入任务,已备用。异步

import pika
import sys


with pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost")) as connection:
    channel = connection.channel()
    channel.queue_declare(queue = "hello")
    for index in range(0,10):
        channel.basic_publish(exchange="",
                            routing_key="hello",
                            body="["+str(index)+"]" + "Hello World")
    connection.close()

接下来编写works.py程序,咱们须要在works.py中建立消费者,让消费者从任务队列中提取任务去执行。分布式

import pika
import sys
import time

connection = pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue = "hello")
channel.basic_ack()

def callback(ch, method, properties, body):
    print(" [x] Received %r" % (body.decode('utf-8'),))
    time.sleep(3)  # 咱们在这里利用线程休息来模拟一个比较耗时的任务处理
    print(" [x] Done")
   

channel.basic_consume(callback,
                      queue='hello',
                      no_ack= True)  # 咱们把no_ack标记为true用来屏蔽消息确认

channel.start_consuming()
connection.close()

在callback函数中让当前线程休息5秒用来模拟一个耗时的任务。ide

接下来首先打开两个Terminal窗口同时去运行works.py程序,而后运行new_task.py程序来查看效果。注意:在这里为了说明多个work可以同时分享任务队列中的队列,咱们必定要先运行works.py,后运行new_task.py程序。具体缘由后面在说明。函数

默认来讲,RabbitMQ会按顺序得把消息发送给两个消费者(consumer),平均每一个消费者都会收到同等数量得消息,这种发送消息的方式叫作——轮询(round-robin)。这样作的好处就是咱们在处理相同数量的task所用的时间成倍的减小了。work越多,咱们处理任务队列所用的时间就越少,这在高并发系统中会很是有用。高并发

2.消息确认

当前的代码中,当消息被RabbitMQ发送给消费者(consumers)以后,立刻就会在内存中移除。这种状况之下,假如其中一个工做者挂掉了,那么它正在处理的消息就会丢失,而且与此同时,后面全部发送到这个工做者的还没来得及处理的消息也都会丢失。这显然不是咱们想看到的结果。咱们不想丢失任何消息,若是一个工做者(worker)挂掉了,咱们但愿任务会从新发送给其余的工做者(worker)。fetch

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会经过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,而后RabbitMQ才会释放并删除这条消息,而不是这条消息一发送出去立刻就从内存中删除。网站

若是消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被彻底处理,而后从新发送给其余消费者(consumer)。这样,及时工做者(workers)偶尔的挂掉,也不会丢失消息。消息是没有超时这个概念的;当工做者与它断开连的时候,RabbitMQ会从新发送消息。这样在处理一个耗时很是长的消息任务的时候就不会出问题了。消息响应默认是开启的。以前的例子中咱们可使用no_ack=True标识把它关闭。接下来咱们移除这个标识,当工做者(worker)完成了任务,就发送一个响应。

对咱们的workers.py稍微进行一下改动:

 1 import pika
 2 import sys
 3 import time
 4 
 5 connection = pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost"))
 6 channel = connection.channel()
 7 channel.queue_declare(queue = "hello")
 8 channel.basic_ack()
 9 
10 def callback(ch, method, properties, body):
11     print(" [x] Received %r" % (body.decode('utf-8'),))
12     time.sleep(3)
13     print(" [x] Done")
14     ch.basic_ack(delivery_tag = method.delivery_tag)   #  2. channel.basic_ack()方法用来执行消息确认操做
15 
16 channel.basic_consume(callback,
17                       queue='hello',
18                       no_ack= False)  # 1. no_ack告诉RabbitMQ开启消息确认机制,也就是说消息须要被确认
19 
20 channel.start_consuming()
21 connection.close()

先开启两个Terinmal窗口执行workers.py而后执行new_task.py,当执行一半是利用ctrl+c关掉其中一个worker。能够看到RabbitMQ将已经关掉的worker的没来得及处理的消息,再一次发给worker2。以此保证消息不会丢失。

必定必定不要忘记消息确认

在回调方法中必定要记得调用channel.basic_ack()方法用来确认消息。缘由很容易理解,消息若是不确认,任务就算是被callback函数处理成功了,RabbitMQ在内存中也不会删除这条任务,这条任务还会停留在内存中。这样无疑会带来一个比较大的bug。

3.消息持久化

RabbittMQ若是意外崩溃的话,就会丢失全部的“队列”和“消息”。所以为了确保信息不会丢失,有两个事情是须要注意的:咱们必须把“队列”和“消息”设为持久化。下面的代码分别演示了如何进行队列持久化和消息持久化。

 1 import pika
 2 import sys
 3 
 4 
 5 with pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost")) as connection:
 6     channel = connection.channel()
 7     channel.queue_declare(queue = "hello",durable=True)   # 1.Queue持久化
 8     for index in range(0,10):
 9         channel.basic_publish(exchange="",
10                             routing_key="hello",
11                             body="["+str(index)+"]" + "Hello World",
12                             properties= pika.BasicProperties(    # 2.消息持久化
13                                 delivery_mode= 2
14                             ))
15     connection.close()
View Code

4.公平调度/多劳多得

在实际生产中咱们不必定全部的任务处理时都消耗一样多的时间,有的任务须要更长的时间,有的任务须要比较少的时间。这样就形成有的工做者比较繁忙,有的工做者比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。这样无疑会形成资源的浪费。

这时由于RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有做出响应。它盲目的把第n-th条消息发给第n-th个消费者。

咱们可使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工做者(worker),直到它已经处理了上一条消息而且做出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工做者(worker)。这样能保证消息是一个一个发出去的,而且是一个处理完成了再发另外一个,而不是一次性所有发分出去了。这样尽量的保证了每一个worker的工做时间相同(公平调度),而且在相同时间执行效率高的worker会分享到更多的消息(多劳多得)。

  channe.basic_qos(prefetch_count=1)

固然,若是全部的worker都长时间处于繁忙状态,没有时间接收下一条消息,那么任务队列就有可能满了。咱们能够增长worker的数量,或者想其余办法。

代码整合

 1 import pika
 2 import sys
 3 
 4 
 5 with pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost")) as connection:
 6     channel = connection.channel()
 7     channel.queue_declare(queue = "hello",durable = True)   # 1.Queue持久化
 8     for index in range(0,10):
 9         channel.basic_publish(exchange="",
10                             routing_key="hello",
11                             body="["+str(index)+"]" + "Hello World",
12                             properties= pika.BasicProperties(    # 2.消息持久化
13                                 delivery_mode= 2
14                             ))
15     connection.close()
new_task.py
 1 import pika
 2 import sys
 3 import time
 4 
 5 connection = pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost"))
 6 channel = connection.channel()
 7 channel.queue_declare(queue = "hello",durable = True)  # 队列持久化
 8 
 9 def callback(ch, method, properties, body):
10     print(" [x] Received %r" % (body.decode('utf-8'),))
11     time.sleep(5)
12     print(" [x] Done----%r" % time.strftime("%Y-%m-%d %X",time.localtime()))
13     ch.basic_ack(delivery_tag = method.delivery_tag)
14 
15 channel.basic_qos(prefetch_count = 1)  # 用来告诉每一个worker一次只能接受一条消息
16 channel.basic_consume(callback,
17                       queue='hello',
18                       no_ack = False)
19 channel.start_consuming()
20 connection.close()
workers_busy
 1 import pika
 2 import sys
 3 import time
 4 
 5 connection = pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost"))
 6 channel = connection.channel()
 7 channel.queue_declare(queue = "hello")
 8 
 9 def callback(ch, method, properties, body):
10     print(" [x] Received %r" % (body.decode('utf-8'),))
11     time.sleep(1)
12     print(" [x] Done----%r" % time.strftime("%Y-%m-%d %X",time.localtime()))
13     ch.basic_ack(delivery_tag = method.delivery_tag)
14    
15 channel.basic_qos(prefetch_count = 1)
16 channel.basic_consume(callback,
17                       queue='hello',
18                       no_ack = False)
19 channel.start_consuming()
20 connection.close()
workers_relax.py
相关文章
相关标签/搜索