最近在学习项目中的通用技术,其中一个是在项目中会常用的基于RabbitMQ实现的RPC。这里一共有三个点要学习,分别是:RPC是什么?RabbitMQ是什么?如何使用RabbitMQ实现RPC。奔着这三个目标,查阅了资料。作笔记记录。python
rpc的全称叫:远程过程调用,能够通俗的理解为经过网络调用另外一台电脑上的函数的业务处理思想。首先,咱们先看看本地的函数调用流程是怎样。编程
本地调用:json
def fun(a,b): sum = a + b return sum if __name__ = __main__ print "i use a function to sum " sum_main = fun(2,3) print sum_main
本地调用当执行到sum=fun(2,3)时,程序会在内存中查找函数指针fun,而后带着参数进入fun()函数中运算,最后返回给sum_main。若是是远程调用,则是从一个电脑A上调用另外一个电脑B上的函数。缓存
RPC思想的好处是:服务器
一、更符合编程思想。想要实现什么功能直接调用相应的函数,这是编程最直接的思想。网络
二、减小代码重复率。A想实现的功能若是B中已经实现了,那么A就直接调用B的函数,避免本身再重复实现。并发
RPC调用:框架
rpc多使用http传输请求,格式有xml,json等,这里是xml。以下是使用python中自带的RPC调用框架来实现的一个最简单的RPC调用。异步
client.py模块化
from xmlrpclib import ServerProxy #导入xmlrpclib的包 s = ServerProxy("http://172.171.5.205:8080") #定义xmlrpc客户端 print s.fun_add(2,3) #调用服务器端的函数
server.py
from SimpleXMLRPCServer import SimpleXMLRPCServer def fun_add(a,b): totle = a + b return totle if __name__ == '__main__': s = SimpleXMLRPCServer(('0.0.0.0', 8080)) #开启xmlrpcserver s.register_function(fun_add) #注册函数fun_add print "server is online..." s.serve_forever() #开启循环等待
先启动服务器端
后启动客户端
这样就完成了一次RPC调用。RPC的调用流程以下图所示。调用流程是:
RabbitMQ是实现了AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的软件。主要功能是
如下摘录自知乎:
对于初学者,举一个饭店的例子来解释这三个分别是什么吧。不是百分百恰当,可是应该足以解释这三者的区别。 RPC:假设你是一个饭店里的服务员,顾客向你点菜,可是你不会作菜,因此你采集了顾客要点什么以后告诉后厨去作顾客点的菜, 这叫RPC(remote procedure call),由于厨房的厨师相对于服务员而言是另一我的(在计算机的世界里就是remote的机器上的一个进程)。 厨师作好了的菜就是RPC的返回值。 任务队列和消息队列:本质都是队列,因此就只举一个任务队列的例子。假设这个饭店在高峰期顾客不少,而厨师只有不多的几个, 因此服务员们不得不把单子按下单顺序放在厨房的桌子上,供厨师们一个一个作,这一堆单子就是任务队列(固然,取决于问题的语境, 可能要把放订单的桌子也算在里面一块儿构成所谓的任务队列平台),厨师们每作完一个菜,就从桌子上的订单里再取出一个单子继续作菜。
简单消息队列:
最简单的消息队列,生产者-消费者模式。一端产生消息,发送到队列,另外一端消费者收取消息。
consume_simple.py
1 #coding:UTF-8
2
3 import pika 4 import time 5
6 # 创建实例
7 connection = pika.BlockingConnection(pika.ConnectionParameters( 8 'localhost')) 9 # 声明管道
10 channel = connection.channel() 11
14 channel.queue_declare(queue='hello') 15
16 def callback(ch, method, properties, body):
17
18 print "ch",ch 19 print "method",method 20 print "properties",properties 21 print "body",body
25 print(" [x] Received %r" % body) 27 # 消费消息
28 channel.basic_consume(
29 callback, # 若是收到消息,就调用callback函数来处理消息
30 queue='hello', # 你要从那个队列里收消息
33 ) 34
35 print(' [*] Waiting for messages. To exit press CTRL+C') 36 channel.start_consuming() # 开始消费消息
productor_simple.py
1 #coding:UTF-8
2 import pika 3
4 # 创建一个实例
5 connection = pika.BlockingConnection( 6 pika.ConnectionParameters('localhost')
7 ) 8 # 声明一个管道,在管道里发消息
9 channel = connection.channel() 10 # 在管道里声明queue
11 channel.queue_declare(queue='hello')
13 channel.basic_publish(exchange='', 14 routing_key='hello', # queue名字
15 body='Hello World!') # 消息内容
16 print(" [x] Sent 'Hello World!'") 17 connection.close() # 队列关闭
先运行消费者
在运行生产者
观察消费者获取的消息
RPC的要求是等待得到返回值,而RabbitMQ常出现的场景是异步等待。这就要求RabbitMQ能够当即返回结果。使用了两种技术:
1、为调用指明id,要求id和结果一块儿返回,使用id来判断是哪个函数的调用返回;
2、指明返回的队列名,返回结果时指明返回队列的名字确保会正确返回到调用者。
工做流程:
#coding:UTF-8
import pika import uuid import time class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, # 只要一收到消息就调用on_response no_ack=True, queue=self.callback_queue) # 收这个queue的消息 def on_response(self, ch, method, props, body): # 必须四个参数 # 若是收到的ID和本机生成的相同,则返回的结果就是我想要的指令返回的结果 if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None # 初始self.response为None self.corr_id = str(uuid.uuid4()) # 随机惟一字符串 self.channel.basic_publish( exchange='', routing_key='rpc_queue', # 发消息到rpc_queue properties=pika.BasicProperties( # 消息持久化 reply_to = self.callback_queue, # 让服务端命令结果返回到callback_queue correlation_id = self.corr_id, # 把随机uuid同时发给服务器 ), body=str(n) ) while self.response is None: # 当没有数据,就一直循环 # 启动后,on_response函数接到消息,self.response 值就不为空了 self.connection.process_data_events() # 非阻塞版的start_consuming() # print("no msg……") # time.sleep(0.5) # 收到消息就调用on_response return int(self.response) if __name__ == '__main__': fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(7)") response = fibonacci_rpc.call(7) print(" [.] Got %r" % response)
#coding:UTF-8
import pika import time def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish( exchange='', # 把执行结果发回给客户端 routing_key=props.reply_to, # 客户端要求返回想用的queue # 返回客户端发过来的correction_id 为了让客户端验证消息一致性 properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response) ) ch.basic_ack(delivery_tag = method.delivery_tag) # 任务完成,告诉客户端 if __name__ == '__main__': connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') # 声明一个rpc_queue , channel.basic_qos(prefetch_count=1) # 在rpc_queue里收消息,收到消息就调用on_request channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()