本文主要是对勇哥的simpleRpc进行了简单的剖析,用来学习rpc,加深对rpc的理解!java
源码地址:http://git.oschina.net/huangyong/rpcnode
勇哥博客:https://my.oschina.net/huangyong/blog/361751git
rpc(Remote Procedure Call)主要是将远程服务调用包装成为本地服务调用,使用起来比较直观。之前,项目中有使用过将定时任务模块剥离出来,单独部署,定时任务模块使用rmi调用主系统的服务,使用起来和rpc应该是相似。spring
首先是down下源码,部署在本地IDE,具体过程参见gitee介绍吧!bootstrap
下面是eclipse的工程截图:api
先试用下:服务器
1:启动本地zk,使用默认配置(工程中的默认配置就是ZK的默认配置,基本不用改)app
2:启动rpc-sample-server工程的RpcBootstrap.javaeclipse
启动日志:ide
start server connect zookeeper create address node: /registry/com.xxx.rpc.sample.api.HelloService-sample.hello2/address-0000000003 register service: com.xxx.rpc.sample.api.HelloService-sample.hello2 => 127.0.0.1:8000 create address node: /registry/com.xxx.rpc.sample.api.HelloService/address-0000000003 register service: com.xxx.rpc.sample.api.HelloService => 127.0.0.1:8000 server started on port 8000
3:启动rpc-sample-client工程的HelloClient.java
connect zookeeper get only address node: address-0000000003 discover service: com.xxx.rpc.sample.api.HelloService => 127.0.0.1:8000 time: 364ms Hello! World connect zookeeper get only address node: address-0000000003 discover service: com.xxx.rpc.sample.api.HelloService-sample.hello2 => 127.0.0.1:8000 time: 19ms 你好! 世界
试用成功!
下面来进行分析
服务端启动时序图:
1:启动main方法,spring开始初始化
2:初始化zk注册类,链接zk,返回zkClient
3:初始化server,扫描application,将被@RpcService标注的bean提取并存储
4:启动netty服务端,并将rpc服务类及对应的服务地址注册到zk中
5:获取请求并利用反射调用请求处理类,返回结果
流程很清晰,反推一下:
要提供服务,必需要启动一个服务端,这里使用netty实现,有了服务端就要考虑对外提供什么服务,这里的服务被统一注册到了zk,因此只须要去zk里面查询便可,也就意味着必需要有一个zk连接的过程,有了连接,就须要注册服务了,那么就须要辨识哪些服务是须要被注册的,这里是经过@RpcService标注的,同时也是spring的bean,那么当获取到请求后,利用反射调用真正的服务提供类,处理完毕以后返回结果,这样就基本实现了上述的流程。
简单看下源码:
启动spring
new ClassPathXmlApplicationContext("spring.xml");
spring.xml配置
<context:component-scan base-package="com.xxx.rpc.sample.server"/> <context:property-placeholder location="classpath:rpc.properties"/> <bean id="serviceRegistry" class="com.xxx.rpc.registry.zookeeper.ZooKeeperServiceRegistry"> <constructor-arg name="zkAddress" value="${rpc.registry_address}"/> </bean> <bean id="rpcServer" class="com.xxx.rpc.server.RpcServer"> <constructor-arg name="serviceAddress" value="${rpc.service_address}"/> <constructor-arg name="serviceRegistry" ref="serviceRegistry"/> </bean>
看到首先是初始化zk注册类ZooKeeperServiceRegistry
基本就是建立zk连接:
public ZooKeeperServiceRegistry(String zkAddress) { // 建立 ZooKeeper 客户端 zkClient = new ZkClient(zkAddress, Constant.ZK_SESSION_TIMEOUT, Constant.ZK_CONNECTION_TIMEOUT); LOGGER.debug("connect zookeeper"); }
紧接着就初始化RpcSserver类,它依赖zk注册类以及提供rpc服务的服务器address
这里的rpc.properties:
rpc.service_address=127.0.0.1:8000
rpc.registry_address=127.0.0.1:2181
RpcServer类实现了ApplicationContextAware, InitializingBean接口
在重写setApplicationContext时,实现了服务类辨识及存储:
// 扫描带有 RpcService 注解的类并初始化 handlerMap 对象 Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); if (MapUtils.isNotEmpty(serviceBeanMap)) { for (Object serviceBean : serviceBeanMap.values()) { RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class); String serviceName = rpcService.value().getName(); String serviceVersion = rpcService.version(); if (StringUtil.isNotEmpty(serviceVersion)) { serviceName += "-" + serviceVersion; } handlerMap.put(serviceName, serviceBean); } }
在重写afterPropertiesSet时实现了Netty服务的启动:
@Override public void afterPropertiesSet() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 建立并初始化 Netty 服务端 Bootstrap 对象 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new RpcDecoder(RpcRequest.class)); // 解码 RPC 请求 pipeline.addLast(new RpcEncoder(RpcResponse.class)); // 编码 RPC 响应 pipeline.addLast(new RpcServerHandler(handlerMap)); // 处理 RPC 请求 } }); bootstrap.option(ChannelOption.SO_BACKLOG, 1024); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 获取 RPC 服务器的 IP 地址与端口号 String[] addressArray = StringUtil.split(serviceAddress, ":"); String ip = addressArray[0]; int port = Integer.parseInt(addressArray[1]); // 启动 RPC 服务器 ChannelFuture future = bootstrap.bind(ip, port).sync(); // 注册 RPC 服务地址 if (serviceRegistry != null) { for (String interfaceName : handlerMap.keySet()) { serviceRegistry.register(interfaceName, serviceAddress); LOGGER.debug("register service: {} => {}", interfaceName, serviceAddress); } } LOGGER.debug("server started on port {}", port); // 关闭 RPC 服务器 future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }
标准的启动Netty服务端,同时使用依赖的zk注册类去zk中注册服务,上一步已经获得了rpc服务类,提供服务类的地址也配置在文件中,那么就能够开始注册了
@Override public void register(String serviceName, String serviceAddress) { // 建立 registry 节点(持久) String registryPath = Constant.ZK_REGISTRY_PATH; if (!zkClient.exists(registryPath)) { zkClient.createPersistent(registryPath); LOGGER.debug("create registry node: {}", registryPath); } // 建立 service 节点(持久) String servicePath = registryPath + "/" + serviceName; if (!zkClient.exists(servicePath)) { zkClient.createPersistent(servicePath); LOGGER.debug("create service node: {}", servicePath); } // 建立 address 节点(临时) String addressPath = servicePath + "/address-"; String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress); LOGGER.debug("create address node: {}", addressNode); }
注册完成后,zk信息以下:
能够看到已经注册了两个类了,这两个类就位于rpc-sample-server工程中
那么如何处理请求呢?因为启动的是Netty服务端,那么请求到来时,确定是netty获取到,在启动服务端时,配置了一个RpcServerHandler,请求的反射处理就是在这里进行的:
@Override public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception { // 建立并初始化 RPC 响应对象 RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { Object result = handle(request); response.setResult(result); } catch (Exception e) { LOGGER.error("handle result failure", e); response.setException(e); } // 写入 RPC 响应对象并自动关闭链接 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private Object handle(RpcRequest request) throws Exception { // 获取服务对象 String serviceName = request.getInterfaceName(); String serviceVersion = request.getServiceVersion(); if (StringUtil.isNotEmpty(serviceVersion)) { serviceName += "-" + serviceVersion; } Object serviceBean = handlerMap.get(serviceName); if (serviceBean == null) { throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName)); } // 获取反射调用所需的参数 Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); // 执行反射调用 // Method method = serviceClass.getMethod(methodName, parameterTypes); // method.setAccessible(true); // return method.invoke(serviceBean, parameters); // 使用 CGLib 执行反射调用 FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); return serviceFastMethod.invoke(serviceBean, parameters); }
这里的请求流程基本是Netty的标准处理流程,在处理时,从请求中获取到指定的请求参数,再去handlerMap中匹配对应的服务类bean,而后利用反射调用此bean对应的处理方法,并获得返回结果
这样一来,服务端的基本功能就完毕了!
看看主要的涉及面有哪些:
利用zk作服务注册
利用@RpcService标识远程服务类
利用Netty实现服务请求接收
利用反射实现真实服务调用
利用Protostuff实现序列化
这些应该是rpc基础的涉及点吧,代码量很少,理解起来相对容易,看来,完成rpc功能相对简单,可是考虑其余的因素就复杂了,慢慢学习理解吧!