本身动手实现RPC服务调用框架

引言

本文利用java自带的socket编程实现了一个简单的rpc调用框架,由两个工程组成分别名为battercake-provider(服务提供者)、battercake-consumer(服务调用者)。java

设计思路以下:
一、在battercake-provider中,写一个服务叫BatterCakeService编程

二、在battercake-provider中,启动RpcProvider,发布该服务app

三、在battercake-consumer中,启动测试类RpcTest框架

四、在battercake-consumer中,利用jdk动态代理,得到BatterCakeService的动态代理类BatterCakeService$Proxy0socket

五、在battercake-consumer中,动态代理类BatterCakeService$Proxy0,与battercake-provider创建socket链接,battercake-provider针对每个链接,都会启动一个ServerThread处理请求,代理类则发送服务参数等相关信息ide

六、在battercake-consumer中,接收battercake-provider的ServerThread请求返回的结果。微服务

上述过程时序图以下所示测试

image

接下来上代码!!
this

服务提供者

本部分的工程为battercake-provider,项目结构图以下图所示spa

image

先上使用的部分的代码
先建立一个微服务,接口以下

package com.rjzheng.service;public interface BatterCakeService {	/**
	 * 卖煎饼的服务
	 * @param name
	 * @return
	 */
	public String sellBatterCake(String name);
}

实现类以下

package com.rjzheng.service.impl;import com.rjzheng.service.BatterCakeService;public class BatterCakeServiceImpl implements BatterCakeService {	@Override
	public String sellBatterCake(String name) {		// TODO Auto-generated method stub
		return name+"煎饼,卖的特别好";
	}

}

接下来就是发布服务

package com.rjzheng.start;import com.rjzheng.rpc.RpcProvider;import com.rjzheng.service.BatterCakeService;import com.rjzheng.service.impl.BatterCakeServiceImpl;public class RpcBootStrap {	public static void main(String[] args) throws Exception {
		BatterCakeService batterCakeService =new BatterCakeServiceImpl();		//发布卖煎饼的服务,注册在20006端口
		RpcProvider.export(20006,batterCakeService);
	}
}

接下来是rpc框架调用部分的代码,RpcProvider,该部分代码能够总结为两步

  1. 将须要发布的服务存储在一个内存变量serviceList中

  2. 启动socket,server.accept()方法阻塞在那,监听输入

  3. 针对每个请求,单独启动一个线程处理

package com.rjzheng.rpc;import java.net.ServerSocket;import java.net.Socket;import java.util.ArrayList;import java.util.Arrays;import java.util.List;/**
 * RPC服务提供器
 * @author zhengrongjun
 *
 */public class RpcProvider {	
	//存储注册的服务列表
	private static List<Object> serviceList;	
	/**
	 * 发布rpc服务
	 * @param object
	 * @param port
	 * @throws Exception
	 */
	public static void export(int port,Object... services) throws Exception {
		serviceList=Arrays.asList(services);
		ServerSocket server = new ServerSocket(port);
		Socket client = null;		while (true) {			//阻塞等待输入
			client = server.accept();			//每个请求,启动一个线程处理
			new Thread(new ServerThread(client,serviceList)).start();
		}
	}
}

接下来ServerThread线程处理类的代码,ServerThread主要作如下几个步骤

  1. 读取客户端发送的服务名

  2. 判断服务是否发布

  3. 若是发布,则走反射逻辑,动态调用,返回结果

  4. 若是未发布,则返回提示通知

package com.rjzheng.rpc;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.Method;import java.net.Socket;import java.util.List;public class ServerThread implements Runnable {	private Socket client = null;	private List<Object> serviceList = null;	public ServerThread(Socket client, List<Object> service) {		this.client = client;		this.serviceList = service;
	}	@Override
	public void run() {
		ObjectInputStream input = null;
		ObjectOutputStream output = null;		try {
			input = new ObjectInputStream(client.getInputStream());
			output = new ObjectOutputStream(client.getOutputStream());			// 读取客户端要访问那个service
			Class serviceClass = (Class) input.readObject();			// 找到该服务类
			Object obj = findService(serviceClass);			if (obj == null) {
				output.writeObject(serviceClass.getName() + "服务未发现");
			} else {				//利用反射调用该方法,返回结果
				try {
					String methodName = input.readUTF();
					Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
					Object[] arguments = (Object[]) input.readObject();
					Method method = obj.getClass().getMethod(methodName, parameterTypes);  
                    Object result = method.invoke(obj, arguments);  
                    output.writeObject(result); 
				} catch (Throwable t) {
					output.writeObject(t);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {			try {
				client.close();
				input.close();
				output.close();
			} catch (IOException e) {				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}

	}	private Object findService(Class serviceClass) {		// TODO Auto-generated method stub
		for (Object obj : serviceList) {			boolean isFather = serviceClass.isAssignableFrom(obj.getClass());			if (isFather) {				return obj;
			}
		}		return null;
	}

}

服务消费者

本部分的工程为battercake-consumer,项目结构图以下图所示
image

先上rpc框架调用部分的代码RpcConsumer,步骤分两步

  1. 封装一个代理类处理器

  2. 返回service的代理类对象

package com.rjzheng.rpc;import java.lang.reflect.Proxy;public class RpcConsumer {	
	public static <T> T getService(Class<T> clazz,String ip,int port) {
		ProxyHandler proxyHandler =new ProxyHandler(ip,port);		return (T)Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[] {clazz}, proxyHandler);
	}
}

接下来上代理类处理器的代码,代理类处理步骤分如下几步

  1. 创建socket链接

  2. 封装请求数据,发送给服务提供者

  3. 返回结果

package com.rjzheng.rpc;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.net.Socket;import com.rjzheng.service.BatterCakeService;public class ProxyHandler implements InvocationHandler {	private String ip;	private int port;	public ProxyHandler(String ip, int port) {		// TODO Auto-generated constructor stub
		this.ip = ip;		this.port = port;
	}	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {		// TODO Auto-generated method stub
		Socket socket = new Socket(this.ip, this.port);
		ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
		ObjectInputStream input = new ObjectInputStream(socket.getInputStream());		try {
			output.writeObject(proxy.getClass().getInterfaces()[0]);
			output.writeUTF(method.getName());
			output.writeObject(method.getParameterTypes());
			output.writeObject(args);
			output.flush();
			Object result = input.readObject();			if(result instanceof Throwable) {				throw (Throwable) result;
			}				return result;
		} finally {
			socket.shutdownOutput();
		}
	}

}

接下来创建一个测试类RpcTest以下(跑该测试类前,记得运行在battercake-provider端的RpcBootstrap类发布BatterCakeService服务)

package com.rjzheng.start;import com.rjzheng.rpc.RpcConsumer;import com.rjzheng.service.BatterCakeService;public class RpcTest {	public static void main(String[] args) {
		BatterCakeService batterCakeService=RpcConsumer.getService(BatterCakeService.class, "127.0.0.1", 20006);
		String result=batterCakeService.sellBatterCake("双蛋");
		System.out.println(result);
	}
}

输出结果以下

双蛋煎饼,卖的特别好

至此,咱们就实现了一个简易的rpc服务调用框架

相关文章
相关标签/搜索