1. 工做队列:html
对于资源密集型任务,咱们等待其处理完成在不少状况下是不现实的,好比没法在http的短暂请求窗口中处理大量耗时任务,python
为了达到主线程无需等待,任务异步执行的要求,咱们能够将任务加入任务队列,如图,多个workers能够共享安全
同一个任务队列,同时对任务进行处理,主线程P将延后任务发送到队列以后便可返回,延迟任务由C1和C2处理完成;异步
2. 轮询调度:测试
队列会将消息轮询分发给worker,如上图两个worker,则首先发送消息到C1,而后发送消息到C2,而后在发送消息到C1,C2,C1...,fetch
队列按顺序发送,这样保证了每一个worker收到的消息是均等的,默认设置状况下,队列并不会考虑worker当前的负载状况。spa
3. 均衡调度:线程
如2中所说,好比如今有两个队列,奇数消息都须要队列作大量繁重的处理,而偶数消息则须要处理的逻辑很是少,这样就会形成某个队列code
任务繁重,等待处理任务过多,从而使消息处理不均衡,处理能力降低。面对这样的状况,RabbitMQ提供了均衡调度机制,指定workerhtm
只能接收一条消息,当worker处理完毕,队列收到消息确认(4中描述)的时候,才会派发给该worker一条新消息。由此,达到对消息和队列处理能力的均衡调度。
以下,咱们可使用basic_qos,并将perfetch_count设置为1,来告诉队列每次只发送一条消息给当前worker,直到收到完成确认才发下一条。
channel.basic_qos(prefetch_count=1)
4. 消息确认:
当不使用消息确认的状况下,队列将消息投递给worker以后,会当即将消息从队列内存中删除;此时,若是woker被停掉或者崩溃,
那么worker当前正在处理的消息和队列已经派发给worker的消息都会丢失。
RabbitMQ提供了消息确认机制,worker完成处理消息以后发送ack,队列确认消息已处理完毕,才将其从内存中删除。可是这个过程没有
超时,哪怕woker处理了很长时间也是没问题的。当worker挂掉,队列没有收到消息ack,若是有其余worker在运行,那么worker
会将未确认的消息派发给其余运行中的worker。ack确认机制默认是开启的,固然能够在channel中关闭。
注意,必定要确保在消息处理完成以后发送ack,不然队列内存将会随消息的增长而不断增长,甚至形成内存耗尽。
5. 消息持久化:
消息ack解决了worker挂掉时候消息的安全性,可是没法针对整个服务的重启或者挂掉,当RabbitMQ重启或者挂掉的时候,队列和消息都会消失,
为了不这种状况发生,咱们须要设置队列和消息持久化。
(1) 设置队列持久化:durable=True
channel.queue_declare(queue='task_queue', durable=True)
(2) 设置消息持久化:delivery_mode=2
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
上述设置虽然必定程度上保证了消息持久化,可是在收到消息和持久化消息之间仍然有时间窗口存在,且并非每条消息都会写一次磁盘,
因此这个时间窗口内仍然可能丢失消息,若是要确保持久化足够健壮,请参考 https://www.rabbitmq.com/confirms.html
6. 测试代码:
new_task.py--用于发送消息到队列
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 # 设置队列持久化 10 channel.queue_declare(queue='task_queue', durable=True) 11 12 message = ' '.join(sys.argv[1:]) or "Hello World!" 13 channel.basic_publish(exchange='', 14 routing_key='task_queue', 15 body=message, 16 properties=pika.BasicProperties( 17 delivery_mode = 2, # 设置消息持久化 18 )) 19 print(" [x] Sent %r" % message) 20 connection.close()
worker.py--用于接收队列消息并完成消息处理
1 #!/usr/bin/env python 2 import pika 3 import time 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 # 设置队列持久化 10 channel.queue_declare(queue='task_queue', durable=True) 11 print(' [*] Waiting for messages. To exit press CTRL+C') 12 13 def callback(ch, method, properties, body): 14 print(" [x] Received %r" % body) 15 time.sleep(body.count(b'.')) 16 print(" [x] Done") 17 # 完成消息处理,发送ack确认消息 18 ch.basic_ack(delivery_tag = method.delivery_tag) 19 20 # 最多同时接受一条消息 21 channel.basic_qos(prefetch_count=1) 22 channel.basic_consume(callback, 23 queue='task_queue') 24 25 channel.start_consuming()