引言
本文利用java自带的socket编程实现了一个简单的rpc调用框架,由两个工程组成分别名为battercake-provider(服务提供者)、battercake-consumer(服务调用者)。html
设计思路以下:
一、在battercake-provider中,写一个服务叫BatterCakeServicejava
二、在battercake-provider中,启动RpcProvider,发布该服务编程
三、在battercake-consumer中,启动测试类RpcTestmarkdown
四、在battercake-consumer中,利用jdk动态代理,得到BatterCakeService的动态代理类BatterCakeService$Proxy0框架
五、在battercake-consumer中,动态代理类BatterCakeService$Proxy0,与battercake-provider创建socket链接,battercake-provider针对每个链接,都会启动一个ServerThread处理请求,代理类则发送服务参数等相关信息socket
六、在battercake-consumer中,接收battercake-provider的ServerThread请求返回的结果。ide
上述过程时序图以下所示微服务
接下来上代码!!post
服务提供者
本部分的工程为battercake-provider,项目结构图以下图所示测试
先上使用的部分的代码
先建立一个微服务,接口以下
package com.rjzheng.service; public interface BatterCakeService { /** * 卖煎饼的服务 * @param name * @return */ public String sellBatterCake(String name); }
实现类以下
package com.rjzheng.service.impl; import com.rjzheng.service.BatterCakeService; public class BatterCakeServiceImpl implements BatterCakeService { @Override public String sellBatterCake(String name) { // TODO Auto-generated method stub return name+"煎饼,卖的特别好"; } }
接下来就是发布服务
package com.rjzheng.start; import com.rjzheng.rpc.RpcProvider; import com.rjzheng.service.BatterCakeService; import com.rjzheng.service.impl.BatterCakeServiceImpl; public class RpcBootStrap { public static void main(String[] args) throws Exception { BatterCakeService batterCakeService =new BatterCakeServiceImpl(); //发布卖煎饼的服务,注册在20006端口 RpcProvider.export(20006,batterCakeService); } }
接下来是rpc框架调用部分的代码,RpcProvider,该部分代码能够总结为两步
- 将须要发布的服务存储在一个内存变量serviceList中
- 启动socket,server.accept()方法阻塞在那,监听输入
- 针对每个请求,单独启动一个线程处理
package com.rjzheng.rpc; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * RPC服务提供器 * @author zhengrongjun * */ public class RpcProvider { //存储注册的服务列表 private static List<Object> serviceList; /** * 发布rpc服务 * @param object * @param port * @throws Exception */ public static void export(int port,Object... services) throws Exception { serviceList=Arrays.asList(services); ServerSocket server = new ServerSocket(port); Socket client = null; while (true) { //阻塞等待输入 client = server.accept(); //每个请求,启动一个线程处理 new Thread(new ServerThread(client,serviceList)).start(); } } }
接下来ServerThread线程处理类的代码,ServerThread主要作如下几个步骤
- 读取客户端发送的服务名
- 判断服务是否发布
- 若是发布,则走反射逻辑,动态调用,返回结果
- 若是未发布,则返回提示通知
package com.rjzheng.rpc; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.Socket; import java.util.List; public class ServerThread implements Runnable { private Socket client = null; private List<Object> serviceList = null; public ServerThread(Socket client, List<Object> service) { this.client = client; this.serviceList = service; } @Override public void run() { ObjectInputStream input = null; ObjectOutputStream output = null; try { input = new ObjectInputStream(client.getInputStream()); output = new ObjectOutputStream(client.getOutputStream()); // 读取客户端要访问那个service Class serviceClass = (Class) input.readObject(); // 找到该服务类 Object obj = findService(serviceClass); if (obj == null) { output.writeObject(serviceClass.getName() + "服务未发现"); } else { //利用反射调用该方法,返回结果 try { String methodName = input.readUTF(); Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); Method method = obj.getClass().getMethod(methodName, parameterTypes); Object result = method.invoke(obj, arguments); output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } } } catch (Exception e) { e.printStackTrace(); } finally { try { client.close(); input.close(); output.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private Object findService(Class serviceClass) { // TODO Auto-generated method stub for (Object obj : serviceList) { boolean isFather = serviceClass.isAssignableFrom(obj.getClass()); if (isFather) { return obj; } } return null; } }
服务消费者
本部分的工程为battercake-consumer,项目结构图以下图所示
先上rpc框架调用部分的代码RpcConsumer,步骤分两步
- 封装一个代理类处理器
- 返回service的代理类对象
package com.rjzheng.rpc; import java.lang.reflect.Proxy; public class RpcConsumer { public static <T> T getService(Class<T> clazz,String ip,int port) { ProxyHandler proxyHandler =new ProxyHandler(ip,port); return (T)Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[] {clazz}, proxyHandler); } }
接下来上代理类处理器的代码,代理类处理步骤分如下几步
- 创建socket链接
- 封装请求数据,发送给服务提供者
- 返回结果
package com.rjzheng.rpc; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.net.Socket; import com.rjzheng.service.BatterCakeService; public class ProxyHandler implements InvocationHandler { private String ip; private int port; public ProxyHandler(String ip, int port) { // TODO Auto-generated constructor stub this.ip = ip; this.port = port; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // TODO Auto-generated method stub Socket socket = new Socket(this.ip, this.port); ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { output.writeObject(proxy.getClass().getInterfaces()[0]); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(args); output.flush(); Object result = input.readObject(); if(result instanceof Throwable) { throw (Throwable) result; } return result; } finally { socket.shutdownOutput(); } } }
接下来创建一个测试类RpcTest以下(跑该测试类前,记得运行在battercake-provider端的RpcBootstrap类发布BatterCakeService服务)
package com.rjzheng.start; import com.rjzheng.rpc.RpcConsumer; import com.rjzheng.service.BatterCakeService; public class RpcTest { public static void main(String[] args) { BatterCakeService batterCakeService=RpcConsumer.getService(BatterCakeService.class, "127.0.0.1", 20006); String result=batterCakeService.sellBatterCake("双蛋"); System.out.println(result); } }
输出结果以下
双蛋煎饼,卖的特别好
至此,咱们就实现了一个简易的rpc服务调用框架