上一篇关于 WSGI 的硬核长文,不知道有多少同窗,可以从头看到尾的,无论大家有没有看得很过瘾,反正我是写得很爽,总有一种将同样知识吃透了的错觉。python
今天我又给本身挖坑了,打算将 rpc 远程调用的知识,好好地梳理一下,花了周末整整两天的时间。git
什么是RPC呢?github
百度百科给出的解释是这样的:“RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种经过网络从远程计算机程序上请求服务,而不须要了解底层网络技术的协议”。这个概念听起来仍是比较抽象,不要紧,继续日后看,后面概念性的东西,我会讲得足够清楚,让你彻底掌握 RPC 的基础内容。在后面的篇章中还会结合其在 OpenStack 中实际应用,一步一步揭开 rpc 的神秘面纱。web
有的读者,可能会问,为啥我举的例子总是 OpenStack 里的东西呢?正则表达式
由于每一个人的业务中接触的框架都不同(我主要接触的就是 OpenStack 框架),我没法为每一个人去定制写一篇文章,但其技术原理都是同样的。即便如此,我也会尽力将文章写得通用,不会由于你没接触过 OpenStack 而成为你理解 rpc 的瓶颈。算法
在 OpenStack 里的进程间通讯方式主要有两种,一种是基于HTTP协议的RESTFul API方式,另外一种则是RPC调用。shell
那么这两种方式在应用场景上有何区别呢?json
有使用经验的人,就会知道:服务器
关于OpenStack中基于RESTful API的通讯方式主要是应用了WSGI,这个知识点,我在前一篇文章中,有深刻地讲解过,你能够点击查看。网络
对于不熟悉 OpenStack 的人,也别担忧听不懂,这样吧,我给你提两个问题:
第一个问题:RPC 和 REST 区别是什么?
你必定会以为这个问题很奇怪,是的,包括我,可是你在网络上一搜,会发现相似对比的文章比比皆是,我在想可能不少初学者因为基础不牢固,才会将不相干的两者拿出来对比吧。既然是这样,那为了让你更加了解陌生的RPC,就从你熟悉得不能再熟悉的 REST 入手吧。
0一、所属类别不一样
REST,是Representational State Transfer 的简写,中文描述表述性状态传递(是指某个瞬间状态的资源数据的快照,包括资源数据的内容、表述格式(XML、JSON)等信息。)
REST 是一种软件架构风格。 这种风格的典型应用,就是HTTP。其由于简单、扩展性强的特色而广受开发者的青睐。
而RPC 呢,是 Remote Procedure Call Protocol 的简写,中文描述是远程过程调用,它能够实现客户端像调用本地服务(方法)同样调用服务器的服务(方法)。
RPC 是一种基于 TCP 的通讯协议,按理说它和REST不是一个层面上的东西,不该该放在一块儿讨论,可是谁让REST这么流行呢,它是目前最流行的一套互联网应用程序的API设计标准,某种意义下,咱们说 REST 能够其实就是指代 HTTP 协议。
0二、使用方式不一样
从使用上来看,HTTP 接口只关注服务提供方,对于客户端怎么调用并不关心。接口只要保证有客户端调用时,返回对应的数据就好了。而RPC则要求客户端接口保持和服务端的一致。
0三、面向对象不一样
从设计上来看,RPC,所谓的远程过程调用 ,是面向方法的 ,REST:所谓的 Representational state transfer ,是面向资源的,除此以外,还有一种叫作 SOA,所谓的面向服务的架构,它是面向消息的,这个接触很少,就很少说了。
0四、序列化协议不一样
接口调用一般包含两个部分,序列化和通讯协议。
通讯协议,上面已经说起了,REST 是 基于 HTTP 协议,而 RPC 能够基于 TCP/UDP,也能够基于 HTTP 协议进行传输的。
常见的序列化协议,有:json、xml、hession、protobuf、thrift、text、bytes等,REST 一般使用的是 JSON或者XML,而 RPC 使用的是 JSON-RPC,或者 XML-RPC。
经过以上几点,咱们知道了 REST 和 RPC 之间有很明显的差别。
第二个问题:为何要采用RPC呢?
那到底为什么要使用 RPC,单纯的依靠RESTful API不能够吗?为何要搞这么多复杂的协议,渣渣表示真的学不过来了。
关于这一点,如下几点仅是个人我的猜测,仅供交流哈:
说了这么多,咱们该如何选择这二者呢?我总结了以下两点,供你参考:
“远程调用”意思就是:被调用方法的具体实现不在程序运行本地,而是在别的某个地方(分布到各个服务器),调用者只想要函数运算的结果,却不须要实现函数的具体细节。
0一、基于 xml-rpc
Python实现 rpc,可使用标准库里的 SimpleXMLRPCServer,它是基于XML-RPC 协议的。
有了这个模块,开启一个 rpc server,就变得至关简单了。执行如下代码:
import SimpleXMLRPCServer
class calculate:
def add(self, x, y):
return x + y
def multiply(self, x, y):
return x * y
def subtract(self, x, y):
return abs(x-y)
def divide(self, x, y):
return x/y
obj = calculate()
server = SimpleXMLRPCServer.SimpleXMLRPCServer(("localhost", 8088))
# 将实例注册给rpc server
server.register_instance(obj)
print "Listening on port 8088"
server.serve_forever()
复制代码
有了 rpc server,接下来就是 rpc client,因为咱们上面使用的是 XML-RPC,因此 rpc clinet 须要使用xmlrpclib 这个库。
import xmlrpclib
server = xmlrpclib.ServerProxy("http://localhost:8088")
复制代码
而后,咱们经过 server_proxy 对象就能够远程调用以前的rpc server的函数了。
>> server.add(2, 3)
5
>>> server.multiply(2, 3)
6
>>> server.subtract(2, 3)
1
>>> server.divide(2, 3)
0
复制代码
SimpleXMLRPCServer是一个单线程的服务器。这意味着,若是几个客户端同时发出多个请求,其它的请求就必须等待第一个请求完成之后才能继续。
若非要使用 SimpleXMLRPCServer 实现多线程并发,其实也不难。只要将代码改为以下便可。
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SocketServer import ThreadingMixIn
class ThreadXMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):pass
class MyObject:
def hello(self):
return "hello xmlprc"
obj = MyObject()
server = ThreadXMLRPCServer(("localhost", 8088), allow_none=True)
server.register_instance(obj)
print "Listening on port 8088"
server.serve_forever()
复制代码
0二、基于json-rpc
SimpleXMLRPCServer 是基于 xml-rpc 实现的远程调用,上面咱们也提到 除了 xml-rpc 以外,还有 json-rpc 协议。
那 python 如何实现基于 json-rpc 协议呢?
答案是不少,不少web框架其自身都本身实现了json-rpc,但咱们要独立这些框架以外,要寻求一种较为干净的解决方案,我查找到的选择有两种
第一种是 jsonrpclib
pip install jsonrpclib -i https://pypi.douban.com/simple
复制代码
第二种是 python-jsonrpc
pip install python-jsonrpc -i https://pypi.douban.com/simple
复制代码
先来看第一种 jsonrpclib
它与 Python 标准库的 SimpleXMLRPCServer 很相似(由于它的类名就叫作 SimpleJSONRPCServer ,不明真相的人真觉得它们是亲兄弟)。或许能够说,jsonrpclib 就是仿照 SimpleXMLRPCServer 标准库来进行编写的。
它的导入与 SimpleXMLRPCServer 略有不一样,由于SimpleJSONRPCServer分布在jsonrpclib库中。
服务端
from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
server = SimpleJSONRPCServer(('localhost', 8080))
server.register_function(lambda x,y: x+y, 'add')
server.serve_forever()
复制代码
客户端
import jsonrpclib
server = jsonrpclib.Server("http://localhost:8080")
复制代码
再来看第二种python-jsonrpc,写起来貌似有些复杂。
服务端
import pyjsonrpc
class RequestHandler(pyjsonrpc.HttpRequestHandler):
@pyjsonrpc.rpcmethod
def add(self, a, b):
"""Test method"""
return a + b
http_server = pyjsonrpc.ThreadingHttpServer(
server_address=('localhost', 8080),
RequestHandlerClass=RequestHandler
)
print "Starting HTTP server ..."
print "URL: http://localhost:8080"
http_server.serve_forever()
复制代码
客户端
import pyjsonrpc
http_client = pyjsonrpc.HttpClient(
url="http://localhost:8080/jsonrpc"
)
复制代码
还记得上面我提到过的 zabbix API,由于我有接触过,因此也拎出来说讲。zabbix API 也是基于 json-rpc 2.0协议实现的。
由于内容较多,这里只带你们打个,zabbix 是如何调用的:直接指明要调用 zabbix server 的哪一个方法,要传给这个方法的参数有哪些。
0三、基于 zerorpc
以上介绍的两种rpc远程调用方式,若是你足够细心,能够发现他们都是http+rpc 两种协议结合实现的。
接下来,咱们要介绍的这种(zerorpc),就再也不使用走 http 了。
zerorpc 这个第三方库,它是基于TCP协议、 ZeroMQ 和 MessagePack的,速度相对快,响应时间短,并发高。zerorpc 和 pyjsonrpc 同样,须要额外安装,虽然SimpleXMLRPCServer不须要额外安装,可是SimpleXMLRPCServer性能相对差一些。
pip install zerorpc -i https://pypi.douban.com/simple
复制代码
服务端代码
import zerorpc
class caculate(object):
def hello(self, name):
return 'hello, {}'.format(name)
def add(self, x, y):
return x + y
def multiply(self, x, y):
return x * y
def subtract(self, x, y):
return abs(x-y)
def divide(self, x, y):
return x/y
s = zerorpc.Server(caculate())
s.bind("tcp://0.0.0.0:4242")
s.run()
复制代码
客户端
import zerorpc
c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
复制代码
客户端除了可使用zerorpc框架实现代码调用以外,它还支持使用“命令行”的方式调用。
客户端可使用命令行,那服务端是否是也能够呢?
是的,经过 Github 上的文档几个 demo 能够体验到这个第三方库作真的是优秀。
好比咱们能够用下面这个命令,建立一个rpc server,后面这个 time
Python 标准库中的 time 模块,zerorpc 会将 time 注册绑定以供client调用。
zerorpc --server --bind tcp://127.0.0.1:1234 time
复制代码
在客户端,就能够用这条命令来远程调用这个 time 函数。
zerorpc --client --connect tcp://127.0.0.1:1234 strftime %Y/%m/%d
复制代码
通过了上面的学习,咱们已经学会了如何使用多种方式实现rpc远程调用。
经过对比,zerorpc 能够说是脱颖而出,一支独秀。
但为什么在 OpenStack 中,rpc client 不直接 rpc 调用 rpc server ,而是先把 rpc 调用请求发给 RabbitMQ ,再由订阅者(rpc server)来取消息,最终实现远程调用呢?
为此,我也作了一番思考:
OpenStack 组件繁多,在一个较大的集群内部每一个组件内部经过rpc通讯频繁,若是都采用rpc直连调用的方式,链接数会很是地多,开销大,如有些 server 是单线程的模式,超时会很是的严重。
OpenStack 是复杂的分布式集群架构,会有多个 rpc server 同时工做,假设有 server01,server02,server03 三个server,当 rpc client 要发出rpc请求时,发给哪一个好呢?这是问题一。
你可能会说轮循或者随机,这样对你们都公平。这样的话还会引出另外一个问题,假若请求恰好发到server01,而server01恰好不凑巧,可能因为机器或者其余由于致使服务没在工做,那这个rpc消息可就直接失败了呀。要知道作为一个集群,高可用是基本要求,若是出现刚刚那样的状况实际上是很尴尬的。这是问题二。
集群有可能根据实际须要扩充节点数量,若是使用直接调用,耦合度过高,不利于部署和生产。这是问题三。
引入消息中间件,能够很好的解决这些问题。
解决问题一:消息只有一份,接收者由AMQP的负载算法决定,默认为在全部Receiver中均匀发送(round robin)。
解决问题二:有了消息中间件作缓冲站,client 能够任性随意的发,server 都挂掉了?没有关系,等 server 正常工做后,本身来消息中间件取就好了。
解决问题三:不管有多少节点,它们只要认识消息中间件这一个中介就足够了。
因为后面,我将实例讲解 OpenStack 中如何将 rpc 和 mq broker 结合使用。
而在此以前,你必须对消息队列的一些基本知识有个概念。
首先,RPC只是定义了一个通讯接口,其底层的实现能够各不相同,能够是 socket,也能够是今天要讲的 AMQP。
AMQP(Advanced Message Queuing Protocol)是一种基于队列的可靠消息服务协议,做为一种通讯协议,AMQP一样存在多个实现,如Apache Qpid,RabbitMQ等。
如下是 AMQP 中的几个必知的概念:
Publisher:消息发布者
Receiver:消息接收者,在RabbitMQ中叫订阅者:Subscriber。
Queue:用来保存消息的存储空间,消息没有被receiver前,保存在队列中。
Exchange:用来接收Publisher发出的消息,根据Routing key 转发消息到对应的Message Queue中,至于转到哪一个队列里,这个路由算法又由exchange type决定的。
exchange type:主要四种描述exchange的类型。
direct:消息路由到知足此条件的队列中(queue,能够有多个): routing key = binding key
topic:消息路由到知足此条件的队列中(queue,能够有多个):routing key 匹配 binding pattern. binding pattern是相似正则表达式的字符串,能够知足复杂的路由条件。
fanout:消息路由到多有绑定到该exchange的队列中。
binding :binding是用来描述exchange和queue之间的关系的概念,一个exchang能够绑定多个队列,这些关系由binding创建。前面说的binding key /binding pattern也是在binding中给出。
在网上找了个图,能够很清晰地描述几个名词的关系。
关于AMQP,有几下几点值得注意:
前面铺垫了那么久,终于到了讲真实应用的场景。在生产中RPC是如何应用的呢?
其余模型我不太清楚,在 OpenStack 中的应用模型是这样的
至于为何要如此设计,前面我已经给出了本身的观点。
接下来,就是源码解读 OpenStack ,看看其是如何经过rpc进行远程调用的。如若你对此没有兴趣(我知道不少人对此都没有兴趣,因此不浪费你们时间),能够直接跳过这一节,进入下一节。
目前Openstack中有两种RPC实现,一种是在oslo messaging,一种是在openstack.common.rpc。
openstack.common.rpc是旧的实现,oslo messaging是对openstack.common.rpc的重构。openstack.common.rpc在每一个项目中都存在一份拷贝,oslo messaging即将这些公共代码抽取出来,造成一个新的项目。oslo messaging也对RPC API 进行了从新设计,对多种 transport 作了进一步封装,底层也是用到了kombu这个AMQP库。(注:Kombu 是Python中的messaging库。Kombu旨在经过为AMQ协议提供惯用的高级接口,使Python中的消息传递尽量简单,并为常见的消息传递问题提供通过验证和测试的解决方案。)
关于oslo_messaging库,主要提供了两种独立的API:
由于 notify 实现是太简单了,因此这里我就很少说了,若是有人想要看这方面内容,能够收藏个人博客(python-online.cn) ,我会更新补充 notify 的内容。
OpenStack RPC 模块提供了 rpc.call,rpc.cast, rpc.fanout_cast 三种 RPC 调用方法,发送和接收 RPC 请求。
rpc.call 和 rpc.rpc.cast 从实现代码上看,他们的区别很小,就是call调用时候会带有wait_for_reply=True参数,而cast不带。
要了解 rpc 的调用机制呢,首先要知道 oslo_messaging 的几个概念
transport:RPC功能的底层实现方法,这里是rabbitmq的消息队列的访问路径
transport 就是定义你如何访链接消息中间件,好比你使用的是 Rabbitmq,那在 nova.conf中应该有一行transport_url
的配置,能够很清楚地看出指定了 rabbitmq 为消息中间件,并配置了链接rabbitmq的user,passwd,主机,端口。
transport_url=rabbit://user:passwd@host:5672
复制代码
def get_transport(conf, url=None, allowed_remote_exmods=None):
return _get_transport(conf, url, allowed_remote_exmods,
transport_cls=RPCTransport)
复制代码
target:指定RPC topic交换机的匹配信息和绑定主机。
target用来表述 RPC 服务器监听topic,server名称和server监听的exchange,是否广播fanout。
class Target(object):
def __init__(self, exchange=None, topic=None, namespace=None, version=None, server=None, fanout=None, legacy_namespaces=None):
self.exchange = exchange
self.topic = topic
self.namespace = namespace
self.version = version
self.server = server
self.fanout = fanout
self.accepted_namespaces = [namespace] + (legacy_namespaces or [])
复制代码
rpc server 要获取消息,须要定义target,就像一个门牌号同样。
rpc client 要发送消息,也须要有target,说明消息要发到哪去。
endpoints:是可供别人远程调用的对象
RPC服务器暴露出endpoint,每一个 endpoint 包涵一系列的可被远程客户端经过 transport 调用的方法。直观理解,能够参考nova-conductor建立rpc server的代码,这边的endpoints就是 nova/manager.py:ConductorManager()
dispatcher:分发器,这是 rpc server 才有的概念
在client端,是这样指定要调用哪一个方法的。
而在server端,是如何知道要执行这个方法的呢?这就是dispatcher 要干的事,它从 endpoint 里找到这个方法,而后执行,最后返回。
Serializer:在 python 对象和message(notification) 之间数据作序列化或是反序列化的基类。
主要方法有四个:
executor:服务的运行方式,单线程或者多线程
每一个notification listener都和一个executor绑定,来控制收到的notification如何分配。默认状况下,使用的是blocking executor(具体特性参加executor一节)
oslo_messaging.get_notification_listener(transport, targets, endpoints, executor=’blocking’, serializer=None, allow_requeue=False, pool=None)
复制代码
rpc server 和rpc client 的四个重要方法
reset()
:Reset service.start()
:该方法调用后,server开始poll,从transport中接收message,而后转发给dispatcher.该message处理过程一直进行,直到stop方法被调用。executor决定server的IO处理策略。可能会是用一个新进程、新协程来作poll操做,或是直接简单的在一个循环中注册一个回调。一样,executor也决定分配message的方式,是在一个新线程中dispatch或是..... *stop()
:当调用stop以后,新的message不会被处理。可是,server可能还在处理一些以前没有处理完的message,而且底层driver资源也还一直没有释放。wait()
:在stop调用以后,可能还有message正在被处理,使用wait方法来阻塞当前进程,直到全部的message都处理完成。以后,底层的driver资源会释放。模仿是一种很高效的学习方法,我这里根据 OpenStack 的调用方式,抽取出核心内容,写成一个简单的 demo,有对 OpenStack 感兴趣的能够了解一下,大部分人也能够直接跳过这章节。
如下代码不能直接运行,你还须要配置 rabbitmq 的链接方式,你能够写在配置文件中,经过 get_transport 从cfg.CONF 中读取,也能够直接将其写成url的格式作成参数,传给 get_transport 。
简单的 rpc client
#coding=utf-8
import oslo_messaging
from oslo_config import cfg
# 建立 rpc client
transport = oslo_messaging.get_transport(cfg.CONF, url="")
target = oslo_messaging.Target(topic='test', version='2.0')
client = oslo_messaging.RPCClient(transport, target)
# rpc同步调用
client.call(ctxt, 'test', arg=arg)
复制代码
简单的 rpc server
#coding=utf-8
from oslo_config import cfg
import oslo_messaging
import time
# 定义endpoint类
class ServerControlEndpoint(object):
target = oslo_messaging.Target(namespace='control',
version='2.0')
def __init__(self, server):
self.server = server
def stop(self, ctx):
if self.server:
self.server.stop()
class TestEndpoint(object):
def test(self, ctx, arg):
return arg
# 建立rpc server
transport = oslo_messaging.get_transport(cfg.CONF, url="")
target = oslo_messaging.Target(topic='test', server='server1')
endpoints = [
ServerControlEndpoint(None),
TestEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target,endpoints,executor='blocking')
try:
server.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping server")
server.stop()
server.wait()
复制代码
以上,就是本期推送的所有内容,周末两天没有出门,都在写这篇文章。真的快掏空了我本身,不过写完后真的很畅快。