所谓RPC就是远程方法调用(Remote Process Call ),简单的来讲就是经过MQ,TCP,HTTP或者本身写的网络协议来传输我要调用对方的什么接口,对方处理以后再把结果返回给我.就这么简单的一个过程.在一个大型的项目以后基本上各模块都是分开的,以提供服务的方式进行相互调用.若是可以提供智能负载均衡,可选择的java对象编码解码协议,网络传输协议,服务监控,服务版本控制等不少功能的话就是一个SOA架构了.java
前两天实现了一个基于java Socket 实现的阻塞的RPC.其原理很是简单数组
客户端用一个TransportMessage类去包装须要调用的接口,调用的方法,调用方法的参数类型,调用方法的参数值.服务器
客户端用Socet链接服务端,序列化TransportMessage,传输给服务端.网络
服务端循环接收请求,一旦受到请求就起一个线程扔到线程池去执行,执行的内容就是反序列化TransportMessage类,在servicePool池中获取接口实现类,经过调用方法参数类型数组获取Method对象.而后经过method.invoke去调用方法.架构
服务器端序列化结果,而后经过socket传输给客户端.并发
客户端收到结果,反序列化结果对象.负载均衡
具体代码实现,(为了节省篇幅,setter,getter就不放进来了):
socket
1.远程调用信息封装 TransportMessage.java函数
/** * @author Lubby * @date 2015年4月22日 下午1:06:18 * 远程调用信息封装. * 包括 1.调用接口名称 (包名+接口名) 2.调用方法名 3.调用参数Class类型数组 4.调用接口的参数数组 */ public class TransportMessage implements Serializable { //包名+接口名称 如com.lubby.rpc.service.MathService. private String interfaceName; //调用方法名 如 getSum private String methodName; //参数类型 按照接口参数顺序 getSum(int a, int b, String name)方法就是int.class int.class String.class的数组 private Class[] paramsTypes; //参数 按照接口参数顺序 getSum(int a, int b, String name)方法就是 1,3,"Tom"的数组 private Object[] parameters; public TransportMessage() { super(); // TODO Auto-generated constructor stub } public TransportMessage(String interfaceName, String methodName, Class[] paramsTypes, Object[] parameters) { super(); this.interfaceName = interfaceName; this.methodName = methodName; this.paramsTypes = paramsTypes; this.parameters = parameters; } }
2.客户端调用远程方法类 RPCClient.java
测试
public class RPCClient { // 服务端地址 private String serverAddress; // 服务端端口 private int serverPort; // 线程池大小 private int threadPoolSize = 10; // 线程池 private ExecutorService executorService = null; public RPCClient() { super(); // TODO Auto-generated constructor stub } /** * @param serverAddress * TPC服务地址 * @param serverPort * TPC服务端口 * */ public RPCClient(String serverAddress, int serverPort) { this.serverAddress = serverAddress; this.serverPort = serverPort; executorService = Executors.newFixedThreadPool(threadPoolSize); } /** * 同步的请求和接收结果 * * @param transportMessage * @return */ public Object sendAndReceive(TransportMessage transportMessage) { Object result = null; Socket socket = null; try { socket = new Socket(serverAddress, serverPort); //反序列化 TransportMessage对象 ObjectOutputStream objectOutpusStream = new ObjectOutputStream( socket.getOutputStream()); objectOutpusStream.writeObject(transportMessage); ObjectInputStream objectInputStream = new ObjectInputStream( socket.getInputStream()); //阻塞等待读取结果并反序列化结果对象 result = objectInputStream.readObject(); socket.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); }finally{ try { //最后关闭socket socket.close(); } catch (IOException e) { e.printStackTrace(); } } return result; } }
3.服务器处理类 RPCServer.java
public class RPCServer { private int threadSize = 10; private ExecutorService threadPool; private Map<String, Object> servicePool; private int port = 4321; public RPCServer() { super(); synchronized (this) { threadPool = Executors.newFixedThreadPool(this.threadSize); } } /** * * @param threadSize * 内部处理线程池大小 * @param port * 当前TPC服务的端口号 * */ public RPCServer(int threadSize, int port) { this.threadSize = threadSize; this.port = port; synchronized (this) { threadPool = Executors.newFixedThreadPool(this.threadSize); } } /** * * * @param servicePool * 装有service对象的Map, Key为全限定接口名,Value为接口实现类对象 * @param threadSize * 内部处理线程池大小 * @param port * 当前TPC服务的端口号 * */ public RPCServer(Map<String, Object> servicePool, int threadSize, int port) { this.threadSize = threadSize; this.servicePool = servicePool; this.port = port; synchronized (this) { threadPool = Executors.newFixedThreadPool(this.threadSize); } } /** * RPC服务端处理函数 监听指定TPC端口,每次有请求过来的时候调用服务,放入线程池中处理. * * @throws IOException */ public void service() throws IOException { ServerSocket serverSocket = new ServerSocket(port); while (true) { Socket receiveSocket = serverSocket.accept(); final Socket socket = receiveSocket; threadPool.execute(new Runnable() { public void run() { try { process(socket); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (SecurityException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }); } } /** * 调用服务 经过TCP Socket返回结果对象 * * @param receiveSocket * 请求Socket * @throws IOException * @throws ClassNotFoundException * @throws NoSuchMethodException * @throws SecurityException * @throws IllegalAccessException * @throws IllegalArgumentException * @throws InvocationTargetException * @throws InstantiationException */ private void process(Socket receiveSocket) throws IOException, ClassNotFoundException, NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, InstantiationException { /* * try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO * Auto-generated catch block e.printStackTrace(); } */ ObjectInputStream objectinputStream = new ObjectInputStream( receiveSocket.getInputStream()); TransportMessage message = (TransportMessage) objectinputStream .readObject(); // 调用服务 Object result = call(message); ObjectOutputStream objectOutputStream = new ObjectOutputStream( receiveSocket.getOutputStream()); objectOutputStream.writeObject(result); objectinputStream.close(); objectOutputStream.close(); } /** * 服务处理函数 经过包名+接口名在servicePool中找到对应服务 经过调用方法参数类型数组获取Method对象 * 经过Method.invoke(对象,参数)调用对应服务 * * @return * @throws ClassNotFoundException * @throws SecurityException * @throws NoSuchMethodException * @throws InvocationTargetException * @throws IllegalArgumentException * @throws IllegalAccessException * @throws InstantiationException */ private Object call(TransportMessage message) throws ClassNotFoundException, NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, InstantiationException { if (servicePool == null) { synchronized (this) { servicePool = new HashMap<String, Object>(); } } String interfaceName = message.getInterfaceName(); Object service = servicePool.get(interfaceName); Class<?> serviceClass = Class.forName(interfaceName); // 检查servicePool中对象,若没有着生产对象 if (service == null) { synchronized (this) { service = serviceClass.newInstance(); servicePool.put(interfaceName, service); } } Method method = serviceClass.getMethod(message.getMethodName(), message.getParamsTypes()); Object result = method.invoke(service, message.getParameters()); return result; } }
4.为了方便测试写了个接口和其实现类 MathService 和 MathServiceImpl
public interface MathService { public int getSum(int a, int b, String name); }
public class MathServiceImpl implements MathService { public int getSum(int a, int b, String name) { System.out.println(name); return a + b; } }
5.服务器端测试代码
public class ServerTest { public static void main(String[] args){ Map<String,Object> servicePool = new HashMap<String, Object>(); servicePool.put("com.lubby.rpc.service.MathService", new MathServiceImpl()); RPCServer server = new RPCServer(servicePool,4, 4321); try { server.service(); } catch (IOException e) { e.printStackTrace(); } } }
6.客户端测试代码
public class ClientTest { public static void main(String[] args) { String serverAddress = "127.0.0.1"; int serverPort = 4321; final RPCClient client = new RPCClient(serverAddress, serverPort); final TransportMessage transportMessage = buildTransportMessage(); for (int i = 0; i < 1000; i++) { final int waitTime = i * 10; new Thread(new Runnable() { public void run() { Object result = client.sendAndReceive(transportMessage); System.out.println(result); } }).start(); } } private static TransportMessage buildTransportMessage() { String interfaceName = "com.lubby.rpc.service.MathService"; Class[] paramsTypes = { int.class, int.class, String.class }; Object[] parameters = { 1, 3, "Lubby" }; String methodName = "getSum"; TransportMessage transportMessage = new TransportMessage(interfaceName, methodName, paramsTypes, parameters); return transportMessage; } }
7.并发问题
因为ServerSocket是阻塞的,因此在ServerSocket.accept()方法同一时刻只能有一个线程进入,虽然以后的处理都另起一个线程,可是有瓶颈的,
我在用400个线程并发链接服务端的时候基本没问题,可是500个线程并发链接服务端的时候就会有部分线程链接不到服务器端.后面看下NIO回头用NIO来改一下.