RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种经过网络,从远程计算机程序上请求服务,而没必要了解底层网络技术的协议。说的再直白一点,就是客户端在没必要知道调用细节的前提之下,调用远程计算机上运行的某个对象,使用起来就像调用本地的对象同样。目前典型的RPC实现框架有:Thrift(facebook开源)、Dubbo(alibaba开源)等等。RPC框架针对网络协议、网络I/O模型的封装是透明的,对于调用的客户端而言,它就认为本身在调用本地的一个对象。至于传输层上,运用的是TCP协议、UDP协议、亦或是HTTP协议,一律不关心。从网络I/O模型上来看,是基于select、poll、epoll方式、仍是IOCP(I/O Completion Port)方式承载实现的,对于调用者而言也不用关心。html
目前,主流的RPC框架都支持跨语言调用,即有所谓的IDL(接口定义语言),其实,这个并非RPC所必需要求的。若是你的RPC框架没有跨语言的要求,IDL就能够不用包括了。java
最后,值得一提的是,衡量一个RPC框架性能的好坏与否,RPC的网络I/O模型的选择,相当重要。在此基础上,设计出来的RPC服务器,能够考虑支持阻塞式同步IO、非阻塞式同步IO、固然还有所谓的多路复用IO模型、异步IO模型。支持不一样的网络IO模型,在高并发的状态下,处理性能上会有很大的差异。还有一个衡量的标准,就是选择的传输协议。是基于TCP协议、仍是HTTP协议、仍是UDP协议?对性能也有必定的影响。可是从我目前了解的状况来看,大多数RPC开源实现框架都是基于TCP、或者HTTP的,目测没有采用UDP协议作为主要的传输协议的。git
明白了RPC的使用原理和性能要求。如今,咱们能不能撇开那些RPC开源框架,本身动手开发一个高性能的RPC服务器呢?我想,仍是能够的。如今本人就使用Java,基于Netty,开发实现一个高性能的RPC服务器。github
如何实现、基于什么原理?并发处理性能如何?请继续接着看下文。spring
咱们有的时候,为了提升单个节点的通讯吞吐量,提升通讯性能。若是是基于Java后端的,通常首选的是NIO框架(No-block IO)。可是问题也来了,Java的NIO掌握起来要至关的技术功底,和足够的技术积累,使用起来才能驾轻就熟。通常的开发人员,若是要使用NIO开发一个后端的TCP/HTTP服务器,附带考虑TCP粘包、网络通讯异常、消息连接处理等等网络通讯细节,开发门槛过高,因此比较明智的选择是,采用业界主流的NIO框架进行服务器后端开发。主流的NIO框架主要有Netty、Mina。它们主要都是基于TCP通讯,非阻塞的IO、灵活的IO线程池而设计的,应对高并发请求也是绰绰有余。随着Netty、Mina这样优秀的NIO框架,设计上日趋完善,Java后端高性能服务器开发,在技术上提供了有力的支持保障,从而打破了C++在服务器后端,一统天下的局面。由于在此以前,Java的NIO一直受人诟病,让人敬而远之!apache
既然,这个RPC服务器是基于Netty的,那就在说说Netty吧。实际上Netty是对JAVA NIO框架的再次封装,它的开源网址是http://netty.io/,本文中使用的Netty版本是:4.0版本,能够经过http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2,进行下载使用。那也许你会问,如何使用Netty进行RPC服务器的开发呢?实际不难,下面我就简单的说明一下技术原理:bootstrap
一、定义RPC请求消息、应答消息结构,里面要包括RPC的接口定义模块、包括远程调用的类名、方法名称、参数结构、参数值等信息。后端
二、服务端初始化的时候经过容器加载RPC接口定义和RPC接口实现类对象的映射关系,而后等待客户端发起调用请求。缓存
三、客户端发起的RPC消息里面包含,远程调用的类名、方法名称、参数结构、参数值等信息,经过网络,以字节流的方式送给RPC服务端,RPC服务端接收到字节流的请求以后,去对应的容器里面,查找客户端接口映射的具体实现对象。安全
四、RPC服务端找到实现对象的参数信息,经过反射机制建立该对象的实例,并返回调用处理结果,最后封装成RPC应答消息通知到客户端。
五、客户端经过网络,收到字节流形式的RPC应答消息,进行拆包、解析以后,显示远程调用结果。
上面说的是很简单,可是实现的时候,咱们还要考虑以下的问题:
一、RPC服务器的传输层是基于TCP协议的,出现粘包咋办?这样客户端的请求,服务端不是会解析失败?好在Netty里面已经提供了解决TCP粘包问题的解码器:LengthFieldBasedFrameDecoder,能够靠它轻松搞定TCP粘包问题。
二、Netty服务端的线程模型是单线程、多线程(一个线程负责客户端链接,链接成功以后,丢给后端IO的线程池处理)、仍是主从模式(客户端链接、后端IO处理都是基于线程池的实现)。固然在这里,我出于性能考虑,使用了Netty主从线程池模型。
三、Netty的IO处理线程池,若是遇到很是耗时的业务,出现阻塞了咋办?这样不是很容易把后端的NIO线程给挂死、阻塞?本文的处理方式是,对于复杂的后端业务,分派到专门的业务线程池里面,进行异步回调处理。
四、RPC消息的传输是经过字节流在NIO的通道(Channel)之间传输,那具体如何实现呢?本文,是经过基于Java原生对象序列化机制的编码、解码器(ObjectEncoder、ObjectDecoder)进行实现的。固然出于性能考虑,这个可能不是最优的方案。更优的方案是把消息的编码、解码器,搞成能够配置实现的。具体好比能够经过:protobuf、JBoss Marshalling方式进行解码和编码,以提升网络消息的传输效率。
五、RPC服务器要考虑多线程、高并发的使用场景,因此线程安全是必须的。此外尽可能不要使用synchronized进行加锁,改用轻量级的ReentrantLock方式进行代码块的条件加锁。好比本文中的RPC消息处理回调,就有这方面的使用。
六、RPC服务端的服务接口对象和服务接口实现对象要能轻易的配置,轻松进行加载、卸载。在这里,本文是经过Spring容器进行统一的对象管理。
综上所述,本文设计的RPC服务器调用的流程图以下所示:
客户端并发发起RPC调用请求,而后RPC服务端使用Netty链接器,分派出N个NIO链接线程,这个时候Netty链接器的任务结束。而后NIO链接线程是统一放到Netty NIO处理线程池进行管理,这个线程池里面会对具体的RPC请求链接进行消息编码、消息解码、消息处理等等一系列操做。最后进行消息处理(Handler)的时候,处于性能考虑,这里的设计是,直接把复杂的消息处理过程,丢给专门的RPC业务处理线程池集中处理,而后Handler对应的NIO线程就当即返回、不会阻塞。这个时候RPC调用结束,客户端会异步等待服务端消息的处理结果,本文是经过消息回调机制实现(MessageCallBack)。
再来讲一说Netty对于RPC消息的解码、编码、处理对应的模块和流程,具体以下图所示:
从上图能够看出客户端、服务端对RPC消息编码、解码、处理调用的模块以及调用顺序了。Netty就是把这样一个一个的处理器串在一块儿,造成一个责任链,统一进行调用。
说了这么多,如今先简单看下,我设计实现的NettyRPC的代码目录层级结构:
其中newlandframework.netty.rpc.core包是NettyRPC的核心实现。newlandframework.netty.rpc.model包里面,则封装了RPC消息请求、应答报文结构,以及RPC服务接口与实现绑定关系的容器定义。newlandframework.netty.rpc.config里面定义了NettyRPC的服务端文件配置属性。
下面先来看下newlandframework.netty.rpc.model包中定义的内容。具体是RPC消息请求、应答消息的结构定义:
RPC请求消息结构
/** * @filename:MessageRequest.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc服务请求结构 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.model; import java.io.Serializable; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class MessageRequest implements Serializable { private String messageId; private String className; private String methodName; private Class<?>[] typeParameters; private Object[] parametersVal; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<?>[] getTypeParameters() { return typeParameters; } public void setTypeParameters(Class<?>[] typeParameters) { this.typeParameters = typeParameters; } public Object[] getParameters() { return parametersVal; } public void setParameters(Object[] parametersVal) { this.parametersVal = parametersVal; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("messageId", messageId).append("className", className) .append("methodName", methodName).toString(); } }
RPC应答消息结构
/** * @filename:MessageResponse.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc服务应答结构 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.model; import java.io.Serializable; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class MessageResponse implements Serializable { private String messageId; private String error; private Object resultDesc; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getError() { return error; } public void setError(String error) { this.error = error; } public Object getResult() { return resultDesc; } public void setResult(Object resultDesc) { this.resultDesc = resultDesc; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("messageId", messageId).append("error", error).toString(); } }
RPC服务接口定义、服务接口实现绑定关系容器定义,提供给spring做为容器使用。
/** * @filename:MessageKeyVal.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc服务映射容器 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.model; import java.util.Map; public class MessageKeyVal { private Map<String, Object> messageKeyVal; public void setMessageKeyVal(Map<String, Object> messageKeyVal) { this.messageKeyVal = messageKeyVal; } public Map<String, Object> getMessageKeyVal() { return messageKeyVal; } }
好了,定义好核心模型结构以后,如今再向你们展现一下NettyRPC核心包:newlandframework.netty.rpc.core的关键部分实现代码,首先是业务线程池相关类的实现代码,具体以下:
线程工厂定义实现
/** * @filename:NamedThreadFactory.java * * Newland Co. Ltd. All rights reserved. * * @Description:线程工厂 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class NamedThreadFactory implements ThreadFactory { private static final AtomicInteger threadNumber = new AtomicInteger(1); private final AtomicInteger mThreadNum = new AtomicInteger(1); private final String prefix; private final boolean daemoThread; private final ThreadGroup threadGroup; public NamedThreadFactory() { this("rpcserver-threadpool-" + threadNumber.getAndIncrement(), false); } public NamedThreadFactory(String prefix) { this(prefix, false); } public NamedThreadFactory(String prefix, boolean daemo) { this.prefix = prefix + "-thread-"; daemoThread = daemo; SecurityManager s = System.getSecurityManager(); threadGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); } public Thread newThread(Runnable runnable) { String name = prefix + mThreadNum.getAndIncrement(); Thread ret = new Thread(threadGroup, runnable, name, 0); ret.setDaemon(daemoThread); return ret; } public ThreadGroup getThreadGroup() { return threadGroup; } }
业务线程池定义实现
/** * @filename:RpcThreadPool.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc线程池封装 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class RpcThreadPool { //独立出线程池主要是为了应对复杂耗I/O操做的业务,不阻塞netty的handler线程而引入 //固然若是业务足够简单,把处理逻辑写入netty的handler(ChannelInboundHandlerAdapter)也何尝不可 public static Executor getExecutor(int threads, int queues) { String name = "RpcThreadPool"; return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name)); } }
/** * @filename:AbortPolicyWithReport.java * * Newland Co. Ltd. All rights reserved. * * @Description:线程池异常策略 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { private final String threadName; public AbortPolicyWithReport(String threadName) { this.threadName = threadName; } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("RpcServer[" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)]", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating()); System.out.println(msg); throw new RejectedExecutionException(msg); } }
RPC调用客户端定义实现
/** * @filename:MessageSendExecutor.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc客户端执行模块 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.lang.reflect.Proxy; public class MessageSendExecutor { private RpcServerLoader loader = RpcServerLoader.getInstance(); public MessageSendExecutor(String serverAddress) { loader.load(serverAddress); } public void stop() { loader.unLoad(); } public static <T> T execute(Class<T> rpcInterface) { return (T) Proxy.newProxyInstance( rpcInterface.getClassLoader(), new Class<?>[]{rpcInterface}, new MessageSendProxy<T>(rpcInterface) ); } }
这里的RPC客户端实际上,是动态代理了MessageSendProxy,固然这里是应用了,JDK原生的动态代理实现,你还能够改为CGLIB(Code Generation Library)方式。不过本人测试了一下CGLIB方式,在高并发的状况下面会出现空指针异常,可是一样的状况,JDK原生的动态代理却没有问题。并发程度不高的状况下面,两种代理方式都运行正常。后续再深刻研究看看吧!废话不说了,如今给出MessageSendProxy的实现方式
/** * @filename:MessageSendProxy.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc客户端消息处理 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.UUID; import newlandframework.netty.rpc.model.MessageRequest; public class MessageSendProxy<T> implements InvocationHandler { private Class<T> cls; public MessageSendProxy(Class<T> cls) { this.cls = cls; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MessageRequest request = new MessageRequest(); request.setMessageId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setTypeParameters(method.getParameterTypes()); request.setParameters(args); MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler(); MessageCallBack callBack = handler.sendRequest(request); return callBack.start(); } }
进一步发现MessageSendProxy实际上是把消息发送给RpcServerLoader模块,它的代码以下:
/** * @filename:RpcServerLoader.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc服务器配置加载 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import java.net.InetSocketAddress; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol; public class RpcServerLoader { private volatile static RpcServerLoader rpcServerLoader; private final static String DELIMITER = ":"; private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE; //方法返回到Java虚拟机的可用的处理器数量 private final static int parallel = Runtime.getRuntime().availableProcessors() * 2; //netty nio线程池 private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(parallel); private static ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1); private MessageSendHandler messageSendHandler = null; //等待Netty服务端链路创建通知信号 private Lock lock = new ReentrantLock(); private Condition signal = lock.newCondition(); private RpcServerLoader() { } //并发双重锁定 public static RpcServerLoader getInstance() { if (rpcServerLoader == null) { synchronized (RpcServerLoader.class) { if (rpcServerLoader == null) { rpcServerLoader = new RpcServerLoader(); } } } return rpcServerLoader; } public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) { String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER); if (ipAddr.length == 2) { String host = ipAddr[0]; int port = Integer.parseInt(ipAddr[1]); final InetSocketAddress remoteAddr = new InetSocketAddress(host, port); threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, this, serializeProtocol)); } } public void setMessageSendHandler(MessageSendHandler messageInHandler) { try { lock.lock(); this.messageSendHandler = messageInHandler; //唤醒全部等待客户端RPC线程 signal.signalAll(); } finally { lock.unlock(); } } public MessageSendHandler getMessageSendHandler() throws InterruptedException { try { lock.lock(); //Netty服务端链路没有创建完毕以前,先挂起等待 if (messageSendHandler == null) { signal.await(); } return messageSendHandler; } finally { lock.unlock(); } } public void unLoad() { messageSendHandler.close(); threadPoolExecutor.shutdown(); eventLoopGroup.shutdownGracefully(); } public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) { this.serializeProtocol = serializeProtocol; } }
好了,如今一次性给出RPC客户端消息编码、解码、处理的模块实现代码。
/** * @filename:MessageSendInitializeTask.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc客户端线程任务处理 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class MessageSendInitializeTask implements Runnable { private EventLoopGroup eventLoopGroup = null; private InetSocketAddress serverAddress = null; private RpcServerLoader loader = null; MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcServerLoader loader) { this.eventLoopGroup = eventLoopGroup; this.serverAddress = serverAddress; this.loader = loader; } public void run() { Bootstrap b = new Bootstrap(); b.group(eventLoopGroup) .channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true); b.handler(new MessageSendChannelInitializer()); ChannelFuture channelFuture = b.connect(serverAddress); channelFuture.addListener(new ChannelFutureListener() { public void operationComplete(final ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { MessageSendHandler handler = channelFuture.channel().pipeline().get(MessageSendHandler.class); MessageSendInitializeTask.this.loader.setMessageSendHandler(handler); } } }); } }
/** * @filename:MessageSendChannelInitializer.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc客户端管道初始化 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class MessageSendChannelInitializer extends ChannelInitializer<SocketChannel> { //ObjectDecoder 底层默认继承半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时候, //消息头开始即为长度字段,占据4个字节。这里出于保持兼容的考虑 final public static int MESSAGE_LENGTH = 4; protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。由于底层的父类LengthFieldBasedFrameDecoder //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageSendChannelInitializer.MESSAGE_LENGTH, 0, MessageSendChannelInitializer.MESSAGE_LENGTH)); //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头 pipeline.addLast(new LengthFieldPrepender(MessageSendChannelInitializer.MESSAGE_LENGTH)); pipeline.addLast(new ObjectEncoder()); //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。通常状况使用:cacheDisabled便可 pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new MessageSendHandler()); } }
/** * @filename:MessageSendHandler.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc客户端处理模块 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.net.SocketAddress; import java.util.concurrent.ConcurrentHashMap; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; public class MessageSendHandler extends ChannelInboundHandlerAdapter { private ConcurrentHashMap<String, MessageCallBack> mapCallBack = new ConcurrentHashMap<String, MessageCallBack>(); private volatile Channel channel; private SocketAddress remoteAddr; public Channel getChannel() { return channel; } public SocketAddress getRemoteAddr() { return remoteAddr; } public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); this.remoteAddr = this.channel.remoteAddress(); } public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); this.channel = ctx.channel(); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MessageResponse response = (MessageResponse) msg; String messageId = response.getMessageId(); MessageCallBack callBack = mapCallBack.get(messageId); if (callBack != null) { mapCallBack.remove(messageId); callBack.over(response); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } public void close() { channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } public MessageCallBack sendRequest(MessageRequest request) { MessageCallBack callBack = new MessageCallBack(request); mapCallBack.put(request.getMessageId(), callBack); channel.writeAndFlush(request); return callBack; } }
最后给出RPC服务端的实现。首先是经过spring自动加载RPC服务接口、接口实现容器绑定加载,初始化Netty主/从线程池等操做,具体是经过MessageRecvExecutor模块实现的,如今给出实现代码:
/** * @filename:MessageRecvExecutor.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc服务器执行模块 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.logging.Level; import newlandframework.netty.rpc.model.MessageKeyVal; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean { private String serverAddress; private final static String DELIMITER = ":"; private Map<String, Object> handlerMap = new ConcurrentHashMap<String, Object>(); private static ThreadPoolExecutor threadPoolExecutor; public MessageRecvExecutor(String serverAddress) { this.serverAddress = serverAddress; } public static void submit(Runnable task) { if (threadPoolExecutor == null) { synchronized (MessageRecvExecutor.class) { if (threadPoolExecutor == null) { threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1); } } } threadPoolExecutor.submit(task); } public void setApplicationContext(ApplicationContext ctx) throws BeansException { try { MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal")); Map<String, Object> rpcServiceObject = keyVal.getMessageKeyVal(); Set s = rpcServiceObject.entrySet(); Iterator<Map.Entry<String, Object>> it = s.iterator(); Map.Entry<String, Object> entry; while (it.hasNext()) { entry = it.next(); handlerMap.put(entry.getKey(), entry.getValue()); } } catch (ClassNotFoundException ex) { java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex); } } public void afterPropertiesSet() throws Exception { //netty的线程池模型设置成主从线程池模式,这样能够应对高并发请求 //固然netty还支持单线程、多线程网络IO模型,能够根据业务需求灵活配置 ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory"); //方法返回到Java虚拟机的可用的处理器数量 int parallel = Runtime.getRuntime().availableProcessors() * 2; EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(parallel,threadRpcFactory,SelectorProvider.provider()); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new MessageRecvChannelInitializer(handlerMap)) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER); if (ipAddr.length == 2) { String host = ipAddr[0]; int port = Integer.parseInt(ipAddr[1]); ChannelFuture future = bootstrap.bind(host, port).sync(); System.out.printf("[author tangjie] Netty RPC Server start success ip:%s port:%d\n", host, port); future.channel().closeFuture().sync(); } else { System.out.printf("[author tangjie] Netty RPC Server start fail!\n"); } } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } }
最后仍是老规矩,给出RPC服务端消息编码、解码、处理的核心模块代码实现,具体以下:
/** * @filename:MessageRecvChannelInitializer.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc服务端管道初始化 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import java.util.Map; public class MessageRecvChannelInitializer extends ChannelInitializer<SocketChannel> { //ObjectDecoder 底层默认继承半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时候, //消息头开始即为长度字段,占据4个字节。这里出于保持兼容的考虑 final public static int MESSAGE_LENGTH = 4; private Map<String, Object> handlerMap = null; MessageRecvChannelInitializer(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。由于底层的父类LengthFieldBasedFrameDecoder //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH)); //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头 pipeline.addLast(new LengthFieldPrepender(MessageRecvChannelInitializer.MESSAGE_LENGTH)); pipeline.addLast(new ObjectEncoder()); //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。通常状况使用:cacheDisabled便可 pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new MessageRecvHandler(handlerMap)); } }
/** * @filename:MessageRecvHandler.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc服务器消息处理 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Map; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; public class MessageRecvHandler extends ChannelInboundHandlerAdapter { private final Map<String, Object> handlerMap; public MessageRecvHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MessageRequest request = (MessageRequest) msg; MessageResponse response = new MessageResponse(); MessageRecvInitializeTask recvTask = new MessageRecvInitializeTask(request, response, handlerMap, ctx); //不要阻塞nio线程,复杂的业务逻辑丢给专门的线程池 MessageRecvExecutor.submit(recvTask); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //网络有异常要关闭通道 ctx.close(); } }
/** * @filename:MessageRecvInitializeTask.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc服务器消息线程任务处理 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import java.util.Map; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; import org.apache.commons.beanutils.MethodUtils; public class MessageRecvInitializeTask implements Runnable { private MessageRequest request = null; private MessageResponse response = null; private Map<String, Object> handlerMap = null; private ChannelHandlerContext ctx = null; public MessageResponse getResponse() { return response; } public MessageRequest getRequest() { return request; } public void setRequest(MessageRequest request) { this.request = request; } MessageRecvInitializeTask(MessageRequest request, MessageResponse response, Map<String, Object> handlerMap, ChannelHandlerContext ctx) { this.request = request; this.response = response; this.handlerMap = handlerMap; this.ctx = ctx; } public void run() { response.setMessageId(request.getMessageId()); try { Object result = reflect(request); response.setResult(result); } catch (Throwable t) { response.setError(t.toString()); t.printStackTrace(); System.err.printf("RPC Server invoke error!\n"); } ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("RPC Server Send message-id respone:" + request.getMessageId()); } }); } private Object reflect(MessageRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = handlerMap.get(className); String methodName = request.getMethodName(); Object[] parameters = request.getParameters(); return MethodUtils.invokeMethod(serviceBean, methodName, parameters); } }
而后是RPC消息处理的回调实现模块代码
/** * @filename:MessageCallBack.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc消息回调 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; public class MessageCallBack { private MessageRequest request; private MessageResponse response; private Lock lock = new ReentrantLock(); private Condition finish = lock.newCondition(); public MessageCallBack(MessageRequest request) { this.request = request; } public Object start() throws InterruptedException { try { lock.lock(); //设定一下超时时间,rpc服务器过久没有相应的话,就默认返回空吧。 finish.await(10*1000, TimeUnit.MILLISECONDS); if (this.response != null) { return this.response.getResult(); } else { return null; } } finally { lock.unlock(); } } public void over(MessageResponse reponse) { try { lock.lock(); finish.signal(); this.response = reponse; } finally { lock.unlock(); } } }
到此为止,NettyRPC的关键部分:服务端、客户端的模块已经经过Netty所有实现了。如今给出spring加载配置rpc-invoke-config.xml的内容:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="newlandframework.netty.rpc.core"/> <context:property-placeholder location="classpath:newlandframework/netty/rpc/config/rpc-server.properties"/> <bean id="rpcbean" class="newlandframework.netty.rpc.model.MessageKeyVal"> <property name="messageKeyVal"> <map> <entry key="newlandframework.netty.rpc.servicebean.Calculate"> <ref bean="calc"/> </entry> </map> </property> </bean> <bean id="calc" class="newlandframework.netty.rpc.servicebean.CalculateImpl"/> <bean id="rpcServer" class="newlandframework.netty.rpc.core.MessageRecvExecutor"> <constructor-arg name="serverAddress" value="${rpc.server.addr}"/> </bean> </beans>
再贴出RPC服务绑定ip信息的配置文件:rpc-server.properties的内容。
#rpc server's ip address config
rpc.server.addr=127.0.0.1:18888
最后NettyRPC服务端启动方式参考以下:
new ClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config.xml");
若是一切顺利,没有出现意外的话,控制台上面,会出现以下截图所示的状况:
若是出现了,说明NettyRPC服务器,已经启动成功!
上面基于Netty的RPC服务器,并发处理性能如何呢?实践是检验真理的惟一标准,下面咱们就来实战一下。
下面的测试案例,是基于RPC远程调用两数相加函数,并返回计算结果。客户端同时开1W个线程,同一时刻,瞬时发起并发计算请求,而后观察Netty的RPC服务器是否有正常应答回复响应,以及客户端是否有正常返回调用计算结果。值得注意的是,测试案例是基于1W个线程瞬时并发请求而设计的,并非1W个线程循环发起请求。这二者对于衡量RPC服务器的并发处理性能,仍是有很大差异的。固然,前者对于并发性能的处理要求,要高上不少不少。
如今,先给出RPC计算接口、RPC计算接口实现类的代码实现:
/** * @filename:Calculate.java * * Newland Co. Ltd. All rights reserved. * * @Description:计算器定义接口 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.servicebean; public interface Calculate { //两数相加 int add(int a, int b); }
/** * @filename:CalculateImpl.java * * Newland Co. Ltd. All rights reserved. * * @Description:计算器定义接口实现 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.servicebean; public class CalculateImpl implements Calculate { //两数相加 public int add(int a, int b) { return a + b; } }
下面是瞬时并发RPC请求的测试样例:
/** * @filename:CalcParallelRequestThread.java * * Newland Co. Ltd. All rights reserved. * * @Description:并发线程模拟 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.servicebean; import newlandframework.netty.rpc.core.MessageSendExecutor; import java.util.concurrent.CountDownLatch; import java.util.logging.Level; import java.util.logging.Logger; public class CalcParallelRequestThread implements Runnable { private CountDownLatch signal; private CountDownLatch finish; private MessageSendExecutor executor; private int taskNumber = 0; public CalcParallelRequestThread(MessageSendExecutor executor, CountDownLatch signal, CountDownLatch finish, int taskNumber) { this.signal = signal; this.finish = finish; this.taskNumber = taskNumber; this.executor = executor; } public void run() { try { signal.await(); Calculate calc = executor.execute(Calculate.class); int add = calc.add(taskNumber, taskNumber); System.out.println("calc add result:[" + add + "]"); finish.countDown(); } catch (InterruptedException ex) { Logger.getLogger(CalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex); } } }
/** * @filename:RpcParallelTest.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc并发测试代码 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.servicebean; import java.util.concurrent.CountDownLatch; import newlandframework.netty.rpc.core.MessageSendExecutor; import org.apache.commons.lang.time.StopWatch; public class RpcParallelTest { public static void main(String[] args) throws Exception { final MessageSendExecutor executor = new MessageSendExecutor("127.0.0.1:18888"); //并行度10000 int parallel = 10000; //开始计时 StopWatch sw = new StopWatch(); sw.start(); CountDownLatch signal = new CountDownLatch(1); CountDownLatch finish = new CountDownLatch(parallel); for (int index = 0; index < parallel; index++) { CalcParallelRequestThread client = new CalcParallelRequestThread(executor, signal, finish, index); new Thread(client).start(); } //10000个并发线程瞬间发起请求操做 signal.countDown(); finish.await(); sw.stop(); String tip = String.format("RPC调用总共耗时: [%s] 毫秒", sw.getTime()); System.out.println(tip); executor.stop(); } }
好了,如今先启动NettyRPC服务器,确认没有问题以后,运行并发RPC请求客户端,看下客户端打印的计算结果,以及处理耗时。
从上面来看,10000个瞬时RPC计算请求,总共耗时接近11秒。咱们在来看下NettyRPC的服务端运行状况,以下所示:
能够很清楚地看到,RPC服务端都有收到客户端发起的RPC计算请求,并返回消息应答。
最后咱们仍是要分别验证一下,RPC服务端是否存在丢包、粘包、IO阻塞的状况?1W个并发计算请求,是否成功接收处理并应答了?实际状况说明一切,看下图所示:
很是给力,RPC的服务端确实成功接收到了客户端发起的1W笔瞬时并发计算请求,而且成功应答处理了。并无出现:丢包、粘包、IO阻塞的状况。再看下RPC客户端,是否成功获得计算结果的应答返回了呢?
很好,RPC的客户端,确实收到了RPC服务端计算的1W笔加法请求的计算结果,并且耗时接近11秒。因而可知,基于Netty+业务线程池的NettyRPC服务器,应对并发多线程RPC请求,处理起来是驾轻就熟,游刃有余!
最后,本文经过Netty这个NIO框架,实现了一个很简单的“高性能”的RPC服务器,代码虽然写出来了,可是仍是有一些值得改进的地方,好比:
一、对象序列化传输能够支持目前主流的序列化框架:protobuf、JBoss Marshalling、Avro等等。
二、Netty的线程模型能够根据业务需求,进行定制。由于,并非每笔业务都须要这么强大的并发处理性能。
三、目前RPC计算只支持一个RPC服务接口映射绑定一个对应的实现,后续要支持一对多的状况。
四、业务线程池的启动参数、线程池并发阻塞容器模型等等,能够配置化管理。
五、Netty的Handler处理部分,对于复杂的业务逻辑,如今是统一分派到特定的线程池进行后台异步处理。固然你还能够考虑JMS(消息队列)方式进行解耦,统一分派给消息队列的订阅者,统一处理。目前实现JMS的开源框架也有不少,ActiveMQ、RocketMQ等等,均可以考虑。
本文实现的NettyRPC,对于面前的您而言,必定还有不少地方,能够加以完善和改进,优化改进的工做就交给您自由发挥了。
因为本人技术能力、认知水平有限。本文中有说不对的地方,恳请园友们批评指正!不吝赐教!最后,感谢面前的您,耐心的阅读完本文,相信如今的你,对于Java开发高性能的服务端应用,又有了一个更深刻的了解!本文算是对我Netty学习成果的阶段性总结,后续有时间,我还会继续推出Netty工业级开发的相关文章,敬请期待!
PS:还有兴趣的朋友能够参考、阅读一下,个人另一篇文章:Netty实现高性能RPC服务器优化篇之消息序列化。此外,自从在博客园发表了两篇:基于Netty开发高性能RPC服务器的文章以后,本人收到不少园友们索要源代码进行学习交流的请求。为了方便你们,本人把NettyRPC的代码开源托管到github上面,欢迎有兴趣的朋友一块儿学习、研究!
附上NettyRPC项目的下载路径:https://github.com/tang-jie/NettyRPC
Netty工业级开发系列文章进阶:Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇
谈谈如何使用Netty开发实现高性能的RPC服务器、Netty实现高性能RPC服务器优化篇之消息序列化。这两篇文章主要设计的思路是,基于Netty构建了一个高性能的RPC服务器,而这些前期代码的准备工做,主要是为了设计、实现一个基于Netty的分布式消息队列系统作铺垫,本人把这个分布式消息队列系统,命名为:AvatarMQ。做为Netty工业级开发系列的进阶篇,感兴趣的朋友能够点击关注:Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇,必定不会让您失望!
AvatarMQ项目开源网址:https://github.com/tang-jie/AvatarMQ。