RPC是目前被普遍应用于互联网服务的一项技术,关于它的基本介绍你们可经过百度了解一下,此处再也不赘述。 正所谓读万卷书不如行万里路,原理性的文章看的再多都不如亲自实现一遍RPC,方可对其了解的更加透彻。 本文将以纯技术视角,为你们演示一下RPC的工做原理与实现方案。java
正式开始以前,先罗列一下实现RPC须要运用到的技术点:算法
在具体实现上除了通讯部分咱们选用smart-socket来辅助,其他包括序列化/反序列化、反射、动态代理等部分咱们将采用JDK提供的解决方案,待您掌握RPC后可再尝试结合第三方技术来重构RPC。数组
既然RPC是跨网络通讯服务,那咱们先制定通讯规则,该部分的内容涉及到通讯、序列化/反序列化技术。网络
通讯协议咱们采用最简单的length+data模式,编解码的实现算法以下。 做为示例咱们假设readBuffer足够容纳一个完整的消息,协议中的data部分即是RPC服务序列化后的byte数组,Provider/Consumer则必须对byte数组完成反序列化后才能继续RPC服务处理。session
public class RpcProtocol implements Protocol<byte[]> { private static final int INTEGER_BYTES = Integer.SIZE / Byte.SIZE; @Override public byte[] decode(ByteBuffer readBuffer, AioSession<byte[]> session, boolean eof) { int remaining = readBuffer.remaining(); if (remaining < INTEGER_BYTES) { return null; } int messageSize = readBuffer.getInt(readBuffer.position()); if (messageSize > remaining) { return null; } byte[] data = new byte[messageSize - INTEGER_BYTES]; readBuffer.getInt(); readBuffer.get(data); return data; } @Override public ByteBuffer encode(byte[] msg, AioSession<byte[]> session) { ByteBuffer byteBuffer = ByteBuffer.allocate(msg.length + INTEGER_BYTES); byteBuffer.putInt(byteBuffer.capacity()); byteBuffer.put(msg); byteBuffer.flip(); return byteBuffer; } }
RPC请求消息由Consumer发送,Consumer须要在请求消息中提供足够信息以供Provider准确识别服务接口。核心要素包括:dom
public class RpcRequest implements Serializable { /** * 消息的惟一标识 */ private final String uuid = UUID.randomUUID().toString(); /** * 接口名称 */ private String interfaceClass; /** * 调用方法 */ private String method; /** * 参数类型字符串 */ private String[] paramClassList; /** * 入参 */ private Object[] params; getX/setX() }
RPC响应消息为Provider将接口执行结果响应给Consumer的载体。socket
public class RpcResponse implements Serializable { /** * 消息的惟一标示,与对应的RpcRequest uuid值相同 */ private String uuid; /** * 返回对象 */ private Object returnObject; /** * 返回对象类型 */ private String returnType; /** * 异常 */ private String exception; public RpcResponse(String uuid) { this.uuid = uuid; } getX/setX() }
经过上述内容便完成RPC通讯的消息设计,至于RpcRequest、RpcResponse如何转化为通讯协议要求的byte数组格式,咱们采用JDK提供的序列化方式(生产环境不建议使用)。ide
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ObjectOutput objectOutput = new ObjectOutputStream(byteArrayOutputStream); objectOutput.writeObject(request); aioSession.write(byteArrayOutputStream.toByteArray());
ObjectInputStream objectInput = new ObjectInputStream(new ByteArrayInputStream(msg)); RpcResponse resp = (RpcResponse) objectInput.readObject();
经过上文方案咱们解决了RPC的通讯问题,接下来便得根据通讯消息实现服务能力。测试
因为RPC的Consumer端只有接口,没有具体实现,但在使用上咱们又指望跟本地服务有一样的使用体验。 所以咱们须要将接口实例化成对象,并使其具有跨应用服务能力,此处便运用到动态代理。 当Consumer调用RPC接口时,代理类内部发送请求消息至Provider并获取结果。ui
obj = (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{remoteInterface}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest req = new RpcRequest(); req.setInterfaceClass(remoteInterface.getName()); req.setMethod(method.getName()); Class<?>[] types = method.getParameterTypes(); if (!ArrayUtils.isEmpty(types)) { String[] paramClass = new String[types.length]; for (int i = 0; i < types.length; i++) { paramClass[i] = types[i].getName(); } req.setParamClassList(paramClass); } req.setParams(args); RpcResponse rmiResp = sendRpcRequest(req); if (StringUtils.isNotBlank(rmiResp.getException())) { throw new RuntimeException(rmiResp.getException()); } return rmiResp.getReturnObject(); } });
Provider可将其提供的RPC服务维护在集合里,采用Map存储便可,key为暴露的接口名,value为接口的具体实现。 一旦Provider接受到RPC的请求消息,只需根据请求消息内容找到并执行对应的服务,最后将返回结果以消息的形式返回至Consumer便可。
ObjectInputStream objectInput = new ObjectInputStream(new ByteArrayInputStream(msg)); RpcRequest req = (RpcRequest) objectInput.readObject(); RpcResponse resp = new RpcResponse(req.getUuid()); try { String[] paramClassList = req.getParamClassList(); Object[] paramObjList = req.getParams(); // 获取入参类型 Class<?>[] classArray = null; if (paramClassList != null) { classArray = new Class[paramClassList.length]; for (int i = 0; i < classArray.length; i++) { Class<?> clazz = primitiveClass.get(paramClassList[i]); if (clazz == null) { classArray[i] = Class.forName(paramClassList[i]); } else { classArray[i] = clazz; } } } // 调用接口 Object impObj = impMap.get(req.getInterfaceClass()); if (impObj == null) { throw new UnsupportedOperationException("can not find interface: " + req.getInterfaceClass()); } Method method = impObj.getClass().getMethod(req.getMethod(), classArray); Object obj = method.invoke(impObj, paramObjList); resp.setReturnObject(obj); resp.setReturnType(method.getReturnType().getName()); } catch (InvocationTargetException e) { LOGGER.error(e.getMessage(), e); resp.setException(e.getTargetException().getMessage()); } catch (Exception e) { LOGGER.error(e.getMessage(), e); resp.setException(e.getMessage()); } ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); objectOutput = new ObjectOutputStream(byteArrayOutputStream); objectOutput.writeObject(resp); session.write(byteArrayOutputStream.toByteArray());
服务端定义接口DemoApi
,并将其实现示例DemoApiImpl
注册至Provider中。
public class Provider { public static void main(String[] args) throws IOException { RpcProviderProcessor rpcProviderProcessor = new RpcProviderProcessor(); AioQuickServer<byte[]> server = new AioQuickServer<>(8888, new RpcProtocol(), rpcProviderProcessor); server.start(); rpcProviderProcessor.publishService(DemoApi.class, new DemoApiImpl()); } }
Consumer调用RPC接口test
、sum
得到执行结果。
public class Consumer { public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { RpcConsumerProcessor rpcConsumerProcessor = new RpcConsumerProcessor(); AioQuickClient<byte[]> consumer = new AioQuickClient<>("localhost", 8888, new RpcProtocol(), rpcConsumerProcessor); consumer.start(); DemoApi demoApi = rpcConsumerProcessor.getObject(DemoApi.class); System.out.println(demoApi.test("smart-socket")); System.out.println(demoApi.sum(1, 2)); } }
本文简要描述了RPC服务实现的关键部分,可是提供稳定可靠的RPC服务还有不少细节须要考虑,有兴趣的朋友可自行研究。 本文示例的完整代码可从smart-socket项目中获取。