rabbitmq官方文档:http://www.rabbitmq.com/tutorials/tutorial-two-python.htmlhtml
rabbitmq中文文档:http://rabbitmq.mr-ping.com/tutorials_with_python/[2]Work_Queues.htmlpython
RabbitMQ是一款基于AMQP协议的消息中间件,它可以在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。并且使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。并且两端可使用不一样的语言编写,大大提供了灵活性。简单来讲,RabbitMQ就是一个消息代理 - 一个消息系统的媒介。它能够为你的应用提供一个通用的消息发送和接收平台,而且保证消息在传输过程当中的安全。git
在项目中,将一些无需即时返回且耗时的操做提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提升了系统的吞吐量。程序员
RabbitMQ库github
RabbitMQ使用的是AMQP协议。要使用她你就必须须要一个使用一样协议的库。几乎全部的编程语言都有可选择的库。python也是同样,能够从如下几个库中选择:算法
pip install pika
大体的设计是这样的:json
生产者(producer)把消息发送到一个名为“hello”的队列中。消费者(consumer)从这个队列中获取消息。windows
# #!/usr/bin/env/ python # # -*-coding:utf-8 -*- import pika #创建一个到RabbitMQ服务器的链接。 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost')) #建立一个通道 channel=connection.channel() # 声明一个名为hello的队列 channel.queue_declare(queue='hello') #n RabbitMQ a message can never sent directly to the queue,it always need to #go through exchange channel.basic_publish(exchange='', routing_key='hello',#要发送到的queue名字 body='Hello world') print("[x] Sent 'Hell World'") #退出程序前,经过关闭链接保证消息已经投递到RabbitMq connection.close()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 声明一个管道 channel = connection.channel() #you may ask why declare the queue again - we have already declared in our previous code.We cloud avoid that if we were sure that the queue already exists .For example if send.py program was run before .But we're not yet sure which program to run first(send or receive are possible) .In such cases it's a good practice to repeat declaring the queue in both programs # channel.queue_declare(queue='hello') channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(ch, method, properties) print(" [x] Received %r" %body) channel.basic_consume( callback,#若是收到消息就调用callback函数处理消息 queue='hello',#queue名 no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
Work Queues(工做队列)(又称:任务队列——Task Queues)是为了不等待一些占用大量资源、时间的操做。当咱们把任务(Task)看成消息发送到队列中,一个运行在后台的工做者(worker)进程就会取出任务而后处理。当你运行多个工做者(workers),任务就会在它们之间共享。缓存
这个概念在网络应用中是很是有用的,它能够在短暂的HTTP请求中处理一些复杂的任务。
默认来讲,RabbitMQ会按顺序得把消息发送给每一个消费者(consumer)。平均每一个消费者都会收到同等数量得消息。这种发送消息得方式叫作——轮询(round-robin)
在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差很少,使用工做队列的一个好处就是它可以并行的处理队列。若是堆积了不少任务,咱们只须要添加更多的工做者(workers)就能够了,扩展很简单。
# #!/usr/bin/env/ python # # -*-coding:utf-8 -*- import pika import sys #创建一个到RabbitMQ服务器的链接。 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost')) #建立一个通道 channel=connection.channel() # 声明一个名为hello的队列 channel.queue_declare(queue='hello') #n RabbitMQ a message can never sent directly to the queue,it always need to #go through exchange message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print(" [x] Sent %r" % (message,)) connection.close()
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 声明一个管道 channel = connection.channel() #声明一个队列 channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print (" [x] Received %r" % (body,)) time.sleep( body.count('.'.encode('utf-8')) ) print(" [x] Done") channel.basic_consume( callback,#若是收到消息就调用callback函数处理消息 queue='hello',#queue名 no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)以后,立刻就会在内存中移除。这种状况,你只要把一个工做者(worker)中止,正在处理的消息就会丢失。同时,全部发送到这个工做者的尚未处理的消息都会丢失。
咱们不想丢失任何任务消息。若是一个工做者(worker)挂掉了,咱们但愿任务会从新发送给其余的工做者(worker)。
为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会经过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,而后RabbitMQ就会释放并删除这条消息。
若是消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被彻底处理,而后从新发送给其余消费者(consumer)。这样,及时工做者(workers)偶尔的挂掉,也不会丢失消息。
消息是没有超时这个概念的;当工做者与它断开连的时候,RabbitMQ会从新发送消息。这样在处理一个耗时很是长的消息任务的时候就不会出问题了。
消息响应默认是开启的。以前的例子中咱们可使用no_ack=True(True表示不管这条消息发完与否都不会给服务端发消息)标识把它关闭。是时候移除这个标识了。当工做者(worker)完成了任务,就发送一个响应。
# #!/usr/bin/env/ python # # -*-coding:utf-8 -*- import pika import sys #创建一个到RabbitMQ服务器的链接。 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost')) #建立一个通道 channel=connection.channel() # 声明一个名为hello的队列 channel.queue_declare(queue='hello') #n RabbitMQ a message can never sent directly to the queue,it always need to #go through exchange message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print(" [x] Sent %r" % (message,)) connection.close()
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 声明一个管道 channel = connection.channel() #声明一个队列 channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print (" [x] Received %r" % (body,)) time.sleep( body.count('.'.encode('utf-8')) ) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag)#当工做者(worker)完成了任务,就发送一个响应。 channel.basic_consume( callback,#若是收到消息就调用callback函数处理消息 queue='hello',#queue名 ) print(' [*] Waiting for messages. To exit press CTRL+C')
忘记确认
一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出以后就会从新发送,若是它不可以释放没响应的消息,RabbitMQ就会占用愈来愈多的内存。
为了排除这种错误,你可使用rabbitmqctl命令,输出messages_unacknowledged字段:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
若是windows环境下的话,去掉sudo
若是你没有特地告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失全部队列和消息。为了确保信息不会丢失,有两个事情是须要注意的:咱们必须把“队列”和“消息”设为持久化。
首先,为了避免让队列消失,须要把队列声明为持久化(durable):
channel.queue_declare(queue='hello', durable=True)#durable=True队列持久化,注意:task和workers端均要写上
这时候,咱们只是确保在RabbitMq重启以后queue_declare队列不会丢失。另外,咱们须要把咱们的消息也要设为持久化——将delivery_mode的属性设为2。
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
注意:消息持久化将消息设为持久化并不能彻底保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间仍是有一个很小的间隔时间。由于RabbitMq并非全部的消息都使用fsync(2)——它有可能只是保存到缓存中,并不必定会写到硬盘中。并不能保证真正的持久化,但已经足够应付咱们的简单工做队列。若是你必定要保证持久化,你须要改写你的代码来支持事务(transaction)。
你应该已经发现,它仍旧没有按照咱们指望的那样进行分发。好比有两个工做者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。
这时由于RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有做出响应。它盲目的把第n-th条消息发给第n-th个消费者。那么问题就来了,Rabbit这样只按顺序把消息发到各个消费者身上,不考虑消费者负载的话,极可能出现,一个机器配置不高的消费者那里堆积了不少消息处理不完,同时配置高的消费者却一直很轻松
咱们可使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工做者(worker),直到它已经处理了上一条消息而且做出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工做者(worker)。
channel.basic_qos(prefetch_count=1)
关于队列大小
不过,这样有可能会出现这样的问题:若是全部的工做者都处理繁忙状态,你的队列就会被填满。你须要留意这个问题,要么添加更多的工做者(workers),要么使用其余策略。
带消息持久化+公平调度的完整代码:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
前面,咱们搭建了一个工做队列,每一个任务只分发给一个工做者(worker)。但如今咱们要作的跟以前彻底不同 —— 分发一个消息给多个消费者(consumers)。这种模式被称为“发布/订阅”。
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给全部的订阅者,而消息队列中的数据被消费一次便消失。因此,RabbitMQ实现发布和订阅时,会为每个订阅者建立一个队列,而发布者发布消息时,会将消息放置在全部相关队列中。
为了描述这种模式,咱们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。
在咱们的这个日志系统中,全部正在运行的接收方程序都会接受消息。咱们用其中一个接收者(receiver)把日志写入硬盘中,另一个接受者(receiver)把日志输出到屏幕上。
最终,日志消息被广播给全部的接受者(receivers)。
要到达这种效果,这时候exchanges就派上用场了
RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只须要把消息发送给一个交换机(exchange)。交换机很是简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列仍是是多个队列,或者是直接忽略消息。这些规则是经过交换机类型(exchange type)来定义的。
有几个可供选择的交换机类型:
direct(直连交换机):经过routingKey和exchange决定的那个惟一的queue能够接收消息
topic(主题交换机):全部符合routingKey(此时能够是一个表达式)的routingKey所bind的queue能够接收消息
表达式符号说明:#表明一个或多个字符,*表明任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候至关于使用fanout
headers(头交换机):经过headers 来决定把消息发给哪些queue
fanout(扇形交换机):全部bind到此exchange的queue均可以接收消息
咱们在这里主要说明最后一个 —— 扇型交换机(fanout)。先建立一个fanout类型的交换机,命名为logs:
channel.exchange_declare(exchange='logs', type='fanout')
扇型交换机(fanout)很简单,你可能从名字上就能猜想出来,它把消息发送给它所知道的全部队列。这正是咱们的日志系统所须要的。
交换器列表:
#rabbitmqctl可以列出服务器上全部的交换器: $ sudo rabbitmqctl list_exchanges Listing exchanges ... logs fanout amq.direct direct amq.topic topic amq.fanout fanout amq.headers headers ...done. #这个列表中有一些叫作amq.*的交换器。这些都是默认建立的,不过这时候你还不须要使用他们。
匿名的交换器:
前面,咱们对交换机一无所知,但仍然可以发送消息到队列中。由于咱们使用了命名为空字符串("")默认的交换机。
回想咱们以前是这样发布一则消息:
channel.basic_publish(exchange='', routing_key='hello', body=message) #exchange参数就是交换机的名称。空字符串表明默认或者匿名交换机:消息将会根据指定的routing_key分发到指定的队列。
如今,咱们就能够发送消息到一个具名交换机了:
channel.basic_publish(exchange='logs', routing_key='',#广播,故不用指定queue名 body=message)
你还记得以前咱们使用的队列名吗( hello和task_queue)?给一个队列命名是很重要的——咱们须要把工做者(workers)指向正确的队列。若是你打算在发布者(producers)和消费者(consumers)之间共享同队列的话,给队列命名是十分重要的。
可是这并不适用于咱们的日志系统。咱们打算接收全部的日志消息,而不只仅是一小部分。咱们关心的是最新的消息而不是旧的。为了解决这个问题,咱们须要作两件事情。
首先,当咱们链接上RabbitMQ的时候,咱们须要一个全新的、空的队列。咱们能够手动建立一个随机的队列名,或者让服务器为咱们选择一个随机的队列名(推荐)。咱们只须要在调用queue_declare方法的时候,不提供queue参数就能够了:
result = channel.queue_declare()
这时候咱们能够经过result.method.queue得到已经生成的随机队列名。它多是这样子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
第二步,当与消费者(consumer)断开链接的时候,这个队列应当被当即删除。exclusive标识符便可达到此目的。
注意:由于广播是即时连通的,消费者是没法接收在其开启前生产者所发送的消息的,这就跟收音机的原理相似。
result = channel.queue_declare(exclusive=True)#exclusive排他的,惟一的,不指定queue名,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name=result.method.queue #得到已经生成的随机队列名
咱们已经建立了一个扇型交换机(fanout)和一个队列。这时候咱们须要告诉交换机如何发送消息给咱们的队列。交换器和队列之间的联系咱们称之为绑定(binding)。
channel.queue_bind(exchange='logs', queue=result.method.queue)
如今,logs交换机将会把消息添加到咱们的队列中
rabbitmqctl list_bindings #列出全部现存的绑定。
代码整合:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 断定应该将数据发送至指定队列。
在前面中,咱们使用的扇型交换机(fanout exchange)没有足够的灵活性 —— 它能作的仅仅是广播。这时候咱们将会使用直连交换机(direct exchange)来代替。路由的算法很简单 —— 交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而肯定消息该分发到哪一个队列。
在这个场景中,咱们能够看到直连交换机 X和两个队列进行了绑定。第一个队列使用error做为绑定键,第二个队列有三个绑定,一个使用info做为绑定键,第二个使用error,第三个使用warning。
这样一来,当路由键为error的消息发布到交换机,就会被路由到队列Q1和Q2。路由键为info或者warning的消息就会路由到Q2。其余的全部消息都将会被丢弃。
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
尽管直连交换机可以改善咱们的系统,可是它也有它的限制 —— 没办法基于多个标准执行路由操做。
发送到主题交换机(topic exchange)的消息不能够携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.
分隔开的词语列表。这些单词随即是什么均可以,可是最好是跟携带它们的消息有关系的词汇。如下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。词语的个数能够随意,可是不要超过255字节。
绑定键也必须拥有一样的格式。主题交换机背后的逻辑跟直连交换机很类似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。可是它的绑定键和路由键有两个特殊应用方式:
*
(星号) 用来表示一个单词.#
(井号) 用来表示任意数量(零个或多个)单词。下边用图说明:
这个例子里,咱们发送的全部消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.
分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。因此它看起来是这样的: <celerity>.<colour>.<species>
。
咱们建立了三个绑定:Q1的绑定键为 *.orange.*
,Q2的绑定键为 *.*.rabbit
和 lazy.#
。
这三个绑定键被能够总结为:
一个携带有 quick.orange.rabbit
的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant
的消息一样也会给两个队列都投递过去。另外一方面携带有 quick.orange.fox
的消息会投递给第一个队列,携带有 lazy.brown.fox
的消息会投递给第二个队列。携带有 lazy.pink.rabbit
的消息只会被投递给第二个队列一次,即便它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox
的消息不会投递给任何一个队列。
若是咱们违反约定,发送了一个携带有一个单词或者四个单词("orange"
or"quick.orange.male.rabbit"
)的消息时,发送的消息不会投递给任何一个队列,并且会丢失掉。
可是另外一方面,即便 "lazy.orange.male.rabbit"
有四个单词,他仍是会匹配最后一个绑定,而且被投递到第二个队列中。
主题交换机 主题交换机是很强大的,它能够表现出跟其余交换机相似的行为 当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收全部的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
接下来咱们会将主题交换机应用到咱们的日志系统中。在开始工做前,咱们假设日志的路由键由两个单词组成,路由键看起来是这样的:<facility>.<severity>
整合:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
#执行下边命令 接收全部日志: python receive_logs_topic.py "#" #执行下边命令 接收来自”kern“设备的日志: python receive_logs_topic.py "kern.*" #执行下边命令 只接收严重程度为”critical“的日志: python receive_logs_topic.py "*.critical" #执行下边命令 创建多个绑定: python receive_logs_topic.py "kern.*" "*.critical" #执行下边命令 发送路由键为 "kern.critical" 的日志: python emit_log_topic.py "kern.critical" "A critical kernel error" #另外,上边代码不会对路由键和绑定键作任何假设,因此你能够在命令中使用超过两个路由键参数。
RPC系统:包含一个客户端和一个RPC服务器
客户端接口
为了展现RPC服务如何使用,咱们建立了一个简单的客户端类。它会暴露出一个名为“call”的方法用来发送一个RPC请求,而且在收到回应前保持阻塞。
fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print "fib(4) is %r" % (result,)
关于RPC的注意事项:
尽管RPC在计算领域是一个经常使用模式,但它也常常被诟病。当一个问题被抛出的时候,程序员每每意识不到这究竟是由本地调用仍是由较慢的RPC调用引发的。一样的困惑还来自于系统的不可预测性和给调试工做带来的没必要要的复杂性。跟软件精简不一样的是,滥用RPC会致使不可维护的面条代码.
考虑到这一点,牢记如下建议:
确保可以明确的搞清楚哪一个函数是本地调用的,哪一个函数是远程调用的。给你的系统编写文档。保持各个组件间的依赖明确。处理错误案例。明了客户端改如何处理RPC服务器的宕机和长时间无响应状况。
当对避免使用RPC有疑问的时候。若是能够的话,你应该尽可能使用异步管道来代替RPC类的阻塞。结果被异步地推送到下一个计算场景。
通常来讲经过RabbitMQ来实现RPC是很容易的。一个客户端发送请求信息,服务器端将其应用到一个回复信息中。为了接收到回复信息,客户端须要在发送请求的时候同时发送一个回调队列(callback queue)的地址:
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request) # ... and some code to read a response message from the callback_queue ...
消息属性:
AMQP协议给消息预约义了一系列的14个属性。大多数属性不多会用到,除了如下几个:
在上面咱们给每个RPC请求新建一个回调队列。但这不是一个高效的作法,而更好的办法 —— 咱们能够为每一个客户端只创建一个独立的回调队列。可是这样的话,问题就来了,队列接收到一个响应的时候它没法辨别出这个响应是属于哪一个请求的。这时候correlation_id 就能够为咱们解决这个问题了。咱们给每一个请求设置一个独一无二的值。稍后,当咱们从回调队列中接收到一个消息的时候,咱们就能够查看这条属性从而将响应和请求匹配起来。若是咱们接手到的消息的correlation_id是未知的,那就直接销毁掉它,由于它不属于咱们的任何一条请求。
你也许会问,为何咱们接收到未知消息的时候不抛出一个错误,而是要将它忽略掉?这是为了解决服务器端有可能发生的竞争状况。尽管可能性不大,但RPC服务器仍是有可能在已将应答发送给咱们但还未将确认消息发送给请求的状况下死掉。若是这种状况发生,RPC在重启后会从新处理请求。这就是为何咱们必须在客户端优雅的处理重复响应,同时RPC也须要尽量保持幂等性。
咱们的RPC如此工做:
建立一个模拟RPC服务来返回斐波那契数列:
服务器端:
#!/usr/bin/env/ python # -*-coding:utf-8 -*- import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to,#客户端指定返回消息的queue properties=pika.BasicProperties(correlation_id= \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
客户端:
#!/usr/bin/env/ python # -*-coding:utf-8 -*- import pika import uuid #uuid 通用惟一标识符 ( Universally Unique Identifier ),经过MAC地址、时间戳、命名空间、随机数、伪随机数来保证生成ID的惟一性 class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue #声明一个consumer,用于处理请求 self.channel.basic_consume(self.on_response, #只要一收到消息,就调用on_response no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4())#uuid4()——基于随机数 self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue,#让服务器端执行完命令后,把消息返回到这个queue里 correlation_id=self.corr_id, ), body=str(n)) # 接收返回的数据 while self.response is None: self.connection.process_data_events() #非阻塞版的start_consuming(),没有消息不阻塞 return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)