简易RPC框架实现

写在最前面

PRC(Remote Procedure Call) 远程过程调用。通俗的讲就是程序经过RPC框架调用远程主机的方法就如同调用本地方法同样。Dubbo就是这样一个Rpc框架,本文主要参考Dubbo的设计思路,简易实现了Rpc框架。 本文涉及到知识点包括:java

  • Jdk 动态代理
  • serialization 序列化
  • Netty 相关
  • Zookeeper 使用

一、Rpc框架

Rpc 框架通常分为三个部分,Registry(注册中心)、Provider(提供者)、Consumer(消费者)。git

  1. Registry 服务的注册中心,能够经过zookeeper、redis等实现。
  2. Provider 服务提供者被调用方,提供服务供消费者调用
  3. Consumer 消费者,经过订阅相应的服务获取须要调用服务的ip和端口号调用远程provider提供的服务。

二、代理

java中常见的代理有JDK动态代理、Cglib动态代理、静态代理(ASM等字节码技术)。github

2.一、JDK 代理

举个例子redis

@Override
   @SuppressWarnings("unchecked")
   public <T> T createProxyBean(Class<T> rpcServiceInterface) {
       return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{rpcServiceInterface}, new AryaRpcInvocationHandler());
   }
复制代码

JDK代理生成代理对象主要经过java.lang.reflect.Proxy类的newProxyInstance方法。JDK代理须要被代理对象必须实现接口。bootstrap

2.二、Cglib

Cglib其实是对ASM的易用性封装,Cglib不须要目标对象必须实现某一个接口,相对JDK动态代理更加灵活。bash

Enhancer en = new Enhancer();  
       en.setSuperclass(clazz);  
       en.setCallback(new MethodInterceptor() {  
           @Override  
           public Object intercept(Object arg0, Method method, Object[] args, MethodProxy arg3) throws Throwable {  
               Object o = method.invoke(object, args);  
               return o;  
           }  
       });  
       return en.create();
复制代码

2.三、静态代理

经过字节码技术对class文件进行修改,使用和学习成本相对较高,须要对Class的文件结构以及各类符号引用有比较深的认识,才能较好的使用,由于是对字节码的修改因此相对的性能上也比动态代理要好一些。服务器

三、序列化

咱们知道数据在网络上传输都是经过二进制流的形式进行进行的。当Consumer调用Provider时传输的参数须要先进行序列化,provider接收到参数时须要进行反序列化才能拿到须要的参数数据,因此序列化的性能对RPC的调用性能有很大的影响。目前主流的序列化方式有不少包括:Kryo、Protostuff、hessian。等网络

Protostuff是google序列化Protosbuff的开源实现,项目中咱们用到它的序列化方式app

/** * @author HODO */
public class ProtostuffSerializer implements Serializer {

   @Override
   public byte[] serialize(Object object) {
       Class targetClass = object.getClass();
       RuntimeSchema schema = RuntimeSchema.createFrom(targetClass);
       LinkedBuffer linkedBuffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
       return ProtostuffIOUtil.toByteArray(object, schema, linkedBuffer);
   }

   @SuppressWarnings("unchecked")
   @Override
   public <T> T deserialize(byte[] bytes, Class<T> targetClass) {
       RuntimeSchema schema = RuntimeSchema.createFrom(targetClass);
       T object = (T) schema.newMessage();
       ProtostuffIOUtil.mergeFrom(bytes, object, schema);
       return object;
   }
}

复制代码

四、Netty

Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持。举个例子: Netty 服务端代码框架

public class NettyServer {

   private ApplicationContext applicationContext;

   public NettyServer(ApplicationContext applicationContext) {
       this.applicationContext = applicationContext;
   }

   public void init(int port) {
       EventLoopGroup boss = new NioEventLoopGroup();
       EventLoopGroup worker = new NioEventLoopGroup();

       try {
           ServerBootstrap bootstrap = new ServerBootstrap();
           bootstrap.group(boss, worker);
           bootstrap.channel(NioServerSocketChannel.class);
           bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
           bootstrap.option(ChannelOption.TCP_NODELAY, true);
           bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
           bootstrap.localAddress(port);
           bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
               @Override
               public void initChannel(SocketChannel socketChannel) throws Exception {
                   ChannelPipeline channelPipeline = socketChannel.pipeline();
                   channelPipeline.addLast(new NettyServerHandler(applicationContext));
               }
           });
           ChannelFuture f = bootstrap.bind().sync();
           if (f.isSuccess()) {
               System.out.println("Netty端口号:" + port);
           }
           f.channel().closeFuture().sync();
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           boss.shutdownGracefully();
           worker.shutdownGracefully();
       }
   }

}
复制代码

Netty 客服端代码

public class NettyClient {

    private int port;
    private String host;

    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    SerializerFactory serializerFactory = new SerializerFactory();
    Serializer serializer = serializerFactory.getSerialize(ProtostuffSerializer.class);


    public NettyClient(String host, int port) {
        this.port = port;
        this.host = host;
    }

    public NettyClient(String inetAddress) {
        if (inetAddress != null && inetAddress.length() != 0) {
            String[] strings = inetAddress.split(":");
            this.host = strings[0];
            this.port = Integer.valueOf(strings[1]);
        }
    }

    public RpcResponse invoker(RpcRequest rpcRequest) throws InterruptedException {

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            final NettyClientHandler clientHandler = new NettyClientHandler();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(clientHandler);
                }});
            ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)).sync();

            serializer.serialize(rpcRequest);
            future.channel().writeAndFlush(Unpooled.buffer().writeBytes(serializer.serialize(rpcRequest)));
            countDownLatch.await();
            // 等待连接关闭
            //future.channel().closeFuture().sync();
            return clientHandler.getRpcResponse();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {


        private RpcResponse rpcResponse;

        /** * 接收 Rpc 调用结果 * * @param ctx netty 容器 * @param msg 服务端答复消息 * @throws Exception */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            rpcResponse = serializer.deserialize(req, RpcResponse.class);
            countDownLatch.countDown();
        }

        RpcResponse getRpcResponse() {
            return rpcResponse;
        }
    }
}

复制代码

五、注册中心 zookeeper

选用了zookeeper做为注册中心,在建议Rpc框架中提供了注册中心的扩展。只要实现RegistryManager接口便可。zookeeper经常使用的命令行:

一、客服端脚本链接zookeeper服务器不指定-server默认链接本地服务

./zkCli -service ip:port

二、建立

create [-s] [-e] path data acl

建立一个节点-s -e分别指定节点的类型和特性:顺序和临时节点默认建立的是临时节点,acl用于权限控制

三、读取

ls path只能看指定节点下的一级节点

get path查看指定节点的数据和属性信息

四、更新

set path data [version]

能够指定更新操做是基于哪个版本当更新的 path 不存在时报 Node does not exist

五、删除

`delete path [version]``

六、Spring 支持

在框架中还提供了两个注解@RpcConsumerRpcProvider 在项目中只要引入

<dependency>
			<groupId>com.yoku.arya</groupId>
			<artifactId>arya</artifactId>
			<version>1.0-SNAPSHOT</version>
		</dependency>
复制代码

在provider端容器注入

@Bean
	public RpcProviderProcessor rpcProviderProcessor() {
		return new RpcProviderProcessor();
	}
复制代码

在comsumer端容器注入

@Bean
	public RpcConsumerProcessor rpcConsumerProcessor() {
		return new RpcConsumerProcessor();
	}
复制代码

项目完整的代码 arya github.com/hoodoly/ary…

框架使用Demo github.com/hoodoly/ary…

欢迎 star

联系方式:gunriky@163.com 有问题能够直接联系