徒手撸一个简单的RPC框架(2)——项目改造

在上一篇的徒手撸一个简单的RPC框架中再最后的服务器和客户端链接的时候只是简单的写了Socket链接,以为有些不妥。正好最近学习了Netty,在平时工做中没机会运用,因而本身就给本身出需求将以前的项目改造一下。git

Netty是什么?

在学习Netty以前呢咱们首先得了解IO和NIOgithub

IO模型

IO编程模型简单来讲就是上一篇我写的服务端与客户端的链接,客户端与服务端创建链接通讯后,必须等待服务端返回信息才能进行下一步的动做,这期间线程一直是等待状态。IO模型在客户端较少的状况下是没问题的,可是一旦有大量客户端与服务端进行链接,那么就会出问题。咱们简单的分析一下缘由。编程

  1. 首先我以前写的代码实际上是有问题的,为何呢?由于每次链接通讯一次我就将其链接关闭了。Socket链接时TCP,双方每次创建链接时都会耗费时间和资源,不能每通讯一次就关闭链接。就比如你和别人打电话你说一句对方说一句,而后挂电话,而后再打过去。确定是不能这么作的
  2. 若是是想要保持通讯,那么程序中就得将其监听的代码放入while循环中用专门的线程来维护。可是线程是操做系统中很是宝贵的资源,每一个操做系统能建立的线程也是有限的。
  3. CPU频繁的在线程之间切换是很是损耗性能的。
  4. 咱们能够看到以前编写的代码中客户端与服务端交流的媒介是字节流,效率不高。

IO模型有这么多的问题,因而在JDK1.4中提出了NIO的概念,就是为了解决以上的问题bootstrap

NIO模型

咱们一一对应上面的问题来看NIO用什么技术来解决的bash

第一个问题

第一个问题是代码的问题,咱们就不讨论了服务器

第二个问题

线程有限的问题:NIO中提出了Selector概念,IO中是每一个链接都会由一个线程阻塞来维护,NIO中用Selector来管理这些链接,若是有消息的传入或传出,那么就创建相应的线程了处理。这样服务器只须要阻塞一个Selector线程,就能够管理多个链接了。框架

具体的Selector文章能够看我以前的NIO中选择器Selector,里面有介绍详细的Selector用法。异步

这里举个例子应该就明白的,比如你去钓鱼,IO就是一人一个鱼竿,等着鱼上来,中间哪也不能去,而NIO就是一我的能守着好几个鱼竿。ide

这就是NIO模型解决操做系统中线程有限的问题。函数

第三个问题

CPU在线程之间频繁切换,因为NIO中只管理了一个Selector线程,那么这个问题也就相应的解决了

第四个问题

NIO中提出了ChannelBuffer的概念,就比如在向往的生活第一季中摘玉米中,是用竹筐一次一次背快呢?仍是接一辆车子来回运送快?固然是车子来回运送快了,而这里的Buffer就比如车子。具体的ChannelBuffer的解释能够看我以前的文章Java中IO和NIOJAVA中NIO再深刻

Netty

那么为何就和Netty扯上关系了呢?其实我以为NIO之于Netty的关系就比如Servlet之于Tomcat的关系,Netty只是对于NIO进行了进一步的封装,让使用者更加简便的编程。

改造

此次改造分为服务端和客户端的改造

服务端

接下来咱们就利用Netty将咱们的服务器端与客户端链接通讯部分进行改造一下,首先咱们先加上对于Netty的依赖

compile 'io.netty:netty-all:4.1.6.Final'
复制代码

而后编写服务端的代码,服务端的代码很是简单

ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boos = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
        .group(boos, worker)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            protected void initChannel(NioSocketChannel ch) {
                ch.pipeline().addLast(new StringDecoder());
                ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                        //得到实现类处理事后的返回值
                        String invokeMethodMes = CommonDeal.getInvokeMethodMes(msg);
                        ByteBuf encoded = ctx.alloc().buffer(4 * invokeMethodMes.length());
                        encoded.writeBytes(invokeMethodMes.getBytes());
                        ctx.writeAndFlush(encoded);
                    }
                });
            }
        }).bind(20006);

复制代码

这和咱们日常写的Socket链接有些区别,能够看到咱们建了两个NioEventLoopGroup一个boss一个worker,为何会有两个呢?

从这个图里面咱们能够看到,boss是专门用来对外链接的,worker则是像NIO中Selector用来处理各类读写的请求。

客户端

其实难点就是在客户端,由于Netty是异步事件驱动的框架,什么是异步呢?

客户端与服务端的任何I/O操做都将当即返回,等待服务端处理完成之后会调用指定的回调函数进行处理。在这个过程当中客户端一直没有阻塞。因此咱们在客户端与服务端请求处理时,若是得到异步处理的结果呢?Netty提供有一种获取异步回调结果的,可是那是添加监听器。而咱们的RPC调用在最后返回结果的时候必须得阻塞等待结果的返回,因此咱们须要本身写一个简单的获取异步回调结果的程序。想法以下。

  1. 想要得到服务端返回的消息时,阻塞等待。
  2. Netty客户端读取到客户端消息时,唤醒等待的线程

那么咱们就围绕这两步来进行编码。

客户端想要获取服务端消息时如何等待呢?这里咱们就能够用wait()

public Response getMessage(){
    synchronized (object){

        while (!success){
            try {
                object.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        return response;
    }
}

复制代码

那么读到消息之后如何唤醒呢?

public void setMessage(Response response){
    synchronized (object){
        this.response = response;
        this.success = true;
        object.notify();
    }
}

复制代码

这样就解决了咱们上面提出的两个问题了。接下来编写客户端的代码

private final Map<Long,MessageFuture> futureMap = new ConcurrentHashMap<>();
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void connect(String requestJson,Long threadId){
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) {
                ch.pipeline().
                        addLast(new StringDecoder()).
                        addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                Gson gson = new Gson();
                                Response response = gson.fromJson(msg, Response.class);
                                MessageFuture messageFuture = futureMap.get(threadId);
                                messageFuture.setMessage(response);
                            }

                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                futureMap.put(threadId,new MessageFuture());
                                countDownLatch.countDown();
                                ByteBuf encoded = ctx.alloc().buffer(4 * requestJson.length());
                                encoded.writeBytes(requestJson.getBytes());
                                ctx.writeAndFlush(encoded);
                            }
                        });
            }
        }).connect("127.0.0.1", 20006);
    }

    public Response getResponse(Long threadId){
        MessageFuture messageFuture = null;
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        messageFuture = futureMap.get(threadId);
        return messageFuture.getMessage();
    }

复制代码

这里面咱们用到了CountDownLatch类,即等待发送完消息之后通知我能获取数据了。这里面的代码和服务端的差很少,其中有区别的地方就是在发送数据的时候将线程ID和MessageFuture放入Map中,在获得服务端发送的数据时取出并放入获得的Response。

总结

到目前为止咱们就完成了咱们的项目改造,只是简单的应用了一下Netty的客户端和服务端的通讯,由于在学习的过程当中若是没有运用的话,那么感受记忆没有那么牢靠,因此就有了这次的项目改造的计划。虽然完成了简单的通讯,可是我知道还有些地方须要优化,例如用synchronized在之后学习了AQS之后但愿也可以学以至用将这里给改一下。

完整的项目地址

徒手撸一个简单的RPC框架

徒手撸一个简单的IOC

相关文章
相关标签/搜索
本站公众号
   欢迎关注本站公众号,获取更多信息