SpringBoot2+Netty打造通俗简版RPC通讯框架

2019-07-19:完成基本RPC通讯!前端

2019-07-22:优化此框架,实现单一长链接!java

2019-07-24:继续优化此框架:一、增长服务提供注解(带版本号),而后利用Spring框架的在启动时马上保存提供服务的实现类。二、优化NettyConfig(区分消费者和提供者配置),由于一个项目可同时做为服务提供者和服务消费者,因此增长两个配置来区分是提供服务仍是消费服务,并且,由于若是都是本地启动着两个项目,那么IP一定是同样的,因此须要区分服务端口和消费端口。否则会有下面事故:先启动client,再启动server,可是他们一样依赖于netty包,因此client也启动了netty服务,只配置一个相同的端口会致使client的RPC通讯也是通道本身启动的Netty服务。。。git

2019-07-27:优化此框架:增长注册中心,使用Zookeeper做为注册中心。github

接下来:我会优化Netty方面的,例如增长心跳检测、业务处理统一使用自定义业务线程池、客户端或服务端异常断开处理等,而后会优化一下项目的结构和rpc通讯返回结果等,最后可能会考虑增长Redis做为注册中心。等完成全部的这些,就会对整个项目从新写一篇文章来介绍一下本身的总体思路,固然了,若是有同窗须要的,能够在下方留言,我能够提早写文章,对完成注册中心及以前的代码进行详细介绍,以后再补充其余新增的功能实现过程!~spring

2019-07-30:已完成所有功能。放上链接:完整版RPC通讯框架框架

下面的是2019-07-19写的文章,因此代码是没通过优化的,不过是核心代码,仍是须要阅读一下的,须要看完整代码的请到最下面的github地址,你们可根据标签拉到对应的代码,麻烦啦~而后还有,测试方法是HelloController的sayHello方法呢,也能够本身再捣鼓一些测试一下~ide

​​​​

前段时间,我花了两个星期的时间去从新学习Netty,由于以前老是看过一会就没看了,因此今次下定决心必定要所有看完,而后也思考作了一些的思考题,而且将简单的控制台版IM系统作出来了。虽然叫IM系统,可是是很简陋的,哈哈,只有登陆、单聊、建群、加群、退群、群聊等简单的功能。你们能够到我github上看看:Netty-IM学习

写完这个IM系统后,我是打算本身写一个网页版的,但是考虑到本身前端的技能好像都退化得差很少了,并且时间上可能没那么充裕,就不了了之了。而后有一天,忽然想起来以前使用的RPC框架->Dubbo,他的通讯底层就是使用Netty,那么我就想着要不本身先搞个简单版试试呗,由于最主要的是学习技能得实践一番,否则学了好像没学同样。。。测试

在开始动手前,本身屡了一下思路,也参考了两篇文章,决定先作一个简版的RPC框架,不带注册中心的那种。那么来了老弟,首先咱们看一下整个流程图是咋样的:优化

接下来重头戏来了,下面将会较详细得说一下流程:

先简单介绍一下项目结构:

simple-rpc-client:服务消费

simple-rpc-server:服务提供

simple-rpc-encapsulation:消费者和提供者公共接口

simple-rpc-netty:是关于Netty的东西,包括:自定义协议,序列化,通讯实体Packet,各类Handler等等。

客户端:         一、首先是两个注解,一个注解是:标识那些接口的调用会进行RPC通讯,即@NettyRPC注解。         另一个注解是:告诉程序哪些包下的类会使用RPC通讯,像@ComponentScan同样,即@EnableNettyRPC注解。

/**
 * @author Howinfun
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface NettyRPC {
}
/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EnableNettyRPC {
    //扫描的包名,若是为空,则根据启动类所在的包名扫描
    String[] basePackages() default {};
}

        二、由于咱们使用@NettyRPC的将是一些接口,若是项目里头没有实现类,那是调用失败的,那么咱们能够经过实现ImportBeanDefinitionRegistrar和自定义FactoryBean和InvocationHandler,利用动态代理使接口有实现,而且能动态注入Bean。ImportBeanDefinitionRegistrar接口能够详细说一下,由于这里是动态注入Bean,怎么注入规则是能够自定的,主要是靠ClassPathScanningCandidateComponentProvider这个类,它主要功能是扫描ClassPath下的全部类,而且根据isCandidateComponent方法来判断哪些类能够做为候选人,固然了,isCandidateComponent方法你能够重写,而后加上你本身的规则,我这里是必须是独立的而且是接口,才能成为候选人。而后ClassPathScanningCandidateComponentProvider还能添加过滤器,我这里主要添加的过滤器是注解过滤器,只要带有@NettyRPC注解的,其余的都不要。     不过须要注意一点的是:记得在有@Configuration注解的配置类上使用@Import导入实现ImportBeanDefinitionRegistrar的类,否则实现动态注入Bean的做用,这里咱们在客户端的启动类Import便可。

package com.hyf.rpc.netty.client.config;

import com.hyf.rpc.netty.anno.EnableNettyRPC;
import com.hyf.rpc.netty.anno.NettyRPC;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;


/**
 * 自定义注册带@NettyRPC注解的接口,利用动态代理使接口有实现
 * 而后在有@Configuration注解的配置类上使用@Import导入,否则不能注入这些实现@NettyRPC接口的BeanDefinition
 * @author Howinfun
 * @date 2019-07-18
 */
public class NettyRpcClientRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {

    private ClassLoader classLoader;

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

        ClassPathScanningCandidateComponentProvider scan = getScanner();

        //指定注解,相似于Feign注解,只扫描带@NettyRPC注解的接口
        scan.addIncludeFilter(new AnnotationTypeFilter(NettyRPC.class));

        Set<BeanDefinition> candidateComponents = new HashSet<>();
        for (String basePackage : getBasePackages(importingClassMetadata)) {
            candidateComponents.addAll(scan.findCandidateComponents(basePackage));
        }
        candidateComponents.stream().forEach(beanDefinition -> {
            if (!registry.containsBeanDefinition(beanDefinition.getBeanClassName())) {
                if (beanDefinition instanceof AnnotatedBeanDefinition) {
                    AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;
                    AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata();
                    Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(NettyRPC.class.getCanonicalName());

                    this.registerNettyRpcClient(registry, annotationMetadata,attributes);
                }
            }
        });
    }

    private void registerNettyRpcClient(BeanDefinitionRegistry registry,
                                        AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
        String className = annotationMetadata.getClassName();
        // 指定工厂,使用@NettyRPC注解的接口,当代码中注入时,是从指定工厂获取,而这里的工厂返回的是代理
        BeanDefinitionBuilder definition = BeanDefinitionBuilder
                .genericBeanDefinition(NettyClientFactoryBean.class);
        // @Autowrie:根据类型注入
        definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
        // 注定type属性
        definition.addPropertyValue("type", className);
        String name = attributes.get("name") == null ? "" :(String)(attributes.get("name"));
        // 别名
        String alias = name + "NettyRpcClient";
        AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
        beanDefinition.setPrimary(true);
        BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
                new String[] { alias });
        // 注册BeanDefinition
        BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
    }



    protected ClassPathScanningCandidateComponentProvider getScanner() {
        return new ClassPathScanningCandidateComponentProvider(false) {
            @Override
            protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
                // 判断候选人的条件:必须是独立的,而后是接口
                if (beanDefinition.getMetadata().isIndependent() && beanDefinition.getMetadata().isInterface()){
                    return true;
                }
                return false;
            }
        };
    }

    /**
     * 获取指定扫描@NettyRPC注解的包路径
     * @param importingClassMetadata
     * @return
     */
    protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
        Map<String, Object> attributes = importingClassMetadata
                .getAnnotationAttributes(EnableNettyRPC.class.getCanonicalName());

        Set<String> basePackages = new HashSet<>();
        // 若是指定的包路径为空,则获取启动类当前路径
        if (basePackages.isEmpty()) {
            basePackages.add(
                    ClassUtils.getPackageName(importingClassMetadata.getClassName()));
        }else{
            for (String pkg : (String[]) attributes.get("basePackages")) {
                if (StringUtils.hasText(pkg)) {
                    basePackages.add(pkg);
                }
            }
        }
        return basePackages;
    }
}
package com.hyf.rpc.netty.client.config;

import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.stereotype.Component;

import java.lang.reflect.Proxy;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Component
public class NettyClientFactoryBean implements FactoryBean<Object> {

    private Class<?> type;

    @Override
    public Object getObject() throws Exception {
        // 这里的interfaces注意是就是type,由于咱们如今是给接口作代理,千万别写type.getInterfaces(),否则启动会报错
        return Proxy.newProxyInstance(type.getClassLoader(),new Class[]{type},new NettyRPCInvocationHandler(this.type));
    }

    @Override
    public Class<?> getObjectType() {
        return this.type;
    }
}

        三、在动态代理的invoke方法里头,咱们将启动Netty的一个客户端,带上接口调用的信息,而后等待Netty服务端返回结果结果再返回到前端便可。

package com.hyf.rpc.netty.client.config;

import com.hyf.rpc.netty.client.NettyClient;
import com.hyf.rpc.netty.packet.RPCRequestPacket;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@NoArgsConstructor
@Component
public class NettyRPCInvocationHandler implements InvocationHandler {

    private Class<?> type;

    public NettyRPCInvocationHandler(Class<?> type){
        this.type = type;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RPCRequestPacket requestPacket = new RPCRequestPacket();
        requestPacket.setClazz(type);
        requestPacket.setMethodName(method.getName());
        requestPacket.setParamTypes(method.getParameterTypes());
        requestPacket.setParams(args);
        Object result = NettyClient.callRPC(requestPacket);
        return result;
    }
}

    有一个坑是:当客户端接收到服务端的返回结果后,记得关闭通道[ctx.channel().close()],由于在客户端中RPC调用后是同步等待Channel关闭的,否则不能响应给前端。

服务端:服务端的流程稍微会简单不少         一、启动Netty服务端服务,而后接收客户端的连接请求,解析请求         二、而后根据接口调用信息,利用反射获取到实现类和对应的方法,最后调用方法获得结果,而后封装一下结果就能够相应给客户端了。

package com.hyf.rpc.netty.server.handler;

import com.hyf.rpc.netty.packet.RPCRequestPacket;
import com.hyf.rpc.netty.packet.RPCResponsePacket;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.reflections.Reflections;

import java.lang.reflect.Method;
import java.util.Set;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/16
 */
@ChannelHandler.Sharable
public class RPCRequestPacketHandler extends SimpleChannelInboundHandler<RPCRequestPacket> {

    public static final RPCRequestPacketHandler INSTANCE = new RPCRequestPacketHandler();
    private RPCRequestPacketHandler(){}

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RPCRequestPacket msg) throws Exception {
        RPCResponsePacket responsePacket = new RPCResponsePacket();
        // 获取rpc调用信息,利用反射执行方法,返回结果
        Class clazz = msg.getClazz();
        String methodName = msg.getMethodName();
        Object[] params = msg.getParams();
        Class[] paramTypes = msg.getParamTypes();
        // 扫面路径下全部元数据
        Reflections reflections = new Reflections("com.hyf.rpc.serviceImpl");
        Set<Class> subTypes = reflections.getSubTypesOf(clazz);
        if (subTypes.isEmpty()){
            responsePacket.setSuccess(false);
            responsePacket.setMsg("没有实现类");
        }else if (subTypes.size() > 1){
            responsePacket.setSuccess(false);
            responsePacket.setMsg("多个实现类,没法判断执行哪个");
        }else{
            Class subClass = subTypes.toArray(new Class[1])[0];
            Method method = subClass.getMethod(methodName,paramTypes);
            Object result = method.invoke(subClass.newInstance(),params);
            responsePacket.setSuccess(true);
            responsePacket.setResult(result);
        }
        ctx.channel().writeAndFlush(responsePacket);
    }
}

        三、这里的反射我推荐一个很好用的框架->Reflections。简单介绍一下我使用了哪些API,首先是根据路径扫描反射元数据,     而后根据接口获取它的全部实现类,而后就能够获取实现类的反射信息,获得方法执行结果了。

若是同窗们对此比较简陋的代码还略感兴趣,能够到个人码云上看看:Netty-RPC

相关文章
相关标签/搜索