rabbitmq中文教程python版 - 工做队列

源码:https://github.com/ltoddy/rabbitmq-tutorialpython

工做队列

image

(using the Pika Python client)git

本章节教程重点介绍的内容

在第一篇教程中,咱们编写了用于从命名队列发送和接收消息的程序。在这一个中,咱们将建立一个工做队列,用于在多个工做人员之间分配耗时的任务。github

工做队列(又名:任务队列)背后的主要思想是避免当即执行资源密集型任务,而且必须等待它完成。相反,咱们安排稍后完成任务。咱们将任务封装 为消息并将其发送到队列。
在后台运行的工做进程将弹出任务并最终执行做业。当你运行许多工人时,任务将在他们之间共享。服务器

这个概念在Web应用程序中特别有用,由于在短的HTTP请求窗口中没法处理复杂的任务。函数

在本教程的前一部分中,咱们发送了一条包含“Hello World!”的消息。如今咱们将发送表明复杂任务的字符串。
咱们没有真实世界的任务,好比要调整大小的图像或要渲染的PDF文件,因此让咱们伪装咱们很忙 - 使用 time.sleep() 函数来假装它。
咱们将把字符串中的点(".")数做为复杂度; 每个点都会占用一秒的“工做”。例如,Hello ... 描述的假任务将须要三秒钟。fetch

咱们稍微修改前面例子中的send.py代码,以容许从命令行发送任意消息。这个程序将把任务安排到咱们的工做队列中,因此让咱们把它命名为new_task.pyspa

import sys

message = ' '.join(sys.argv[1:]) or 'Hello World'

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print(" [x] Sent %r" % message)

咱们的旧版receive.py脚本也须要进行一些更改:它须要为邮件正文中的每一个点伪造第二个工做。它会从队列中弹出消息并执行任务,因此咱们称之为worker.py命令行

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

循环调度

使用任务队列的优势之一是能够轻松地平行工做。若是咱们正在积累积压的工做,咱们能够增长更多的工做人员,而且这种方式很容易扩展。code

首先,咱们试着同时运行两个worker.py脚本。他们都会从队列中获取消息,但具体究竟是什么?让咱们来看看。教程

您须要打开三个控制台。两个将运行worker.py脚本。这些控制台将成为咱们的两个消费者 - C1和C2。

image

默认状况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。平均而言,每一个消费者将得到相同数量的消息。这种分配消息的方式称为循环法。请尝试与三名或更多的工人。

消息确认

作任务可能须要几秒钟的时间。你可能想知道若是其中一个消费者开始一项长期任务而且只是部分完成而死亡会发生什么。
用咱们目前的代码,一旦RabbitMQ将消息传递给客户,它当即将其标记为删除。在这种状况下,若是你杀了一个工人,咱们将失去刚刚处理的信息。
咱们也会失去全部派发给这个特定工做人员但还没有处理的消息。

但咱们不想失去任何任务。若是一名工人死亡,咱们但愿将任务交付给另外一名工人。

为了确保消息永不丢失,RabbitMQ支持消息确认。消费者发回ack(请求)告诉RabbitMQ已经收到,处理了特定的消息,而且RabbitMQ能够自由删除它。

若是消费者死亡(其通道关闭,链接关闭或TCP链接丢失),RabbitMQ将理解消息未被彻底处理,并将从新排队。若是有其余消费者同时在线,它会迅速将其从新发送给另外一位消费者。
这样,即便工做人员偶尔死亡,也能够确保没有任何信息丢失。

没有任何消息超时; 当消费者死亡时,RabbitMQ将从新传递消息。即便处理消息须要很是很长的时间也不要紧。

消息确认默认是被打开的。在前面的例子中,咱们经过 no_ack = True 标志明确地将它们关闭。一旦咱们完成了一项任务,如今是时候清除这个标志而且发送工人的正确确认。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)


channel.basic_consume(callback,
                      queue='hello')

使用这段代码,咱们能够肯定,即便在处理消息时使用CTRL + C来杀死一个工做者,也不会丢失任何东西。工人死后不久,全部未确认的消息将被从新发送。

消息持久性

咱们已经学会了如何确保即便消费者死亡,任务也不会丢失。可是若是RabbitMQ服务器中止,咱们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非您告诉它不要。须要作两件事来确保消息不会丢失:咱们须要将队列和消息标记为持久。

首先,咱们须要确保RabbitMQ永远不会失去咱们的队列。为了作到这一点,咱们须要宣布它是持久的:

channel.queue_declare(queue='hello', durable=True)

虽然这个命令自己是正确的,但它在咱们的设置中不起做用。那是由于咱们已经定义了一个名为hello的队列 ,这个队列并不"耐用"。
RabbitMQ不容许您使用不一样的参数从新定义现有的队列,并会向任何试图执行该操做的程序返回错误。
可是有一个快速的解决方法 - 让咱们声明一个具备不一样名称的队列,例如task_queue

channel.queue_declare(queue='task_queue', durable=True)

queue_declare更改须要应用于生产者和消费者代码。

此时咱们确信,即便RabbitMQ从新启动,task_queue队列也不会丢失。如今咱们须要将消息标记为持久 - 经过提供值为2的delivery_mode属性。

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 确保消息是持久的
                      ))

公平派遣

您可能已经注意到调度仍然没法彻底按照咱们的要求工做。例如,在有两名工人的状况下,当全部奇怪的信息都很重,甚至信息不多时,一名工做人员会一直很忙,
另外一名工做人员几乎不会作任何工做。那么,RabbitMQ不知道任何有关这一点,并仍将均匀地发送消息。

发生这种状况是由于RabbitMQ只在消息进入队列时调度消息。它没有考虑消费者未确认消息的数量。它只是盲目地将第n条消息分发给第n位消费者。

image

为了解决这个问题,咱们可使用basic.qos方法和设置prefetch_count = 1。这告诉RabbitMQ一次不要向工做人员发送多个消息。
或者换句话说,不要向工做人员发送新消息,直到它处理并确认了前一个消息。相反,它会将其分派给不是仍然忙碌的下一个工做人员。

channel.basic_qos(prefetch_count=1)

把它放在一块儿

咱们的new_task.py脚本的最终代码:

#!/usr/bin/env python
import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or 'Hello World'

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 确保消息是持久的
                      ))
print(" [x] Sent %r" % message)
connection.close()

而咱们的工人 worker.py

#!/usr/bin/env python
import time
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback,
                      queue='hello')
channel.basic_qos(prefetch_count=1)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

使用消息确认和prefetch_count,您能够设置一个工做队列。即便RabbitMQ从新启动,持久性选项也可以让任务继续存在。

相关文章
相关标签/搜索