主要介绍了grpc在使用示例和原理,以及如何与consul结合
gRPC 也是基于如下理念:定义一个服务,指定其可以被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根可以像服务端同样的方法。
在 gRPC 里客户端应用能够像调用本地对象同样直接调用另外一台不一样的机器上服务端应用的方法,使得咱们可以更容易地建立分布式应用和服务。html
参考文档:gRPC Python Quickstartpython
开始前确保已经安装grpcio-tools和grpcio这两个包git
定义一个GRPC有以下三个步骤:windows
咱们以实现一个echo的grpc为例。安全
首先定义通讯双方(即客户端和服务端)交互的消息格式(protobuf消息的格式),而后定义该echo服务
以下:bash
syntax = "proto3"; // 声明使用 proto3 语法 // 定义客户端请求的protobuf格式,以下所示,包含一个字符串字段q message Req { string q = 1; } // 定义服务端相应的protobuf格式,以下所示,包含一个字符串字段a message Resp { string a = 1; } // 定义echo服务,以下所示,该服务包含一个名称为"echo"的rpc service Echoer{ rpc echo (Req) returns (Resp) {} }
使用如下命令编译:服务器
python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. ./Echoer.proto
生成两个py文件tcp
建立和运行 Echoer 服务能够分为两个部分:分布式
在当前目录,建立文件 Echoer_server.py,实现一个新的函数:ide
from concurrent import futures import time import grpc import Echoer_pb2 import Echoer_pb2_grpc _ONE_DAY_IN_SECONDS = 60 * 60 * 24 class Echoer(Echoer_pb2_grpc.EchoerServicer): # 工做函数 def SayHello(self, request, context): return Echoer_pb2.Resp(a="echo") def serve(): # gRPC 服务器 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server) server.add_insecure_port('[::]:50051') server.start() # start() 不会阻塞,若是运行时你的代码没有其它的事情可作,你可能须要循环等待。 try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()
在当前目录,打开文件 Echoer_client.py,实现一个新的函数:
from __future__ import print_function import grpc import Echoer_pb2 import Echoer_pb2_grpc def run(): channel = grpc.insecure_channel('localhost:50051') # 建立信道 stub = Echoer_pb2_grpc.EchoerStub(channel) # 经过信道获取凭据,即Stub response = stub.echo(Echoer_pb2.Req(q='echo')) # 调用rpc,获取响应 print("Echoer client received: " + response.a) if __name__ == '__main__': run()
运行代码
首先运行服务端代码
python Echoer_server.py
复制代码
而后运行客户端代码
python Echoer_client.py # output Echoer client received: echo
点击查看参考博客
为了通讯安全起见,GRPC提供了TSlSSL的支持。
$ openssl genrsa -out server.key 2048 Generating RSA private key, 2048 bit long modulus (2 primes) ............................................................+++++ ................................................................................................................................+++++ e is 65537 (0x010001) $ openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 You are about to be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. There are quite a few fields but you can leave some blank For some fields there will be a default value, If you enter '.', the field will be left blank. ----- Country Name (2 letter code) [AU]: State or Province Name (full name) [Some-State]: Locality Name (eg, city) []: Organization Name (eg, company) [Internet Widgits Pty Ltd]: Organizational Unit Name (eg, section) []: Common Name (e.g. server FQDN or YOUR name) []:Echoer Email Address []:
生成了server.key和server.crt两个文件,服务端两个文件都须要,客户端只须要crt文件
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server) # 读取 key and certificate with open(os.path.join(os.path.split(__file__)[0], 'server.key')) as f: private_key = f.read().encode() with open(os.path.join(os.path.split(__file__)[0], 'server.crt')) as f: certificate_chain = f.read().encode() # 建立 server credentials server_creds = grpc.ssl_server_credentials(((private_key, certificate_chain,),)) # 调用add_secure_port方法,而不是add_insesure_port方法 server.add_secure_port('localhost:50051', server_creds)
# 读取证书 with open('server.crt') as f: trusted_certs = f.read().encode() # 建立 credentials credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs) # 调用secure_channel方法,而不是insecure_channel方法 channel = grpc.secure_channel('localhost:50051', credentials)
启动服务端后,启动客户端,会出现如下错误:
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Connect Failed" debug_error_string = "{"created":"@1547552759.642000000","description":"Failed to create subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":2721,"referenced_errors":[{"created":"@1547552759.642000000","description":"Pick Cancelled","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":241,"referenced_errors":[{"created":"@1547552759.642000000","description":"Connect Failed","file":"src/core/ext/filters/client_channel/subchannel.cc","file_line":689,"grpc_status":14,"referenced_errors":[{"created":"@1547552759.642000000","description":"Peer name localhost is not in peer certificate","file":"src/core/lib/security/security_connector/security_connector.cc","file_line":880}]}]}]}" >
!!! 警告:
这是由于TSLSSL模式下,客户端是经过服务名称:port来获取服务的凭据,而不是ip:port, 因此对客户端作以下修改:
# 修改前 channel = grpc.secure_channel('localhost:50051', credentials) # 修改后 channel = grpc.secure_channel('Echoer:50051', credentials)
!!! 警告:
其次,在TSLSSL模式下,客户端对服务名称:port解析时候须要dns支持,目前不知道如何解决,只可以采起如下措施解决,经过修改windows的host文件,利用host将服务名称解析为IP地址,
打开windows的host文件,地址:C:\Windows\System32\drivers\etc\hosts
备份后修改以下,添加:
# 服务的IP地址 服务名称 127.0.0.1 Echoer
保存便可
修改后,再次运行,便可运行成功
注意事项:CA证书和私钥key都是配套的,不配套的CA证书和key是没法校验成功的
注意事项:确保consul已经正确启动,查看http://ip:port:8500/, 可查看consul的状态,确保已经安装python-consul这个库,不然没法操做consul
首先想象咱们以上的grpc示例程序之因此成功的有限制条件,
但在实际过程当中,通常是不可能确切知道服务的ip和端口的,因此consul就起了个中间桥梁的做用,具体以下:
服务注册,顾名思义,服务在启动以前,必须如今consul中注册。
服务端:当服务端启动以后,consul会利用服务注册时得到的ip和port同服务创建联系,其中最重要的就是health check即心跳检测。consul经过心跳检测来断定该服务是否正常。
客户端:客户端经过consul来查询所需服务的ip和port,若对应服务已经注册且心跳检测正常,则会返回给客户端对应的ip和port信息,而后客户端就能够利用这个来链接服务端了
服务注册示例代码以下:
def register(self, server_name, ip, port, consul_host=CONSUL_HOST): """ server_name: 服务名称 ip: 服务IP地址 port: 服务监听的端口 consul_host: 所链接的consul服务器的IP地址 """ c = consul.Consul(host=consul_host) # 获取与consul的链接 print(f"开始注册服务{server_name}") check = consul.Check.tcp(ip, port, "10s") # 设置心跳检测的超时时间和对应的ip和port端口 c.agent.service.register(server_name, f"{server_name}-{ip}-{port}", address=ip, port=port, check=check) # 注册
既然有服务注册,固然会有服务注销,示例代码以下:
def unregister(self, server_name, ip, port, consul_host=CONSUL_HOST): c = consul.Consul(host=consul_host) print(f"开始退出服务{server_name}") c.agent.service.deregister(f"{server_name}-{ip}-{port}")
客户端则须要在consul中查询对应服务的IP和port,但因为在TSL/SSL模式下,所需的只是服务名称和port,故而只须要查询port端口便可。
客户端服务查询采用的是DNS的查询方式,必须确保安装dnspython库,用于建立DNS查询
服务查询示例代码以下:
# 建立一个consul dns查询的 resolver consul_resolver = resolver.Resolver() consul_resolver.port = 8600 consul_resolver.nameservers = [consul_host] def get_host_port(self, server_name): try: dns_answer_srv = consul_resolver.query(f"{server_name}.service.consul", "SRV") # 查询对应服务的port, except DNSException as e: return None, None return server_name, dns_answer_srv[0].port # 返回服务名和端口
grpc总共提供了四种数据交互模式:
因为grpc对于消息有大小限制,diff数据过大会致使没法接收数据,咱们在使用过程当中,使用了流模式来解决了此类问题,
在此模式下,客户端传入的参数由具体的protobuf变为了protobuf的迭代器,客户端接收的响应也变为了迭代器,获取完整的响应则须要迭代获取。
服务端响应也变为了一个迭代器。
# 修改前 service Echoer{ rpc echo (Req) returns (Resp) {} } # 修改后 service Echoer{ rpc echo (stream Req) returns (stream Resp) {} }
从新编译
将工做函数修改成以下所示, 即工做函数变成了一个迭代器:
def echo(self, request_iterator, context): for i in range(10): yield Echoer_pb2.Resp(a="echo")
将echo的传入参数修改成迭代器:
def qq(): for i in range(10): yield Echoer_pb2.Req(q="echo") response = stub.echo(qq()) for resp in response: print("Echoer client received: " + response.a)
从新运行,接收结果以下:
$ python Echoer_client.py Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo Echoer client received: echo