源码:https://github.com/ltoddy/rabbitmq-tutorialpython
(using the Pika Python client)git
在第二篇教程中,咱们学习了如何使用工做队列在多个工做人员之间分配耗时的任务。程序员
可是若是咱们须要在远程计算机上运行某个功能并等待结果呢?那么,这是一个不一样的事情。
这种模式一般称为远程过程调用(RPC)。github
在本教程中,咱们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。
因为咱们没有任何值得分发的耗时任务,咱们将建立一个返回斐波那契数字的虚拟RPC服务。json
为了说明如何使用RPC服务,咱们将建立一个简单的客户端类。它将公开一个名为call的方法 ,
它发送一个RPC请求并阻塞,直到收到答案:服务器
fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print("fib(4) is %r" % result)
*有关RPC的说明* 虽然RPC是计算中很常见的模式,但它常常被吹毛求疵。当程序员不知道函数调用是本地的仍是 慢速的RPC时会出现这些问题。像这样的混乱致使不可预知的问题,并增长了调试的没必要要的复杂性, 而不是咱们想要的简化软件。 铭记这一点,请考虑如下建议: * 确保显而易见哪一个函数调用是本地的,哪一个是远程的。 * 记录您的系统。清楚组件之间的依赖关系。 * 处理错误状况。当RPC服务器长时间关闭时,客户端应该如何反应? 有疑问时避免RPC。若是能够的话,你应该使用异步管道 - 而不是相似于RPC的阻塞, 其结果被异步推送到下一个计算阶段。
通常来讲,经过RabbitMQ来执行RPC是很容易的。客户端发送请求消息,服务器回复响应消息。
为了接收响应,客户端须要发送一个“回调”队列地址和请求。让咱们试试看:app
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)
消息属性 AMQP 0-9-1协议预约义了一组包含14个属性的消息。大多数属性不多使用,但如下状况除外: delivery_mode:将消息标记为持久(值为2)或瞬态(任何其余值)。你可能会记得第二篇教程中的这个属性。 content_type:用于描述编码的MIME类型。例如,对于常用的JSON编码,将此属性设置为application/json是一种很好的作法。 reply_to:一般用于命名回调队列。 correlation_id:用于将RPC响应与请求关联起来。
在上面介绍的方法中,咱们建议为每一个RPC请求建立一个回调队列。这是很是低效的,
但幸运的是有一个更好的方法 - 让咱们为每一个客户端建立一个回调队列。异步
这引起了一个新问题,在该队列中收到回复后,不清楚回复属于哪一个请求。那是何时使用correlation_id属性。
咱们将把它设置为每一个请求的惟一值。稍后,当咱们在回调队列中收到消息时,咱们将查看此属性,
并基于此属性,咱们将可以将响应与请求进行匹配。若是咱们看到未知的correlation_id值,
咱们能够放心地丢弃该消息 - 它不属于咱们的请求。函数
您可能会问,为何咱们应该忽略回调队列中的未知消息,而不是抛出错误?
这是因为服务器端可能出现竞争情况。虽然不太可能,但在发送给咱们答案以后,但在发送请求的确认消息以前,
RPC服务器可能会死亡。若是发生这种状况,从新启动的RPC服务器将再次处理该请求。
这就是为何在客户端,咱们必须优雅地处理重复的响应,理想状况下RPC应该是等幂的。学习
咱们的RPC会像这样工做:
rpc_server.py的代码:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_size=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
服务器代码很是简单:
rpc_client.py的代码:
#!/usr/bin/env python import pika import uuid class FibonacciRpcClient: def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True) def on_response(self, ch, method, props, body): if self.corr_id == props.corrrelation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
客户端代码稍有涉及: