2019-07-19:完成基本RPC通讯!前端
2019-07-22:优化此框架,实现单一长链接!java
2019-07-24:继续优化此框架:一、增长服务提供注解(带版本号),而后利用Spring框架的在启动时马上保存提供服务的实现类。二、优化NettyConfig(区分消费者和提供者配置),由于一个项目可同时做为服务提供者和服务消费者,因此增长两个配置来区分是提供服务仍是消费服务,并且,由于若是都是本地启动着两个项目,那么IP一定是同样的,因此须要区分服务端口和消费端口。否则会有下面事故:先启动client,再启动server,可是他们一样依赖于netty包,因此client也启动了netty服务,只配置一个相同的端口会致使client的RPC通讯也是通道本身启动的Netty服务。。。git
2019-07-27:优化此框架:增长注册中心,使用Zookeeper做为注册中心。github
接下来:我会优化Netty方面的,例如增长心跳检测、业务处理统一使用自定义业务线程池、客户端或服务端异常断开处理等,而后会优化一下项目的结构和rpc通讯返回结果等,最后可能会考虑增长Redis做为注册中心。等完成全部的这些,就会对整个项目从新写一篇文章来介绍一下本身的总体思路,固然了,若是有同窗须要的,能够在下方留言,我能够提早写文章,对完成注册中心及以前的代码进行详细介绍,以后再补充其余新增的功能实现过程!~spring
2019-07-30:已完成所有功能。放上链接:完整版RPC通讯框架框架
下面的是2019-07-19写的文章,因此代码是没通过优化的,不过是核心代码,仍是须要阅读一下的,须要看完整代码的请到最下面的github地址,你们可根据标签拉到对应的代码,麻烦啦~而后还有,测试方法是HelloController的sayHello方法呢,也能够本身再捣鼓一些测试一下~ide
前段时间,我花了两个星期的时间去从新学习Netty,由于以前老是看过一会就没看了,因此今次下定决心必定要所有看完,而后也思考作了一些的思考题,而且将简单的控制台版IM系统作出来了。虽然叫IM系统,可是是很简陋的,哈哈,只有登陆、单聊、建群、加群、退群、群聊等简单的功能。你们能够到我github上看看:Netty-IM学习
写完这个IM系统后,我是打算本身写一个网页版的,但是考虑到本身前端的技能好像都退化得差很少了,并且时间上可能没那么充裕,就不了了之了。而后有一天,忽然想起来以前使用的RPC框架->Dubbo,他的通讯底层就是使用Netty,那么我就想着要不本身先搞个简单版试试呗,由于最主要的是学习技能得实践一番,否则学了好像没学同样。。。测试
在开始动手前,本身屡了一下思路,也参考了两篇文章,决定先作一个简版的RPC框架,不带注册中心的那种。那么来了老弟,首先咱们看一下整个流程图是咋样的:优化
接下来重头戏来了,下面将会较详细得说一下流程:
先简单介绍一下项目结构:
simple-rpc-client:服务消费
simple-rpc-server:服务提供
simple-rpc-encapsulation:消费者和提供者公共接口
simple-rpc-netty:是关于Netty的东西,包括:自定义协议,序列化,通讯实体Packet,各类Handler等等。
客户端: 一、首先是两个注解,一个注解是:标识那些接口的调用会进行RPC通讯,即@NettyRPC注解。 另一个注解是:告诉程序哪些包下的类会使用RPC通讯,像@ComponentScan同样,即@EnableNettyRPC注解。
/** * @author Howinfun */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface NettyRPC { } /** * @author Howinfun * @desc * @date 2019/7/15 */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface EnableNettyRPC { //扫描的包名,若是为空,则根据启动类所在的包名扫描 String[] basePackages() default {}; }
二、由于咱们使用@NettyRPC的将是一些接口,若是项目里头没有实现类,那是调用失败的,那么咱们能够经过实现ImportBeanDefinitionRegistrar和自定义FactoryBean和InvocationHandler,利用动态代理使接口有实现,而且能动态注入Bean。ImportBeanDefinitionRegistrar接口能够详细说一下,由于这里是动态注入Bean,怎么注入规则是能够自定的,主要是靠ClassPathScanningCandidateComponentProvider这个类,它主要功能是扫描ClassPath下的全部类,而且根据isCandidateComponent方法来判断哪些类能够做为候选人,固然了,isCandidateComponent方法你能够重写,而后加上你本身的规则,我这里是必须是独立的而且是接口,才能成为候选人。而后ClassPathScanningCandidateComponentProvider还能添加过滤器,我这里主要添加的过滤器是注解过滤器,只要带有@NettyRPC注解的,其余的都不要。 不过须要注意一点的是:记得在有@Configuration注解的配置类上使用@Import导入实现ImportBeanDefinitionRegistrar的类,否则实现动态注入Bean的做用,这里咱们在客户端的启动类Import便可。
package com.hyf.rpc.netty.client.config; import com.hyf.rpc.netty.anno.EnableNettyRPC; import com.hyf.rpc.netty.anno.NettyRPC; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider; import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; import org.springframework.core.type.AnnotationMetadata; import org.springframework.core.type.filter.AnnotationTypeFilter; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; import org.springframework.util.StringUtils; import java.util.HashSet; import java.util.Map; import java.util.Set; /** * 自定义注册带@NettyRPC注解的接口,利用动态代理使接口有实现 * 而后在有@Configuration注解的配置类上使用@Import导入,否则不能注入这些实现@NettyRPC接口的BeanDefinition * @author Howinfun * @date 2019-07-18 */ public class NettyRpcClientRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware { private ClassLoader classLoader; @Override public void setBeanClassLoader(ClassLoader classLoader) { this.classLoader = classLoader; } @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { ClassPathScanningCandidateComponentProvider scan = getScanner(); //指定注解,相似于Feign注解,只扫描带@NettyRPC注解的接口 scan.addIncludeFilter(new AnnotationTypeFilter(NettyRPC.class)); Set<BeanDefinition> candidateComponents = new HashSet<>(); for (String basePackage : getBasePackages(importingClassMetadata)) { candidateComponents.addAll(scan.findCandidateComponents(basePackage)); } candidateComponents.stream().forEach(beanDefinition -> { if (!registry.containsBeanDefinition(beanDefinition.getBeanClassName())) { if (beanDefinition instanceof AnnotatedBeanDefinition) { AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition; AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata(); Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(NettyRPC.class.getCanonicalName()); this.registerNettyRpcClient(registry, annotationMetadata,attributes); } } }); } private void registerNettyRpcClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) { String className = annotationMetadata.getClassName(); // 指定工厂,使用@NettyRPC注解的接口,当代码中注入时,是从指定工厂获取,而这里的工厂返回的是代理 BeanDefinitionBuilder definition = BeanDefinitionBuilder .genericBeanDefinition(NettyClientFactoryBean.class); // @Autowrie:根据类型注入 definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); // 注定type属性 definition.addPropertyValue("type", className); String name = attributes.get("name") == null ? "" :(String)(attributes.get("name")); // 别名 String alias = name + "NettyRpcClient"; AbstractBeanDefinition beanDefinition = definition.getBeanDefinition(); beanDefinition.setPrimary(true); BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[] { alias }); // 注册BeanDefinition BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry); } protected ClassPathScanningCandidateComponentProvider getScanner() { return new ClassPathScanningCandidateComponentProvider(false) { @Override protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) { // 判断候选人的条件:必须是独立的,而后是接口 if (beanDefinition.getMetadata().isIndependent() && beanDefinition.getMetadata().isInterface()){ return true; } return false; } }; } /** * 获取指定扫描@NettyRPC注解的包路径 * @param importingClassMetadata * @return */ protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) { Map<String, Object> attributes = importingClassMetadata .getAnnotationAttributes(EnableNettyRPC.class.getCanonicalName()); Set<String> basePackages = new HashSet<>(); // 若是指定的包路径为空,则获取启动类当前路径 if (basePackages.isEmpty()) { basePackages.add( ClassUtils.getPackageName(importingClassMetadata.getClassName())); }else{ for (String pkg : (String[]) attributes.get("basePackages")) { if (StringUtils.hasText(pkg)) { basePackages.add(pkg); } } } return basePackages; } }
package com.hyf.rpc.netty.client.config; import lombok.Data; import lombok.EqualsAndHashCode; import org.springframework.beans.factory.FactoryBean; import org.springframework.stereotype.Component; import java.lang.reflect.Proxy; /** * @author Howinfun * @desc * @date 2019/7/15 */ @Data @EqualsAndHashCode(callSuper = false) @Component public class NettyClientFactoryBean implements FactoryBean<Object> { private Class<?> type; @Override public Object getObject() throws Exception { // 这里的interfaces注意是就是type,由于咱们如今是给接口作代理,千万别写type.getInterfaces(),否则启动会报错 return Proxy.newProxyInstance(type.getClassLoader(),new Class[]{type},new NettyRPCInvocationHandler(this.type)); } @Override public Class<?> getObjectType() { return this.type; } }
三、在动态代理的invoke方法里头,咱们将启动Netty的一个客户端,带上接口调用的信息,而后等待Netty服务端返回结果结果再返回到前端便可。
package com.hyf.rpc.netty.client.config; import com.hyf.rpc.netty.client.NettyClient; import com.hyf.rpc.netty.packet.RPCRequestPacket; import lombok.NoArgsConstructor; import org.springframework.stereotype.Component; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; /** * @author Howinfun * @desc * @date 2019/7/15 */ @NoArgsConstructor @Component public class NettyRPCInvocationHandler implements InvocationHandler { private Class<?> type; public NettyRPCInvocationHandler(Class<?> type){ this.type = type; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RPCRequestPacket requestPacket = new RPCRequestPacket(); requestPacket.setClazz(type); requestPacket.setMethodName(method.getName()); requestPacket.setParamTypes(method.getParameterTypes()); requestPacket.setParams(args); Object result = NettyClient.callRPC(requestPacket); return result; } }
有一个坑是:当客户端接收到服务端的返回结果后,记得关闭通道[ctx.channel().close()],由于在客户端中RPC调用后是同步等待Channel关闭的,否则不能响应给前端。
服务端:服务端的流程稍微会简单不少 一、启动Netty服务端服务,而后接收客户端的连接请求,解析请求 二、而后根据接口调用信息,利用反射获取到实现类和对应的方法,最后调用方法获得结果,而后封装一下结果就能够相应给客户端了。
package com.hyf.rpc.netty.server.handler; import com.hyf.rpc.netty.packet.RPCRequestPacket; import com.hyf.rpc.netty.packet.RPCResponsePacket; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.reflections.Reflections; import java.lang.reflect.Method; import java.util.Set; /** * @author Howinfun * @desc * @date 2019/7/16 */ @ChannelHandler.Sharable public class RPCRequestPacketHandler extends SimpleChannelInboundHandler<RPCRequestPacket> { public static final RPCRequestPacketHandler INSTANCE = new RPCRequestPacketHandler(); private RPCRequestPacketHandler(){} @Override protected void channelRead0(ChannelHandlerContext ctx, RPCRequestPacket msg) throws Exception { RPCResponsePacket responsePacket = new RPCResponsePacket(); // 获取rpc调用信息,利用反射执行方法,返回结果 Class clazz = msg.getClazz(); String methodName = msg.getMethodName(); Object[] params = msg.getParams(); Class[] paramTypes = msg.getParamTypes(); // 扫面路径下全部元数据 Reflections reflections = new Reflections("com.hyf.rpc.serviceImpl"); Set<Class> subTypes = reflections.getSubTypesOf(clazz); if (subTypes.isEmpty()){ responsePacket.setSuccess(false); responsePacket.setMsg("没有实现类"); }else if (subTypes.size() > 1){ responsePacket.setSuccess(false); responsePacket.setMsg("多个实现类,没法判断执行哪个"); }else{ Class subClass = subTypes.toArray(new Class[1])[0]; Method method = subClass.getMethod(methodName,paramTypes); Object result = method.invoke(subClass.newInstance(),params); responsePacket.setSuccess(true); responsePacket.setResult(result); } ctx.channel().writeAndFlush(responsePacket); } }
三、这里的反射我推荐一个很好用的框架->Reflections。简单介绍一下我使用了哪些API,首先是根据路径扫描反射元数据, 而后根据接口获取它的全部实现类,而后就能够获取实现类的反射信息,获得方法执行结果了。
若是同窗们对此比较简陋的代码还略感兴趣,能够到个人码云上看看: