【如何实现一个简单的RPC框架】系列文章:java
【远程调用框架】如何实现一个简单的RPC框架(一)想法与设计
【远程调用框架】如何实现一个简单的RPC框架(二)实现与使用
【远程调用框架】如何实现一个简单的RPC框架(三)优化一:利用动态代理改变用户服务调用方式
【远程调用框架】如何实现一个简单的RPC框架(四)优化二:改变底层通讯框架
【远程调用框架】如何实现一个简单的RPC框架(五)优化三:软负载中心设计与实现
第一个优化以及第二个优化修改后的工程代码可下载资源 如何实现一个简单的RPC框架编程
简单socket通讯BIO方式-》-》NIO方式-》使用netty服务框架
关于这部分,能够提早阅读下博客《Java NIO BIO AIO总结》bootstrap
问题描述:在目前的服务框架版本中,服务发布端和服务调用端采用的IO通讯模式为BIO,即便用最基础的Java Socket编程的方式。看过咱们以前实现介绍部分的读者都知道,服务端一直在监听请求,每当有一个请求发来,则会建立一个新的线程去处理该请求,以下代码:api
while (true){ Socket socket = serverSocket.accept(); new Thread(new ServerProcessThread(socket)).start();//开启新的线程进行链接请求的处理 }
而ServerProcessThread线程完成了服务的调用及结果的返回工做,这样的方法,有如下两个弊端:数组
(2)这种方式为阻塞式IO,即数据的读写是阻塞的,在没有有效可读/可写数据的状况下,线程会一直阻塞,形成资源的浪费。
所以为了解决上述两个弊端,咱们改变这种IO模式。服务器
step1.使用selector+channel+buffer实现NIO模式(参考博客《Java NIO BIO AIO总结》)
NIO的模式有两个特色:
(1)不用对全部链接都建立新的线程去维护,selector线程能够管理多个数据通道;
(2)IO数据读写是非阻塞的,只有当出现有效读写数据时才会出发相应的事件进行读写,节约资源。网络
关于NIO模式的基本客户端与服务端的实现代码在博客《Java NIO BIO AIO总结》中已经进行了介绍。这里我对LCRPC框架代码的改造即利用该博客中的代码。仅做为NIO通讯模式的使用示例,由于还有好多能够修改的地方。并发
@Override public boolean startLisetenByNIO() { new Thread(new NIOServerThread()).start(); return true; }
该方法开启新的线程,采用NIO的模式进行服务的监听。线程类NIOServerThread的代码与博客《Java NIO BIO AIO总结》介绍的一致,只是read事件的触发方法代码有所改动。该类的代码以下:app
public class NIOServerThread extends NIOBase implements Runnable{ @Override public void run() { try { initSelector();//初始化通道管理器Selector initServer(Constant.IP,Constant.PORT);//初始化ServerSocketChannel,开启监听 listen();//轮询处理Selector选中的事件 } catch (IOException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } /** * 初始化 该线程中的通道管理器Selector */ public void initSelector() throws IOException { this.selector = Selector.open(); } /** * 采用轮询的方式监听selector上是否有须要处理的事件,若是有,则循环处理 * 这里主要监听链接事件以及读事件 */ public void listen() throws IOException, ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { System.out.println("监听成功,可开始进行服务注册!"); //轮询访问select while(true){ //当注册的事件到达时,方法返回;不然将一直阻塞 selector.select(); //得到selector中选中的项的迭代器,选中的项为注册的事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); //循环处理注册事件 /** * 一共有四种事件: * 1. 服务端接收客户端链接事件: SelectionKey.OP_ACCEPT * 2. 客户端链接服务端事件: SelectionKey.OP_CONNECT * 3. 读事件: SelectionKey.OP_READ * 4. 写事件: SelectionKey.OP_WRITE */ while(iterator.hasNext()){ SelectionKey key = iterator.next(); //手动删除已选的key,以防重复处理 iterator.remove(); //判断事件性质 if (key.isAcceptable()){//服务端接收客户端链接事件 accept(key); }else if (key.isReadable()){//读事件 read(key); } } } } /** * 得到一个ServerSocket通道,并经过port对其进行初始化 * @param port 监听的端口号 */ private void initServer(String ip,int port) throws IOException { //step1. 得到一个ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //step2. 初始化工做 serverSocketChannel.configureBlocking(false);//设置通道为非阻塞 serverSocketChannel.socket().bind(new InetSocketAddress(ip,port)); //step3. 将该channel注册到Selector上,并为该通道注册SelectionKey.OP_ACCEPT事件 //这样一来,当有"服务端接收客户端链接"事件到达时,selector.select()方法会返回,不然将一直阻塞 serverSocketChannel.register(this.selector,SelectionKey.OP_ACCEPT); } /** * 当监听到服务端接收客户端链接事件后的处理函数 * @param key 事件key,能够从key中获取channel,完成事件的处理 */ public void accept(SelectionKey key) throws IOException { //step1. 获取serverSocketChannel ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); //step2. 得到和客户端链接的socketChannel SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false);//设置为非阻塞 //step3. 注册该socketChannel socketChannel.register(selector,SelectionKey.OP_READ);//为了接收客户端的消息,注册读事件 } public void read(SelectionKey key) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { byte[] result = getReadData(key); if (result == null) return; SocketChannel socketChannel = (SocketChannel) key.channel(); LCRPCRequestDO requestDO = (LCRPCRequestDO) ObjectAndByteUtil.toObject(result); IProviderService providerService = new ProviderServiceImpl(); //将结果写回 socketChannel.write(ByteBuffer.wrap(ObjectAndByteUtil.toByteArray(providerService.getFuncCalldata(requestDO)))); // socketChannel.close();//关闭 } }
类NIOBase为基类。代码以下:框架
public class NIOBase { // 线程中的通道管理器 public Selector selector; /** * 初始化 该线程中的通道管理器Selector */ public void initSelector() throws IOException { this.selector = Selector.open(); } public byte[] getReadData(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(10); int len = socketChannel.read(byteBuffer); if (len == -1){ socketChannel.close(); return null;//说明链接已经断开 } int lenth = 0; List<byte[]> list = new ArrayList<>(); while (len > 0){ lenth += len; byteBuffer.flip(); byte[] arr = new byte[len]; byteBuffer.get(arr,0,len); list.add(arr); byteBuffer.clear(); len = socketChannel.read(byteBuffer); } byte[] result = new byte[lenth]; int l = 0; for (int i = 0;i<list.size();i++){ for (int j = 0;j<list.get(i).length;j++){ result[l + j] = list.get(i)[j]; } l += list.get(i).length; } return result; } }
getReadData方法读取客户端发送的所有数据。利用帮助类ObjectAndByteUtil对客户端发送的数据进行序列化为reqeust对象。同时为接口IProviderService添加方法getFuncCallData,利用request对象调用相应服务方法,获得方法的返回值,反序列化后发送给客户端,该方法的代码与初版本一致。
帮助类ObjectAndByteUtil负责利用反/序列化技术进行字节数组与对象之间的转化,代码以下:
package whu.edu.lcrpc.util; import java.io.*; /** * Created by apple on 17/3/30. */ public class ObjectAndByteUtil { /** * 对象转数组 * @param obj * @return */ public static byte[] toByteArray (Object obj) { byte[] bytes = null; ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); oos.flush(); bytes = bos.toByteArray (); oos.close(); bos.close(); } catch (IOException ex) { ex.printStackTrace(); } return bytes; } /** * 数组转对象 * @param bytes * @return */ public static Object toObject (byte[] bytes) { Object obj = null; try { ByteArrayInputStream bis = new ByteArrayInputStream (bytes); ObjectInputStream ois = new ObjectInputStream (bis); obj = ois.readObject(); ois.close(); bis.close(); } catch (IOException ex) { ex.printStackTrace(); } catch (ClassNotFoundException ex) { ex.printStackTrace(); } return obj; } }
为了采用NIO的方法开启服务发布端的服务监听,咱们修改LCRPCProviderImpl类中对startListen函数的调用改成方法startListenByNIO,使得服务端采用NIO的方式发布服务。
public Object read(SelectionKey key) throws IOException { //step1. 获得事件发生的通道 byte[] result = getReadData(key); if (result == null) return null; Object object = ObjectAndByteUtil.toObject(result); return object; }
咱们为接口IConsumerService添加方法sendDataByNIO,采用NIO的方式将服务调用端的请求信息序列化后发送给服务端,该函数代码以下:
public Object sendDataByNIO(String ip, LCRPCRequestDO requestDO) throws IOException, ClassNotFoundException { NIOClient nioClient = new NIOClient(requestDO,ip); return nioClient.run(); }
类NIOClient代码以下,其中run函数开启轮询,当所注册事件发生时,触发相应的方法。并在read事件触发后结束轮训。
public class NIOClient extends NIOBase{ private LCRPCRequestDO requestDO;//客户端对应的请求DO,发送给服务端 private String ip; public NIOClient(LCRPCRequestDO requestDO,String ip){ this.requestDO = requestDO; this.ip = ip; } public Object run() { try { initSelector();//初始化通道管理器 initClient(ip,Constant.PORT);//初始化客户端链接scoketChannel return listen();//开始轮询处理事件 } catch (Exception e) { e.printStackTrace(); return null; } } public Object listen() throws IOException { //轮询访问select boolean flag = true; Object result = null; while(flag){ //当注册的事件到达时,方法返回;不然将一直阻塞 selector.select(); //得到selector中选中的项的迭代器,选中的项为注册的事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); //循环处理注册事件 /** * 一共有四种事件: * 1. 服务端接收客户端链接事件: SelectionKey.OP_ACCEPT * 2. 客户端链接服务端事件: SelectionKey.OP_CONNECT * 3. 读事件: SelectionKey.OP_READ * 4. 写事件: SelectionKey.OP_WRITE */ while(iterator.hasNext()){ SelectionKey key = iterator.next(); //手动删除已选的key,以防重复处理 iterator.remove(); //判断事件性质 if (key.isReadable()){//读事件 result = read(key); flag = false; break; }else if (key.isConnectable()) {//客户端链接事件 connect(key); } } } return result; } /** * 得到一个SocketChannel,并对该channel作一些初始化工做,并注册到 * @param ip * @param port */ public void initClient(String ip,int port) throws IOException { //step1. 得到一个SocketChannel SocketChannel socketChannel = SocketChannel.open(); //step2. 初始化该channel socketChannel.configureBlocking(false);//设置通道为非阻塞 //step3. 客户端链接服务器,其实方法执行并无实现链接,须要再listen()方法中调用channel.finishConnect()方法才能完成链接 socketChannel.connect(new InetSocketAddress(ip,port)); //step4. 注册该channel到selector中,并为该通道注册SelectionKey.OP_CONNECT事件和SelectionKey.OP_READ事件 socketChannel.register(this.selector,SelectionKey.OP_CONNECT|SelectionKey.OP_READ); } /** * 当监听到客户端链接事件后的处理函数 * @param key 事件key,能够从key中获取channel,完成事件的处理 */ public void connect(SelectionKey key) throws IOException { //step1. 获取事件中的channel SocketChannel socketChannel = (SocketChannel) key.channel(); //step2. 若是正在链接,则完成链接 if (socketChannel.isConnectionPending()){ socketChannel.finishConnect(); } socketChannel.configureBlocking(false);//将链接设置为非阻塞 //step3. 链接后,能够给服务端发送消息 socketChannel.write(ByteBuffer.wrap(ObjectAndByteUtil.toByteArray(requestDO))); } public Object read(SelectionKey key) throws IOException { //step1. 获得事件发生的通道 byte[] result = getReadData(key); if (result == null) return null; Object object = ObjectAndByteUtil.toObject(result); return object; } }
为了使得客户端采用NIO方式进行通信,咱们修改MyInvokeHandler类:
// result = consumerService.sendData(serviceAddress,requestDO);//采用BIO的方式 result = consumerService.sendDataByNIO(serviceAddress,requestDO);//采用NIO的方式
至此,NIO通讯模式代码修改完毕。在测试的过程当中,遇到了一个问题,就是在服务调用端发出一个服务调用请求后,服务发布端一直在触发read事件,查阅资料后,了解到这种NIO的实现方式中,客户端或者服务端其中一方将链接关闭后,会一直触发另外一方的read事件,这时read会回传-1,若没有即便正确处理断线(关闭channel),read事件会一直触发,所以在getData函数读取数据时,添加以下代码:
if (len == -1){ socketChannel.close(); return null;//说明链接已经断开 }
至此,问题得以解决。
服务注册查找中心以及服务端客户端的代码都不须要改变,分别运行后,获得与初版相同的结果。(因为服务端咱们采用一个selector管理全部channel,而且没有开启新的线程去处理数据,所以客户端会以同步的方式获得四次服务调用结果)
目前为止咱们的代码中,通讯部分采用了NIO和BIO两种模式。BIO模式采用socket编程实现,NIO部分采用selector channel buffer编程实现。可是不管哪种,都只是简单的帮助咱们了解两种通讯模式的基本概念,以及如何用最简单得编程方式实现。咱们在代码中,也有很是多的异常,网络等状况没有考虑,在实际生产中,也毫不会使用这种最基本最底层的编程方式来完成远程得通讯。所以,咱们这里引入Netty开源框架来实现通讯。他帮助咱们考虑了多种情况,使得咱们以简单的代码完成高质量的远程通讯,专一于其余业务逻辑等的实现。
在分布式应用系统开发中,服务化的应用之间进行远程通讯时使用。Netty是在Java NIO的基础上封装的用于客户端服务端网络应用程序开发的框架,帮助用户考虑在分布式、高并发、高性能开发中遇到的多种情况,使得用户使用更容易的网络编程接口完成网络通讯,专一于其余业务逻辑的开发。
(1)关于Netty
(如下内容摘自知乎的帖子《通俗地讲,Netty 能作什么?》)
netty是一套在java NIO的基础上封装的便于用户开发网络应用程序的api.
Netty是什么?
1)本质:JBoss作的一个Jar包
2)目的:快速开发高性能、高可靠性的网络服务器和客户端程序
3)优势:提供异步的、事件驱动的网络应用程序框架和工具
通俗的说:一个好使的处理Socket的东东
(2)为何选择netty
如下内容摘抄自《Netty权威指南》
在上述优化中,咱们使用JDK为咱们提供的NIO的类库来修改LCRPC框架的远程通讯方式。如下总结了不选择Java原声NIO编程的缘由:
因为上述缘由,在大多数场景下,不建议你们直接食用JDK的NIO类库,除非精通NIO编程或者有特殊的需求。在绝大多数的业务场景中,咱们可使用NIO框架Netty来进行NIO编程,他既能够做为客户端也能够做为服务端,同时也支持UDP和异步文件传输,功能很是强大。
如下总结了为何选择Netty做为基础通讯框架:
(3)LCRPC服务框架优化:使用netty替换底层网络通信
与NIO的修改方式大体相同
增长四个netty服务端与客户端的类;
netty服务端开启监听的类NettyServer:
package whu.edu.lcrpc.io.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import whu.edu.lcrpc.util.Constant; /** * Created by apple on 17/4/10. */ public class NettyServer { public void bind() throws InterruptedException { //配置服务端的NIO的线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup wokerGroup = new NioEventLoopGroup(); //建立服务启动的辅助类 try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,wokerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChildChannelHandler()); //绑定端口,同步等待成功 ChannelFuture f = b.bind(Constant.PORT).sync(); System.out.println("已经开始监听,能够注册服务了"); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); }finally { //优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); wokerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); } } }
netty服务端hanlder类NettyServerhandler:
package whu.edu.lcrpc.io.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import whu.edu.lcrpc.entity.LCRPCRequestDO; import whu.edu.lcrpc.service.IProviderService; import whu.edu.lcrpc.service.impl.ProviderServiceImpl; import whu.edu.lcrpc.util.ObjectAndByteUtil; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Date; /** * Created by apple on 17/4/10. */ public class NettyServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); if (req == null) return; LCRPCRequestDO requestDO = (LCRPCRequestDO) ObjectAndByteUtil.toObject(req); IProviderService providerService = new ProviderServiceImpl(); ByteBuf resp = Unpooled.copiedBuffer(ObjectAndByteUtil.toByteArray(providerService.getFuncCalldata(requestDO))); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
netty客户端链接类NettyClient:
package whu.edu.lcrpc.io.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import whu.edu.lcrpc.util.Constant; import whu.edu.lcrpc.util.ObjectAndByteUtil; import java.io.UnsupportedEncodingException; /** * Created by apple on 17/4/10. */ public class NettyClient { private Object reqObj; private String ip; public NettyClient(Object reqObj, String ip){ this.reqObj = reqObj; this.ip = ip; } public Object connect() throws InterruptedException, UnsupportedEncodingException { //配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); byte[] req = ObjectAndByteUtil.toByteArray(reqObj); NettyClientHandler nettyClientHandler = new NettyClientHandler(req); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(nettyClientHandler); } }); //发起异步链接操做 ChannelFuture f = b.connect(ip, Constant.PORT).sync(); //等待客户端链路关闭 f.channel().closeFuture().sync(); //拿到异步请求结果,返回 Object responseObj = ObjectAndByteUtil.toObject(nettyClientHandler.response); return responseObj; }finally { //优雅退出,释放NIO线程组 group.shutdownGracefully(); } } }
netty客户端handler类NettyClientHandler:
package whu.edu.lcrpc.io.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * Created by apple on 17/4/11. */ public class NettyClientHandler extends ChannelHandlerAdapter { private ByteBuf firstMessage; public byte[] response; public NettyClientHandler(byte[] req){ //将请求写入缓冲区 firstMessage = Unpooled.buffer(req.length); firstMessage.writeBytes(req); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; response = new byte[buf.readableBytes()]; buf.readBytes(response); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
然后修改LCRPC中本来的代码,采用netty来进行远程通讯。
首先在接口IConsumerService中增长函数sendDataByNetty,该函数采用netty的方式向服务发布端发送数据。函数实现以下:
@Override public Object sendDataByNetty(String ip, LCRPCRequestDO requestDO) throws IOException, ClassNotFoundException, InterruptedException { NettyClient nettyClient = new NettyClient(requestDO,ip); return nettyClient.connect(); }
然后在接口IProviderService增长函数startListenByNetty,该函数采用netty的方式开启服务监听。
@Override public boolean startListenByNetty() { new Thread(()->{ NettyServer nettyServer = new NettyServer(); try { nettyServer.bind(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); return true; }
而后在代理handler类MyInvocationHandler中,
修改
result = consumerService.sendDataByNIO(serviceAddress,requestDO);//采用NIO的方式
为
result = consumerService.sendDataByNetty(serviceAddress,requestDO); //采用netty的方式
采用netty的方式调用服务。
而且在类LCRPCProviderImpl中使用方法startListenByNetty开启服务的监听。
客户端以及服务端的测试工程代码均不须要改变,进行测试后,输出结果不变。
须要注意的是:上述关于Netty的使用没有考虑到TCL粘包/拆包的问题!
这个优化未完待续~