手写RPC框架

1、RPC简介

最近看hadoop底层通讯,都是经过RPC实现的。java

RPC(Remote Procedure Call Protocol)远程调用: 远程过程调用是一种经常使用的分布式网络通讯协议,它容许运行于 一台计算机的程序调用另外一台计算机的子程序,同时将网络的通讯细节隐藏起来, 使得用户无须额外地为这个交互做用编程。分布式系统之间的通讯大都经过RPC实现编程

2、RPC请求过程

  1. client发起服务调用请求
  2. client stub代理程序将调用的方法,参数按照必定格式封装,经过服务方的地址,发起网络请求
  3. 消息经过网络发送到服务端,server stub接收到消息,进行解包,反射调用本地对应的服务
  4. 本地服务执行将结果返回给server stub,而后server stub会将结果消息打包返回到客户端
  5. client stub接收消息解码,获得最终结果.

3、RPC框架架构

要写一个RPC框架,须要哪些组成部分?bash

  1. 序列化方式。序列化主要做用是将结构化对象转为字节流以便于经过网络进行传输或写入持久存储。
  2. 远程代理对象,通常使用jdk动态代理或者cglib代理
  3. 服务暴露 设置注册中心Zookeeper
  4. 网络通讯,基于事件驱动的Reactor模式

4、RPC框架示例

  1. 服务提供者,运行在服务器端,提供服务接口定义与服务实现类
  2. 服务发布者,运行在服务器端,负责将本地服务发布成远程服务,管理远程服务,提供给服务消费者使用
  3. 服务消费者,运行在客户端,经过远程代理对象调用远程服务

服务端代码

服务接口:服务器

//计算学生年龄和的接口
public interface CalculateService {
    String cal(Student sta, Student stb);
}

public class CalculateServiceImpl implements CalculateService {
    @Override
    public String cal(Student sta, Student stb) {
        return "学生年龄之和:" + (sta.getAge() + stb.getAge());
    }
}
复制代码

服务发布网络

public class PublishUtilI {
    //服务接口集合
    private static List<Object> serviceList;
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,10, TimeUnit.SECONDS,
                                                    new LinkedBlockingQueue<Runnable>(10));

    public static void publish(int port,Object... services) throws IOException {
        serviceList= Arrays.asList(services);
        ServerSocket server = new ServerSocket(port);
        Socket client;
        while (true) {
            //阻塞等待请求
            client = server.accept();
            //使用线程池处理请求
            executor.submit(new ServerHandler(client, serviceList));
        }

    }
}
复制代码

反射调用服务架构

  1. 读取客户端发送的服务名
  2. 判断服务是否发布
  3. 若是发布,反射调用服务端对应服务
  4. 返回结果给客户端
public class ServerHandler implements Runnable {
    private Socket client = null;

    private List<Object> serviceList = null;

    public ServerHandler(Socket client, List<Object> service) {
        this.client = client;
        this.serviceList = service;
    }

    @Override
    public void run() {
        try (
                ObjectInputStream input = new ObjectInputStream(client.getInputStream());
                ObjectOutputStream output = new ObjectOutputStream(client.getOutputStream())
        ) {
            // 读取客户端要访问那个service
            Class serviceClass = (Class) input.readObject();

            // 找到该服务类
            Object obj = findService(serviceClass);
            if (obj == null) {
                output.writeObject(serviceClass.getName() + "服务未发现");
            } else {
                //利用反射调用该方法,返回结果
                String methodName = input.readUTF(); //读取UTF编码的String字符串
                //读取参数类型
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                //读取参数
                Object[] arguments = (Object[]) input.readObject();
                Method method = obj.getClass().getMethod(methodName, parameterTypes);
                //反射执行方法
                Object result = method.invoke(obj, arguments);
                output.writeObject(result);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private Object findService(Class serviceClass)  {
        for (Object obj : serviceList) {
            boolean isFather = serviceClass.isAssignableFrom(obj.getClass());
            if (isFather) {
                return obj;
            }
        }
        return null;
    }
}
复制代码

客户端代码


public class Client {
    public static void main(String[] args) {
        CallProxyHandler handler = new CallProxyHandler("127.0.0.1", 1111);
        CalculateService calculateService = handler.getService(CalculateService.class);
        Student sta = new Student(1);
        Student stb = new Student(2);
        String result = calculateService.cal(sta, stb);
        System.out.println(result);
    }
}
复制代码

建立代理类远程调用服务端发布的服务并发

public class CallProxyHandler implements InvocationHandler {

    private String ip;
    private int port;

    public CallProxyHandler(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    /**
     * 获取代理对象
     * @param clazz
     * @param <T>
     * @return
     */
    @SuppressWarnings("all")
    public <T> T getService(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(CallProxyHandler.class.getClassLoader(),
                new Class<?>[] {clazz}, this);
    }
    
    /**
     * 将须要调用服务的方法名,参数类型,参数按照必定格式封装发送至服务端
     * 读取服务端返回的结果
     * @param proxy
     * @param method
     * @param args
     * @return
     * @throws Throwable
     */
    @SuppressWarnings("all")
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        try (
                Socket socket = new Socket(ip, port);
                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                ObjectInputStream input = new ObjectInputStream(socket.getInputStream())
        ) {
            output.writeObject(proxy.getClass().getInterfaces()[0]);
            output.writeUTF(method.getName());
            output.writeObject(method.getParameterTypes());
            output.writeObject(args);
            output.flush();
            Object result = input.readObject();
            if (result instanceof Throwable) {
                throw (Throwable) result;
            }
            return result;
        }
    }
}
复制代码

至此,一个简单的RPC服务调用框架完成。可是存在不少问题:框架

  1. 使用java自带的序列化,效率不高,可使用Hadoop Avro与protobuf
  2. 使用BIO方式进行网络传输,高并发状况没法应对,使用Netty框架进行网络通讯
  3. 缺乏注册中心,服务注册可使用Zookeeper进行管理。
相关文章
相关标签/搜索