上一篇介绍了rabbitmq的安装和经典的hello world!实例。这里将对工做队列(Work Queues)作一个了解。由于是接上一篇说明的,因此若是没看过上一篇,看这篇可能会比较难理解。上一篇的地址是:ubuntu安装rabbitmq和python的使用实现python
消息也能够理解为任务,消息发送者能够理解为任务分配者,消息接收者能够理解为工做者,当工做者接收到一个任务,还没完成的时候,任务分配者又发一个任务过来,那就忙不过来了,因而就须要多个工做者来共同处理这些任务,这些工做者,就称为工做队列。结构图以下:ubuntu
rabbitmq的python实例工做队列并发
准备工做(Preparation)app
在实例程序中,用new_task.py来模拟任务分配者, worker.py来模拟工做者。函数
修改send.py,从命令行参数里接收信息,并发送fetch
1spa 2命令行 3code 4cdn 5 6 7 |
|
修改receive.py的回调函数。
1 2 3 4 5 6 |
|
这边先打开两个终端,都运行worker.py,处于监听状态,这边就至关于两个工做者。打开第三个终端,运行new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
观察worker.py接收到任务,其中一个工做者接收到3个任务 :
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
另一个工做者接收到2个任务 :
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
从上面来看,每一个工做者,都会依次分配到任务。那么若是一个工做者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其余工做者处理。因此应当有一种机制,当一个工做者完成任务时,会反馈消息。
消息确认(Message acknowledgment)
消息确认就是当工做者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:
1 2 3 4 5 |
|
这边停顿5秒,能够方便ctrl+c退出。
去除no_ack=True参数或者设置为False也能够。
1 |
|
用这个代码运行,即便其中一个工做者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务从新分配给其余工做者。
消息持久化存储(Message durability)
虽然有了消息反馈机制,可是若是rabbitmq自身挂掉的话,那么任务仍是会丢失。因此须要将任务持久化存储起来。声明持久化存储:
1 |
|
可是这个程序会执行错误,由于hello这个队列已经存在,而且是非持久化的,rabbitmq不容许使用不一样的参数来从新定义存在的队列。从新定义一个队列:
1 |
|
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
1 2 3 4 5 6 |
|
公平调度(Fair dispatch)
上面实例中,虽然每一个工做者是依次分配到任务,可是每一个任务不必定同样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。若是能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工做者分配多个任务,即只有工做者完成任务以后,才会再次接收到任务。
1 |
|
new_task.py完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
worker.py完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|