RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。html
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通讯方法。应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。消 息传递指的是程序之间经过在消息中发送数据进行通讯,而不是经过直接调用彼此来通讯,直接调用一般是用于诸如远程过程调用的技术。排队指的是应用程序经过 队列来通讯。队列的使用除去了接收和发送应用程序同时执行的要求。python
RabbitMQ安装算法
安装配置epel源 # 64位源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm # 32位源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安装erlang(依赖包) $ yum -y install erlang 安装RabbitMQ $ yum -y install rabbitmq-server
默认端口为:5672
启动rabbitmq: service rabbitmq-server start/stopjson
安装API缓存
pip3 install pika or easy_install pika or 源码 https://pypi.python.org/pypi/pika
使用API操做RabbitMQ服务器
MQ是消费-生产者模型的一个典型的表明,一端往消息队列中不断写入消息,而另外一端则能够读取或者订阅队列中的消息。下面以消费-生产者模型为例:网络
RabbitMQ 的结构图:app
生产者-消费者模型的简单实例异步
对于RabbitMQ来讲,生产和消费再也不针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。ide
-----------------------------生产者----------------------------------- # /usr/bin/env python # -*- coding:utf8 -*- # auth rain import pika # 建立链接,链接到消息队列服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) # 建立通道 channel = connection.channel() # 建立任务队列 channel.queue_declare(queue='task_queue') # 发布消息 # exchange -- 它使咱们可以确切地指定消息应该到哪一个队列去。 # 向队列插入数值 routing_key是队列名 body 是要插入的内容 channel.basic_publish( exchange='', routing_key='task_queue', body='test rabbitMQ' ) print("[X] sent 'test rabbitMq'") # 缓冲区已经flush并且消息已经确认发送到了RabbitMQ中,关闭连接 connection.close() # [X] sent 'test rabbitMq'
----------------------------消费者----------------------------------- # /usr/bin/env python # -*- coding:utf8 -*- # auth rain import pika # 建立链接,链接到消息队列服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) # 建立通道 channel = connection.channel() # 若是生产者没有运行建立队列,那么消费者也许就找不到队列了。为了不这个问题因此消费者也建立这个队列 channel.queue_declare(queue='task_queue') # 接收消息须要使用callback这个函数来接收,他会被pika库来调用 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 从队列取数据 callback是回调函数 若是拿到数据 那么将执行callback函数 channel.basic_consume(callback, queue='task_queue', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') # 永远循环等待数据处理和callback处理的数据 channel.start_consuming() ''' [*] Waiting for messages. To exit press CTRL+C [x] Received b'test rabbitMQ' [x] Received b'test rabbitMQ' '''
import pika import sys import time # 建立链接,链接到消息队列服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.10.36.101')) # 建立通道 channel = connection.channel() # 建立任务队列 channel.queue_declare(queue='task_queue') # 发布消息 # exchange -- 它使咱们可以确切地指定消息应该到哪一个队列去。 # 向队列插入数值 routing_key是队列名 body 是要插入的内容 message = ' '.join(sys.argv[1:]) or "Hello World!" # 循环发送数据 for i in range(20): channel.basic_publish( exchange='', routing_key='task_queue', body=message ) time.sleep(0.5) print("[X] sent %s " % message) # 缓冲区已经flush并且消息已经确认发送到了RabbitMQ中,关闭连接 connection.close()
import pika import time # 建立链接,链接到消息队列服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) # 建立通道 channel = connection.channel() # 若是生产者没有运行建立队列,那么消费者也许就找不到队列了。为了不这个问题因此消费者也建立这个队列 channel.queue_declare(queue='task_queue') # 接收消息须要使用callback这个函数来接收,他会被pika库来调用 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # time.sleep(body.count('.')) # print('[x] Done') # 从队列取数据 callback是回调函数 若是拿到数据 那么将执行callback函数 channel.basic_consume(callback, queue='task_queue', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') # 永远循环等待数据处理和callback处理的数据 channel.start_consuming()
一、acknowledgment 消息不丢失
消息确认
当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)以后,立刻就会在内存中移除。这种状况,你只要把一个工做者(worker)中止,正在处理的消息就会丢失。同时,全部发送到这个工做者的尚未处理的消息都会丢失。
咱们不想丢失任何任务消息。若是一个工做者(worker)挂掉了,咱们但愿任务会从新发送给其余的工做者(worker)。
为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会经过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,而后RabbitMQ就会释放并删除这条消息。
若是消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被彻底处理,而后从新发送给其余消费者(consumer)。这样,及时工做者(workers)偶尔的挂掉,也不会丢失消息。
消息是没有超时这个概念的;当工做者与它断开连的时候,RabbitMQ会从新发送消息。这样在处理一个耗时很是长的消息任务的时候就不会出问题了。
消息响应默认是开启的。以前的例子中咱们可使用no_ack=True标识把它关闭。是时候移除这个标识了,当工做者(worker)完成了任务,就发送一个响应。
no-ack = False,若是消费者遇到状况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会从新将该任务添加到队列中。
消息响应默认是开启的。以前的例子中咱们可使用no_ack=True标识把它关闭。是时候移除这个标识了,当工做者(worker)完成了任务,就发送一个响应。
ch.basic_ack(delivery_tag=method.delivery_tag)
一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出以后就会从新发送,若是它不可以释放没响应的消息,RabbitMQ就会占用愈来愈多的内存。
为了排除这种错误,你可使用rabbitmqctl命令,输出messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
import pika import sys import time # 建立链接,链接到消息队列服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.10.36.101')) # 建立通道 channel = connection.channel() # 建立任务队列 channel.queue_declare(queue='task_queue') # 发布消息 # exchange -- 它使咱们可以确切地指定消息应该到哪一个队列去。 # 向队列插入数值 routing_key是队列名 body 是要插入的内容 message = ' '.join(sys.argv[1:]) or "Hello World!" # 循环发送数据 for i in range(20): channel.basic_publish( exchange='', routing_key='task_queue', body=message ) time.sleep(0.5) print("[X] sent %s " % message) # 缓冲区已经flush并且消息已经确认发送到了RabbitMQ中,关闭连接 connection.close()
import pika import time # 建立链接,链接到消息队列服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) # 建立通道 channel = connection.channel() # 若是生产者没有运行建立队列,那么消费者也许就找不到队列了。为了不这个问题因此消费者也建立这个队列 channel.queue_declare(queue='task_queue') # 接收消息须要使用callback这个函数来接收,他会被pika库来调用 def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.decode().count('...')) print(" [x] Done") # 消息不丢失 ch.basic_ack(delivery_tag=method.delivery_tag) # 从队列取数据 callback是回调函数 若是拿到数据 那么将执行callback函数 channel.basic_consume(callback, queue='task_queue', ) print(' [*] Waiting for messages. To exit press CTRL+C') # 永远循环等待数据处理和callback处理的数据 channel.start_consuming()
二、消息持久化
若是你没有特地告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失全部队列和消息。为了确保信息不会丢失,有两个事情是须要注意的:咱们必须把“队列”和“消息”设为持久化。
首先,为了避免让队列消失,须要把队列声明为持久化(durable):
channel.queue_declare(queue='hello', durable=True)
这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改。
这时候,咱们就能够确保在RabbitMq重启以后queue_declare队列不会丢失。另外,咱们须要把咱们的消息也要设为持久化——将delivery_mode的属性设为2。
# 生产者端 channel.basic_publish(exchange='', routing_key='task_queue', properties=pika.BasicProperties( delivery_mode=2, # 即便服务端挂了, 消息也能持久化 )) # 消费者端 def callback(ch, method, properties, body): print(body) time.sleep(body.count()) ch.basic_ack(delivery_tag=method.delivery_tag)
注意:消息持久化
将消息设为持久化并不能彻底保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间仍是有一个很小的间隔时间。由于RabbitMq并非全部的消息都使用fsync(2)——它有可能只是保存到缓存中,并不必定会写到硬盘中。并不能保证真正的持久化,但已经足够应付咱们的简单工做队列。若是你必定要保证持久化,你须要改写你的代码来支持事务(transaction)。
三、公平调度
你应该已经发现,它仍旧没有按照咱们指望的那样进行分发。好比有两个工做者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。
这时由于RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有做出响应。它盲目的把第n-th条消息发给第n-th个消费者
咱们可使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工做者(worker),直到它已经处理了上一条消息而且做出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工做者(worker)。
channel.basic_qos(prefetch_count=1)
注意:关于队列大小
若是全部的工做者都处理繁忙状态,你的队列就会被填满。你须要留意这个问题,要么添加更多的工做者(workers),要么使用其余策略。
----------------------------------生产者-------------------------------------------- #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 队列持久化 durable=True channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # 消息持久化 )) print " [x] Sent %r" % (message,) connection.close()
----------------------------------消费者--------------------------------------------
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 队列持久化 durable=True channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
四、发布/订阅
分发一个消息给多个消费者(consumers)。这种模式被称为“发布/订阅”。
为了描述这种模式,咱们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。
在咱们的这个日志系统中,全部正在运行的接收方程序都会接受消息。咱们用其中一个接收者(receiver)把日志写入硬盘中,另一个接受者(receiver)把日志输出到屏幕上。
最终,日志消息被广播给全部的接受者(receivers)。
交换机(Exchanges)
RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只须要把消息发送给一个交换机(exchange)。交换机很是简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列仍是是多个队列,或者是直接忽略消息。这些规则是经过交换机类型(exchange type)来定义的。
交换机类型:
直连交换机(direct),
主题交换机(topic),
头交换机 (headers),
扇型交换机(fanout)。
channel.exchange_declare(exchange='logs', type='fanout')
扇型交换机(fanout)很简单,你可能从名字上就能猜想出来,它把消息发送给它所知道的全部队列。这正是咱们的日志系统所须要的。
匿名的交换器
前面的教程中咱们对交换机一无所知,但仍然可以发送消息到队列中。由于咱们使用了命名为空字符串("")默认的交换机。
回想咱们以前是如何发布一则消息:
channel.basic_publish(exchange='', routing_key='hello', body=message)exchange参数就是交换机的名称。空字符串表明默认或者匿名交换机:消息将会根据指定的routing_key分发到指定的队列。
如今,咱们就能够发送消息到一个具名交换机了:
channel.basic_publish(exchange='logs', routing_key='', body=message)
临时队列
第一步, 当咱们链接上RabbitMQ的时候,咱们须要一个全新的、空的队列。咱们能够手动建立一个随机的队列名,或者让服务器为咱们选择一个随机的队列名(推荐)。咱们只须要在调用queue_declare方法的时候,不提供queue参数就能够了:
result = channel.queue_declare()
这时候咱们能够经过result.method.queue得到已经生成的随机队列名。它多是这样子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
第二步,当与消费者(consumer)断开链接的时候,这个队列应当被当即删除。exclusive标识符便可达到此目的。
result = channel.queue_declare(exclusive=True)
绑定(Bindings)
咱们已经建立了一个扇型交换机(fanout)和一个队列。如今咱们须要告诉交换机如何发送消息给咱们的队列。交换器和队列之间的联系咱们称之为绑定(binding)。
channel.queue_bind(exchange='logs', queue=result.method.queue)
如今,logs交换机将会把消息添加到咱们的队列中。
绑定(binding)列表
你可使用
rabbitmqctl list_bindings
列出全部现存的绑定。
一个发送日志的实例
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() # 指定exchange,其类型为fanout channel.exchange_declare(exchange='logs', type='fanout') message = ''.join(sys.argv[1:] or 'hello world') # 发送给指定的exchange channel.publish( exchange='logs', routing_key='', body='message' ) print(" [x] Sent %r" % (message,)) connection.close()
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() # 指定exchange,其类型为fanout channel.exchange_declare(exchange='logs', type='fanout') # 建立临时(queue)队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 将queue绑定到指定的exchange上 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % (body,)) channel.basic_consume(callback, queue=queue_name, no_ack=True ) channel.start_consuming()
绑定(binding)是指交换机(exchange)和队列(queue)的关系。能够简单理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣。
绑定的时候能够带上一个额外的routing_key参数。为了不与basic_publish的参数混淆,咱们把它叫作绑定键(binding key)。如下是如何建立一个带绑定键的绑定。
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
绑定键的意义取决于交换机(exchange)的类型。咱们以前使用过的扇型交换机(fanout exchanges)会忽略这个值。
应用场景:
咱们的日志系统广播全部的消息给全部的消费者(consumers)。咱们打算扩展它,使其基于日志的严重程度进行消息过滤。例如咱们也许只是但愿将比较严重的错误(error)日志写入磁盘,以避免在警告(warning)或者信息(info)日志上浪费磁盘空间。
咱们使用的扇型交换机(fanout exchange)没有足够的灵活性 —— 它能作的仅仅是广播。
咱们将会使用直连交换机(direct exchange)来代替。路由的算法很简单 —— 交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而肯定消息该分发到哪一个队列。
下图可以很好的描述这个场景:
在这个场景中,咱们能够看到直连交换机 X和两个队列进行了绑定。第一个队列使用orange做为绑定键,第二个队列有两个绑定,一个使用black做为绑定键,另一个使用green。
这样以来,当路由键为orange的消息发布到交换机,就会被路由到队列Q1。路由键为black或者green的消息就会路由到Q2。其余的全部消息都将会被丢弃。
多个绑定(Multiple bindings)
多个队列使用相同的绑定键是合法的。这个例子中,咱们能够添加一个X和Q1之间的绑定,使用black绑定键。这样一来,直连交换机就和扇型交换机的行为同样,会将消息广播到全部匹配的队列。带有black路由键的消息会同时发送到Q1和Q2。
发送日志
咱们将会发送消息到一个直连交换机,把日志级别做为路由键。这样接收日志的脚本就能够根据严重级别来选择它想要处理的日志。咱们先看看发送日志。
severity = ['info','warning','error']
咱们须要建立一个交换机(exchange): channel.exchange_declare(exchange='direct_logs', type='direct') 而后咱们发送一则消息: channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) 咱们先假设“severity”的值是info、warning、error中的一个。
订阅 处理接收消息的方式和以前差很少,只有一个例外,咱们将会为咱们感兴趣的每一个严重级别分别建立一个新的绑定。 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
1.生产者:
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
2.消费者:
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
发送到主题交换机(topic exchange)的消息不能够携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.
分隔开的词语列表。这些单词随即是什么均可以,可是最好是跟携带它们的消息有关系的词汇。如下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。词语的个数能够随意,可是不要超过255字节。
绑定键也必须拥有一样的格式。主题交换机背后的逻辑跟直连交换机很类似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。可是它的绑定键和路由键有两个特殊应用方式:
*
(星号) 用来表示一个单词.#
(井号) 用来表示任意数量(零个或多个)单词。
这个例子里,咱们发送的全部消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个 . 分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。因此它看起来是这样的: <celerity>.<colour>.<species> 。 咱们建立了三个绑定:Q1的绑定键为 *.orange.* ,Q2的绑定键为 *.*.rabbit 和 lazy.# 。 这三个绑定键被能够总结为: •Q1 对全部的桔黄色动物都感兴趣。 •Q2 则是对全部的兔子和全部懒惰的动物感兴趣。 一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant 的消息一样也会给两个队列都投递过去。另外一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即便它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。 若是咱们违反约定,发送了一个携带有一个单词或者四个单词( "orange" or "quick.orange.male.rabbit" )的消息时,发送的消息不会投递给任何一个队列,并且会丢失掉。 可是另外一方面,即便 "lazy.orange.male.rabbit" 有四个单词,他仍是会匹配最后一个绑定,而且被投递到第二个队列中。
主题交换机 主题交换机是很强大的,它能够表现出跟其余交换机相似的行为 当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收全部的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body,)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
执行下边命令 接收全部日志: python receive_logs_topic.py "#" 执行下边命令 接收来自”kern“设备的日志: python receive_logs_topic.py "kern.*" 执行下边命令 只接收严重程度为”critical“的日志: python receive_logs_topic.py "*.critical" 执行下边命令 创建多个绑定: python receive_logs_topic.py "kern.*" "*.critical" 执行下边命令 发送路由键为 "kern.critical" 的日志: python emit_log_topic.py "kern.critical" "A critical kernel error" 执行上边命令试试看效果吧。另外,上边代码不会对路由键和绑定键作任何假设,因此你能够在命令中使用超过两个路由键参数。
远程过程调用(RPC Remote Procedure Call)
使用RabbitMQ来构建一个RPC系统:包含一个客户端和一个RPC服务器。
AMQP协议给消息预约义了一系列的14个属性。大多数属性不多会用到,除了如下几个:
关联标识
咱们建议给每个RPC请求新建一个回调队列。这不是一个高效的作法,幸亏这儿有一个更好的办法 —— 咱们能够为每一个客户端只创建一个独立的回调队列。
这就带来一个新问题,当此队列接收到一个响应的时候它没法辨别出这个响应是属于哪一个请求的。correlation_id 就是为了解决这个问题而来的。咱们给每一个请求设置一个独一无二的值。稍后,当咱们从回调队列中接收到一个消息的时候,咱们就能够查看这条属性从而将响应和请求匹配起来。若是咱们接手到的消息的correlation_id是未知的,那就直接销毁掉它,由于它不属于咱们的任何一条请求。
你也许会问,为何咱们接收到未知消息的时候不抛出一个错误,而是要将它忽略掉?这是为了解决服务器端有可能发生的竞争状况。尽管可能性不大,但RPC服务器仍是有可能在已将应答发送给咱们但还未将确认消息发送给请求的状况下死掉。若是这种状况发生,RPC在重启后会从新处理请求。这就是为何咱们必须在客户端优雅的处理重复响应,同时RPC也须要尽量保持幂等性。
RPC工做流程:
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print("[.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id,), body=str(response) ) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika import uuid class FibonacciRpcClient: def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id,), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % (response,))
此处呈现的设计并非实现RPC服务的惟一方式,可是他有一些重要的优点:
咱们的代码依旧很是简单,并且没有试图去解决一些复杂(可是重要)的问题,如:
rabbitMQ中文文档: http://rabbitmq.mr-ping.com/tutorials_with_python/[1]Hello_World.html