在文章RabbitMQ入门(一)之Hello World,咱们编写程序经过指定的队列来发送和接受消息。在本文中,咱们将会建立工做队列
(Work Queue),经过多个workers来分配耗时任务。
工做队列(Work Queue,也被成为Task Queue,任务队列)的中心思想是,避免当即执行一个资源消耗巨大且必须等待其完成的任务。相反地,咱们调度好队列能够安排该任务稍后执行。咱们将一个任务(task)封装成一个消息,将它发送至队列。一个在后台运行的work进程将会抛出该任务,并最终执行该任务。当你运行多个workers的时候,任务将会在它们之中共享。
这个概念在web开发中颇有用,由于经过一个短的HTTP请求不可能处理复杂的任务。
在以前的文章中,咱们发送了一个包含“Hello World!”的消息。如今咱们将会发送表明复杂任务的字串符。咱们并无实际上的任务,好比从新调整图片的尺寸或者渲染PDF,咱们伪装有这样的复杂任务,经过使用time.sleep()
函数。咱们将会用字符串中的点(.)来表明复杂度;每个点表明一秒中的任务。举例来讲,字符串Hello...
须要花费三秒。
咱们须要稍微修改下sent.py
中的代码,容许在命令中输入任意字符串。该程序会调度任务至工做队列,所以命名为new_task.py
:python
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print(" [x] Sent %r" % message)
咱们原先的receive.py
也须要改动:它须要在消息体中将字符串的每个点表明1秒钟的任务。它会从队列中抛出消息并执行该消息,所以命名为task.py
:mysql
import time def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done")
使用工做队列的一个好处就是它可以轻松实现并行工做。若是咱们建立了一项积压的工做,那么咱们能够增长更多的worker来使它的扩展性更好。
首先,咱们同时运行两个worker.py
脚本。他们都可以从队列中获取消息,可是具体是怎么实现的呢?让咱们接着阅读。
你须要打开三个终端查看。两个终端用于运行worker.py
脚本。这两个终端将会成为两个消费者——C1和C2。web
# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C
在第三个终端中,咱们将会产生新的任务。一旦你启动了这些消费者,你就能够发送一些消息了:sql
# shell 3 python new_task.py First message. python new_task.py Second message.. python new_task.py Third message... python new_task.py Fourth message.... python new_task.py Fifth message.....
让咱们看看这两个workers传递了什么:shell
# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
# shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
RabbitMQ默认会将每一个消息依次地发送给下一个消费者。所以总的来讲,每一个消费者将会一样数量的消息。这种消息分配的方法叫Round-Robin
。你能够尝试三个或者更多的worker。数据库
执行一项任务须要花费几秒钟。你也许会好奇,若是其中一个消费者执行一项耗时很长的任务,而且在执行了一部分的时候挂掉了,将会发生什么?根据咱们如今的代码,一旦RabbitMQ将消息传送至消费者,那么RabbitMQ就会标志它为删除状态。在这种状况下,若是咱们杀死某个worker,咱们将会失去他正在处理的消息。咱们也会失去全部分配至这个worker的消息,固然,这些消息还未被处理。
可是,咱们不但愿失去任何一项任务。若是有一个worker挂掉了,咱们但愿这些任务可以被传送至另外一个worker。
为了确保消息不丢失,RabbitMQ支持消息确认
。一个ack(nowledgement)是由消费者发送回来的,用于告诉RabbitMQ,这个特定的消息已经被接受,被处理,能够被删除了。
若是一个消费者挂了(它的channel关闭了,链接关闭了,或者TCP链接丢失)可是没有发送一个ack,RabbitMQ就会知道这个消息并未被彻底处理,会将它从新塞进队列。若是同时还存在着其余在线消费者,RabbbitMQ将会将这个消息从新传送给另外一个消费者。用这种方式能够确保没有消息丢失,即便workers偶尔会刮掉。
并不存在消息超时;若是消费者挂了,RabbitMQ将会从新传送消息。这样即便处理一个消息须要消耗很长很长的时间,也是能够的。
默认的消息确认
方式为人工消息确认
。在咱们以前的例子中,咱们清晰地将它关闭了,使用了auto_ack=True
这个命令。当咱们完成一项任务的时候,根据须要,移除这个标志,从worker中发送一个合适的确认。json
def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep( body.count('.') ) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(queue='hello', on_message_callback=callback)
使用上述代码,咱们能够确保,即便咱们使用CTRL+C
命令杀死了一个正在处理消息的woker,也不会丢失什么。这个worker挂掉后不久,全部未确认的消息将会被从新传送。
消息确认
必须在同一个传输消息的channel中发送。尝试着在不一样的channel中进行消息确认将会引起channel-level protocol exception。bash
咱们已经学习了如何在消费者挂掉的状况下,任务不会丢失。可是,当RabbitMQ server中止时,咱们的任务仍然会丢失。
当RabbitMQ中止或崩溃时,它将会忘记全部的队列和消息,除非你告诉它不这么作。在这种状况下,须要作两个事情确保消息不会丢失:咱们须要将队列和消息都设置为持久化。
首先,咱们须要确保RabbitMQ不会丢失队列。为了实现这个,咱们须要将队列声明为持久化:app
channel.queue_declare(queue='hello', durable=True)
尽管这个命令是正确的,但他仍会不会起做用。这是由于,咱们已经建立了一个叫为hello
的非持久化队列。RabbitMQ不容许你从新定义一个已经存在的队列而参数不同,全部这样作的程序只会引起错误。可是有一个快速的应变办法——咱们能够建立一个不一样名称的队列,好比task_queue
:dom
channel.queue_declare(queue='task_queue', durable=True)
queue_declare
须要同时应用于生产者和消费者。
在这点上咱们能够确保task_queue
队列不会丢失消息即便RabbitMQ重启。如今,咱们须要声明消息为持久化——将delivery_mode
这个参数设置为2。
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
你也许注意到了,刚才的消息分发机制并不会严格地按照咱们所但愿的方式进行。举这样一个例子,设想有两个worker,而全部的奇数消息都很重而偶数消息都是轻量级的,这样其中一个worker就会一直很忙而另外一个worker几乎不作什么工做。然而,RabbitMQ对此一无所知,它仍然会平均分配消息。
这种状况的发生是由于RabbitMQ仅仅是当消息进入队列的时候就会分发这个消息。它并不会注意消费者所接收的未确认的消息数量。它盲目地将第n个消息发送至第n个消费者。
为了克服这种状况,咱们能够在basic.qos
方法中设置prefetch_count=1
。这就告诉RabbitMQ一次不要将多于一个的消息发送给一个worker。换句话说,不要分发一个新的消息给worker除非这个worker已经处理好以前的消息而且进行了消息确认。也就说,RabbitMQ将会将这个消息分发给下一个不是很忙的worker。
channel.basic_qos(prefetch_count=1)
为了对上面的例子有一个好的理解,咱们须要写代码进行实际操练一下。
生产者new_task.py
的代码以下:
# -*- coding: utf-8 -*- import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消费者worker.py
的完整代码以下:
# -*- coding: utf-8 -*- import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()
开启三个终端,消息的发送和接收状况以下:
若是咱们停掉其中一个worker,那么消息的接收状况以下:
能够看到,如今全部发送的消息都会被这个仍在工做的worker接收到。
接下来,咱们将会使用RabbitMQ的这种工做队列的方式往MySQL数据库中的表插入数据。
数据库为orm_test,表格为exam_user,表结构以下:
接下来,咱们须要往这张表中插入随机建立的数据。若是咱们利用Python的第三方模块pymysql,每一次插入一条记录,那么一分钟插入53237条记录。
利用RabbitMQ,咱们的生产者代码以下:
# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:23 import pika from random import choice names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark', 'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David', 'Ella', 'Lake', 'Moon', 'Nake', 'Zoo'] places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao'] types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008', 'DG009', 'DG010', 'DG020'] connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) for id in range(1, 20000001): name = choice(names) place = choice(places) type2 = choice(types) message = "insert into exam_users values(%s, '%s', '%s', '%s');" % (id, name, place, type2) channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消费者代码以下:
# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:28 # -*- coding: utf-8 -*- # author: Jclian91 # place: Sanya Hainan # time: 2020-01-12 13:45 import pika import time import pymysql # 打开数据库链接 db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test") # 使用 cursor() 方法建立一个游标对象 cursor cursor = db.cursor() connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) cursor.execute(body) db.commit() print(" [x] Insert successfully!") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()
咱们开启9个终端,其中8个消费者1个生产者,先启动消费者,而后生产者,按照上面的数据导入方式,一分钟插入了133084条记录,是普通方式的2.50倍,效率有大幅度提高!
让咱们稍微修改下生产者和消费者的代码,一次提交插入多条记录,减小每提交一次就插入一条记录的消耗时间。新的生产者代码以下:
# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:23 import pika from random import choice import json names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark', 'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David', 'Ella', 'Lake', 'Moon', 'Nake', 'Zoo'] places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao'] types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008', 'DG009', 'DG010', 'DG020'] connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) for _ in range(1, 200001): values = [] for i in range(100): name = choice(names) place = choice(places) type2 = choice(types) values.append([100*_+i+1, name, place, type2]) message = json.dumps(values) channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
新的消费者的代码以下:
# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:28 # -*- coding: utf-8 -*- # author: Jclian91 # place: Sanya Hainan # time: 2020-01-12 13:45 import pika import json import time import pymysql # 打开数据库链接 db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test") # 使用 cursor() 方法建立一个游标对象 cursor cursor = db.cursor() connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) sql = 'insert into exam_users values(%s, %s, %s, %s)' cursor.executemany(sql, json.loads(body)) db.commit() print(" [x] Insert successfully!") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()
跟刚才同样,咱们开启9个终端,其中8个消费者1个生产者,先启动消费者,而后生产者,按照上面的数据导入方式,一分钟插入了3170600条记录,是普通方式的59.56倍,是先前一次只提交一条记录的插入方式的23.82倍。这样的提速无疑是很是惊人的!
固然还有更高效的数据插入方法,本文的方法仅仅是为了演示RabbitMQ的工做队列以及在插入数据方面的提速。
本次分享到此结束,感谢你们阅读~