Python rabbitmq的使用(二)

上一篇介绍了rabbitmq的安装和经典的hello world!实例。这里将对工做队列(Work Queues)作一个了解。由于是接上一篇说明的,因此若是没看过上一篇,看这篇可能会比较难理解。上一篇的地址是:ubuntu安装rabbitmq和python的使用实现python

消息也能够理解为任务,消息发送者能够理解为任务分配者,消息接收者能够理解为工做者,当工做者接收到一个任务,还没完成的时候,任务分配者又发一个任务过来,那就忙不过来了,因而就须要多个工做者来共同处理这些任务,这些工做者,就称为工做队列。结构图以下:ubuntu

rabbitmq的python实例工做队列

rabbitmq的python实例工做队列并发

准备工做(Preparation)app

在实例程序中,用new_task.py来模拟任务分配者, worker.py来模拟工做者。函数

修改send.py,从命令行参数里接收信息,并发送fetch

1spa

2命令行

3code

4cdn

5

6

7

import sys

 

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',

                      routing_key='hello',

                      body=message)

print " [x] Sent %r" % (message,)

修改receive.py的回调函数。

1

2

3

4

5

6

import time

 

def callback(ch, method, properties, body):

    print " [x] Received %r" % (body,)

    time.sleep( body.count('.') )

    print " [x] Done"

这边先打开两个终端,都运行worker.py,处于监听状态,这边就至关于两个工做者。打开第三个终端,运行new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

观察worker.py接收到任务,其中一个工做者接收到3个任务 :

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

另一个工做者接收到2个任务 :

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

从上面来看,每一个工做者,都会依次分配到任务。那么若是一个工做者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其余工做者处理。因此应当有一种机制,当一个工做者完成任务时,会反馈消息。

消息确认(Message acknowledgment)

消息确认就是当工做者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

1

2

3

4

5

def callback(ch, method, properties, body):

    print " [x] Received %r" % (body,)

    time.sleep(5)

    print " [x] Done"

    ch.basic_ack(delivery_tag = method.delivery_tag)

这边停顿5秒,能够方便ctrl+c退出。

去除no_ack=True参数或者设置为False也能够。

1

channel.basic_consume(callback, queue='hello', no_ack=False)

用这个代码运行,即便其中一个工做者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务从新分配给其余工做者。

消息持久化存储(Message durability)

虽然有了消息反馈机制,可是若是rabbitmq自身挂掉的话,那么任务仍是会丢失。因此须要将任务持久化存储起来。声明持久化存储:

1

channel.queue_declare(queue='hello', durable=True)

可是这个程序会执行错误,由于hello这个队列已经存在,而且是非持久化的,rabbitmq不容许使用不一样的参数来从新定义存在的队列。从新定义一个队列:

1

channel.queue_declare(queue='task_queue', durable=True)

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

1

2

3

4

5

6

channel.basic_publish(exchange='',

                      routing_key="task_queue",

                      body=message,

                      properties=pika.BasicProperties(

                         delivery_mode = 2# make message persistent

                      ))

公平调度(Fair dispatch)

上面实例中,虽然每一个工做者是依次分配到任务,可是每一个任务不必定同样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。若是能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工做者分配多个任务,即只有工做者完成任务以后,才会再次接收到任务。

1

channel.basic_qos(prefetch_count=1)

new_task.py完整代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

#!/usr/bin/env python

import pika

import sys

 

connection = pika.BlockingConnection(pika.ConnectionParameters(

        host='localhost'))

channel = connection.channel()

 

channel.queue_declare(queue='task_queue', durable=True)

 

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',

                      routing_key='task_queue',

                      body=message,

                      properties=pika.BasicProperties(

                         delivery_mode = 2# make message persistent

                      ))

print " [x] Sent %r" % (message,)

connection.close()

worker.py完整代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

#!/usr/bin/env python

import pika

import time

 

connection = pika.BlockingConnection(pika.ConnectionParameters(

        host='localhost'))

channel = connection.channel()

 

channel.queue_declare(queue='task_queue', durable=True)

print ' [*] Waiting for messages. To exit press CTRL+C'

 

def callback(ch, method, properties, body):

    print " [x] Received %r" % (body,)

    time.sleep( body.count('.') )

    print " [x] Done"

    ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,

                      queue='task_queue')

 

channel.start_consuming()

相关文章
相关标签/搜索