GRPC协议的相关原理

      GRPC的Client与Server,均经过Netty Channel做为数据通讯,序列化、反序列化则使用Protobuf,每一个请求都将被封装成HTTP2的Stream,在整个生命周期中,客户端Channel应该保持长链接,而不是每次调用从新建立Channel、响应结束后关闭Channel(即短链接、交互式的RPC),目的就是达到连接的复用,进而提升交互效率。java

    一、Server端算法

    咱们一般使用NettyServerBuilder,即IO处理模型基于Netty,未来可能会支持其余的IO模型。Netty Server的IO模型简析:设计模式

    1)建立ServerBootstrap,设定BossGroup与workerGroup线程池网络

    2)注册childHandler,用来处理客户端连接中的请求成帧架构

    3)bind到指定的port,即内部初始化ServerSocketChannel等,开始侦听和接受客户端连接。并发

    4)BossGroup中的线程用于accept客户端连接,并转发(轮训)给workerGroup中的线程。app

    5)workerGroup中的特定线程用于初始化客户端连接,初始化pipeline和handler,并将其注册到worker线程的selector上(每一个worker线程持有一个selector,不共享)async

    6)selector上发生读写事件后,获取事件所属的连接句柄,而后执行handler(inbound),同时进行拆封package,handler执行完毕后,数据写入经过,由outbound handler处理(封包)经过连接发出。    注意每一个worker线程上的数据请求是队列化的。tcp

    GRPC而言,只是对Netty Server的简单封装,底层使用了PlaintextHandler、Http2ConnectionHandler的相关封装等。具体Framer、Stream方式请参考Http2相关文档。ide

    1)bossEventLoopGroup:若是没指定,默认为一个static共享的对象,即JVM内全部的NettyServer都使用同一个Group,默认线程池大小为1。

    2)workerEventLoopGroup:若是没指定,默认为一个static共享的对象,线程池大小为coreSize * 2。这两个对象采用默认值并不会带来问题;一般状况下,即便你的application中有多个GRPC Server,默认值也同样可以带来收益。不合适的线程池大小,有可能会是性能受限。

    3)channelType:默认为NioServerSocketChannel,一般咱们采用默认值;固然你也能够开发本身的类。若是此值为NioServerSocketChannel,则开启keepalive,同时设定SO_BACKLOG为128;BACKLOG就是系统底层已经创建引入连接可是还没有被accept的Socket队列的大小,在连接密集型(特别是短链接)时,若是队列超过此值,新的建立连接请求将会被拒绝(有可能你在压力测试时,会遇到这样的问题),keepalive和BACKLOG特性目前没法直接修改。

Java代码  
  1. [root@sh149 ~]# sysctl -a|grep tcp_keepalive  
  2. net.ipv4.tcp_keepalive_time = 60  ##单位:秒  
  3. net.ipv4.tcp_keepalive_probes = 9  
  4. net.ipv4.tcp_keepalive_intvl = 75 ##单位:秒  
  5. ##能够在/etc/sysctl.conf查看和修改相关值  
  6. ##tcp_keepalive_time:最后一个实际数据包发送完毕后,首个keepalive探测包发送的时间。  
  7. ##若是首个keepalive包探测成功,那么连接会被标记为keepalive(首先TCP开启了keepalive)  
  8. ##此后此参数将再也不生效,而是使用下述的2个参数继续探测  
  9. ##tcp_keepalive_intvl:此后,不管通道上是否发生数据交换,keepalive探测包发送的时间间隔  
  10. ##tcp_keepalive_probes:在判定连接失效以前,尝试发送探测包的次数;  
  11. ##若是都失败,则判定连接已关闭。  

    对于Server端,咱们须要关注上述keepalive的一些设置;若是Netty Client在空闲一段时间后,Server端会主动关闭连接,有可能Client仍然保持连接的句柄,将会致使RPC调用时发生异常。这也会致使GRPC客户端调用时偶尔发生错误的缘由之一。

    4)followControlWindow:流量控制的窗口大小,单位:字节,默认值为1M,HTTP2中的“Flow Control”特性;链接上,已经发送还没有ACK的数据帧大小,好比window大小为100K,且winow已满,每次向Client发送消息时,若是客户端反馈ACK(携带这次ACK数据的大小),window将会减掉此大小;每次向window中添加亟待发送的数据时,window增长;若是window中的数据已达到限定值,它将不能继续添加数据,只能等待Client端ACK。

    5)maxConcurrentCallPerConnection:每一个connection容许的最大并发请求数,默认值为Integer.MAX_VALUE;若是此链接上已经接受但还没有响应的streams个数达到此值,新的请求将会被拒绝。为了不TCP通道的过分拥堵,咱们能够适度调整此值,以便Server端平稳处理,毕竟buffer太多的streams会对server的内存形成巨大压力。

    6)maxMessageSize:每次调用容许发送的最大数据量,默认为100M。

    7)maxHeaderListSize:每次调用容许发送的header的最大条数,GRPC中默认为8192。

    对于其余的好比SSL/TSL等,能够参考其余文档。

    GRPC Server端,还有一个最终要的方法:addService。【以下文service代理模式】

    在此以前,咱们须要介绍一下bindService方法,每一个GRPC生成的service代码中都有此方法,它以硬编码的方式遍历此service的方法列表,将每一个方法的调用过程都与“被代理实例”绑定,这个模式有点相似于静态代理,好比调用sayHello方法时,其实内部直接调用“被代理实例”的sayHello方法(参见MethodHandler.invoke方法,每一个方法都有一个惟一的index,经过硬编码方式执行);bindService方法的最终目的是建立一个ServerServiceDefinition对象,这个对象内部位置一个map,key为此Service的方法的全名(fullname,{package}.{service}.{method}),value就是此方法的GRPC封装类(ServerMethodDefinition)。

Java代码 
  1. private static final int METHODID_SAY_HELLO = 0;  
  2. private static class MethodHandlers<Req, Resp> implements  
  3.       ... {  
  4.     private final TestRpcService serviceImpl;//实际被代理实例  
  5.     private final int methodId;  
  6.   
  7.     public MethodHandlers(TestRpcService serviceImpl, int methodId) {  
  8.       this.serviceImpl = serviceImpl;  
  9.       this.methodId = methodId;  
  10.     }  
  11.   
  12.     @java.lang.SuppressWarnings("unchecked")  
  13.     public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {  
  14.       switch (methodId) {  
  15.         case METHODID_SAY_HELLO:        //经过方法的index来断定具体须要代理那个方法  
  16.           serviceImpl.sayHello((com.test.grpc.service.model.TestModel.TestRequest) request,  
  17.               (io.grpc.stub.StreamObserver<com.test.grpc.service.model.TestModel.TestResponse>) responseObserver);  
  18.           break;  
  19.         default:  
  20.           throw new AssertionError();  
  21.       }  
  22.     }  
  23.     ....  
  24.   }  
  25.   
  26.   public static io.grpc.ServerServiceDefinition bindService(  
  27.       final TestRpcService serviceImpl) {  
  28.     return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)  
  29.         .addMethod(  
  30.           METHOD_SAY_HELLO,  
  31.           asyncUnaryCall(  
  32.             new MethodHandlers<  
  33.               com.test.grpc.service.model.TestModel.TestRequest,  
  34.               com.test.grpc.service.model.TestModel.TestResponse>(  
  35.                 serviceImpl, METHODID_SAY_HELLO)))  
  36.         .build();  
  37.   }  

    addService方法能够添加多个Service,即一个Netty Server能够为多个service服务,这并不违背设计模式和架构模式。addService方法将会把service保存在内部的一个map中,key为serviceName(即{package}.{service}),value就是上述bindService生成的对象。

    那么究竟Server端是如何解析RPC过程的?Client在调用时会将调用的service名称 + method信息保存在一个GRPC“保留”的header中,那么Server端便可经过获取这个特定的header信息,就能够得知此stream须要请求的service、以及其method,那么接下来只须要从上述提到的map中找到service,而后找到此method,直接代理调用便可。执行结果在Encoder以后发送给Client。(参见:NettyServerHandler)

    由于是map存储,因此咱们须要在定义.proto文件时,尽量的指定package信息,以免由于service过多致使名称可能重复的问题。

    二、Client端

    咱们使用ManagedChannelBuilder来建立客户端channel,ManagedChannelBuilder使用了provider机制,具体是建立了哪一种channel有provider决定,能够参看META-INF下同类名的文件中的注册信息。当前Channel有2种:NettyChannelBuilder与OkHttpChannelBuilder。本人的当前版本中为NettyChannelBuilder;咱们能够直接使用NettyChannelBuilder来构建channel。以下描述则针对NettyChannelBuilder:

    配置参数与NettyServerBuilder基本相似,再次再也不赘言。默认状况下,Client端默认的eventLoopGroup线程池也是static的,全局共享的,默认线程个数为coreSize * 2。合理的线程池个数能够提升客户端的吞吐能力。

    ManagedChannel是客户端最核心的类,它表示逻辑上的一个channel;底层持有一个物理的transport(TCP通道,参见NettyClientTransport),并负责维护此transport的活性;即在RPC调用的任什么时候机,若是检测到底层transport处于关闭状态(terminated),将会尝试重建transport。(参见TransportSet.obtainActiveTransport())

    一般状况下,咱们不须要在RPC调用结束后就关闭Channel,Channel能够被一直重用,直到Client再也不须要请求位置或者Channel没法真的异常中断而没法继续使用。固然,为了提升Client端application的总体并发能力,咱们可使用链接池模式,即建立多个ManagedChannel,而后使用轮训、随机等算法,在每次RPC请求时选择一个Channel便可。(备注,链接池特性,目前GRPC还没有提供,须要额外的开发)

    每一个Service客户端,都生成了2种stub:BlockingStub和FutureStub;这两个Stub内部调用过程几乎同样,惟一不一样的是BlockingStub的方法直接返回Response Model,而FutureStub返回一个Future对象。BlockingStub内部也是基于Future机制,只是封装了阻塞等待的过程:

Java代码 
  1. try {  
  2.         //也是基于Future  
  3.       ListenableFuture<RespT> responseFuture = futureUnaryCall(call, param);  
  4.       //阻塞过程  
  5.       while (!responseFuture.isDone()) {  
  6.         try {  
  7.           executor.waitAndDrain();  
  8.         } catch (InterruptedException e) {  
  9.           Thread.currentThread().interrupt();  
  10.           throw Status.CANCELLED.withCause(e).asRuntimeException();  
  11.         }  
  12.       }  
  13.       return getUnchecked(responseFuture);  
  14.     } catch (Throwable t) {  
  15.       call.cancel();  
  16.       throw t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);  
  17. }  

    建立一个Stub的成本是很是低的,咱们能够在每次请求时都经过channel建立新的stub,这并不会带来任何问题(只不过是建立了大量对象);其实更好的方式是,咱们应该使用一个Stub发送屡次请求,即Stub也是能够重用的;直到Stub上的状态异常而没法使用。最多见的异常,就是“io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED”,即表示DEADLINE时间过时,咱们能够为每一个Stub配置deadline时间,那么若是此stub被使用的时长超过此值(不是空闲的时间),将不能再发送请求,此时咱们应该建立新的Stub。不少人想尽办法来使用“withDeadlineAfter”方法来实现一些奇怪的事情,此参数的主要目的就是代表:此stub只能被使用X时长,此后将不能再进行请求,应该被释放。因此,它并不能实现相似于“keepAlive”的语义,即便咱们须要keepAlive,也应该在Channel级别,而不是在一个Stub上。

    若是你使用了链接池,那么其实链接池不该该关注DEADLINE的错误,只要Channel自己没有terminated便可;就把这个问题交给调用者处理。若是你也对Stub使用了对象池,那么你就可能须要关注这个状况了,你不该该向调用者返回一个“DEADLINE”的stub,或者若是调用者发现了DEADLINE,你的对象池应该可以移除它。

    1)实例化ManagedChannel,此channel能够被任意多个Stub实例引用;如上文说述,咱们能够经过建立Channel池,来提升application总体的吞吐能力。此Channel实例,不该该被shutdown,直到Client端中止服务;在任什么时候候,特别是建立Stub时,咱们应该断定Channel的状态。

Java代码   收藏代码
  1. synchronized (this) {  
  2.     if (channel.isShutdown() || channel.isTerminated()) {  
  3.         channel = ManagedChannelBuilder.forAddress(poolConfig.host, poolConfig.port).usePlaintext(true).build();  
  4.     }  
  5.     //new Stub  
  6. }  
  7.   
  8. //或者  
  9. ManagedChannel channel = (ManagedChannel)client.getChannel();  
  10. if(channel.isShutdown() || channel.isTerminated()) {  
  11.     client = createBlockStub();  
  12. }  
  13. client.sayHello(...)  

    由于Channel是能够多路复用,因此咱们用Pool机制(好比commons-pool)也能够实现链接池,只是这种池并不是彻底符合GRPC/HTTP2的设计语义,由于GRPC容许一个Channel上连续发送对个Requests(而后一次性接收多个Responses),而不是“交互式”的Request-Response模式,固然这么使用并不会有任何问题。

    2)对于批量调用的场景,咱们可使用FutureStub,对于普通的业务类型RPC,咱们应该使用BlockingStub。

    3)每一个RPC方法的调用,好比sayHello,调用开始后,将会为每一个调用请求建立一个ClientCall实例,其内部封装了调用的方法、配置选项(headers)等。此后将会建立Stream对象,每一个Stream都持有惟一的streamId,它是Transport用于分拣Response的凭证。最终调用的全部参数都会被封装在Stream中。

    4)检测DEADLINE,是否已通过期,若是过时,将使用FailingClientStream对象来模拟整个RPC过程,固然请求不会经过通道发出,直接通过异常流处理过程。

    5)而后获取transport,若是此时检测到transport已经中断,则重建transport。(自动重练机制,ClientCallImpl.start()方法)

    6)发送请求参数,即咱们Request实例。一次RPC调用,数据是分屡次发送,可是ClientCall在建立时已经绑定到了指定的线程上,因此数据发送老是经过一个线程进行(不会乱序)。

    7)将ClientCall实例置为halfClose,即半关闭,并非将底层Channel或者Transport半关闭,只是逻辑上限定此ClientCall实例上将不能继续发送任何stream信息,而是等待Response。

    8)Netty底层IO将会对reponse数据流进行解包(Http2ConnectionDecoder),并根据streamId分拣Response,同时唤醒响应的ClientCalls阻塞。(参见ClientCalls,GrpcFuture)

    9)若是是BlockingStub,则请求返回,若是响应中包含应用异常,则封装后抛出;若是是网络异常,则可能触发Channel重建、Stream重置等。

 

转载自:http://shift-alt-ctrl.iteye.com/blog/2292862

相关文章
相关标签/搜索