RabbitMQ能作些什么?
消息系统容许软件、应用相互链接和扩展.这些应用能够相互连接起来组成一个更大的应用,或者将用户设备和数据进行链接.消息系统经过将消息的发送和接收分离来实现应用程序的异步和解偶.html
或许你正在考虑进行数据投递,非阻塞操做或推送通知。或许你想要实现发布/订阅,异步处理,或者工做队列。全部这些均可以经过消息系统实现。python
RabbitMQ是一个消息代理 - 一个消息系统的媒介。它能够为你的应用提供一个通用的消息发送和接收平台,而且保证消息在传输过程当中的安全。git
技术亮点
可靠性
RabbitMQ提供了多种技术可让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证明和高可用性机制。github
灵活的路由
消息在到达队列前是经过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。若是你有更复杂的路由需求,能够将这些交换机组合起来使用,你甚至能够实现本身的交换机类型,而且当作RabbitMQ的插件来使用。shell
集群
在相同局域网中的多个RabbitMQ服务器能够聚合在一块儿,做为一个独立的逻辑代理来使用。编程
联合
对于服务器来讲,它比集群须要更多的松散和非可靠连接。为此RabbitMQ提供了联合模型。小程序
高可用的队列
在同一个集群里,队列能够被镜像到多个机器中,以确保当其中某些硬件出现故障后,你的消息仍然安全。缓存
多协议
RabbitMQ 支持多种消息协议的消息传递。安全
普遍的客户端
只要是你能想到的编程语言几乎都有与其相适配的RabbitMQ客户端。bash
可视化管理工具
RabbitMQ附带了一个易于使用的可视化管理工具,它能够帮助你监控消息代理的每个环节。
追踪
若是你的消息系统有异常行为,RabbitMQ还提供了追踪的支持,让你可以发现问题所在。
插件系统
RabbitMQ附带了各类各样的插件来对本身进行扩展。你甚至也能够写本身的插件来使用。
AMQP协议
AMQP是什么
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通讯。
消息代理和他们所扮演的角色
消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。
因为AMQP是一个网络协议,因此这个过程当中的发布者,消费者,消息代理 能够存在于不一样的设备上。
介绍
RabbitMQ是一个消息代理。它的工做就是接收和转发消息。你能够把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ就扮演着邮箱、邮局以及邮递员的角色。
RabbitMQ和邮局的主要区别在于,它处理纸张,而是接收、存储和发送消息(message)这种二进制数据。
下面是RabbitMQ和消息所涉及到的一些术语。
-
生产(Producing)的意思就是发送。发送消息的程序就是一个生产者(producer)。咱们通常用"P"来表示:
-
队列(queue)就是存在于RabbitMQ中邮箱的名称。虽然消息的传输通过了RabbitMQ和你的应用程序,可是它只能被存储于队列当中。实质上队列就是个巨大的消息缓冲区,它的大小只受主机内存和硬盘限制。多个生产者(producers)能够把消息发送给同一个队列,一样,多个消费者(consumers)也可以从同一个队列(queue)中获取数据。队列能够绘制成这样(图上是队列的名称):
-
在这里,消费(Consuming)和接收(receiving)是同一个意思。一个消费者(consumer)就是一个等待获取消息的程序。咱们把它绘制为"C":
须要指出的是生产者、消费者、代理需不要待在同一个设备上;事实上大多数应用也确实不在会将他们放在一台机器上。
Hello World!
(使用pika 0.10.0 Python客户端)
接下来咱们用Python写两个小程序。一个发送单条消息的生产者(producer)和一个接收消息并将其输出的消费者(consumer)。传递的消息是"Hello World"。
下图中,“P”表明生产者,“C”表明消费者,中间的盒子表明为消费者保留的消息缓冲区,也就是咱们的队列。
生产者(producer)把消息发送到一个名为“hello”的队列中。消费者(consumer)从这个队列中获取消息。
RabbitMQ库
RabbitMQ使用的是AMQP 0.9.1协议。这是一个用于消息传递的开放、通用的协议。针对不一样编程语言有大量的RabbitMQ客户端可用。在这个系列教程中,RabbitMQ团队推荐使用Pika这个Python客户端。你们能够经过pip这个包管理工具进行安装:
发送
咱们第一个程序send.py
会发送一个消息到队列中。首先要作的事情就是创建一个到RabbitMQ服务器的链接。
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
如今咱们已经跟本地机器的代理创建了链接。若是你想链接到其余机器的代理上,须要把表明本地的localhost
改成指定的名字或IP地址。
接下来,在发送消息以前,咱们须要确认服务于消费者的队列已经存在。若是将消息发送给一个不存在的队列,RabbitMQ会将消息丢弃掉。下面咱们建立一个名为"hello"的队列用来将消息投递进去。
channel.queue_declare(queue='hello')
这时候咱们就能够发送消息了,咱们第一条消息只包含了Hello World!字符串,咱们打算把它发送到hello队列。
在RabbitMQ中,消息是不能直接发送到队列中的,这个过程须要经过交换机(exchange)来进行。可是为了避免让细节拖累咱们的进度,这里咱们只须要知道如何使用由空字符串表示的默认交换机便可。若是你想要详细了解交换机,能够查看咱们教程的第三部分来获取更多细节。默认交换机比较特别,它容许咱们指定消息究竟须要投递到哪一个具体的队列中,队列名字须要在routing_key
参数中指定。
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'")
在退出程序以前,咱们须要确认网络缓冲已经被刷写、消息已经投递到RabbitMQ。经过安全关闭链接能够作到这一点。
connection.close()
发送不成功!
若是这是你第一次使用RabbitMQ,而且没有看到“Sent”消息出如今屏幕上,你可能会抓耳挠腮不知因此。这也许是由于没有足够的磁盘空间给代理使用所形成的(代理默认须要200MB的空闲空间),因此它才会拒绝接收消息。查看一下代理的日志文件进行确认,若是须要的话也能够减小限制。配置文件文档会告诉你如何更改磁盘空间限制(disk_free_limit)。
接收
咱们的第二个程序receive.py
,将会从队列中获取消息并将其打印到屏幕上。
此次咱们仍是须要要先链接到RabbitMQ服务器。链接服务器的代码和以前是同样的。
下一步也和以前同样,咱们须要确认队列是存在的。咱们能够屡次使用queue_declare
命令来建立同一个队列,可是只有一个队列会被真正的建立。
channel.queue_declare(queue='hello')
你也许要问: 为何要重复声明队列呢 —— 咱们已经在前面的代码中声明过它了。若是咱们肯定了队列是已经存在的,那么咱们能够不这么作,好比此前预先运行了send.py程序。但是咱们并不肯定哪一个程序会首先运行。这种状况下,在程序中重复将队列重复声明一下是种值得推荐的作法。
列出全部队列
你也许但愿查看RabbitMQ中有哪些队列、有多少消息在队列中。此时你可使用rabbitmqctl工具(使用有权限的用户):
sudo rabbitmqctl list_queues(在Windows中不须要sudo命令)
rabbitmqctl list_queues
从队列中获取消息相对来讲稍显复杂。须要为队列定义一个回调(callback)函数。当咱们获取到消息的时候,Pika库就会调用此回调函数。这个回调函数会将接收到的消息内容输出到屏幕上。
def callback(ch, method, properties, body): print(" [x] Received %r" % body)
下一步,咱们须要告诉RabbitMQ这个回调函数将会从名为"hello"的队列中接收消息:
channel.basic_consume(callback,
queue='hello', no_ack=True)
要成功运行这些命令,咱们必须保证队列是存在的,咱们的确能够确保它的存在——由于咱们以前已经使用queue_declare
将其声明过了。
no_ack
参数稍后会进行介绍。
最后,咱们运行一个用来等待消息数据而且在须要的时候运行回调函数的无限循环。
print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
将代码整合到一块儿
send.py的完整代码:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive.py的完整代码:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
如今咱们能够在终端中尝试一下咱们的程序了。
首先咱们启动一个消费者,它会持续的运行来等待投递到达。
python receive.py
# => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Hello World!'
而后启动生产者,生产者程序每次执行后都会中止运行。
python send.py
# => [x] Sent 'Hello World!'
成功了!咱们已经经过RabbitMQ发送第一条消息。你也许已经注意到了,receive.py程序并无退出。它一直在准备获取消息,你能够经过Ctrl-C来停止它。
试下在新的终端中再次运行send.py
。
咱们已经学会如何发送消息到一个已知队列中并接收消息。是时候移步到第二部分了,咱们将会创建一个简单的工做队列(work queue)。
工做队列
(使用pika 0.9.5 Python客户端)
在第一篇教程中,咱们已经写了一个从已知队列中发送和获取消息的程序。在这篇教程中,咱们将建立一个工做队列(Work Queue),它会发送一些耗时的任务给多个工做者(Worker)。
工做队列(又称:任务队列——Task Queues)是为了不等待一些占用大量资源、时间的操做。当咱们把任务(Task)看成消息发送到队列中,一个运行在后台的工做者(worker)进程就会取出任务而后处理。当你运行多个工做者(workers),任务就会在它们之间共享。
这个概念在网络应用中是很是有用的,它能够在短暂的HTTP请求中处理一些复杂的任务。
准备
以前的教程中,咱们发送了一个包含“Hello World!”的字符串消息。如今,咱们将发送一些字符串,把这些字符串看成复杂的任务。咱们没有真实的例子,例如图片缩放、pdf文件转换。因此使用time.sleep()函数来模拟这种状况。咱们在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。好比"Hello..."就会耗时3秒钟。
咱们对以前教程的send.py作些简单的调整,以即可以发送随意的消息。这个程序会按照计划发送任务到咱们的工做队列中。咱们把它命名为new_task.py:
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
咱们的旧脚本(receive.py)一样须要作一些改动:它须要为消息体中每个点号(.)模拟1秒钟的操做。它会从队列中获取消息并执行,咱们把它命名为worker.py:
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
循环调度:
使用工做队列的一个好处就是它可以并行的处理队列。若是堆积了不少任务,咱们只须要添加更多的工做者(workers)就能够了,扩展很简单。
首先,咱们先同时运行两个worker.py脚本,它们都会从队列中获取消息,究竟是不是这样呢?咱们看看。
你须要打开三个终端,两个用来运行worker.py脚本,这两个终端就是咱们的两个消费者(consumers)—— C1 和 C2。
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C
第三个终端,咱们用来发布新任务。你能够发送一些消息给消费者(consumers):
shell3$ python new_task.py First message. shell3$ python new_task.py Second message.. shell3$ python new_task.py Third message... shell3$ python new_task.py Fourth message.... shell3$ python new_task.py Fifth message.....
看看到底发送了什么给咱们的工做者(workers):
shell1$ python worker.py
[*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
shell2$ python worker.py
[*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
默认来讲,RabbitMQ会按顺序得把消息发送给每一个消费者(consumer)。平均每一个消费者都会收到同等数量得消息。这种发送消息得方式叫作——轮询(round-robin)。试着添加三个或更多得工做者(workers)。
消息确认
当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)以后,立刻就会在内存中移除。这种状况,你只要把一个工做者(worker)中止,正在处理的消息就会丢失。同时,全部发送到这个工做者的尚未处理的消息都会丢失。
咱们不想丢失任何任务消息。若是一个工做者(worker)挂掉了,咱们但愿任务会从新发送给其余的工做者(worker)。
为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会经过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,而后RabbitMQ就会释放并删除这条消息。
若是消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被彻底处理,而后从新发送给其余消费者(consumer)。这样,及时工做者(workers)偶尔的挂掉,也不会丢失消息。
消息是没有超时这个概念的;当工做者与它断开连的时候,RabbitMQ会从新发送消息。这样在处理一个耗时很是长的消息任务的时候就不会出问题了。
消息响应默认是开启的。以前的例子中咱们可使用no_ack=True标识把它关闭。是时候移除这个标识了,当工做者(worker)完成了任务,就发送一个响应。
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello')
运行上面的代码,咱们发现即便使用CTRL+C杀掉了一个工做者(worker)进程,消息也不会丢失。当工做者(worker)挂掉这后,全部没有响应的消息都会从新发送。
忘记确认
一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出以后就会从新发送,若是它不可以释放没响应的消息,RabbitMQ就会占用愈来愈多的内存。
为了排除这种错误,你可使用rabbitmqctl命令,输出messages_unacknowledged字段:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.
消息持久化
若是你没有特地告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失全部队列和消息。为了确保信息不会丢失,有两个事情是须要注意的:咱们必须把“队列”和“消息”设为持久化。
首先,为了避免让队列消失,须要把队列声明为持久化(durable):
channel.queue_declare(queue='hello', durable=True)
尽管这行代码自己是正确的,可是仍然不会正确运行。由于咱们已经定义过一个叫hello的非持久化队列。RabbitMq不容许你使用不一样的参数从新定义一个队列,它会返回一个错误。但咱们如今使用一个快捷的解决方法——用不一样的名字,例如task_queue。
channel.queue_declare(queue='task_queue', durable=True)
这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改。
这时候,咱们就能够确保在RabbitMq重启以后queue_declare队列不会丢失。另外,咱们须要把咱们的消息也要设为持久化——将delivery_mode的属性设为2。
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
注意:消息持久化
将消息设为持久化并不能彻底保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间仍是有一个很小的间隔时间。由于RabbitMq并非全部的消息都使用fsync(2)——它有可能只是保存到缓存中,并不必定会写到硬盘中。并不能保证真正的持久化,但已经足够应付咱们的简单工做队列。若是你必定要保证持久化,你须要改写你的代码来支持事务(transaction)。
公平调度
你应该已经发现,它仍旧没有按照咱们指望的那样进行分发。好比有两个工做者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。
这时由于RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有做出响应。它盲目的把第n-th条消息发给第n-th个消费者。
咱们可使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工做者(worker),直到它已经处理了上一条消息而且做出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工做者(worker)。
channel.basic_qos(prefetch_count=1)
关于队列大小
若是全部的工做者都处理繁忙状态,你的队列就会被填满。你须要留意这个问题,要么添加更多的工做者(workers),要么使用其余策略。
整合代码
new_task.py的完整代码:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close()
(new_task.py源码)
咱们的worker:
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
(worker.py source)
使用消息响应和prefetch_count你就能够搭建起一个工做队列了。这些持久化的选项使得在RabbitMQ重启以后仍然可以恢复。