客户机与服务端之间的数据交互须要遵照必定的约定,好比协议版本,数据类型,是否有缓存,是否有压缩等,只有在这些约定的基础上才能相互之间愉快的工做。css
这时说的是基于TCP/IP的Netty之间的通讯。TCP/IP协议下客户端与服务端之间要进行数据交互,通常须要将数据转换成二进制格式,直接传java bean是不能支持的。在RPC模式下客户端在向服务端发起请求前须要将数据作编码,服务端在接收客户端发的数据后须要作解码以后才能正常工做。java
为了更好的控制RPC客户端与服务端之间的通讯,也能够编写私有的协议栈来支撑。git
相似HTTP协议,包含头信息以及内容信息。github
public class RpcMessage implements Serializable { private RpcMessageHeader messageHeader; private Object messageBody; }
头信息,包含内容体长度,消息类型等信息。能够根据消息类型来作不一样的业务,好比区分是心跳信息仍是业务或者是监控之类的信息。web
public class RpcMessageHeader implements Serializable { private int length; private int type; }
由于TCP/IP协议容易出现粘包拆包现象,这里为了简单直接选择继承组件提供的LengthFieldBasedFrameDecoder,只须要重写下面的方法便可:缓存
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame=(ByteBuf)super.decode(ctx,in); if(null==frame){ return null; } RpcMessage message=new RpcMessage(); RpcMessageHeader messageHeader=new RpcMessageHeader(); messageHeader.setLength(frame.readInt()); message.setMessageHeader(messageHeader); byte[] data = new byte[message.getMessageHeader().getLength()]; frame.readBytes(data); Object obj = ProtoStuffSerializeUtil.deserialize(data, genericClass); message.setMessageBody(obj); return message; }
编码器继承MessageToByteEncoder,将对象转换成字节的编码器网络
public class RpcEncoder extends MessageToByteEncoder<RpcMessage>
重点是下面的编码函数,在ByteBuf中输出数据长度以及数据体,若有其它须要能够补充其它的字段,好比消息类型。ide
public void encode(ChannelHandlerContext ctx, RpcMessage in, ByteBuf out) throws Exception { if(null==in){ throw new RpcException("RpcMessage is null"); } if (genericClass.isInstance(in.getMessageBody())) { byte[] data = ProtoStuffSerializeUtil.serialize(in.getMessageBody()); out.writeInt(data.length); out.writeBytes(data); } }
public class RpcServerInvoker extends AbstractInvoker<RpcMessage>
从服务端方法获取到返回的结果后,从新封装成消息对象(RpcMessage)发送给客户端。函数
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage message) { this.executor.execute(new Runnable() { @Override public void run() { RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this); RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation((RpcRequest) message.getMessageBody())); RpcMessage responseMessage=new RpcMessage(); byte[] data = ProtoStuffSerializeUtil.serialize(response); RpcMessageHeader messageHeader=new RpcMessageHeader(); messageHeader.setLength(data.length); responseMessage.setMessageHeader(messageHeader); responseMessage.setMessageBody(response); channelHandlerContext.writeAndFlush(responseMessage); } }); }
public class RpcClientInvoker extends AbstractInvoker<RpcMessage>
接收的返回结果修改成RpcMessage,从body属性中获取原来的RpcResponse对象ui
public void channelRead0(ChannelHandlerContext ctx, RpcMessage message) { RpcResponse response=(RpcResponse) message.getMessageBody(); String requestId = response.getRequestId(); ResponseFuture responseFuture = pendingRPC.get(requestId); if (responseFuture != null) { pendingRPC.remove(requestId); responseFuture.done(response); } }
public ResponseFuture invoke(RpcInvocation invocation) { RpcRequest request=this.getRpcRequest(); ResponseFuture responseFuture = new ResponseFuture(request); pendingRPC.put(request.getRequestId(), responseFuture); RpcMessage message=new RpcMessage(); byte[] data = ProtoStuffSerializeUtil.serialize(request); RpcMessageHeader messageHeader=new RpcMessageHeader(); messageHeader.setLength(data.length); message.setMessageHeader(messageHeader); message.setMessageBody(request); channel.writeAndFlush(message); return responseFuture; }
https://github.com/jiangmin168168/jim-framework
文中代码是依赖上述项目的,若是有不明白的可下载源码