https://github.com/wephone/MeiZhuoRPC/tree/1.0java
在上一博文中 跟你们讲了RPC的实现思路 思路毕竟只是思路 那么这篇就带着源码给你们讲解下实现过程当中的各个具体问题git
/**
*调用端代码及spring配置
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"file:src/test/java/rpcTest/ClientContext.xml"})
public class Client {
@Test
public void start(){
Service service= (Service) RPC.call(Service.class);
System.out.println("测试Integer,Double类型传参与返回String对象:"+service.stringMethodIntegerArgsTest(233,666.66));
//输出string233666.66
}
}
/**
*Service抽象及其实现
*调用与实现端共同依赖Service
*/
public interface Service {
String stringMethodIntegerArgsTest(Integer a,Double b);
}
/**
* ServiceImpl实现端对接口的具体实现
*/
public class ServiceImpl implements Service {
@Override
public String stringMethodIntegerArgsTest(Integer a, Double b) {
return "String"+a+b;
}
}
复制代码
1.0版本分3个包github
调用端只需如此调用 定义接口 传入接口类类型 后面调用的接口内的方法 所有是由实现端实现spring
Service service= (Service) RPC.call(Service.class);
复制代码
这句的做用其实就是生成调用端的动态代理json
/**
* 暴露调用端使用的静态方法 为抽象接口生成动态代理对象
* TODO 考虑后面优化不在使用时仍需强转
* @param cls 抽象接口的类类型
* @return 接口生成的动态代理对象
*/
public static Object call(Class cls){
RPCProxyHandler handler=new RPCProxyHandler();
Object proxyObj=Proxy.newProxyInstance(cls.getClassLoader(),new Class<?>[]{cls},handler);
return proxyObj;
}
复制代码
RPCProxyHandler为动态代理的方法被调用后的回调方法 每一个方法被调用时都会执行这个invoke数组
/**
* 代理抽象接口调用的方法
* 发送方法信息给服务端 加锁等待服务端返回
* @param proxy
* @param method
* @param args
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RPCRequest request=new RPCRequest();
request.setRequestID(buildRequestID(method.getName()));
request.setClassName(method.getDeclaringClass().getName());//返回表示声明由此 Method 对象表示的方法的类或接口的Class对象
request.setMethodName(method.getName());
// request.setParameterTypes(method.getParameterTypes());//返回形参类型
request.setParameters(args);//输入的实参
RPCRequestNet.requestLockMap.put(request.getRequestID(),request);
RPCRequestNet.connect().send(request);
//调用用结束后移除对应的condition映射关系
RPCRequestNet.requestLockMap.remove(request.getRequestID());
return request.getResult();//目标方法的返回结果
}
复制代码
也就是收集对应调用的接口的信息 而后send给实现端 那么这个requestLockMap又是做何做用的呢bash
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String responseJson= (String) msg;
RPCResponse response= (RPCResponse) RPC.responseDecode(responseJson);
synchronized (RPCRequestNet.requestLockMap.get(response.getRequestID())) {
//唤醒在该对象锁上wait的线程
RPCRequest request= (RPCRequest) RPCRequestNet.requestLockMap.get(response.getRequestID());
request.setResult(response.getResult());
request.notifyAll();
}
}
复制代码
接下来是RPCRequestNet.connect().send(request);方法 connect方法实际上是单例模式返回RPCRequestNet实例 RPCRequestNet构造方法是使用netty对实现端进行TCP连接 send方法以下服务器
try {
//判断链接是否已完成 只在链接启动时会产生阻塞
if (RPCRequestHandler.channelCtx==null){
connectlock.lock();
//挂起等待链接成功
System.out.println("正在等待链接实现端");
connectCondition.await();
connectlock.unlock();
}
//编解码对象为json 发送请求
String requestJson= null;
try {
requestJson = RPC.requestEncode(request);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes());
RPCRequestHandler.channelCtx.writeAndFlush(requestBuf);
System.out.println("调用"+request.getRequestID()+"已发送");
//挂起等待实现端处理完毕返回 TODO 后续配置超时时间
synchronized (request) {
//放弃对象锁 并阻塞等待notify
request.wait();
}
System.out.println("调用"+request.getRequestID()+"接收完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
复制代码
condition和lock一样是为了同步等待异步IO返回用的 send方法基本是编解码json后发送给实现端网络
/**
*实现端代码及spring配置
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"file:src/test/java/rpcTest/ServerContext.xml"})
public class Server {
@Test
public void start(){
//启动spring后才可启动 防止容器还没有加载完毕
RPC.start();
}
}
复制代码
出了配置spring以外 实现端就一句 RPC.start() 其实就是启动netty服务器 服务端的处理客户端信息回调以下app
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
String requestJson= (String) msg;
System.out.println("receive request:"+requestJson);
RPCRequest request= RPC.requestDeocde(requestJson);
Object result=InvokeServiceUtil.invoke(request);
//netty的write方法并无直接写入通道(为避免屡次唤醒多路复用选择器)
//而是把待发送的消息放到缓冲数组中,flush方法再所有写到通道中
// ctx.write(resp);
//记得加分隔符 否则客户端一直不会处理
RPCResponse response=new RPCResponse();
response.setRequestID(request.getRequestID());
response.setResult(result);
String respStr=RPC.responseEncode(response);
ByteBuf responseBuf= Unpooled.copiedBuffer(respStr.getBytes());
ctx.writeAndFlush(responseBuf);
}
复制代码
主要是编解码json 反射对应的方法 咱们看看反射的工具类
/**
* 反射调用相应实现类并结果
* @param request
* @return
*/
public static Object invoke(RPCRequest request){
Object result=null;//内部变量必须赋值 全局变量才不用
//实现类名
String implClassName= RPC.getServerConfig().getServerImplMap().get(request.getClassName());
try {
Class implClass=Class.forName(implClassName);
Object[] parameters=request.getParameters();
int parameterNums=request.getParameters().length;
Class[] parameterTypes=new Class[parameterNums];
for (int i = 0; i <parameterNums ; i++) {
parameterTypes[i]=parameters[i].getClass();
}
Method method=implClass.getDeclaredMethod(request.getMethodName(),parameterTypes);
Object implObj=implClass.newInstance();
result=method.invoke(implObj,parameters);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
return result;
}
复制代码
解析Parameters getClass获取他们的类类型 反射调用对应的方法
最后是借助spring配置基础配置 我写了两个类 ServerConfig ClientConfig 做为调用端和服务端的配置 只需在spring中配置这两个bean 并启动IOC容器便可
调用端
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="org.meizhuo.rpc.client.ClientConfig">
<property name="host" value="127.0.0.1"></property>
<property name="port" value="9999"></property>
</bean>
</beans>
复制代码
实现端
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="org.meizhuo.rpc.server.ServerConfig">
<property name="port" value="9999"></property>
<property name="serverImplMap">
<map>
<!--配置对应的抽象接口及其实现-->
<entry key="rpcTest.Service" value="rpcTest.ServiceImpl"></entry>
</map>
</property>
</bean>
</beans>
复制代码
咱们的框架是做为一个依赖包引入的 咱们不可能在咱们的框架中读取对应的spring xml 这样彻底是去了框架的灵活性 那咱们怎么在运行过程当中得到咱们所处于的IOC容器 已得到咱们的正确配置信息呢 答案是spring提供的ApplicationContextAware接口
/**
* Created by wephone on 17-12-26.
*/
public class ClientConfig implements ApplicationContextAware {
private String host;
private int port;
//调用超时时间
private long overtime;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public long getOvertime() {
return overtime;
}
public void setOvertime(long overtime) {
this.overtime = overtime;
}
/**
* 加载Spring配置文件时,若是Spring配置文件中所定义的Bean类
* 若是该类实现了ApplicationContextAware接口
* 那么在加载Spring配置文件时,会自动调用ApplicationContextAware接口中的
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RPC.clientContext=applicationContext;
}
}
复制代码
这样咱们在RPC类内部就维护了一个静态IOC容器的context 只需如此获取配置 RPC.getServerConfig().getPort()
public static ServerConfig getServerConfig(){
return serverContext.getBean(ServerConfig.class);
}
复制代码
本例程仅为1.0版本 后续博客中 会加入异常处理 zookeeper支持 负载均衡策略等 博客:zookeeper支持 欢迎持续关注 欢迎star 提issue