【远程调用框架】如何实现一个简单的RPC框架(四)优化二:改变底层通讯框架

【如何实现一个简单的RPC框架】系列文章:java

【远程调用框架】如何实现一个简单的RPC框架(一)想法与设计 
【远程调用框架】如何实现一个简单的RPC框架(二)实现与使用 
【远程调用框架】如何实现一个简单的RPC框架(三)优化一:利用动态代理改变用户服务调用方式 
【远程调用框架】如何实现一个简单的RPC框架(四)优化二:改变底层通讯框架 
【远程调用框架】如何实现一个简单的RPC框架(五)优化三:软负载中心设计与实现 
第一个优化以及第二个优化修改后的工程代码可下载资源 如何实现一个简单的RPC框架编程

 

 

二、优化二: 改变底层通讯框架

简单socket通讯BIO方式-》-》NIO方式-》使用netty服务框架 
关于这部分,能够提早阅读下博客《Java NIO BIO AIO总结》bootstrap

2.1 目的

问题描述:在目前的服务框架版本中,服务发布端和服务调用端采用的IO通讯模式为BIO,即便用最基础的Java Socket编程的方式。看过咱们以前实现介绍部分的读者都知道,服务端一直在监听请求,每当有一个请求发来,则会建立一个新的线程去处理该请求,以下代码:api

while (true){
    Socket socket = serverSocket.accept();
    new Thread(new ServerProcessThread(socket)).start();//开启新的线程进行链接请求的处理
}
  • 1
  • 2
  • 3
  • 4

而ServerProcessThread线程完成了服务的调用及结果的返回工做,这样的方法,有如下两个弊端:数组

  • (1)对于每个socket链接都建立一个线程去维护,当链接逐渐增多的状况下,建立的线程数随之增长,对虚拟机形成必定压力。
  • (2)这种方式为阻塞式IO,即数据的读写是阻塞的,在没有有效可读/可写数据的状况下,线程会一直阻塞,形成资源的浪费。 
    所以为了解决上述两个弊端,咱们改变这种IO模式。服务器

  • step1.使用selector+channel+buffer实现NIO模式(参考博客《Java NIO BIO AIO总结》
    NIO的模式有两个特色: 
    (1)不用对全部链接都建立新的线程去维护,selector线程能够管理多个数据通道; 
    (2)IO数据读写是非阻塞的,只有当出现有效读写数据时才会出发相应的事件进行读写,节约资源。网络

  • step2.使用netty/mina的框架来实现。

2.2 实现

2.2.1 NIO模式

关于NIO模式的基本客户端与服务端的实现代码在博客《Java NIO BIO AIO总结》中已经进行了介绍。这里我对LCRPC框架代码的改造即利用该博客中的代码。仅做为NIO通讯模式的使用示例,由于还有好多能够修改的地方。并发

  • (1)服务发布端的代码修改 
    这里咱们为接口IProviderService添加一个方法:startListenByNIO,该方法实现采用NIO的模式进行服务的监听,与之对应的是该接口中的startListen方法使用BIO的模式进行服务的监听,即咱们初版本中的内容。startListenByNIO方法的实现代码以下:
@Override
public boolean startLisetenByNIO() {
    new Thread(new NIOServerThread()).start();
    return true;
}
  • 1
  • 2
  • 3
  • 4
  • 5

该方法开启新的线程,采用NIO的模式进行服务的监听。线程类NIOServerThread的代码与博客《Java NIO BIO AIO总结》介绍的一致,只是read事件的触发方法代码有所改动。该类的代码以下:app

public class NIOServerThread extends NIOBase implements Runnable{


    @Override
    public void run() {
        try {
            initSelector();//初始化通道管理器Selector
            initServer(Constant.IP,Constant.PORT);//初始化ServerSocketChannel,开启监听
            listen();//轮询处理Selector选中的事件
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }

    }

    /**
     * 初始化 该线程中的通道管理器Selector
     */
    public void initSelector() throws IOException {
        this.selector = Selector.open();
    }


    /**
     * 采用轮询的方式监听selector上是否有须要处理的事件,若是有,则循环处理
     * 这里主要监听链接事件以及读事件
     */
    public void listen() throws IOException, ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException {
        System.out.println("监听成功,可开始进行服务注册!");
        //轮询访问select
        while(true){
            //当注册的事件到达时,方法返回;不然将一直阻塞
            selector.select();
            //得到selector中选中的项的迭代器,选中的项为注册的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            //循环处理注册事件
            /**
             * 一共有四种事件:
             * 1. 服务端接收客户端链接事件: SelectionKey.OP_ACCEPT
             * 2. 客户端链接服务端事件:    SelectionKey.OP_CONNECT
             * 3. 读事件:                SelectionKey.OP_READ
             * 4. 写事件:                SelectionKey.OP_WRITE
             */
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                //手动删除已选的key,以防重复处理
                iterator.remove();
                //判断事件性质
                if (key.isAcceptable()){//服务端接收客户端链接事件
                    accept(key);
                }else if (key.isReadable()){//读事件
                    read(key);
                }
            }
        }
    }

    /**
     * 得到一个ServerSocket通道,并经过port对其进行初始化
     * @param port    监听的端口号
     */
    private void initServer(String ip,int port) throws IOException {
        //step1. 得到一个ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        //step2. 初始化工做
        serverSocketChannel.configureBlocking(false);//设置通道为非阻塞
        serverSocketChannel.socket().bind(new InetSocketAddress(ip,port));

        //step3. 将该channel注册到Selector上,并为该通道注册SelectionKey.OP_ACCEPT事件
        //这样一来,当有"服务端接收客户端链接"事件到达时,selector.select()方法会返回,不然将一直阻塞
        serverSocketChannel.register(this.selector,SelectionKey.OP_ACCEPT);
    }


    /**
     * 当监听到服务端接收客户端链接事件后的处理函数
     * @param key 事件key,能够从key中获取channel,完成事件的处理
     */
    public void accept(SelectionKey key) throws IOException {

        //step1. 获取serverSocketChannel
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

        //step2. 得到和客户端链接的socketChannel
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);//设置为非阻塞
        //step3. 注册该socketChannel
        socketChannel.register(selector,SelectionKey.OP_READ);//为了接收客户端的消息,注册读事件
    }

    public void read(SelectionKey key) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
        byte[] result = getReadData(key);
        if (result == null) return;
        SocketChannel socketChannel = (SocketChannel) key.channel();

        LCRPCRequestDO requestDO = (LCRPCRequestDO) ObjectAndByteUtil.toObject(result);
        IProviderService providerService = new ProviderServiceImpl();
        //将结果写回
        socketChannel.write(ByteBuffer.wrap(ObjectAndByteUtil.toByteArray(providerService.getFuncCalldata(requestDO))));
//        socketChannel.close();//关闭
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113

类NIOBase为基类。代码以下:框架

public class NIOBase {

    // 线程中的通道管理器
    public Selector selector;


    /**
     * 初始化 该线程中的通道管理器Selector
     */
    public void initSelector() throws IOException {
        this.selector = Selector.open();
    }



    public byte[] getReadData(SelectionKey key) throws IOException {

        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(10);
        int len = socketChannel.read(byteBuffer);
        if (len == -1){
            socketChannel.close();
            return null;//说明链接已经断开
        }
        int lenth = 0;
        List<byte[]> list = new ArrayList<>();
        while (len > 0){
            lenth += len;
            byteBuffer.flip();
            byte[] arr = new byte[len];

            byteBuffer.get(arr,0,len);
            list.add(arr);
            byteBuffer.clear();
            len = socketChannel.read(byteBuffer);
        }

        byte[] result = new byte[lenth];
        int l = 0;
        for (int i = 0;i<list.size();i++){
            for (int j = 0;j<list.get(i).length;j++){
                result[l + j] = list.get(i)[j];
            }
            l += list.get(i).length;
        }
        return result;
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

getReadData方法读取客户端发送的所有数据。利用帮助类ObjectAndByteUtil对客户端发送的数据进行序列化为reqeust对象。同时为接口IProviderService添加方法getFuncCallData,利用request对象调用相应服务方法,获得方法的返回值,反序列化后发送给客户端,该方法的代码与初版本一致。 
帮助类ObjectAndByteUtil负责利用反/序列化技术进行字节数组与对象之间的转化,代码以下:

package whu.edu.lcrpc.util;

import java.io.*;

/**
 * Created by apple on 17/3/30.
 */

public class ObjectAndByteUtil {

    /**
     * 对象转数组
     * @param obj
     * @return
     */
    public static byte[] toByteArray (Object obj) {
        byte[] bytes = null;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try {
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(obj);
            oos.flush();
            bytes = bos.toByteArray ();
            oos.close();
            bos.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
        return bytes;
    }

    /**
     * 数组转对象
     * @param bytes
     * @return
     */
    public static Object toObject (byte[] bytes) {
        Object obj = null;
        try {
            ByteArrayInputStream bis = new ByteArrayInputStream (bytes);
            ObjectInputStream ois = new ObjectInputStream (bis);
            obj = ois.readObject();
            ois.close();
            bis.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (ClassNotFoundException ex) {
            ex.printStackTrace();
        }
        return obj;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

为了采用NIO的方法开启服务发布端的服务监听,咱们修改LCRPCProviderImpl类中对startListen函数的调用改成方法startListenByNIO,使得服务端采用NIO的方式发布服务。

  • (2)客户端代码修改 
    客户端的代码大部分与博客《》中的一致,只不过仍是在read事件出发函数中,有所修改,主要流程就是读取到服务端返回的数据后进行序列化,代码以下:
public Object read(SelectionKey key) throws IOException {

    //step1. 获得事件发生的通道
    byte[] result = getReadData(key);
    if (result == null) return null;

    Object object = ObjectAndByteUtil.toObject(result);

    return object;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

咱们为接口IConsumerService添加方法sendDataByNIO,采用NIO的方式将服务调用端的请求信息序列化后发送给服务端,该函数代码以下:

public Object sendDataByNIO(String ip, LCRPCRequestDO requestDO) throws IOException, ClassNotFoundException {
    NIOClient nioClient = new NIOClient(requestDO,ip);
    return nioClient.run();
}
  • 1
  • 2
  • 3
  • 4

类NIOClient代码以下,其中run函数开启轮询,当所注册事件发生时,触发相应的方法。并在read事件触发后结束轮训。

public class NIOClient extends NIOBase{


    private LCRPCRequestDO requestDO;//客户端对应的请求DO,发送给服务端
    private String ip;

    public NIOClient(LCRPCRequestDO requestDO,String ip){
        this.requestDO = requestDO;
        this.ip = ip;
    }

    public Object run() {
        try {
            initSelector();//初始化通道管理器
            initClient(ip,Constant.PORT);//初始化客户端链接scoketChannel
            return listen();//开始轮询处理事件

        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    public Object listen() throws IOException {
        //轮询访问select
        boolean flag = true;
        Object result = null;
        while(flag){
            //当注册的事件到达时,方法返回;不然将一直阻塞
            selector.select();
            //得到selector中选中的项的迭代器,选中的项为注册的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            //循环处理注册事件
            /**
             * 一共有四种事件:
             * 1. 服务端接收客户端链接事件: SelectionKey.OP_ACCEPT
             * 2. 客户端链接服务端事件:    SelectionKey.OP_CONNECT
             * 3. 读事件:                SelectionKey.OP_READ
             * 4. 写事件:                SelectionKey.OP_WRITE
             */
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                //手动删除已选的key,以防重复处理
                iterator.remove();
                //判断事件性质
                if (key.isReadable()){//读事件
                    result = read(key);
                    flag = false;
                    break;
                }else if (key.isConnectable()) {//客户端链接事件
                    connect(key);
                }
            }
        }

        return result;

    }

    /**
     * 得到一个SocketChannel,并对该channel作一些初始化工做,并注册到
     * @param ip
     * @param port
     */
    public void initClient(String ip,int port) throws IOException {
        //step1. 得到一个SocketChannel
        SocketChannel socketChannel = SocketChannel.open();



        //step2. 初始化该channel
        socketChannel.configureBlocking(false);//设置通道为非阻塞


        //step3. 客户端链接服务器,其实方法执行并无实现链接,须要再listen()方法中调用channel.finishConnect()方法才能完成链接
        socketChannel.connect(new InetSocketAddress(ip,port));

        //step4. 注册该channel到selector中,并为该通道注册SelectionKey.OP_CONNECT事件和SelectionKey.OP_READ事件
        socketChannel.register(this.selector,SelectionKey.OP_CONNECT|SelectionKey.OP_READ);
    }

    /**
     * 当监听到客户端链接事件后的处理函数
     * @param key 事件key,能够从key中获取channel,完成事件的处理
     */
    public void connect(SelectionKey key) throws IOException {
        //step1. 获取事件中的channel
        SocketChannel socketChannel = (SocketChannel) key.channel();


        //step2. 若是正在链接,则完成链接
        if (socketChannel.isConnectionPending()){
            socketChannel.finishConnect();
        }
        socketChannel.configureBlocking(false);//将链接设置为非阻塞
        //step3. 链接后,能够给服务端发送消息
        socketChannel.write(ByteBuffer.wrap(ObjectAndByteUtil.toByteArray(requestDO)));

    }

    public Object read(SelectionKey key) throws IOException {

        //step1. 获得事件发生的通道
        byte[] result = getReadData(key);
        if (result == null) return null;

        Object object = ObjectAndByteUtil.toObject(result);

        return object;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112

为了使得客户端采用NIO方式进行通信,咱们修改MyInvokeHandler类:

//            result = consumerService.sendData(serviceAddress,requestDO);//采用BIO的方式
            result = consumerService.sendDataByNIO(serviceAddress,requestDO);//采用NIO的方式
  • 1
  • 2

至此,NIO通讯模式代码修改完毕。在测试的过程当中,遇到了一个问题,就是在服务调用端发出一个服务调用请求后,服务发布端一直在触发read事件,查阅资料后,了解到这种NIO的实现方式中,客户端或者服务端其中一方将链接关闭后,会一直触发另外一方的read事件,这时read会回传-1,若没有即便正确处理断线(关闭channel),read事件会一直触发,所以在getData函数读取数据时,添加以下代码:

if (len == -1){
    socketChannel.close();
    return null;//说明链接已经断开
}
  • 1
  • 2
  • 3
  • 4

至此,问题得以解决。 
服务注册查找中心以及服务端客户端的代码都不须要改变,分别运行后,获得与初版相同的结果。(因为服务端咱们采用一个selector管理全部channel,而且没有开启新的线程去处理数据,所以客户端会以同步的方式获得四次服务调用结果)

2.2.2 netty/mina

目前为止咱们的代码中,通讯部分采用了NIO和BIO两种模式。BIO模式采用socket编程实现,NIO部分采用selector channel buffer编程实现。可是不管哪种,都只是简单的帮助咱们了解两种通讯模式的基本概念,以及如何用最简单得编程方式实现。咱们在代码中,也有很是多的异常,网络等状况没有考虑,在实际生产中,也毫不会使用这种最基本最底层的编程方式来完成远程得通讯。所以,咱们这里引入Netty开源框架来实现通讯。他帮助咱们考虑了多种情况,使得咱们以简单的代码完成高质量的远程通讯,专一于其余业务逻辑等的实现。 
在分布式应用系统开发中,服务化的应用之间进行远程通讯时使用。Netty是在Java NIO的基础上封装的用于客户端服务端网络应用程序开发的框架,帮助用户考虑在分布式、高并发、高性能开发中遇到的多种情况,使得用户使用更容易的网络编程接口完成网络通讯,专一于其余业务逻辑的开发。 
(1)关于Netty 
(如下内容摘自知乎的帖子《通俗地讲,Netty 能作什么?》) 
netty是一套在java NIO的基础上封装的便于用户开发网络应用程序的api. 
Netty是什么?

1)本质:JBoss作的一个Jar包

2)目的:快速开发高性能、高可靠性的网络服务器和客户端程序

3)优势:提供异步的、事件驱动的网络应用程序框架和工具 
通俗的说:一个好使的处理Socket的东东

(2)为何选择netty 
如下内容摘抄自《Netty权威指南》 
在上述优化中,咱们使用JDK为咱们提供的NIO的类库来修改LCRPC框架的远程通讯方式。如下总结了不选择Java原声NIO编程的缘由: 
这里写图片描述
因为上述缘由,在大多数场景下,不建议你们直接食用JDK的NIO类库,除非精通NIO编程或者有特殊的需求。在绝大多数的业务场景中,咱们可使用NIO框架Netty来进行NIO编程,他既能够做为客户端也能够做为服务端,同时也支持UDP和异步文件传输,功能很是强大。 
如下总结了为何选择Netty做为基础通讯框架: 
这里写图片描述

(3)LCRPC服务框架优化:使用netty替换底层网络通信

与NIO的修改方式大体相同

增长四个netty服务端与客户端的类; 
netty服务端开启监听的类NettyServer:

package whu.edu.lcrpc.io.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import whu.edu.lcrpc.util.Constant;

/**
 * Created by apple on 17/4/10.
 */
public class NettyServer {
    public void bind() throws InterruptedException {

        //配置服务端的NIO的线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup wokerGroup = new NioEventLoopGroup();

        //建立服务启动的辅助类
        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,wokerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childHandler(new ChildChannelHandler());
            //绑定端口,同步等待成功
            ChannelFuture f = b.bind(Constant.PORT).sync();
            System.out.println("已经开始监听,能够注册服务了");
            //等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        }finally {
            //优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new NettyServerHandler());
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

netty服务端hanlder类NettyServerhandler:

package whu.edu.lcrpc.io.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import whu.edu.lcrpc.entity.LCRPCRequestDO;
import whu.edu.lcrpc.service.IProviderService;
import whu.edu.lcrpc.service.impl.ProviderServiceImpl;
import whu.edu.lcrpc.util.ObjectAndByteUtil;

import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Date;

/**
 * Created by apple on 17/4/10.
 */
public class NettyServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        if (req == null) return;

        LCRPCRequestDO requestDO = (LCRPCRequestDO) ObjectAndByteUtil.toObject(req);
        IProviderService providerService = new ProviderServiceImpl();
        ByteBuf resp = Unpooled.copiedBuffer(ObjectAndByteUtil.toByteArray(providerService.getFuncCalldata(requestDO)));
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

netty客户端链接类NettyClient:

package whu.edu.lcrpc.io.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import whu.edu.lcrpc.util.Constant;
import whu.edu.lcrpc.util.ObjectAndByteUtil;

import java.io.UnsupportedEncodingException;

/**
 * Created by apple on 17/4/10.
 */


public class NettyClient {


    private Object reqObj;
    private String ip;
    public NettyClient(Object reqObj, String ip){
        this.reqObj = reqObj;
        this.ip = ip;
    }

    public Object connect() throws InterruptedException, UnsupportedEncodingException {
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            byte[] req = ObjectAndByteUtil.toByteArray(reqObj);
            NettyClientHandler nettyClientHandler = new NettyClientHandler(req);
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(nettyClientHandler);
                        }
                    });
            //发起异步链接操做
            ChannelFuture f = b.connect(ip, Constant.PORT).sync();
            //等待客户端链路关闭
            f.channel().closeFuture().sync();
            //拿到异步请求结果,返回
            Object responseObj = ObjectAndByteUtil.toObject(nettyClientHandler.response);
            return responseObj;

        }finally {
            //优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

netty客户端handler类NettyClientHandler:

package whu.edu.lcrpc.io.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * Created by apple on 17/4/11.
 */
public class NettyClientHandler extends ChannelHandlerAdapter {


    private  ByteBuf firstMessage;
    public byte[] response;
    public NettyClientHandler(byte[] req){

        //将请求写入缓冲区
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        response = new byte[buf.readableBytes()];
        buf.readBytes(response);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

然后修改LCRPC中本来的代码,采用netty来进行远程通讯。

首先在接口IConsumerService中增长函数sendDataByNetty,该函数采用netty的方式向服务发布端发送数据。函数实现以下:

@Override
public Object sendDataByNetty(String ip, LCRPCRequestDO requestDO) throws IOException, ClassNotFoundException, InterruptedException {
    NettyClient nettyClient = new NettyClient(requestDO,ip);
    return nettyClient.connect();
}
  • 1
  • 2
  • 3
  • 4
  • 5

然后在接口IProviderService增长函数startListenByNetty,该函数采用netty的方式开启服务监听。

@Override
public boolean startListenByNetty() {
        new Thread(()->{
            NettyServer nettyServer = new NettyServer();
            try {
                nettyServer.bind();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }).start();
    return true;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

而后在代理handler类MyInvocationHandler中, 
修改

result = consumerService.sendDataByNIO(serviceAddress,requestDO);//采用NIO的方式
  • 1

result = consumerService.sendDataByNetty(serviceAddress,requestDO); //采用netty的方式
  • 1

采用netty的方式调用服务。 
而且在类LCRPCProviderImpl中使用方法startListenByNetty开启服务的监听。 
客户端以及服务端的测试工程代码均不须要改变,进行测试后,输出结果不变。

须要注意的是:上述关于Netty的使用没有考虑到TCL粘包/拆包的问题!

三、优化三:服务框架工做日志

这个优化未完待续~

相关文章
相关标签/搜索