基于TCP的RPC简单实现

所谓RPC就是远程方法调用(Remote  Process Call ),简单的来讲就是经过MQ,TCP,HTTP或者本身写的网络协议来传输我要调用对方的什么接口,对方处理以后再把结果返回给我.就这么简单的一个过程.在一个大型的项目以后基本上各模块都是分开的,以提供服务的方式进行相互调用.若是可以提供智能负载均衡,可选择的java对象编码解码协议,网络传输协议,服务监控,服务版本控制等不少功能的话就是一个SOA架构了.java


前两天实现了一个基于java Socket 实现的阻塞的RPC.其原理很是简单数组

  1. 客户端用一个TransportMessage类去包装须要调用的接口,调用的方法,调用方法的参数类型,调用方法的参数值.服务器

  2. 客户端用Socet链接服务端,序列化TransportMessage,传输给服务端.网络

  3. 服务端循环接收请求,一旦受到请求就起一个线程扔到线程池去执行,执行的内容就是反序列化TransportMessage类,在servicePool池中获取接口实现类,经过调用方法参数类型数组获取Method对象.而后经过method.invoke去调用方法.架构

  4. 服务器端序列化结果,而后经过socket传输给客户端.并发

  5. 客户端收到结果,反序列化结果对象.负载均衡


具体代码实现,(为了节省篇幅,setter,getter就不放进来了):
socket

1.远程调用信息封装   TransportMessage.java函数

/**
 * @author Lubby
 * @date 2015年4月22日 下午1:06:18
 *	远程调用信息封装.
 *	包括    1.调用接口名称  (包名+接口名)   2.调用方法名  3.调用参数Class类型数组  4.调用接口的参数数组
 */
public class TransportMessage implements Serializable {
	//包名+接口名称  如com.lubby.rpc.service.MathService.
	private String interfaceName;
	//调用方法名   如 getSum
	private String methodName;
	//参数类型 按照接口参数顺序  getSum(int a, int b, String name)方法就是int.class int.class String.class的数组
	private Class[] paramsTypes;
	//参数 按照接口参数顺序 getSum(int a, int b, String name)方法就是 1,3,"Tom"的数组
	private Object[] parameters;

	public TransportMessage() {
		super();
		// TODO Auto-generated constructor stub
	}

	public TransportMessage(String interfaceName, String methodName,
			Class[] paramsTypes, Object[] parameters) {
		super();
		this.interfaceName = interfaceName;
		this.methodName = methodName;
		this.paramsTypes = paramsTypes;
		this.parameters = parameters;
	}

}

2.客户端调用远程方法类 RPCClient.java
测试

public class RPCClient {
	// 服务端地址
	private String serverAddress;
	// 服务端端口
	private int serverPort;
	// 线程池大小
	private int threadPoolSize = 10;
	// 线程池
	private ExecutorService executorService = null;

	public RPCClient() {
		super();
		// TODO Auto-generated constructor stub
	}

	/**
	 * @param serverAddress
	 *            TPC服务地址
	 * @param serverPort
	 *            TPC服务端口
	 * 
	 */
	public RPCClient(String serverAddress, int serverPort) {
		this.serverAddress = serverAddress;
		this.serverPort = serverPort;
		executorService = Executors.newFixedThreadPool(threadPoolSize);
	}

	/**
	 * 同步的请求和接收结果
	 * 
	 * @param transportMessage
	 * @return
	 */
	public Object sendAndReceive(TransportMessage transportMessage) {
		Object result = null;
		Socket socket = null;
		try {
			 socket = new Socket(serverAddress, serverPort);
			 
			 //反序列化 TransportMessage对象
			ObjectOutputStream objectOutpusStream = new ObjectOutputStream(
					socket.getOutputStream());
			objectOutpusStream.writeObject(transportMessage);

			ObjectInputStream objectInputStream = new ObjectInputStream(
					socket.getInputStream());
			//阻塞等待读取结果并反序列化结果对象
			result = objectInputStream.readObject();
			socket.close();
		} catch (UnknownHostException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}finally{
			try {
				//最后关闭socket
				socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return result;
	}
}

3.服务器处理类 RPCServer.java

public class RPCServer {
	private int threadSize = 10;
	private ExecutorService threadPool;
	private Map<String, Object> servicePool;
	private int port = 4321;

	public RPCServer() {
		super();
		synchronized (this) {
			threadPool = Executors.newFixedThreadPool(this.threadSize);
		}
	}

	/**
	 * 
	 * @param threadSize
	 *            内部处理线程池大小
	 * @param port
	 *            当前TPC服务的端口号
	 * 
	 */

	public RPCServer(int threadSize, int port) {
		this.threadSize = threadSize;
		this.port = port;
		synchronized (this) {
			threadPool = Executors.newFixedThreadPool(this.threadSize);
		}
	}

	/**
	 * 
	 * 
	 * @param servicePool
	 *            装有service对象的Map, Key为全限定接口名,Value为接口实现类对象
	 * @param threadSize
	 *            内部处理线程池大小
	 * @param port
	 *            当前TPC服务的端口号
	 * 
	 */
	public RPCServer(Map<String, Object> servicePool, int threadSize, int port) {
		this.threadSize = threadSize;
		this.servicePool = servicePool;
		this.port = port;
		synchronized (this) {
			threadPool = Executors.newFixedThreadPool(this.threadSize);
		}
	}

	/**
	 * RPC服务端处理函数 监听指定TPC端口,每次有请求过来的时候调用服务,放入线程池中处理.
	 * 
	 * @throws IOException
	 */
	public void service() throws IOException {
		ServerSocket serverSocket = new ServerSocket(port);
		while (true) {
			Socket receiveSocket = serverSocket.accept();
			final Socket socket = receiveSocket;
			threadPool.execute(new Runnable() {

				public void run() {
					try {
						process(socket);

					} catch (ClassNotFoundException e) {
						e.printStackTrace();
					} catch (NoSuchMethodException e) {
						e.printStackTrace();
					} catch (SecurityException e) {
						e.printStackTrace();
					} catch (IllegalAccessException e) {
						e.printStackTrace();
					} catch (IllegalArgumentException e) {
						e.printStackTrace();
					} catch (InvocationTargetException e) {
						e.printStackTrace();
					} catch (InstantiationException e) {
						e.printStackTrace();
					} catch (IOException e) {
						e.printStackTrace();
					} finally {
						try {
							socket.close();
						} catch (IOException e) {
							e.printStackTrace();
						}
					}

				}
			});
		}

	}

	/**
	 * 调用服务 经过TCP Socket返回结果对象
	 * 
	 * @param receiveSocket
	 *            请求Socket
	 * @throws IOException
	 * @throws ClassNotFoundException
	 * @throws NoSuchMethodException
	 * @throws SecurityException
	 * @throws IllegalAccessException
	 * @throws IllegalArgumentException
	 * @throws InvocationTargetException
	 * @throws InstantiationException
	 */
	private void process(Socket receiveSocket) throws IOException,
			ClassNotFoundException, NoSuchMethodException, SecurityException,
			IllegalAccessException, IllegalArgumentException,
			InvocationTargetException, InstantiationException {

		/*
		 * try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO
		 * Auto-generated catch block e.printStackTrace(); }
		 */
		ObjectInputStream objectinputStream = new ObjectInputStream(
				receiveSocket.getInputStream());
		TransportMessage message = (TransportMessage) objectinputStream
				.readObject();

		// 调用服务
		Object result = call(message);

		ObjectOutputStream objectOutputStream = new ObjectOutputStream(
				receiveSocket.getOutputStream());
		objectOutputStream.writeObject(result);
		objectinputStream.close();
		objectOutputStream.close();
	}

	/**
	 * 服务处理函数 经过包名+接口名在servicePool中找到对应服务 经过调用方法参数类型数组获取Method对象
	 * 经过Method.invoke(对象,参数)调用对应服务
	 * 
	 * @return
	 * @throws ClassNotFoundException
	 * @throws SecurityException
	 * @throws NoSuchMethodException
	 * @throws InvocationTargetException
	 * @throws IllegalArgumentException
	 * @throws IllegalAccessException
	 * @throws InstantiationException
	 */
	private Object call(TransportMessage message)
			throws ClassNotFoundException, NoSuchMethodException,
			SecurityException, IllegalAccessException,
			IllegalArgumentException, InvocationTargetException,
			InstantiationException {
		if (servicePool == null) {
			synchronized (this) {
				servicePool = new HashMap<String, Object>();
			}
		}
		String interfaceName = message.getInterfaceName();
		Object service = servicePool.get(interfaceName);
		Class<?> serviceClass = Class.forName(interfaceName);
		// 检查servicePool中对象,若没有着生产对象
		if (service == null) {
			synchronized (this) {
				service = serviceClass.newInstance();
				servicePool.put(interfaceName, service);
			}
		}
		Method method = serviceClass.getMethod(message.getMethodName(),
				message.getParamsTypes());
		Object result = method.invoke(service, message.getParameters());
		return result;

	}
}

4.为了方便测试写了个接口和其实现类 MathService 和 MathServiceImpl

public interface MathService {
	public int getSum(int a, int b, String name);
}
public class MathServiceImpl implements MathService {
	public int getSum(int a, int b, String name) {
		System.out.println(name);
		return a + b;
	}

}

5.服务器端测试代码

public class ServerTest {
	
	public static void main(String[] args){
		Map<String,Object> servicePool = new  HashMap<String, Object>();
		servicePool.put("com.lubby.rpc.service.MathService", new MathServiceImpl());
		RPCServer server = new RPCServer(servicePool,4, 4321);
		try {
			server.service();
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}

}

6.客户端测试代码

public class ClientTest {
	public static void main(String[] args) {
		String serverAddress = "127.0.0.1";
		int serverPort = 4321;
		
		final RPCClient client = new RPCClient(serverAddress, serverPort);
		final TransportMessage transportMessage = buildTransportMessage();
		
		for (int i = 0; i < 1000; i++) {
			final int waitTime = i * 10;
			new Thread(new Runnable() {
				public void run() {
					Object result = client.sendAndReceive(transportMessage);
					System.out.println(result);
				}
			}).start();
		}
	}

	private static TransportMessage buildTransportMessage() {

		String interfaceName = "com.lubby.rpc.service.MathService";
		Class[] paramsTypes = { int.class, int.class, String.class };
		Object[] parameters = { 1, 3, "Lubby" };
		String methodName = "getSum";

		TransportMessage transportMessage = new TransportMessage(interfaceName,
				methodName, paramsTypes, parameters);

		return transportMessage;
	}

}


7.并发问题

因为ServerSocket是阻塞的,因此在ServerSocket.accept()方法同一时刻只能有一个线程进入,虽然以后的处理都另起一个线程,可是有瓶颈的,

我在用400个线程并发链接服务端的时候基本没问题,可是500个线程并发链接服务端的时候就会有部分线程链接不到服务器端.后面看下NIO回头用NIO来改一下.

相关文章
相关标签/搜索