RPC(Remote Procedure Call)—远程过程调用协议,它是一种经过网络从远程计算机程序上请求服务,而不须要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通讯程序之间携带信息数据。在OSI网络通讯模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。git
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,而后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器得到进程参数,计算结果,发送答复信息,而后等待下一个调用信息,最后,客户端调用进程接收答复信息,得到进程结果,而后调用执行继续进行。数组
1. Client端获取一个 RPC 代理对象 proxy安全
2. 调用 proxy 上的方法, 被 InvocationHandler 实现类 Invoker 的 invoke() 方法捕获服务器
3. invoke() 方法内将 RPC 请求封装成 Invocation 实例, 再向 Server 发送 RPC请求网络
4. Server端循环接收 RPC请求, 对每个请求都建立一个 Handler线程处理并发
5. Handler线程从输入流中反序列化出 Invocation实例, 再调用 Server端的实现方法分布式
6. 调用结束, 向 Client端返回调用结果ide
InvocationHandler 的实现类高并发
/** * InvocationHandler 接口的实现类 <br> * Client端代理对象的方法调用都会被 Invoker 的 invoke() 方法捕获 */ public class Invoker implements InvocationHandler { /** RPC协议接口的 Class对象 */ private Class<?> intface; /** Client 端 Socket */ private Socket client; /** 用于向 Server端发送 RPC请求的输出流 */ private ObjectOutputStream oos; /** 用于接收 Server端返回的 RPC请求结果的输入流 */ private ObjectInputStream ois; /** * 构造一个 Socket实例 client, 并链接到指定的 Server端地址, 端口 * * @param intface * RPC协议接口的 Class对象 * @param serverAdd * Server端地址 * @param serverPort * Server端监听的端口 */ public Invoker(Class<?> intface, String serverAdd, int serverPort) throws UnknownHostException, IOException { this.intface = intface; client = new Socket(serverAdd, serverPort); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { try { // 封装 RPC请求 Invocation invocation = new Invocation(intface, method.getName(), method.getParameterTypes(), args); // 打开 client 的输出流 oos = new ObjectOutputStream(client.getOutputStream()); // 序列化, 将 RPC请求写入到 client 的输出流中 oos.writeObject(invocation); oos.flush(); // 等待 Server端返回 RPC请求结果 // // 打开 client 的输入流 ois = new ObjectInputStream(client.getInputStream()); // 反序列化, 从输入流中读取 RPC请求结果 Object res = ois.readObject(); // 向 client 返回 RPC请求结果 return res; } finally { // 关闭资源 CloseUtil.closeAll(ois, oos); CloseUtil.closeAll(client); } } }
Serializable 的实现类, RPC请求的封装
/** * RPC调用的封装, 包括如下字段: <br> * methodName: 方法名 <br> * parameterTypes: 方法参数列表的 Class 对象数组 <br> * params: 方法参数列表 */ @SuppressWarnings("rawtypes") public class Invocation implements Serializable { private static final long serialVersionUID = -7311316339835834851L; /** RPC协议接口的 Class对象 */ private Class<?> intface; /** 方法名 */ private String methodName; /** 方法参数列表的 Class 对象数组 */ private Class[] parameterTypes; /** 方法的参数列表 */ private Object[] params; public Invocation() { } /** * 构造一个 RPC请求的封装 * * @param intface * RPC协议接口的 Class对象 * @param methodName * 方法名 * @param parameterTypes * 方法参数列表的 Class 对象数组 * @param params * 方法的参数列表 */ public Invocation(Class intface, String methodName, Class[] parameterTypes, Object[] params) { this.intface = intface; this.methodName = methodName; this.parameterTypes = parameterTypes; this.params = params; } public Class getIntface() { return intface; } public String getMethodName() { return methodName; } public Class[] getParameterTypes() { return parameterTypes; } public Object[] getParams() { return params; } }
构造 Client端代理对象, Server端实例
/** * 一个构造 Server 端实例与 Client 端代理对象的类 */ public class RPC { /** * 获取一个 Client 端的代理对象 * * @param intface * RPC协议接口, Client 与 Server 端共同遵照 * @param serverAdd * Server 端地址 * @param serverPort * Server 端监听的端口 * @return Client 端的代理对象 */ public static <T> Object getProxy(final Class<T> intface, String serverAdd, int serverPort) throws UnknownHostException, IOException { Object proxy = Proxy.newProxyInstance(intface.getClassLoader(), new Class[] { intface }, new Invoker(intface, serverAdd, serverPort)); return proxy; } /** * 获取 RPC 的 Server 端实例 * * @param intface * RPC协议接口 * @param intfaceImpl * Server 端 RPC协议接口的实现 * @param port * Server 端监听的端口 * @return RPCServer 实例 */ public static <T> RPCServer getRPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException { return new RPCServer(intface, intfaceImpl, port); } }
Server端接收 RPC请求, 处理请求
/** * RPC 的 Server端 */ public class RPCServer { /** Server端的 ServerSocket实例 */ private ServerSocket server; /** Server端 RPC协议接口的实现缓存, 一个接口对应一个实现类的实例 */ private static Map<Class<?>, Object> intfaceImpls = new HashMap<Class<?>, Object>(); /** * 构造一个 RPC 的 Server端实例 * * @param intface * RPC协议接口的 Class对象 * @param intfaceImpl * Server端 RPC协议接口的实现 * @param port * Server端监听的端口 */ public <T> RPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException { server = new ServerSocket(port); RPCServer.intfaceImpls.put(intface, intfaceImpl); } /** * 循环监听并接收 Client端链接, 处理 RPC请求, 向 Client端返回结果 */ public void start() { try { while (true) { // 接收 Client端链接, 建立一个 Handler线程, 处理 RPC请求 new Handler(server.accept()).start(); } } catch (IOException e) { e.printStackTrace(); } finally { // 关闭资源 CloseUtil.closeAll(server); } } /** * 向 RPC协议接口的实现缓存中添加缓存 * * @param intface * RPC协议接口的 Class对象 * @param intfaceImpl * Server端 RPC协议接口的实现 */ public static <T> void addIntfaceImpl(Class<T> intface, T intfaceImpl) { RPCServer.intfaceImpls.put(intface, intfaceImpl); } /** * 处理 RPC请求的线程类 */ private static class Handler extends Thread { /** Server端接收到的 Client端链接 */ private Socket client; /** 用于接收 client 的 RPC请求的输入流 */ private ObjectInputStream ois; /** 用于向 client 返回 RPC请求结果的输出流 */ private ObjectOutputStream oos; /** RPC请求的封装 */ private Invocation invocation; /** * 用 Client端链接构造 Handler线程 * * @param client */ public Handler(Socket client) { this.client = client; } @Override public void run() { try { // 打开 client 的输入流 ois = new ObjectInputStream(client.getInputStream()); // 反序列化, 从输入流中读取 RPC请求的封装 invocation = (Invocation) ois.readObject(); // 从 RPC协议接口的实现缓存中获取实现 Object intfaceImpl = intfaceImpls.get(invocation.getIntface()); // 获取 Server端 RPC协议接口的方法实现 Method method = intfaceImpl.getClass().getMethod(invocation.getMethodName(), invocation.getParameterTypes()); // 跳过安全检查 method.setAccessible(true); // 调用具体的实现方法, 用 res 接收方法返回结果 Object res = method.invoke(intfaceImpl, invocation.getParams()); // 打开 client 的输出流 oos = new ObjectOutputStream(client.getOutputStream()); // 序列化, 向输出流中写入 RPC请求的结果 oos.writeObject(res); oos.flush(); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭资源 CloseUtil.closeAll(ois, oos); CloseUtil.closeAll(client); } } } }
Login类, RPC协议接口
/** * RPC协议接口, Client 与 Server端共同遵照 */ public interface Login { /** * 抽象方法 login(), 模拟用户登陆传入两个String 类型的参数, 返回 String类型的结果 * * @param username * 用户名 * @param password * 密码 * @return 返回登陆结果 */ public String login(String username, String password); }
LoginImpl类, Server 端 RPC协议接口( Login )的实现类
/** * Server端 RPC协议接口( Login )的实现类 */ public class LoginImpl implements Login { /** * 实现 login()方法, 模拟用户登陆 * * @param username * 用户名 * @param password * 密码 * @return hello 用户名 */ @Override public String login(String username, String password) { return "hello " + username; } }
ClientTest类, Client端测试类
/** * Client端测试类 */ public class ClientTest { public static void main(String[] args) throws UnknownHostException, IOException { // 获取一个 Client端的代理对象 proxy Login proxy = (Login) RPC.getProxy(Login.class, "192.168.8.1", 8888); // 调用 proxy 的 login() 方法, 返回值为 res String res = proxy.login("rpc", "password"); // 输出 res System.out.println(res); } }
ServerTest类, Server端测试类
/** * Server端测试类 */ public class ServerTest { public static void main(String[] args) throws ClassNotFoundException, IOException { // 获取 RPC 的 Server 端实例 server RPCServer server = RPC.getRPCServer(Login.class, new LoginImpl(), 8888); // 循环监听并接收 Client 端链接, 处理 RPC 请求, 向 Client 端返回结果 server.start(); } }
运行 ServerTest, 控制台输出:
Starting Socket Handler for port 8888
运行 ClientTest, 控制台输出:
hello rpc
至此, 实现了基于 Proxy, Socket, IO 的简单版 RPC模型,
对于每个 RPC请求, Server端都开启一个 Handler线程处理该请求,
在高并发状况下, Server端是扛不住的, 改用 NIO应该表现更好