rpc的简单实现

前文: java

1.以前公司用hessian&ws来作远程调用,因为客户机的ip常常变换,没法实时跟着做变更。 服务器

2.消息中间件服务load小,没有充分发挥用处。 网络

处理方案,利用MOM被客户机订阅的机制获取客户的id,在MOM的层面作RPC。这样作解决了上面的问题还能作到以下好处: 框架

1.很好的作到服务注册,监控等功能。 异步

2.自然的支持分布式 socket

3.自然支持异步 分布式

下面贴出最简要的代码: ide


  • 基础类
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class RpcExample {
    
        /**
         * 暴露服务
         * 
         * @param service 服务实现
         * @param port 服务端口
         * @throws Exception
         */
        public static void service(final Object service, int port) throws Exception {
            ServerSocket server = new ServerSocket(port);
            for(;;) {
                try {
                    final Socket socket = server.accept();
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                try {
                                    ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                    try {
                                        String methodName = input.readUTF();
                                        Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                        Object[] arguments = (Object[])input.readObject();
                                        ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                                        try {
                                            Method method = service.getClass().getMethod(methodName, parameterTypes);
                                            Object result = method.invoke(service, arguments);
                                            output.writeObject(result);
                                        } catch (Throwable t) {
                                            output.writeObject(t);
                                        } finally {
                                            output.close();
                                        }
                                    } finally {
                                        input.close();
                                    }
                                } finally {
                                    socket.close();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }).start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 引用服务
         * 
         * @param <T> 接口泛型
         * @param interfaceClass 接口类型
         * @param host 服务器主机名
         * @param port 服务器端口
         * @return 远程服务
         * @throws Exception
         */
        @SuppressWarnings("unchecked")
        public static <T> T reference(final Class<T> interfaceClass, final String host, final int port) throws Exception {  return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {
                public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
                    Socket socket = new Socket(host, port);
                    try {
                        ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                        try {
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(arguments);
                            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                            try {
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable) result;
                                }
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });
        }
    }

RpcExample解释: spa

1.例子中是利用socket来实现网络传输,在产品中是利用ActiveMQ来实现,只须要将invoke中的代码用queue来实现。host和port就能够不用传入了。 .net

2.固然你能够不用ActiveMQ(用这个的理由上面已经说了),甚至socket也不本身写,你能够用现成的框架作,例如mina,netty

3.想实现简单的服务治理和监控 只要在run和invoke中加入相应的代码。


  • 生产者(服务提供者)
    public class RpcProvider {
        //IService是接口,ServiceImpl是相应的实现类
        public static void main(String[] args) throws Exception {
            IService service = new ServiceImpl();
            RpcExample.service(service, 8161);
        }
    }


  • 消费者

    public class RpcConsumer {
        public static void main(String[] args) throws Exception {
            IService service = RpcExample.reference(IService.class, "127.0.0.1", 8161);
            String print = service.getString();
            System.out.println(print);
        }
    }


生产者,消费者代码解释:

1.接口和实现就不贴出来了,里面就一个方法getString(),

2.接口(IService)必须作成jar,生产者和消费者都须要引用,若是print方法中包含其余的传输对象,必须序列化和作成相应的jar。

3.固然,若是要作成跨语言的就不要作成java对象了,你能够用各类传输协议和格式来作。

4.相应的序列化手段也能够作成插入式的形式让使用者定制本身的序列化方式。

相关文章
相关标签/搜索