python中RabbitMQ的使用(工做队列)

消息能够理解为任务,消息发送者能够当作任务派送者(sender),消息接收者能够当作工做者(worker)。dom

当工做者接收到一个任务,还没完任务时分配者又发一个任务,此时须要多个工做者来共同处理这些任务。post

任务分派结构图以下:fetch

注:此时有一个任务派送人P,两个工做接收者C1和C2。this

如今咱们来模拟该状况:spa

1.首先打开三个终端:code

2.分别在前两个终端运行receive1.pyblog

 3.在第三个终端屡次运行send1.pyrabbitmq

 此时将会轮流向worker1和worker2分派任务。it

问题:io

在以上任务分配和完成状况中,有几个问题将会产生:

1.工做者任务是否完成?

2.工做者挂掉后,如何防止未完成的任务丢失,而且如何处理这些任务?

3.RabbitMQ自身出现问题,此时如何防止任务丢失?

4.任务有轻重之分,如何实现公平调度?

方案:

1.消息确认(Message acknowledgment)

当任务完成后,工做者(receiver)将消息反馈给RabbitMQ:

复制代码
1 def callback(ch, method, properties, body):
2     print " [x] Received %r" % (body,)
3     #停顿5秒,方便ctrl+c退出
4     time.sleep(5)
5     print " [x] Done"
6     #当工做者完成任务后,会反馈给rabbitmq
7     ch.basic_ack(delivery_tag=method.delivery_tag)
复制代码

2.保留任务(no_ack=False)

当工做者挂掉后,防止任务丢失:

# 去除no_ack=True参数或者设置为False后能够实现
# 一个工做者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务从新分配给其余工做者。
channel.basic_consume(callback, queue='task_queue', no_ack=False)

3.消息持久化存储(Message durability)

声明持久化存储:

# durable=True即声明持久化存储
channel.queue_declare(queue='task_queue', durable=True)

在发送任务时,用delivery_mode=2来标记任务为持久化存储:

复制代码
1 # 用delivery_mode=2来标记任务为持久化存储:
2 channel.basic_publish(exchange='',
3                       routing_key='task_queue',
4                       body=message,
5                       properties=pika.BasicProperties(
6                           delivery_mode=2,
7                       ))
复制代码

4.公平调度(Fair dispatch)

使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工做者分配多个任务,即只有工做者完成任务以后,才会再次接收到任务

channel.basic_qos(prefetch_count=1)

完整代码以下:

receive1.py

import time

import pika
# 建立链接
hostname="localhost"
parameters=pika.ConnectionParameters(hostname)
connection=pika.BlockingConnection(parameters)

# 建立通道
channel=connection.channel()

'''
queue_declare(self, 
                queue, 
                passive=False, 
                durable=False(是否将任务持久化存储), 
                exclusive=False, 
                auto_delete=False, 
                arguments=None
                )
'''
channel.queue_declare(queue="task_queue",durable=True)

def callback(ch,method,properties,body):
    print (" [x] Received %r" % body)
    time.sleep(5)
    print(" [x] Done")
    # ch.basic_ack为当工做者完成任务后,会反馈给rabbitmq
    ch.basic_ack(delivery_tag=method.delivery_tag)

# basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工做者分配多个任务,
# 即只有工做者完成任务以后,才会再次接收到任务。
channel.basic_qos(prefetch_count=1)

# 去除no_ack=True参数或者设置为False后能够实现   ack:acknowledgment(确认)
# 一个工做者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务从新分配给其余工做者。
channel.basic_consume('task_queue',callback,auto_ack=False)

# 开始接收信息,按ctrl+c退出
print (' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

send1.py

import pika
import random
connector=pika.BlockingConnection(pika.ConnectionParameters("localhost"))

channel=connector.channel()
# 若是rabbitmq自身挂掉的话,那么任务会丢失。因此须要将任务持久化存储起来,声明持久化存储:
channel.queue_declare(queue="task_queue",durable=True)

number=random.randint(1,1000)
messge="hello word:%s" %number
# 在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=messge,
    properties=pika.BasicProperties(
        delivery_mode=2
    )
)
print( " [x] Sent %s" % messge)
connector.close()

示例以下:

首先启动三个终端,两个先执行receive1.py,第三个屡次执行send1.py:

终端3:

此时分配三个任务,33分配给worker1,170分配给worker2,262分配给worker1

 

终端1:

worker1完成任务33后,开始任务262,咱们在任务完成前使用(CRTL+C)使worker1挂掉

 

终端2:

worker2完成任务170,原本没有任务,可是worker1挂掉,此时接收他的任务262

相关文章
相关标签/搜索