RabbitMQ-python应用


 

介绍

官方文档:https://www.rabbitmq.com/tutorials/tutorial-one-python.htmlhtml

RabbitMQ是一个基于AMQP协议的消息代理。它的工做就是接收和转发消息。你能够把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ就扮演着邮箱、邮局以及邮递员的角色。python

了解AMQP:https://www.cnblogs.com/Xuuuuuu/p/10880640.html算法

RabbitMQ和邮局的主要区别在于,它处理纸张,而是接收、存储和发送消息(message)这种二进制数据。shell

Hello World!

(使用pika 1.0.1 Python客户端)小程序

接下来咱们用Python写两个小程序。一个发送单条消息的生产者(producer)和一个接收消息并将其输出的消费者(consumer)。传递的消息是"Hello World"。缓存

下图中,“P”表明生产者,“C”表明消费者,中间的盒子表明为消费者保留的消息缓冲区,也就是咱们的队列。安全

生产者(producer)把消息发送到一个名为“hello”的队列中。消费者(consumer)从这个队列中获取消息。bash

发送

咱们第一个程序send.py会发送一个消息到队列中。首先要作的事情就是创建一个到RabbitMQ服务器的链接。服务器

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.233'))
channel = connection.channel()

如今咱们已经跟本地机器的代理创建了链接。若是你想链接到其余机器的代理上,须要把表明本地的localhost改成指定的名字或IP地址。网络

接下来,在发送消息以前,咱们须要确认服务于消费者的队列已经存在。若是将消息发送给一个不存在的队列,RabbitMQ会将消息丢弃掉。下面咱们建立一个名为"hello"的队列用来将消息投递进去。

channel.queue_declare(queue='hello')

在RabbitMQ中,消息是不能直接发送到队列中的,这个过程须要经过交换机(exchange)来进行。可是为了避免让细节拖累咱们的进度,这里咱们只须要知道如何使用由空字符串表示的默认交换机便可。默认交换机比较特别,它容许咱们指定消息究竟须要投递到哪一个具体的队列中,队列名字须要在routing_key参数中指定。

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

在退出程序以前,咱们须要确认网络缓冲已经被刷写、消息已经投递到RabbitMQ。经过安全关闭链接能够作到这一点。

connection.close()

接收

咱们的第二个程序receive.py,将会从队列中获取消息并将其打印到屏幕上。

此次咱们仍是须要要先链接到RabbitMQ服务器。链接服务器的代码和以前是同样的。

下一步也和以前同样,咱们须要确认队列是存在的。咱们能够屡次使用queue_declare命令来建立同一个队列,可是只有一个队列会被真正的建立。

channel.queue_declare(queue='hello')

你也许要问: 为何要重复声明队列呢 —— 咱们已经在前面的代码中声明过它了。若是咱们肯定了队列是已经存在的,那么咱们能够不这么作,好比此前预先运行了send.py程序。但是咱们并不肯定哪一个程序会首先运行。这种状况下,在程序中重复将队列重复声明一下是种值得推荐的作法。

列出全部队列

你也许但愿查看RabbitMQ中有哪些队列、有多少消息在队列中。此时你可使用rabbitmqctl工具(使用有权限的用户):

sudo rabbitmqctl list_queues

(在Windows中不须要sudo命令)

rabbitmqctl list_queues

从队列中获取消息相对来讲稍显复杂。须要为队列定义一个回调(callback)函数。当咱们获取到消息的时候,Pika库就会调用此回调函数。这个回调函数会将接收到的消息内容输出到屏幕上。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

下一步,咱们须要告诉RabbitMQ这个回调函数将会从名为"hello"的队列中接收消息:

channel.basic_consume(queue='hello',
                      auto_ack=True, on_message_callback=callback)

要成功运行这些命令,咱们必须保证队列是存在的,咱们的确能够确保它的存在——由于咱们以前已经使用queue_declare将其声明过了。

no_ack参数稍后会进行介绍。

最后,咱们运行一个用来等待消息数据而且在须要的时候运行回调函数的无限循环。

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

输出:

python receive.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Hello World!'

 

工做队列

在第一篇教程中,咱们已经写了一个从已知队列中发送和获取消息的程序。在这篇教程中,咱们将建立一个工做队列(work queue),它会发送一些耗时的任务给多个工做者(Worker)

工做队列(又称:任务队列)是为了不等待一些占用大量资源、时间的操做。当咱们把任务发送到队列中,一个运行在后台的工做者进程就会取出任务而后处理。当你运行多个工做者,任务就会在它们之间共享。

准备

以前的教程中,咱们发送了一个包含“Hello World!”的字符串消息。如今,咱们将发送一些字符串,把这些字符串看成复杂的任务。咱们没有真实的例子,例如图片缩放、pdf文件转换。因此使用time.sleep()函数来模拟这种状况。咱们在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。好比"Hello..."就会耗时3秒钟。

咱们对以前教程的send.py作些简单的调整,以即可以发送随意的消息。这个程序会按照计划发送任务到咱们的工做队列中。咱们把它命名为new_task.py:

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print " [x] Sent %r" % (message,)

咱们的旧脚本(receive.py)一样须要作一些改动:它须要为消息体中每个点号(.)模拟1秒钟的操做。它会从队列中获取消息并执行,咱们把它命名为worker.py:

import time

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.decode("utf-8").count('.') )
    print " [x] Done"

循环调度:

使用工做队列的一个好处就是它可以并行的处理队列。若是堆积了不少任务,咱们只须要添加更多的工做者(workers)就能够了,扩展很简单。

首先,咱们先同时运行两个worker.py脚本,它们都会从队列中获取消息,究竟是不是这样呢?咱们看看。

你须要打开三个终端,两个用来运行worker.py脚本,这两个终端就是咱们的两个消费者(consumers)—— C1 和 C2。

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

第三个终端,咱们用来发布新任务。你能够发送一些消息给消费者(consumers):

shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....

看看到底发送了什么给咱们的工做者(workers):

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

默认来讲,RabbitMQ会按顺序得把消息发送给每一个消费者(consumer)。平均每一个消费者都会收到同等数量得消息。这种发送消息得方式叫作——轮询(round-robin)。试着添加三个或更多得工做者(workers)。

消息确认

当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)以后,立刻就会在内存中移除。这种状况,你只要把一个工做者(worker)中止,正在处理的消息就会丢失。同时,全部发送到这个工做者的尚未处理的消息都会丢失。

咱们不想丢失任何任务消息。若是一个工做者(worker)挂掉了,咱们但愿任务会从新发送给其余的工做者(worker)。

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会经过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,而后RabbitMQ就会释放并删除这条消息。

若是消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被彻底处理,而后从新发送给其余消费者(consumer)。这样,及时工做者(workers)偶尔的挂掉,也不会丢失消息。

消息是没有超时这个概念的;当工做者与它断开连的时候,RabbitMQ会从新发送消息。这样在处理一个耗时很是长的消息任务的时候就不会出问题了。

消息响应默认是开启的。以前的例子中咱们可使用no_ack=True标识把它关闭。是时候移除这个标识了,当工做者(worker)完成了任务,就发送一个响应。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.decode('utf-8').count('.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(queue='hello',
                      on_message_callback=callback)

运行上面的代码,咱们发现即便使用CTRL+C杀掉了一个工做者(worker)进程,消息也不会丢失。当工做者(worker)挂掉这后,全部没有响应的消息都会从新发送

 忘记确认

一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出以后就会从新发送,若是它不可以释放没响应的消息,RabbitMQ就会占用愈来愈多的内存。

为了排除这种错误,你可使用rabbitmqctl命令,输出messages_unacknowledged字段:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

消息持久化

若是你没有特地告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失全部队列和消息。为了确保信息不会丢失,有两个事情是须要注意的:咱们必须把“队列”和“消息”设为持久化。

首先,为了避免让队列消失,须要把队列声明为持久化(durable):

channel.queue_declare(queue='hello', durable=True)

尽管这行代码自己是正确的,可是仍然不会正确运行。由于咱们已经定义过一个叫hello的非持久化队列。RabbitMq不容许你使用不一样的参数从新定义一个队列,它会返回一个错误。但咱们如今使用一个快捷的解决方法——用不一样的名字,例如task_queue。

channel.queue_declare(queue='task_queue', durable=True)

这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改。

这时候,咱们就能够确保在RabbitMq重启以后queue_declare队列不会丢失。另外,咱们须要把咱们的消息也要设为持久化——将delivery_mode的属性设为2。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

将消息设为持久化并不能彻底保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间仍是有一个很小的间隔时间。由于RabbitMq并非全部的消息都使用fsync(2)——它有可能只是保存到缓存中,并不必定会写到硬盘中。并不能保证真正的持久化,但已经足够应付咱们的简单工做队列。若是你必定要保证持久化,你须要改写你的代码来支持事务(transaction)。

公平调度

你应该已经发现,它仍旧没有按照咱们指望的那样进行分发。好比有两个工做者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。

这时由于RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有做出响应。它盲目的把第n-th条消息发给第n-th个消费者。

咱们可使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工做者(worker),直到它已经处理了上一条消息而且做出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工做者(worker)。

channel.basic_qos(prefetch_count=1)

整合代码

new_task.py的完整代码:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()
View Code

咱们的worker:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.decode("utf-8").count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()
View Code

 

发布/订阅

在上篇教程中,咱们搭建了一个工做队列,每一个任务只分发给一个工做者(worker)。在本篇教程中,咱们要作的跟以前彻底不同 —— 分发一个消息给多个消费者(consumers)。这种模式被称为“发布/订阅”。

为了描述这种模式,咱们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。

在咱们的这个日志系统中,全部正在运行的接收方程序都会接受消息。咱们用其中一个接收者(receiver)把日志写入硬盘中,另一个接受者(receiver)把日志输出到屏幕上。

最终,日志消息被广播给全部的接受者(receivers)。

交换机(Exchanges)

前面的教程中,咱们发送消息到队列并从中取出消息。如今是时候介绍RabbitMQ中完整的消息模型了。

让咱们简单的归纳一下以前的教程:

  • 发布者(producer)是发布消息的应用程序。
  • 队列(queue)用于消息存储的缓冲。
  • 消费者(consumer)是接收消息的应用程序。

RabbitMQ消息模型的核心理念是:发布者不会直接发送任何消息给队列。事实上,发布者甚至不知道消息是否已经被投递到队列。

发布者只须要把消息发送给一个交换机。交换机很是简单,它一边从发布者接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列仍是多个丢列,或则是直接忽略信息。这些规则是经过交换机类型来定义的。

有几个可供选择的交换机类型:直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)。咱们在这里主要说明最后一个 —— 扇型交换机(fanout)。先建立一个fanout类型的交换机,命名为logs:

channel.exchange_declare(exchange='logs',
                         type='fanout')

扇型交换机(fanout)很简单,你可能从名字上就能猜想出来,它把消息发送给它所知道的全部队列。这正是咱们的日志系统所须要的。

交换器列表

rabbitmqctl可以列出服务器上全部的交换器:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

这个列表中有一些叫作amq.*的交换器。这些都是默认建立的,不过这时候你还不须要使用他们。

匿名的交换器

前面的教程中咱们对交换机一无所知,但仍然可以发送消息到队列中。由于咱们使用了命名为空字符串("")默认的交换机。

回想咱们以前是如何发布一则消息:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

exchange参数就是交换机的名称。空字符串表明默认或者匿名交换机:消息将会根据指定的routing_key分发到指定的队列。

如今,咱们就能够发送消息到一个具名交换机了:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

临时队列

你还记得以前咱们使用的队列名吗( hello和task_queue)?给一个队列命名是很重要的——咱们须要把工做者(workers)指向正确的队列。若是你打算在发布者(producers)和消费者(consumers)之间共享同队列的话,给队列命名是十分重要的。

可是这并不适用于咱们的日志系统。咱们打算接收全部的日志消息,而不只仅是一小部分。咱们关心的是最新的消息而不是旧的。为了解决这个问题,咱们须要作两件事情。

首先,当咱们链接上RabbitMQ的时候,咱们须要一个全新的、空的队列。咱们能够手动建立一个随机的队列名,或者让服务器为咱们选择一个随机的队列名(推荐)。咱们只须要在调用queue_declare方法的时候,不提供queue参数就能够了:

result = channel.queue_declare()

这时候咱们能够经过result.method.queue得到已经生成的随机队列名。它多是这样子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。

第二步,当与消费者(consumer)断开链接的时候,这个队列应当被当即删除。exclusive标识符便可达到此目的。

result = channel.queue_declare(exclusive=True)

绑定(Bindings)

咱们已经建立了一个扇型交换机(fanout)和一个队列。如今咱们须要告诉交换机如何发送消息给咱们的队列。交换器和队列之间的联系咱们称之为绑定(binding)。

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

如今,logs交换机将会把消息添加到咱们的队列中。

 绑定(binding)列表

你可使用rabbitmqctl list_bindings 列出全部现存的绑定。

代码整合

 

发布日志消息的程序看起来和以前的没有太大区别。最重要的改变就是咱们把消息发送给logs交换机而不是匿名交换机。在发送的时候咱们须要提供routing_key参数,可是它的值会被扇型交换机(fanout exchange)忽略。如下是emit_log.py脚本:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.233"))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type="fanout")

message = " ".join(sys.argv[1:]) or "info: hello world!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % (message,))
connection.close()

正如你看到的那样,在链接成功以后,咱们声明了一个交换器,这一个是很重要的,由于不容许发布消息到不存在的交换器。

若是没有绑定队列到交换器,消息将会丢失。但这个没有所谓,若是没有消费者监听,那么消息就会被忽略。

receive_logs.py的代码:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.233"))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type="fanout")

message = " ".join(sys.argv[1:]) or "info: hello world!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % (message,))
connection.close()
View Code

正如你看到的那样,在链接成功以后,咱们声明了一个交换器,这一个是很重要的,由于不容许发布消息到不存在的交换器。

若是没有绑定队列到交换器,消息将会丢失。但这个没有所谓,若是没有消费者监听,那么消息就会被忽略。

receive_logs.py的代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.233"))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type="fanout")

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % (body,))

channel.basic_consume(auto_ack=True,
                      queue=queue_name,
                      on_message_callback=callback)

channel.start_consuming()
View Code

这样咱们就完成了。若是你想把日志保存到文件中,只须要打开控制台输入:

$ python receive_logs.py > logs_from_rabbit.log

若是你想在屏幕中查看日志,那么打开一个新的终端而后运行:

$ python receive_logs.py

固然还要发送日志:

$ python emit_log.py

使用rabbitmqctl list_bindings你可确认已经建立的队列绑定。你能够看到运行中的两个receive_logs.py程序:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
 ...
logs    amq.gen-TJWkez28YpImbWdRKMa8sg==                []
logs    amq.gen-x0kymA4yPzAT6BoC/YP+zw==                []
...done.

 

路由(Routing)

在前面的教程中,咱们实现了一个简单的日志系统。能够把日志消息广播给多个接收者。

本篇教程中咱们打算新增一个功能 —— 使得它可以只订阅消息的一个字集。例如,咱们只须要把严重的错误日志信息写入日志文件(存储到磁盘),但同时仍然把全部的日志信息输出到控制台中

绑定(Bindings)

前面的例子,咱们已经建立过绑定(bindings),代码以下:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

绑定是指交换机和队列的关系。能够简单理解为:这个队列对这个交换机的消息感兴趣。

绑定的时候能够带上一个额外的routing_key参数。为了不与basic_publish的参数混淆,咱们把它叫作绑定键(binding key)。如下是如何建立一个带绑定键的绑定。

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

绑定键的意义取决于交换机(exchange)的类型。咱们以前使用过的扇型交换机(fanout exchanges)会忽略这个值。

直连交换机(Direct exchange)

咱们的日志系统广播全部的消息给全部的消费者(consumers)。咱们打算扩展它,使其基于日志的严重程度进行消息过滤。例如咱们也许只是但愿将比较严重的错误(error)日志写入磁盘,以避免在警告(warning)或者信息(info)日志上浪费磁盘空间。

咱们使用的扇型交换机(fanout exchange)没有足够的灵活性 —— 它能作的仅仅是广播。

咱们将会使用直连交换机(direct exchange)来代替。路由的算法很简单 —— 交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而肯定消息该分发到哪一个队列。

下图可以很好的描述这个场景:

在这个场景中,咱们能够看到直连交换机 X和两个队列进行了绑定。第一个队列使用orange做为绑定键,第二个队列有两个绑定,一个使用black做为绑定键,另一个使用green。

这样以来,当路由键为orange的消息发布到交换机,就会被路由到队列Q1。路由键为black或者green的消息就会路由到Q2。其余的全部消息都将会被丢弃。

发送日志

咱们将会发送消息到一个直连交换机,把日志级别做为路由键。这样接收日志的脚本就能够根据严重级别来选择它想要处理的日志。咱们先看看发送日志。

咱们须要建立一个交换机(exchange):

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

而后咱们发送一则消息:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

咱们先假设“severity”的值是info、warning、error中的一个。

订阅

处理接收消息的方式和以前差很少,只有一个例外,咱们将会为咱们感兴趣的每一个严重级别分别建立一个新的绑定。

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

代码整合

emit_log_direct.py的代码:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
View Code

receive_logs_direct.py的代码:

$ python receive_logs_direct.py warning error > logs_from_rabbit.log

若是你但愿全部的日志信息都输出到屏幕中,打开一个新的终端,而后输入:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
View Code

若是你但愿只是保存warning和error级别的日志到磁盘,只须要打开控制台并输入

$ python receive_logs_direct.py info warning error
 [*] Waiting for logs. To exit press CTRL+C

若是要触发一个error级别的日志,只须要输入:

$ python emit_log_direct.py error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

 

主题交换机

发送到主题交换机(topic exchange)的消息不能够携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.分隔开的词语列表。这些单词随即是什么均可以,可是最好是跟携带它们的消息有关系的词汇。如下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。词语的个数能够随意,可是不要超过255字节。

绑定键也必须拥有一样的格式。主题交换机背后的逻辑跟直连交换机很类似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。可是它的绑定键和路由键有两个特殊应用方式:

  • * (星号) 用来表示一个单词.
  • # (井号) 用来表示任意数量(零个或多个)单词。

这个例子里,咱们发送的全部消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。因此它看起来是这样的: <celerity>.<colour>.<species>

咱们建立了三个绑定:Q1的绑定键为 *.orange.*,Q2的绑定键为 *.*.rabbit 和 lazy.# 。

这三个绑定键被能够总结为:

  • Q1 对全部的桔黄色动物都感兴趣。
  • Q2 则是对全部的兔子全部懒惰的动物感兴趣。

一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant 的消息一样也会给两个队列都投递过去。另外一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即便它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。

若是咱们违反约定,发送了一个携带有一个单词或者四个单词("orange" or "quick.orange.male.rabbit")的消息时,发送的消息不会投递给任何一个队列,并且会丢失掉。

可是另外一方面,即便 "lazy.orange.male.rabbit" 有四个单词,他仍是会匹配最后一个绑定,而且被投递到第二个队列中。

主题交换机

主题交换机是很强大的,它能够表现出跟其余交换机相似的行为

当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收全部的消息。

当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

组合在一块儿

接下来咱们会将主题交换机应用到咱们的日志系统中。在开始工做前,咱们假设日志的路由键由两个单词组成,路由键看起来是这样的:<facility>.<severity>

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()
View Code

receive_logs_topic.py的代码:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
View Code

执行下边命令 接收全部日志:

python receive_logs_topic.py "#"

执行下边命令 接收来自”kern“设备的日志:

python receive_logs_topic.py "kern.*"

执行下边命令 只接收严重程度为”critical“的日志:

python receive_logs_topic.py "*.critical"

执行下边命令 创建多个绑定:

python receive_logs_topic.py "kern.*" "*.critical"

执行下边命令 发送路由键为 "kern.critical" 的日志:

python emit_log_topic.py "kern.critical" "A critical kernel error"

 

远程过程调用(RPC)

相关文章
相关标签/搜索