一文带你实现RPC框架

想要获取更多文章能够访问个人博客 -  代码无止境

如今大部分的互联网公司都会采用微服务架构,但具体实现微服务架构的方式有所不一样,主流上分为两种,一种是基于Http协议的远程调用,另一种是基于RPC方式的调用。两种方式都有本身的表明框架,前者是著名的Spring Cloud,后者则是有阿里巴巴开源的Dubbo,两者都被普遍的采用。今天这篇文章,咱们就一块儿来了解一下RPC,而且和你们一块儿动手实现一个简单的RPC框架的Demo。java

什么是RPC

RPC是一种远程调用过程,是一种经过网络远程调用其余服务的协议。通俗的说就是,A经过打电话的方式让B帮忙办一件事,B办完过后将结果告知A。 咱们下面经过一张图来大概了解一下在一个完整的RPC框架中存在的角色以及整个远程调用的过程。git

Dubbo框架

经过上面的图能够看出来,在RPC框架中主要有如下4个角色:github

  • registry - 注册中心,当服务提供者启动时会向注册中心注册,而后注册中心会告知全部的消费者有新的服务提供者。
  • provider - 服务提供者,远程调用过程当中的被消费方。
  • consumer - 服务消费者,远程调用过程当中的消费方。
  • monitor - 监视器,它主要负责统计服务的消费和调用状况。

启动服务提供者后,服务提供者会以异步的方式向注册中心注册。而后启动服务消费者,它会订阅注册中心中服务提供者列表,当有服务提供者的信息发生改变时,注册中心会通知全部的消费者。当消费者发起远程调用时,会经过动态代理将须要请求的参数以及方法签名等信息经过Netty发送给服务提供者,服务提供者收到调用的信息后调用对应的方法并将产生的结果返回给消费者,这样就完成了一个完整的远程调用。固然了这个过程当中可能还会将调用信息异步发送给monitor用于监控和统计。web

阅读过上面的内容后,你应该对RPC框架有了一个大概的认识。为了更好更深刻的了解RPC框架的原理,下面咱们就一块儿来动手实现一个简单的RPC框架吧。spring

框架核心部分

首先咱们要实现的是整个RPC框架的核心部分,这部分的主要包含如下内容:bootstrap

  1. RPC服务的注解的实现。
  2. 服务提供者初始化、注册、以及响应远程调用的实现。
  3. 服务消费者订阅注册中心、监听服务提供者的变化的实现。
  4. 动态代理的实现。

整个核心部分将以一个Spring Boot Starter的形式实现,这样咱们能够很方便的在Spring Boot项目中使用它。api

注解

咱们须要使用一个注解来标识服务提供者所提供服务的实现类,方便在初始化的时候将其交由Spring管理,也只有这样咱们才能够在远程调用发生时能够找到它们。数组

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {

    Class<?> value();

}

value属性用来标记这个服务的实现类对应的接口,RPC框架中服务提供者和消费者之间会共同引用一个服务接口的包,当咱们须要远程调用的时候实际上只须要调用接口中定义的方法便可。
除了一个标识服务实现类的注解以外,咱们还须要一个标识服务消费者注入服务实现的注解@RpcConsumer,被其修饰的属性在初始化的时候都会被咱们设置上动态代理,这一点在后面会详细讲到,咱们先来看下它的具体实现吧。浏览器

@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcConsumer {

    /**
     * 服务名称
     * @return
     */
    String providerName();

}

服务提供者

服务提供者启动的时候,咱们RPC框架须要作如下几件事情:服务器

  1. 扫描服务提供者中全部提供服务的类(被@RpcService修饰的类),并将其交由BeanFactory管理。
  2. 启动Netty服务端,用来收到消费者的调用消息,而且返回调用结果。
  3. 向注册中心注册,本例中使用的注册中心是Zookeeper。

这部分咱们定义了一个ProviderAutoConfiguration类来实现这几个步骤,

@PostConstruct
public void  init() {
    logger.info("rpc server start scanning provider service...");
    Map<String, Object> beanMap = this.applicationContext.getBeansWithAnnotation(RpcService.class);
    if (null != beanMap && !beanMap.isEmpty()) {
        beanMap.entrySet().forEach(one -> {
            initProviderBean(one.getKey(), one.getValue());
        });
    }
    logger.info("rpc server scan over...");
    // 若是有服务的话才启动netty server
    if (!beanMap.isEmpty()) {
        startNetty(rpcProperties.getPort());
    }
}

看上面的代码,首先咱们获取到了全部被@RpcService注解修饰的实体,而且调用了initProviderBean方法逐一对其处理,而后咱们启动了Netty。那么咱们须要在initProviderBean方法中作些什么呢?其实很简单,就是逐一将其交由BeanFactory管理。

private void initProviderBean(String beanName, Object bean) {
    RpcService rpcService = this.applicationContext
                .findAnnotationOnBean(beanName, RpcService.class);
    BeanFactory.addBean(rpcService.value(), bean);
}

将服务实现类交由Spring管理以后,咱们还须要启动Netty用来接收远程调用信息,启动Netty的代码在这里我就不所有粘出来了,你们能够在源码中查看。在Netty启动成功以后,其实咱们还执行了下面的代码,用来向ZK注册。

new RegistryServer(rpcProperties.getRegisterAddress(),
                    rpcProperties.getTimeout(), rpcProperties.getServerName(),
                    rpcProperties.getHost(), port)
                    .register();

整个注册的过程也很是容易理解,首先是建立了一个ZK链接,而后是判断是否有/rpc的根节点,若是没有的话就建立一个,最后就是在根节点下建立一个EPHEMERAL_SEQUENTIAL类型的节点,这种类型的节点在ZK重启以后会自动清除,这样能够保证注册中心重启后会自动清除服务提供者的信息。而在节点中会存储服务提供者的名称,IP地址以及端口号的信息,这样RPC框架就能够根据这些信息顺利的定位到服务提供者。

public void register() throws ZkConnectException {
    try {
        // 获取zk链接
        ZooKeeper zooKeeper = new ZooKeeper(addr, timeout, event -> {
            logger.info("registry zk connect success...");
        });
        if (zooKeeper.exists(Constants.ZK_ROOT_DIR, false) == null) {
            zooKeeper.create(Constants.ZK_ROOT_DIR, Constants.ZK_ROOT_DIR.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
        zooKeeper.create(Constants.ZK_ROOT_DIR + "/" + serverName,
                (serverName + ","+ host + ":" + port).getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        logger.info("provider register success {}", serverName);
    } catch (Exception e) {
        throw new ZkConnectException("register to zk exception," + e.getMessage(), e.getCaus());
    }
}

就这样咱们RPC框架与服务提供者相关的内容就完成了,接下来要完成的是服务消费者部分。

服务消费者

对于服务消费者,咱们框架须要对它的处理就是,为全部的RPC服务(被@RpcConsumer修饰的属性)设置上动态代理。具体的设置代码以下所示(PS:这段代码写在ConsumerAutoConfiguration类中哦):

@Bean
public BeanPostProcessor beanPostProcessor() {
    return new BeanPostProcessor() {
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName)
                throws BeansException {
            Class<?> objClz = bean.getClass();
            for (Field field : objClz.getDeclaredFields()) {
                RpcConsumer rpcConsumer = field.getAnnotation(RpcConsumer.class);
                if (null != rpcConsumer) {
                    Class<?> type = field.getType();
                    field.setAccessible(true);
                    try {
                        field.set(bean, rpcProxy.create(type, rpcConsumer.providerName()));
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } finally {
                        field.setAccessible(false);
                    }
                }
            }
            return bean;
        }
    };
}

BeanPostProcessor也称为Bean后置处理器,它是Spring中定义的接口,在Spring容器的建立过程当中(具体为Bean初始化先后)会回调BeanPostProcessor中定义的两个方法。上面实现的postProcessBeforeInitialization是在Bean初始化以前调用的,还有一个postProcessAfterInitialization方法是在Bean初始化以后调用的。
如上面代码所示,咱们会在每个带有@RpcConsumer的实例初始化以前利用反射机制为其设置一个RpcProxy的代理,能够看到咱们在建立这个动态代理的时候还须要服务提供者的名称,这是由于在动态代理的实现里面须要使用服务提供者的名称来查询服务提供者的地址信息。那么这个动态代理的实现又是怎样的呢?这就是咱们下一步须要作的事情。

动态代理

在这个RPC框架里面动态代理主要实现的内容就是,当服务消费者调用服务提供者提供的接口时,将调用信息经过Netty发送给对应的服务调用者,而后由服务提供者完成相关的处理而且将处理结果返回给服务消费者。下面咱们就一块儿来看一下RpcProxy的是如何实现这部分功能的。

@Component
public class RpcProxy {

    @Autowired
    private ServiceDiscovery serviceDiscovery;

    public <T> T create(Class<?> interfaceClass, String providerName) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass},
                (proxy, method, args) -> {
            // 经过netty向Rpc服务发送请求。
            // 构建一个请求。
            RpcRequest request = new RpcRequest();
            request.setRequestId(UUID.randomUUID().toString())
                    .setClassName(method.getDeclaringClass().getName())
                    .setMethodName(method.getName())
                    .setParamTypes(method.getParameterTypes())
                    .setParams(args);
            // 获取一个服务提供者。
            ProviderInfo providerInfo = serviceDiscovery.discover(providerName);
            // 解析服务提供者的地址信息,数组第一个元素为ip地址,第二个元素为端口号。
           String[] addrInfo = providerInfo.getAddr().split(":");
            String host = addrInfo[0];
            int port = Integer.parseInt(addrInfo[1]);
            RpcClient rpcClient = new RpcClient(host, port);
            // 使用Netty向服务提供者发送调用消息,并接收请求结果。
            RpcResponse response = rpcClient.send(request);
            if (response.isError()) {
                throw response.getError();
            } else {
                return response.getResult();
            }
        });
    }
}

其实在代理里面首先咱们会构造请求信息实体,而后会根据服务提供者的名称获取一个服务提供者的地址,最后再将请求信息发送给服务提供者并接收调用结果。获取服务提供者的方法会在后面消费者和提供者的通用配置里面讲解。咱们在这里重点来看一下发送调用信息并接收调用结果的实现。

public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
    
    ... 此处省略对象属性信息,可查看源码。

    public RpcResponse send(RpcRequest request){
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ... 此处省略Netty相关配置,可查看源码。
            // 链接服务器
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            channelFuture.channel().writeAndFlush(request).sync();
            future = new CompletableFuture<>();
            future.get();
            if (response != null) {
                // 关闭netty链接。
                channelFuture.channel().closeFuture().sync();
            }
            return response;
        } catch (Exception e) {
            logger.error("client send msg error,", e);
            return null;
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                RpcResponse rpcResponse) throws Exception {
        logger.info("client get request result,{}", rpcResponse);
        this.response = rpcResponse;
        future.complete("");
    }
}

经过上面的代码能够看出向服务提供者发送消息是异步的,咱们经过CompletableFutureget()方法阻塞当前线程,直到接收到调用结果(PS:咱们在channelRead0方法中收到返回结果后会将其设置成完成状态)。看到这里,你可能会问服务提供者收到调用请求信息后如何处理的呢?具体的处理逻辑咱们写在了ServerHandler这个类中,能够看出在channelRead0方法收到一条调用信息以后,调用handle方法来处理具体的调用过程,在handle方法中会使用反射机制找到所调用方法的具体实现,而后执行调用过程并获取结果,最后再使用Netty将结果返回给消费者服务。

public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                RpcRequest request) throws Exception {
        logger.info("provider accept request,{}", request);
        // 返回的对象。
        RpcResponse rpcResponse = new RpcResponse();
        // 将请求id原路带回
        rpcResponse.setRequestId(request.getRequestId());
        try {
            Object result = handle(request);
            rpcResponse.setResult(result);
        } catch (Exception e) {
            rpcResponse.setError(e);
        }
        channelHandlerContext.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
    }

    private Object handle(RpcRequest request) throws Exception {
        String className = request.getClassName();
        Class<?> objClz = Class.forName(className);
        Object o = BeanFactory.getBean(objClz);
        // 获取调用的方法名称。
        String methodName = request.getMethodName();
        // 参数类型
        Class<?>[] paramsTypes = request.getParamTypes();
        // 具体参数。
        Object[] params = request.getParams();
        // 调用实现类的指定的方法并返回结果。
        Method method = objClz.getMethod(methodName, paramsTypes);
        Object res = method.invoke(o, params);
        return res;
    }
}

消费者和提供者的通用配置

除了ProviderAutoConfigurationConsumerAutoConfiguration两个配置类,咱们还定义了一个RpcAutoConfiguration类来配置一些其余的东西,以下所示。

public class RpcAutoConfiguration {
    ...

    @Bean
    @ConditionalOnMissingBean
    public ServiceDiscovery serviceDiscovery() {
        ServiceDiscovery serviceDiscovery =
                null;
        try {
            serviceDiscovery = new ServiceDiscovery(rpcProperties.getRegisterAddress());
        } catch (ZkConnectException e) {
            logger.error("zk connect failed:", e);
        }
        return serviceDiscovery;
    }

    @Bean
    @ConditionalOnMissingBean
    public RpcProxy rpcProxy() {
        RpcProxy rpcProxy = new RpcProxy();
        rpcProxy.setServiceDiscovery(serviceDiscovery());
        return rpcProxy;
    }
}

在这个配置类里面,主要初始化了一个ServiceDiscovery的对象以及一个RpcProxy的对象。其中RpcProxy是动态代理,在上面咱们已经详细了解过了。那么这里就来着重了解一下ServiceDiscovery是干啥的吧。
你们还记得咱们在文章开始的时候贴出来的那张图片吗?在服务消费者初始化的时候会去订阅服务提供者内容的变化,ServiceDiscovery的主要功能就是这个,其主要代码以下所示(若是你须要完整的代码,能够查看本文源码)。

public class ServiceDiscovery {

    // 存储服务提供者的信息。
    private volatile List<ProviderInfo> dataList = new ArrayList<>();

    public ServiceDiscovery(String registoryAddress) throws ZkConnectException {
        try {
            // 获取zk链接。
            ZooKeeper zooKeeper = new ZooKeeper(registoryAddress, 2000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    logger.info("consumer connect zk success!");
                }
            });
            watchNode(zooKeeper);
        } catch (Exception e) {
            throw new ZkConnectException("connect to zk exception," + e.getMessage(), e.getCause());
        }
    }

    /**
     * 监听服务提供者的变化
     */
    public void watchNode(final ZooKeeper zk) {
        ...
    }

    /**
     * 获取一个服务提供者
     */
    public ProviderInfo discover(String providerName) {
        ....
    }
}

在这个类的构造方法里面,咱们和ZK注册中心创建了一个链接,而且在watchNode方法中监听服务提供者节点的变化,当有服务提供者信息有变化时会去修改dataList里的内容,这样能够保证在服务本地维持一份可用的服务提供者的信息。而在远程调用发生的时候咱们会经过discover方法(PS:前面有见到过哦)去dataList里面寻找一个可用的服务提供者来提供服务。

Starter的配置

咱们还须要在resources目录下新建一个META-INF目录,而后在该目录下新建一个spring.factories文件,里面的内容以下面代码所示。它主要是用来指定在Spring Boot项目启动的时候须要加载的其余配置。若是你有不明白的地方能够查询一下Spring Boot自定义Stater的相关内容。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.itweknow.sbrpccorestarter.config.RpcAutoConfiguration,\
cn.itweknow.sbrpccorestarter.config.ProviderAutoConfiguration,\
cn.itweknow.sbrpccorestarter.config.ConsumerAutoConfiguration

到这一步咱们框架的核心部分就完成了,它将会以一个Spring Boot Stater的形式提供给服务提供者和服务消费者使用,接下来咱们就将分别定义一个服务提供者和一个消费者来测试咱们本身实现的RPC框架。

建立服务提供者

在建立服务提供者以前,咱们须要新建一个与服务消费者之间共享的服务接口。由于前面提到过,在服务消费者眼里的远程调用实际上就是调用本地的接口方法而已。在这个项目里咱们就建立了一个HelloRpcService.java的接口,以下所示:

public interface HelloRpcService {
    String sayHello();
}

在接口定义完成以后,咱们就来建立咱们的服务提供者,而且实现上面定义的HelloRpcService接口。在服务提供者服务里还须要依赖RPC框架的核心Starter以及服务接口包,咱们须要在pom.xml中添加下面的依赖。

<dependency>
    <groupId>cn.itweknow</groupId>
    <artifactId>sb-rpc-core-starter</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>cn.itweknow</groupId>
    <artifactId>sb-rpc-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

添加完依赖后,咱们就来看下HelloRpcService的具体实现吧:

@RpcService(HelloRpcService.class)
public class HelloRpcServiceImpl implements HelloRpcService {
    
    @Override
    public String sayHello() {
        return "Hello RPC!";
    }
}

其实现很简单,主要是要须要在实现类上加上@RpcService注解,这样在项目启动的时候RPC框架才会扫描到它,并将其交给BeanFactory管理。接下来还须要配置的是一些RPC框架须要的配置项,包括服务名称,ZK的地址以及Netty启动的端口等信息。这些信息在框架是经过RpcProperties这个配置类来读取的,有兴趣的同窗能够在源码中找到它。

spring.rpc.host=localhost
# netty服务的端口号
spring.rpc.port=21810
# zk地址
spring.rpc.register-address=localhost:2181
spring.rpc.server-name=provider
# 链接zk的超时时间
spring.rpc.timeout=2000

建立服务消费者

服务消费者一样也须要RPC核心框架的Starter以及服务接口的依赖,和RPC框架的一些基础配置项,和服务提供者相似,这里就不粘出来了。这里须要说明的一点是,为了方便测试,服务消费者是一个Web服务,因此它还添加了spring-boot-starter-web的依赖。下面咱们就一块儿来看下服务消费者是如何调用远程服务的吧。

@RestController
@RequestMapping("/hello-rpc")
public class HelloRpcController {


    @RpcConsumer(providerName = "provider")
    private HelloRpcService helloRpcService;

    @GetMapping("/hello")
    public String hello() {
        return helloRpcService.sayHello();
    }
}

咱们在消费者服务中写了一个hello的接口,在接口里面调用了HelloRpcService接口里的sayHello()方法,看过前面内容的同窗应该知道,被@RpcConsumer修饰的helloRpcService属性在初始化的时候会为其设置一个动态代理,当咱们调用这个接口里面的方法时,会经过Netty向服务提供者发送调用信息,而后由服务提供者调用相应方法并返回结果。
到这一步,咱们能够说完成了一个简单的RPC框架以及其使用,下面咱们就一块儿来验证一下结果吧。

测试

在测试以前咱们须要在本身本地电脑上安装Zookeeper,具体的安装方式很是简单。能够参考这篇文章。
安装好Zookeeper后,咱们须要完成如下几个步骤:

  1. 启动Zookeeper。
  2. 启动服务提供者。
  3. 启动服务消费者。

第一次启动服务消费者的过程当中,你的控制台能够能会报一个找不到/rpc节点的错误,产生这个错误的缘由是咱们在第一次启动的时候ZK里面并不存在/rpc这个节点,可是若是你仔细研究源码的话,会发现当这个节点不存在的时候,咱们会建立一个。因此直接忽略这个异常便可。完成以上几步以后,咱们只须要在浏览器中访问http://127.0.0.1:8080/hello-rpc/hello,若是你看到了下面的结果,那么恭喜你,整个RPC框架完美的运行成功了。

远程调用结果

结束语

本文的主要内容是和你们一块儿完成了一个Demo版的RPC框架,其主要目的是让你们更深入的理解RPC的原理以及其调用过程。固然因为文章篇幅的缘由,不少代码没有直接在文中给出,您能够在Github上找到完整的实现。若是您有什么问题能够在Github上提交Issue或者发送邮件到个人邮箱(gancy.programmer@gmail.com),若是您以为这篇文章写的还行的话,但愿您能给我个Star,这是对我最好的鼓励。

PS:学习不止,码不停蹄!若是您喜欢个人文章,就关注我吧!

扫码关注“代码无止境”

相关文章
相关标签/搜索