简述RPC原理实现

 

 

 

前言

架构的改变,每每是由于业务规模的扩张。java

随着业务规模的扩张,为了知足业务对技术的要求,技术架构须要从单体应用架构升级到分布式服务架构,来下降公司的技术成本,更好的适应业务的发展。git

分布式服务架构的诸多优点,这里就不一一列举了,今天围绕的话题是服务框架,为了推行服务化,必然须要一套易用的服务框架,来支撑业务技术架构升级。 github

 

服务框架

服务架构的核心是服务调用,分布式服务架构中的服务分布在不一样主机的不一样进程上,服务的调用跟单体应用进程内方法调用的本质区别就是须要借助网络来进行通讯。算法

 

 

RPC Demo实现思路

原做者梁飞,在此记录下他很是简洁的rpc实现思路。apache

 

核心框架类

  /*
   * Copyright 2011 Alibaba.com All right reserved. This software is the
   * confidential and proprietary information of Alibaba.com ("Confidential
   * Information"). You shall not disclose such Confidential Information and shall
   * use it only in accordance with the terms of the license agreement you entered
   * into with Alibaba.com.
   */
  package com.alibaba.study.rpc.framework;
  ​
  import java.io.ObjectInputStream;
  import java.io.ObjectOutputStream;
  import java.lang.reflect.InvocationHandler;
  import java.lang.reflect.Method;
  import java.lang.reflect.Proxy;
  import java.net.ServerSocket;
  import java.net.Socket;
  ​
  /**
   * RpcFramework
   * 
   * @author william.liangf
   */
  public class RpcFramework {
  ​
      /**
       * 暴露服务
       * 
       * @param service 服务实现
       * @param port 服务端口
       * @throws Exception
       */
      public static void export(final Object service, int port) throws Exception {
          if (service == null)
              throw new IllegalArgumentException("service instance == null");
          if (port <= 0 || port > 65535)
              throw new IllegalArgumentException("Invalid port " + port);
          System.out.println("Export service " + service.getClass().getName() + " on port " + port);
          ServerSocket server = new ServerSocket(port);
          for(;;) {
              try {
                  final Socket socket = server.accept();
                  new Thread(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              try {
                                  ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                  try {
                                      String methodName = input.readUTF();
                                      Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                      Object[] arguments = (Object[])input.readObject();
                                      ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                                      try {
                                          Method method = service.getClass().getMethod(methodName, parameterTypes);
                                          Object result = method.invoke(service, arguments);
                                          output.writeObject(result);
                                      } catch (Throwable t) {
                                          output.writeObject(t);
                                      } finally {
                                          output.close();
                                      }
                                  } finally {
                                      input.close();
                                  }
                              } finally {
                                  socket.close();
                              }
                          } catch (Exception e) {
                              e.printStackTrace();
                          }
                      }
                  }).start();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
  ​
      /**
       * 引用服务
       * 
       * @param <T> 接口泛型
       * @param interfaceClass 接口类型
       * @param host 服务器主机名
       * @param port 服务器端口
       * @return 远程服务
       * @throws Exception
       */
      @SuppressWarnings("unchecked")
      public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
          if (interfaceClass == null)
              throw new IllegalArgumentException("Interface class == null");
          if (! interfaceClass.isInterface())
              throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
          if (host == null || host.length() == 0)
              throw new IllegalArgumentException("Host == null!");
          if (port <= 0 || port > 65535)
              throw new IllegalArgumentException("Invalid port " + port);
          System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
          return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {
              public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
                  Socket socket = new Socket(host, port);
                  try {
                      ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                      try {
                          output.writeUTF(method.getName());
                          output.writeObject(method.getParameterTypes());
                          output.writeObject(arguments);
                          ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                          try {
                              Object result = input.readObject();
                              if (result instanceof Throwable) {
                                  throw (Throwable) result;
                              }
                              return result;
                          } finally {
                              input.close();
                          }
                      } finally {
                          output.close();
                      }
                  } finally {
                      socket.close();  
                }  
            }  
        });  
    }  
​  
}

 

 

定义服务接口

 /*
   * Copyright 2011 Alibaba.com All right reserved. This software is the
   * confidential and proprietary information of Alibaba.com ("Confidential
   * Information"). You shall not disclose such Confidential Information and shall
   * use it only in accordance with the terms of the license agreement you entered
   * into with Alibaba.com.
   */
  package com.alibaba.study.rpc.test;
  ​
  /**
   * HelloService
   * 
   * @author william.liangf
   */
  public interface HelloService {
  ​
      String hello(String name);
  ​
  }

 

 

实现服务

 1   /*
 2    * Copyright 2011 Alibaba.com All right reserved. This software is the
 3    * confidential and proprietary information of Alibaba.com ("Confidential
 4    * Information"). You shall not disclose such Confidential Information and shall
 5    * use it only in accordance with the terms of the license agreement you entered
 6    * into with Alibaba.com.
 7    */
 8   package com.alibaba.study.rpc.test;
 9 10   /**
11    * HelloServiceImpl
12    * 
13    * @author william.liangf
14    */
15   public class HelloServiceImpl implements HelloService {
16 17       public String hello(String name) {
18           return "Hello " + name;
19       }
20 21   }

 

 

暴露服务

 /*
   * Copyright 2011 Alibaba.com All right reserved. This software is the
   * confidential and proprietary information of Alibaba.com ("Confidential
   * Information"). You shall not disclose such Confidential Information and shall
   * use it only in accordance with the terms of the license agreement you entered
   * into with Alibaba.com.
   */
  package com.alibaba.study.rpc.test;
  ​
  import com.alibaba.study.rpc.framework.RpcFramework;
  ​
  /**
   * RpcProvider
   * 
   * @author william.liangf
   */
  public class RpcProvider {
  ​
      public static void main(String[] args) throws Exception {
          HelloService service = new HelloServiceImpl();
          RpcFramework.export(service, 1234);
      }
  ​
  }

 

 

引用服务

 1   /*
 2    * Copyright 2011 Alibaba.com All right reserved. This software is the
 3    * confidential and proprietary information of Alibaba.com ("Confidential
 4    * Information"). You shall not disclose such Confidential Information and shall
 5    * use it only in accordance with the terms of the license agreement you entered
 6    * into with Alibaba.com.
 7    */
 8   package com.alibaba.study.rpc.test;
 9 10   import com.alibaba.study.rpc.framework.RpcFramework;
11 12   /**
13    * RpcConsumer
14    * 
15    * @author william.liangf
16    */
17   public class RpcConsumer {
18       
19       public static void main(String[] args) throws Exception {
20           HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
21           for (int i = 0; i < Integer.MAX_VALUE; i ++) {
22               String hello = service.hello("World" + i);
23               System.out.println(hello);
24               Thread.sleep(1000);
25           }
26       }
27       
28   }

 

 

小结

梁飞大大的博客使用原生的jdk api就展示给各位读者一个生动形象的rpc demo,实在是强。api

这个简单的例子的实现思路是:数组

  • 使用阻塞的socket IO流来进行server和client的通讯,也就是rpc应用中服务提供方和服务消费方。而且是端对端的,用端口号来直接进行通讯 安全

  • 方法的远程调用使用的是jdk的动态代理服务器

  • 参数的序列化也是使用的最简单的objectStream网络

 


 

服务框架

服务框架的核心是服务调用,分布式服务架构中的服务分布在不一样主机的不一样进程上,服务的调用跟单体应用进程内方法调用的本质区别就是须要借助网络来进行通讯。

下图是服务框架的架构图,主流的服务框架的实现都是这套架构,如 Dubbo、SpringCloud 等。

 

 

  • Invoker 是服务的调用方

  • Provider 是服务的提供方

  • Registry 是服务的注册中心

  • Monitor 是服务的监控模块

Invoker 和 Provider 分别做为服务的调用和被调用方,这点很明确。

可是仅有这二者仍是不够的,由于做为调用方须要知道服务部署在哪,去哪调用服务,因此有了 Registry 模块,它的功能是给服务提供方注册服务,给服务调用方发现服务。

Monitor 做为服务的监控模块,负责服务的调用统计以及链路分析功能,也是服务治理重要的一环。

 

核心模块

下图是服务框架的流程图,咱们分服务注册、发现、调用三个方面来进行流程分解。

 

 

服务注册是服务提供方向注册中心注册服务信息;当提供服务应用下线时,负责将服务注册信息从注册中心删去。

服务发现是服务调用方从注册中心订阅服务,获取服务提供方的相关信息;当服务注册信息有变动时,注册中心负责通知到服务调用方。

服务调用是服务调用方经过从注册中心拿到服务提供方的信息,向服务提供方发起服务调用,获取调用结果。

对照上述流程图,咱们按照请求的具体过程进行分析。

做为服务调用方 Invoker 的具体流程是:

  1. Request 从下往上,因为服务调用方只能拿到服务提供方提供的 API 接口或者 API 接口的 JAR 包,因此服务调用方须要通过一层代理 Proxy 来假装服务的实现;

  2. 通过代理 Proxy 以后,会通过路由 Router、负载均衡 LoadBalance 模块,目的是从一堆从注册中心拿到的服务提供方信息中选出最合适的服务提供方机器进行调用。另外,还会通过 Monitor 监控等模块;

  3. 接着会通过服务编码 Codec 模块,这个模块的目的是由于请求在网络传输前须要按照通讯协议以及对象的序列化方式,对传输的请求进行编解码;

  4. 最终会通过网络通讯 Transporter 模块,这个模块将 Codec 编码好的请求进行传输。

 

做为服务提供方 Provider 的具体流程是:

  1. Request 从上往下,通过网络通讯 Transporter 模块,获取到的是由调用方发送的Request字节数组。

  2. 接着通过服务编码 Codec 模块,根据通讯协议解出一个完整的请求包,而后使用具体的序列化方式反序列化成请求对象。

  3. 紧接着会通过监控、限流、鉴权等模块。

  4. 最终会执行服务的真正业务实现 ServiceImpl,执行完后,结果按原路返回。

     

按照上述流程分解一个服务框架的相关工做,再去看一些开源的服务框架也就不难理解了。

通常服务框架的核心模块应该有注册中心、网络通讯、服务编码(通讯协议、序列化)、服务路由、负载均衡,服务鉴权,可用性保障(服务降级、服务限流、服务隔离)、服务监控(Metrics、Trace)、配置中心、服务治理平台等。

 

注册中心

注册中心是用来注册和发现服务的,须要具有的基本功能有注册服务、下线服务、发现服务、通知服务变动等。

当前使用比较多的开源注册中心有 ZookeeperETCDEureka 等。

Zookeeper 与 ETCD 在总体架构上都比较相似,使用方式很是便捷,应用比较普遍。

这两套系统按照 CAP 理论,属于 CP 系统,可用性会差一点,可是做为中小规模服务注册中心,仍是游刃有余,并无某些人说的那么差劲。 Eureka 是 Spring Cloud Netflix 微服务套件中的一部分,很不幸的是 Eureka 2.0 开源工做宣告中止。

 

网络通讯

服务的调用方和提供方都来自不一样的主机的不一样的进程,因此要进行调用,必然少不了网络通讯。能够说网络通讯是分布式系统的重中之重,网络通讯框架的好坏直接影响服务框架的性能。从零实现一套性能高,稳定性强的通讯框架仍是很是难的,好在目前已经有不少开源的高性能的网络通讯框架。 针对 Java 生态有 Mina、Netty 等,目前使用最普遍的也当属 Netty。Netty 使用的是 per thread one eventloop 线程模型,这点与 Nginx 等其余高性能网络框架相似。另外,Netty 很是易用,因此网络通讯选择 Netty 框架天然是毫无疑问的。

 

Netty实践学习案例,是Netty初学者及核心技术巩固的最佳实践。
能够见个人netty学习工程:
https://github.com/sanshengshui/netty-learning-example

 

服务编码

内存对象要通过网络传输前须要作两件事:第一是肯定好通讯协议,第二序列化。

 

通讯协议

通讯协议说白了在发送数据前按照必定的格式来处理数据,而后进行发送,保证接收方拿到数据知道按照什么样的格式进行处理。

有些同窗可能不理解,为何须要通讯协议,不是有 TCP、UDP 协议了吗?这里说的不是传输层的通讯协议,应该是应用层的协议相似 HTTP。

由于的 TCP 协议虽然已经保证了可靠有序的传输,可是若是没有一套应用层的协议,就不知道发过来的字节数据是否是一个完整的数据请求,或者说是多个请求的字节数据都在一块儿,没法拆分,这就是是所谓的粘包,须要按照协议进行拆包,拆成一个个完整的请求包进行处理。

协议的实现上通常大厂或者开源的服务框架选择自建协议,更偏向服务领域。如 Dubbo,固然也有些框架直接使用 HTTP,HTTP/2,好比 GRPC 使用的就是 HTTP/2。

序列化

因为向网络层发送的数据必须是字节数据,不可能直接将一个对象发送到网络,因此在发送对象数据前,通常须要将对象序列化成字节数据,而后进行传输。

在服务方收到网络的字节数据时,须要通过反序列化拿到相关的对象。

序列化的实现目前现成比较多,如 Hessian、JSON、Thrift、ProtoBuf 等。Thrift 和 ProtoBuf 能支持跨语言,性能比较好,不过使用时须要编写 IDL 文件,有点麻烦。Hessian、JSON 使用起来比较友好,可是性能会差一点。


 

服务路由

服务路由指的是向服务提供方发起调用时,须要根据必定的算法从注册中心拿到的服务方地址信息中选择其中的一批机器进行调用。

路由的算法通常是根据场景来进行选择的,好比有些公司实施两地三中心这种高可用部署,可是因为两地的网络延时比较大,那这时就能够实施同地区路由策略,好比上海的调用方请求会优先选择上海的服务进行调用,来下降网络延时致使的服务端到端的调用耗时。

还有些框架支持脚本配置来进行定向路由策略。


 

负载均衡

负载均衡是紧接着服务路由的模块,负载均衡负责将发送请求均匀合理的发送到服务提供方的节点上,而备选机器,通常就是通过路由模块选择出来的。

负载均衡的算法有不少,如 RoundRobin、Random、LeastActive、ConsistentHash 等。

并且这些算法通常都是基于权重的加强版本,由于须要根据权重来调节每台服务节点的流量。


 

服务鉴权

服务鉴权是服务安全调用的基础,虽然绝大部分服务都是公司内部服务,可是对于敏感度较高的数据仍是须要进行鉴权的。

鉴权的服务须要对服务的调用方进行受权,未经受权的调用方是不可以调用该服务的。

关于服务鉴权的实现大都是基于 token 的认证方案,如 JWT(JSON Web Token) 认证。


 

可用性保障

可用性保障模块是服务高可用的一个重要保证。

服务在交互中主要分红调用方和提供方两种角色,做为服务调用方,能够经过服务降级提高可用性。做为服务提供方,能够经过服务限流、服务隔离来保证可用性。

服务降级

服务降级指的是当依赖的服务不可用时,使用预设的值来替代服务调用。

试想一下,假设调用一个非关键路径上的服务(也就是说该调用获取的结果是否实时,是否正确不是特别重要)出现问题,致使调用超时、失败等,在没有降级措施的状况下,会直接应用服务调用方业务。

所以,有些非关键路径上服务调用,能够经过服务降级实现有损服务,柔性可用。 开源的降级组件有 Netflix 的 Hystrix,Hystrix 使用比较普遍。

服务限流

服务降级保护的是服务的调用方,也就是服务的依赖方。而服务的提供方呢,如何保证服务的可用性呢? 服务限流指的是对服务调用流量的限制,限制其调用频次,来保护服务。

在高并发的场景中,很容易出现流量太高,致使服务被打垮。这里就须要限流来保证服务自身的稳定运行。 Hystrix 也是能够用来限流的,可是用的比较多的有 guava 的 RateLimiter,其使用的是令牌桶算法,可以保证平滑限流。

服务隔离

除了服务限流对服务提供方进行保护,就够了吗? 可能还不够,考虑一下这样的场景,假设某一个有问题的方法出现问题,处理很是耗时,这样会堵住整个服务处理线程,致使正常的服务方法也不可以正常调用。所以还须要服务隔离。 服务隔离指的是对服务执行的方法进行线程池隔离,保证异常耗时方法不会对正常的方法调用产生干扰,进而保护服务的稳定运行,提高可用性。


 

服务监控

服务监控是高可用系统不可或缺的重要支撑。

服务监控不只包括服务调用等业务统计信息 Metrics,还包括分布式链路追踪 Trace。

分布式系统监控比单体应用要复杂的多,须要将大量的监控信息进行聚合展现,尤为是在分布式链路追踪方面,因为服务调用过程当中涉及到多个分布在不一样机器上的服务,须要一个调用链路展现系统方便查看调用链路中耗时和出问题的环节。

Metrics

Metrics 监控主要是服务调用的一些统计报表,包括服务调用次数、成功数、失败数,以及服务方法的调用耗时,如平均耗时,耗时99线,999线等。全方位展现服务的可用性以及性能等信息。

目前开源的 Metrics 监控有美团点评的 Cat、SoundCloud 的 Prometheus 以及基于 OpenTracking 的 SkyWalking。

Trace

Trace 监控是对分布式服务调用过程当中的总体链路展现和分析。方便查看链路上各个环境的性能问题。

分布式链路追踪的原理大都是基于 Google 的论文 Dapper, a Large-Scale Distributed Systems Tracing Infrastructure。 开源的分布式链路追踪系统有美团点评的 Cat,基于 OpenTracking 的SkyWalking、Twitter 的 ZipKin。


 

配置中心

配置中心不光是常见的系统须要,服务框架也须要,它可以对系统中使用的配置进行管理,也可以针对修改配置动态通知到应用系统。 一套完善的服务框架,必然少不了配置,如一些动态开关、降级配置、限流配置、鉴权配置等。

开源的配置中心有阿里的 Diamond,携程的 Apollo。


 

治理平台

治理平台指的是对服务进行管理的平台。

微服务微了以后,必然会致使服务数量的上升,若是没有一个完善的治理平台,服务规模扩大以后,很难去维护,也必然致使故障频频,而且极度影响开发效率。

治理平台主要是服务功能的相关操做平台,包括服务权重修改、服务下线、鉴权降级等配置修改等。 治理平台跟服务框架的耦合比较强,因此开源的比较少。

其余

关于RPC原理实现详解到这里就结束了。

原创不易,若是感受不错,但愿给个推荐!您的支持是我写做的最大动力!

版权声明:

做者:穆书伟

博客园出处:www.cnblogs.com/sanshengshu…

github出处:github.com/sanshengshu…    

我的博客出处:sanshengshui.github.io/

相关文章
相关标签/搜索