用netty实现一个rpc

        之前在作项目的时候接触到了rpc,感受颇有意思,可是那个框架用了rabbitmq来进行channel的管理。正好前几天看到了netty,一个高效的JAVA NIO框架,因此萌生了想本身写一个rpc框架的想法。java

RPC原理简介

        RPC(Remote Procedure Call)指远程过程调用。它的做用大概能够这么描述一下:B 程序员 程序想要调用 A 程序的某个函数,可是因为 A 与 B 是两个独立的项目,B 不可能直接调用 A 中的任何一个类里的任何一个函数。这时 RPC 就能起到它的做用了。         为了完成 B 程序的需求, A 程序 对 B 程序进行规定,若是 B 想要调用 A 的方法,须要给 A 一个规定的数据格式,而后 A 在本地执行完 B 所想要使用的函数后将结果 封装成一个规定好的数据格式后发送给 B。这样 B 就达到了不拷贝 A 的代码的状况下完成其所须要的业务功能。 程序员

rpc调用流程
rpc调用流程

RPC具体实现

通讯框架的选择

        一个rpc底层应该支持io/nio,这种实现方法大体有两种,一是经过代码彻底有本身实现,可是这种方法对技术要求比较高,并且容易出现隐藏的bug,另外一种就是利用现有的开源框架,Netty 是个不错的选择,它是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。它能大大的简化咱们的开发流程,使得代码更加牢靠。在此次的RPC中,咱们使用 Netty 来做为链接客户端与服务端的桥梁。redis

数据的序列化与反序列化

        一个好的rpc不该该受到语言的限制,因此client端到server端的数据交换格式应该有一个良好的定义,好比json、xml。如今这方面成熟的框架有不少好比Thrift、Protobuf等等。咱们不用本身去定义以及实现一个交换格式,这些成熟的框架都是久经考验的。在本例中因为是抱着学习的目的,本人采用java自带的序列化与反序列化方法。json

服务的注册与发现

        client的端想要调用服务端的某个方法,须要得知这个方法的某些信息,而如今问题就来了,得知这个信息的时候是由写 A 程序的人去直接告诉 B 程序的人,仍是由 B 程序主动去发现 A 的服务。很明显,第一种方法很不牢靠,如果采用这种方法的话,A服务的每次改动都要通知到 B ,B也要每次根据 A服务的改变,而重写本身的代码。相比之下,第二种方法更显得可行。         其实实现服务的注册与发现的方法也有不少,好比zookeeper,redis等等。大体原理就是,A 服务将本身暴露出的方法信息存在zookeeper或者redis上,每次更改由A主动通知或由 B 去zookeeper或redis上自动去拉取最新的信息。对于zookeeper来讲存储方法信息的是一个个固定的节点,对于reids来讲就是一个key值。用zookeeper还解决了在分布式的部署方案下,某个服务down机的问题。由于zookeeper与生俱来的容灾能力(好比leader选举),能够确保服务注册表的高可用性。在本例中,我并未实现服务的注册于发现。服务器

client与server

        client端与server端有各自须要处理的发送格式与接受格式。对于client端来讲须要封装好本身所要请求的方法信息发送给server端,并等待server端返回的结果。server端则是接收client的请求数据,处理完成后返回给client端结果数据。         其实RPC在调用的时候应该让调用者像调用本地服务通常的去完成业务逻辑。这种实如今java中就应该用代理来实现。网络

关键代码

数据交换格式定义

client请求格式 利用java自带的序列化方法要继承Serializable方法而且要实现无参构造方法。app

package com.example.nettyrpcfirst.netty.entity;

import java.io.Serializable;

/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class Request implements Serializable {
    private static final long serialVersionUID  = -1L;
    private String clientId;
    private String className;
    private String methodName;
    private Class[] paramterTypes;
    private Object[] parameters;


    public Request(String clientId, String className, String methodName, Class[] paramterTypes, Object[] parameters) {
        this.clientId = clientId;
        this.className = className;
        this.methodName = methodName;
        this.paramterTypes = paramterTypes;
        this.parameters = parameters;
    }

    public Request() {
    }
//getter and setter
}

复制代码

server 响应数据格式 具体要求与client端相同框架

package com.example.nettyrpcfirst.netty.entity;

import java.io.Serializable;

/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class Response implements Serializable {
    private static final long serialVersionUID = -1L;
    private String clientId;
    private Throwable err;
    private Object result;


    public Response() {
    }
// getter and setter
}

复制代码

注意 clientId字段的设置是为了保证返回的数据是本身想要的。dom

server 实现

netty不熟悉的能够去官网写写几个例子socket

package com.example.nettyrpcfirst.netty.server;



/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class ServerHandler extends SimpleChannelInboundHandler {
    Logger logger = LoggerFactory.getLogger(ServerHandler.class);

    private final Map<String, Object> services;

    public ServerHandler(Map<String, Object> services) {
        this.services = services;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("{} has created a channel",ctx.channel().remoteAddress());
    }


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
       Runnable r = () ->{
           Request request = (Request) o;
           Response response = new Response();
           response.setClientId(request.getClientId());
           try {
               Object service = services.get(request.getClassName());
               FastClass serviceFastClass = FastClass.create(service.getClass());
               FastMethod serviceFastMethod = serviceFastClass.getMethod(request.getMethodName(), request.getParamterTypes());
               Object result = serviceFastMethod.invoke(service, request.getParameters());
               response.setResult(result);
           }catch (Exception e){
               response.setErr(e);
           }
           channelHandlerContext.writeAndFlush(response).addListener(new ChannelFutureListener() {
               @Override
               public void operationComplete(ChannelFuture channelFuture) throws Exception {
                   logger.info("send response for request: "+request.getClientId());
               }
           });
       };
       Server.submit(r);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        logger.error("rpc server err occur" + cause.getMessage()+" | "+ctx.channel().remoteAddress());
        ctx.close();
    }
}

复制代码

最主要的就是channelRead0方法,这里定义了在接收到客户端的数据后如何去调用本地方法,具体是用cglib代理完成。 server具体代码

package com.example.nettyrpcfirst.netty.server;



/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
@Configuration
public class Server implements BeanNameAware, BeanFactoryAware, ApplicationContextAware,InitializingBean {
    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Server.class);

    private Map<String,Object> services = new ConcurrentHashMap<>();

    private static ExecutorService threadPoolExecutor;

    public Server(){
    }

    /** * 启动netty server * @throws Exception */
    @Override
    public void afterPropertiesSet() throws Exception {
        logger.info("afterPropertiesSet");
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new ObjectDecoder(1024,ClassResolvers.cacheDisabled(this.getClass().getClassLoader())))
                                    .addLast(new ObjectEncoder())
                                    .addLast(new ServerHandler(services));
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture future = b.bind(8080).sync();
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
            logger.error("an error occur ----------->"+e.getMessage());
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /** * 经过扫描全部带有@RPCServer注解的类进行注册 * @param applicationContext * @throws BeansException */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            logger.info("setApplicationContext");
            Map<String,Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(RPCServer.class);
            if(!serviceBeanMap.isEmpty()){
                for (Object service :serviceBeanMap.values()){
                    String interfaceName = service.getClass().getAnnotation(RPCServer.class).value().getName();
                    logger.info("RPCService: {}" , interfaceName);
                    this.services.put(interfaceName,service);
                }
            }
    }
    public static void submit(Runnable task){
        if(threadPoolExecutor == null){
            synchronized (RPCServer.class){
                if(threadPoolExecutor == null){
                    threadPoolExecutor = Executors.newFixedThreadPool(16);
                }
            }
        }
        threadPoolExecutor.submit(task);
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        logger.info("setBeanFactory()");
    }

    @Override
    public void setBeanName(String s) {
        logger.info("setBeanName() {}", s);
    }
}

复制代码

RPCServer 自定义注解

package com.example.nettyrpcfirst.netty.annoations;


/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface RPCServer {
    Class<?> value();
}

复制代码

client实现

client代理类

package com.example.nettyrpcfirst.netty.client;


/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class ClientProxy {
    @SuppressWarnings("unchecked")
    public <T> T create(Class<?> interfaceClass){

        return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Request request = new Request();
                        request.setClientId(UUID.randomUUID().toString());
                        request.setClassName(method.getDeclaringClass().getName());
                        request.setMethodName(method.getName());
                        request.setParamterTypes(method.getParameterTypes());
                        request.setParameters(args);
                        Client client = new Client("127.0.0.1",8080);
                        Response response = client.send(request);
                        if(response.getErr()!=null){
                            throw response.getErr();
                        }else{
                            return response.getResult();
                        }
                    }
                });
    }
}

复制代码

client 与server 链接发送数据并等待数据返回

package com.example.nettyrpcfirst.netty.client;



/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class Client extends SimpleChannelInboundHandler<Response> {
    private static org.slf4j.Logger logger = LoggerFactory.getLogger(Client.class);

    private Response response;
    private final static Object obj = new Object();
    private String host;
    private int port;
    ChannelFuture future;
    public Client(String host,int port){
        this.host = host;
        this.port = port;
    }

    /** * 接收到消息后唤醒线程 * @param channelHandlerContext * @param response * @throws Exception */

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
        this.response = response;
        synchronized (obj){
            obj.notifyAll();
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("client caught exception", cause);
        ctx.close();
    }

    /** * 链接server端channel,发送完数据后锁定线程,等待数据返回 * @param request * @return * @throws Exception */
    public Response send(Request request) throws Exception{
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new ObjectDecoder(1024,ClassResolvers.cacheDisabled(this.getClass().getClassLoader())))
                                    .addLast(new ObjectEncoder())
                                    .addLast(Client.this);
                        }
                    })
                    .option(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture future = b.connect(host,port).sync();
            future.channel().writeAndFlush(request).sync();
            System.out.println("2 "+Thread.currentThread().getName());
            synchronized (obj){
                System.out.println("1111111111111111");
                obj.wait();
            }
            if(response != null){
                System.out.println("3333333333333");
            }
            return response;
        }finally {
            if(future!=null){
                future.channel().closeFuture().sync();
            }
            eventLoopGroup.shutdownGracefully();
        }
    }
}

复制代码

测试

package com.example.nettyrpcfirst.netty.client;

import com.example.nettyrpcfirst.netty.entity.TestService;

/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class TestMain {
    public static void main(String[] args){
        TestService testService = new ClientProxy().create(TestService.class);
        String result = testService.play();
        System.out.println("收到消息 ------------> "+result);
    }
}

复制代码

不出意外的话 控制台会成功打印

相关文章
相关标签/搜索