因为rpc底层涉及网络编程接口,线程模型,网络数据结构,服务协议,细到字节的处理。牵涉内容较多,今天就先从一个点提及。
说说,dubbo经过netty框架作传输层,从接到数据字节流到把字节转换为dubbo上层可读的Request消息对象的过程。当前dubbo还支持mina,grizzly作底层传输层。
这里包括两部,反序列化和解码。我打算分两篇写。这篇主要是说解码的过程。
就是下面这个dubbo架构图中,红框中的部分。java
既然是netty作传输层,netty的基础得提一点。
netty框架是经过管道(ChannelPipeline)模型处理网络数据流的,每一个管道中有多个处理接点(ChannelHandler),
节点分为,进站(client请求进服务端口)和出站(请求响应出服务端口)两种。好比一个进站消息老是,顺序的(顺序是程序中编码指定的)经过进站处理节点。
同理出站消息,老是顺序的经过出站节点到达网络接口。编程
dubbo2.5.6版本,传输层dubbo提供有netty3和netty4两种实现,初始化netty通道都在NettyServer类里,两个类同名,包名不一样。bootstrap
具体,netty3在NettyServer类里doOpen()方法:数组
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { //编解码器的初始化 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder());//设置解码hander pipeline.addLast("encoder", adapter.getEncoder());//设置编码hander pipeline.addLast("handler", nettyHandler);//自定义NettyHandler 扩展自netty双向handler基类,能够接受进站和出站数据流 return pipeline; //进站的请求,先通过adapter.getDecoder()handler处理,再由nettyHandler处理 //出站的请求,先通过nettyHandler处理 再由adapter.getEncoder()handler处理 } }); // bind channel = bootstrap.bind(getBindAddress()); }
netty4版本NettyServer类里doOpen()方法:网络
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ServerBootstrap(); //acceptor 事件循环线程 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); //client channel事件循环线程 workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { //编解码器的初始化 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder())//设置解码hander .addLast("encoder", adapter.getEncoder())//设置编码hander .addLast("handler", nettyServerHandler);//自定义NettyServerHandler 扩展netty双向handler基类,能够接受进站和出站数据流 //进站的请求,先通过adapter.getDecoder()handler处理,再由nettyServerHandler处理 //出站的请求,先通过nettyServerHandler处理 再由adapter.getEncoder()handler处理 } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
本次说的是进站流程,请求数据解析成Request对象过程,经过上面的代码和netty特性可知,
进站数据先经过解码hander,经解码成消息Request对象后,再到自定义handler,而后由自定义hanlder经过装饰模式,调用实际服务。数据结构
先看下解码handler的实现:
由adapter.getDecoder()这句跟踪到NettyCodecAdapter类的getDecoder()方法
public ChannelHandler getDecoder() {
return decoder;
}
能够看到,这个方法获取的解码handler,decoder,是NettyCodecAdapter类的私有属性架构
private final ChannelHandler decoder = new InternalDecoder();并发
看下InternalDecoder类定义,netty3版本:app
* 这里须要些netty的知识,继承SimpleChannelUpstreamHandler,代表它是进站handler * 因此进站的数据流,都会通过本handler对象,具体就是messageReceived方法。 */ private class InternalDecoder extends SimpleChannelUpstreamHandler { private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;//这是dubbo根据nio本身实现的buffer @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { Object o = event.getMessage(); if (!(o instanceof ChannelBuffer)) {//这里ChnnelBuffer是netty基于jdk nio ByteBuffer 实现 ctx.sendUpstream(event); return; } ChannelBuffer input = (ChannelBuffer) o; int readable = input.readableBytes();//到这就是从netty event对象取数据的过程。 if (readable <= 0) { return; } com.alibaba.dubbo.remoting.buffer.ChannelBuffer message; if (buffer.readable()) { if (buffer instanceof DynamicChannelBuffer) { buffer.writeBytes(input.toByteBuffer()); message = buffer; } else { int size = buffer.readableBytes() + input.readableBytes(); message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer( size > bufferSize ? size : bufferSize);//bufferSize 不指定是8k,这里表示最小8K message.writeBytes(buffer, buffer.readableBytes()); message.writeBytes(input.toByteBuffer());//把netty 读到的字节流写入message } } else {//直接经过构造器,构造message message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer( input.toByteBuffer()); } NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); Object msg; int saveReaderIndex; //从netty框架,读取到的数据,放入message后,下面就是针对message的反序列化和解码过程。 try { // decode object. do { saveReaderIndex = message.readerIndex(); try { //解码,这里面包括的反序列化 msg = codec.decode(channel, message);//重要!!!经过具体编解码实例codec完成解码 } catch (IOException e) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {//若是解码结果是,Codec2.DecodeResult.NEED_MORE_INPUT,表示,须要更多数据 message.readerIndex(saveReaderIndex);//很重要,设置readerIndex为,解码读取前的位置,为了下次再从头读取。 break; } else { if (saveReaderIndex == message.readerIndex()) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw new IOException("Decode without read data."); } if (msg != null) {//解码完成,这里的msg已是Request对象。!! Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress()); } } } while (message.readable()); } finally { if (message.readable()) { message.discardReadBytes(); buffer = message; } else { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; } NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } //处理异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { ctx.sendUpstream(e);//向下一个传递。通常在最后一个handler处理异常 } }
netty4版本:框架
/*** * netty4 扩展了ByteToMessageDecoder * 重写decode 方法,解码完成后,不用像netty3手动Channels.fireMessageReceived 发送事件, * netty4自动把对象,传递到下一个handler */ private class InternalDecoder extends ByteToMessageDecoder { protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); Object msg; int saveReaderIndex; try { // decode object. do { //保存初始,读取位置 saveReaderIndex = message.readerIndex(); try { msg = codec.decode(channel, message);//重要!!!经过具体编解码实例codec完成解码 } catch (IOException e) { throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { //解码失败后,从新设置readerIndex为读取前位置 message.readerIndex(saveReaderIndex); break; } else { //is it possible to go here ? if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data."); } if (msg != null) { //解码成功后,加入到out list中,传递到下一个处理handler out.add(msg); } } } while (message.readable()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } }
能够看到,不管netty3,仍是netty4都是经过,NettyCodecAdapter的codec属性完成解码的,
这里有个概念,编解码handler是经过编解码实例完成编解码的,这里的编解码实例就是codec
而codec实例是由它构造函数从上层方法传递的。以下
public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) { this.codec = codec; this.url = url; this.handler = handler; int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE); this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE; }
再回到NettySever类中NettyCodecAdapter的构造语句
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
跟进getCodec()方法,这个方就是获取实际编解码方案的。这个方法的实如今NettyServer的祖先类AbstractEndpoint中:
protected Codec2 getCodec() { return codec; } //能够看到codec在构造方法里建立的 public AbstractEndpoint(URL url, ChannelHandler handler) { super(url, handler); this.codec = getChannelCodec(url);//根据url配置,构造编码解码器(经过spi获得DubboCountCodec类实例) this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); } // 再跟到getChannelCodec方法 protected static Codec2 getChannelCodec(URL url) { //经过spi机制,从url里获取编解码方案,这里是dubbo。取不到就是telnet //dubbo编解码方案,实现类是DubboCountCodec String codecName = url.getParameter(Constants.CODEC_KEY, "telnet"); if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class) .getExtension(codecName)); } }
这里看下,dubbo中的全部编解码类结构:
能够看到,全部编解码器实现,都扩展了Codec2接口。同时Codec2也是个spi扩展点。
接口Codec2,以下:
@SPI public interface Codec2 { /** * spi 获取编码器 * @param channel * @param buffer * @param message * @throws IOException */ @Adaptive({Constants.CODEC_KEY}) void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException; /*** * spi 获取解码器 * @param channel * @param buffer * @return * @throws IOException */ @Adaptive({Constants.CODEC_KEY}) Object decode(Channel channel, ChannelBuffer buffer) throws IOException; enum DecodeResult { NEED_MORE_INPUT, SKIP_SOME_INPUT } }
具体实现经过spi获取,dubbo编解码方案实例就是DubboCountCodec
那么看下DubboCountCodec类,以及decode方法:
public final class DubboCountCodec implements Codec2 { private DubboCodec codec = new DubboCodec();//具体dubbo协议编解码方案实现 public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { codec.encode(channel, buffer, msg); } public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex();//记录读取初始位置 MultiMessage result = MultiMessage.create();//解码后对象容器。list,可放多个消息 do { Object obj = codec.decode(channel, buffer);//解码过程在DubboCodec类中的decode方法里 if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {//须要接受更多信息 buffer.readerIndex(save);//恢复读取位置 break; } else {//解码完成,加到对象容器 result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save);//记录日志,可忽略 save = buffer.readerIndex();//更新读取位置 } } while (true); if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1) { return result.get(0);//返回解码后对象 } return result; } private void logMessageLength(Object result, int bytes) { if (bytes <= 0) { return; } if (result instanceof Request) { try { ((RpcInvocation) ((Request) result).getData()).setAttachment( Constants.INPUT_KEY, String.valueOf(bytes)); } catch (Throwable e) { /* ignore */ } } else if (result instanceof Response) { try { ((RpcResult) ((Response) result).getResult()).setAttachment( Constants.OUTPUT_KEY, String.valueOf(bytes)); } catch (Throwable e) { /* ignore */ } } } }
分析DubboCodec类以前,先说下dubbo协议消息格式,它包括消息头和消息体:
前2个字节:
为协议魔数,固定值oxdabb
第三字节:
第1比特(0/1)表示是请求消息,仍是响应消息
第2比特(0/1)表示是是否必须双向通讯,即有请求,必有响应
第3比特(0/1)表示是是不是,心跳消息
第低5位比特,表示一个表示消息序列化的方式(1,是dubbo ,2,是hessian...)
第四字节:
只在响应消息中用到,表示响应消息的状态,是成功,失败等
第5-12字节:
8个字节,表示一个long型数字,是reqeustId
第13—16字节:
4个字节,表示消息体的长度(字节数)
消息体,不固定长度
是请求消息时,表示请求数据
是响应消息时,表示方法调用返回结果。
编码和解码主要是对消息头的设置和解析。序列化和反序列化主要是对消息体的操做。
先看DubboCodec的关系图:
DubboCodec类decode方法的实如今其父类ExchangeCodec中:
//先看下类中定义的常量: // header length.消息头长度 protected static final int HEADER_LENGTH = 16; // magic header. protected static final short MAGIC = (short) 0xdabb;//1101 1010 1011 1011魔数 protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];//高字节 protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];//低字节 // message flag. protected static final byte FLAG_REQUEST = (byte) 0x80;//1000,0000//表示消息类型 protected static final byte FLAG_TWOWAY = (byte) 0x40;//0100,0000//表示是否双向通讯 protected static final byte FLAG_EVENT = (byte) 0x20;//0010,0000//表示是不是心跳事件 protected static final int SERIALIZATION_MASK = 0x1f;//0001,1111/表示是序列化实现类型 /*** * 解码入口方法 * @param channel * @param buffer * @return * @throws IOException */ public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { //取得可读的字节数 int readable = buffer.readableBytes(); //header 最大16字节 byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; //从buffer中读16字节到header数组中 buffer.readBytes(header); //调用自己decode方法 return decode(channel, buffer, readable, header); } /*** * 具体协议解析方法,本方法主要是读取验证消息头的过程 * @param channel * @param buffer * @param readable * @param header * @return * @throws IOException */ protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // check magic number. if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { //前两字节,不是dubbo魔数 int length = header.length; //若是可读的字节数目,大于16字节 if (header.length < readable) { //给header扩容到readable大小 header = Bytes.copyOf(header, readable); //把buffer剩下的字节读到header中,这里多于16字节 buffer.readBytes(header, length, readable - length); } //上层方法,第一字节已经验证过,这个从第二字节开始验证。 for (int i = 1; i < header.length - 1; i++) { //若是发现,后续字节有dubbo协议的开头 if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { //重置readerIndex 位置,到魔数开始的地方。 buffer.readerIndex(buffer.readerIndex() - header.length + i); //把魔数开始位置,之前的数据,覆盖赋给header(下面调用super.decode解析) header = Bytes.copyOf(header, i); break; } } //上层方法是dubbo telnet 编解码实现 return super.decode(channel, buffer, readable, header); } // check length. if (readable < HEADER_LENGTH) {//小于16字节,返回 须要更多对象 return DecodeResult.NEED_MORE_INPUT; } //get data length.读取header[]最后四字节,构造一个int的数据, //根据dubbo协议,这个是消息体的长度 int len = Bytes.bytes2int(header, 12); //检查数据大小 checkPayload(channel, len);//默认为8M int tt = len + HEADER_LENGTH;//总的消息大小,消息头加消息实体 if (readable < tt) {//若是可读取的,不够消息总大小,就返回 须要更多数据 return DecodeResult.NEED_MORE_INPUT; } // limit input stream.这个时候,buffer的readerIndex位置已经是,读完header后的位置,接下来的len长度的数据,全是消息体的数据。 ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { //解码反序列化成Reqeust或者Response对象,decodeBody方法被子类DubboCodec重写了 //这里要看DubboCodec的decodeBody的方法 return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { if (logger.isWarnEnabled()) { logger.warn("Skip input stream " + is.available()); } StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } }
DubboCodec类的decodeBody方法:
/*** * 解码,是从输出流 is 取字节数据,经反序列化,构造Request 和Response对象的过程。 * @param channel * @param is * @param header * @return * @throws IOException */ protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { //消息头第3字节和SERIALIZATION_MASK&操做后,就能够获得,序列化/反序列化方案 byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); //根据序列化方案id,或者url指定,经过spi机制去获取序列化实现。dubbo协议默认用hession2序列化方案 //是放在消息头flag 里的。这里proto 值是2 //获取具体用序列化/反序列化实现 Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // get request id.header字节从第5到12 8个字节,是请求id long id = Bytes.bytes2long(header, 4); if ((flag & FLAG_REQUEST) == 0) {//根据flag&FLAG_REQUEST后,判断须要解码的消息类型 // decode response. Response res = new Response(id); if ((flag & FLAG_EVENT) != 0) { res.setEvent(Response.HEARTBEAT_EVENT); } // get status. 获取响应 状态 成功,失败等 byte status = header[3]; res.setStatus(status); if (status == Response.OK) {//返回结果状态 ok成功 try { Object data; if (res.isHeartbeat()) { data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (res.isEvent()) {//事件消息,反序列化 data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else {//业务调用结果消息 解码构造 DecodeableRpcResult 对象的过程 DecodeableRpcResult result; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//是否在io 线程内解码 result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); //!!!Response消息反序列化 就是把调用结果返回值 从is里反序列化出来,放在 DecodeableRpcResult类的result 字段的过程。 result.decode(); } else { //不在io线程解码,要先经过readMessageData方法把调用结果数组取出后, //放在UnsafeByteArrayInputStream对象,存在DecodeableRpcResult对象里,后续经过上层方法解码。 result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } //同时把DecodeableRpcResult对象放入Response result字段。 res.setResult(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode response failed: " + t.getMessage(), t); } //异常处理,设置status和异常信息 res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } } else { //异常处理,设置异常信息 res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); } return res; } else {//反解码Requset 消息类型 // decode request. Request req = new Request(id); req.setVersion("2.0.0"); req.setTwoWay((flag & FLAG_TWOWAY) != 0); if ((flag & FLAG_EVENT) != 0) { req.setEvent(Request.HEARTBEAT_EVENT); } try { Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (req.isEvent()) { data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else {//业务调用请求消息 解码构造 DecodeableRpcInvocation 对象的过程 DecodeableRpcInvocation inv; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//是否 io 线程内解码 inv = new DecodeableRpcInvocation(channel, req, is, proto); //!!!Requset 类型反序列化方法 inv.decode(); } else { inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } //同时把DecodeableRpcInvocation对象放入Request data字段。 req.setData(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode request failed: " + t.getMessage(), t); } // bad request 异常请求对象设置 req.setBroken(true); req.setData(t); } return req; } } /*** * 获取反序列化方案 * @param serialization * @param url * @param is * @return * @throws IOException */ private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) throws IOException { return serialization.deserialize(url, is); } /*** * 读取is 里的可用数据 * @param is * @return * @throws IOException */ private byte[] readMessageData(InputStream is) throws IOException { if (is.available() > 0) { byte[] result = new byte[is.available()]; is.read(result); return result; } return new byte[]{}; }
RPC调用请求:DecodeableRpcInvocation 类反序列化方法:
public void decode() throws Exception { if (!hasDecoded && channel != null && inputStream != null) { try {//具体在decode重载方法里 decode(channel, inputStream); } catch (Throwable e) {//异常请求设置 if (log.isWarnEnabled()) { log.warn("Decode rpc invocation failed: " + e.getMessage(), e); } request.setBroken(true); request.setData(e); } finally { hasDecoded = true; } } } /** * 反序列化,解码 经过反序列化还原 * RpcInvocation 类的 * private String methodName; private Class<?>[] parameterTypes; private Object[] arguments; private Map<String, String> attachments; 是个属性值,就像在客户端请求时设置的同样。 * @param channel channel. * @param input input stream. * @return * @throws IOException */ public Object decode(Channel channel, InputStream input) throws IOException { //获取反序列化方案 ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); //反序列化出,dubbo版本,路径,服务版本信息,设置到attachments里,这读取顺序和序列化时的写入顺序也一致 //固然,序列化方案也一致。这里默认都是hissen2 //调用ObjectInput的readUTF()反序列化方法,依次获取调用信息 setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF()); setAttachment(Constants.PATH_KEY, in.readUTF()); setAttachment(Constants.VERSION_KEY, in.readUTF()); //读取方法名 setMethodName(in.readUTF()); //反序列化,方法请求参数类型, try { Object[] args; Class<?>[] pts; String desc = in.readUTF(); if (desc.length() == 0) { pts = DubboCodec.EMPTY_CLASS_ARRAY; args = DubboCodec.EMPTY_OBJECT_ARRAY; } else { pts = ReflectUtils.desc2classArray(desc); args = new Object[pts.length]; for (int i = 0; i < args.length; i++) { try { //更具类型读取请求参数值 //调用ObjectInput的readObject()反序列化方法,反序列化出参数值 args[i] = in.readObject(pts[i]); } catch (Exception e) { if (log.isWarnEnabled()) { log.warn("Decode argument failed: " + e.getMessage(), e); } } } } //设置保存请求参数类型 setParameterTypes(pts); //反序列化,attachment map //调用ObjectInput的readObject()反序列化方法,反序列化出attachemnet值 Map<String, String> map = (Map<String, String>) in.readObject(Map.class); if (map != null && map.size() > 0) { Map<String, String> attachment = getAttachments(); if (attachment == null) { attachment = new HashMap<String, String>(); } attachment.putAll(map); setAttachments(attachment); } //decode argument ,may be callback 回调参数设置,这个再说。 for (int i = 0; i < args.length; i++) { args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]); } //保存请求参数值 setArguments(args); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read invocation data failed.", e)); } return this; }
RPC调用结果:DecodeableRpcResult 反序列化 方法decode()
public void decode() throws Exception { if (!hasDecoded && channel != null && inputStream != null) { try {//具体实如今decode(channel, inputStream)重载方法里 decode(channel, inputStream); } catch (Throwable e) {//设置异常返回response if (log.isWarnEnabled()) { log.warn("Decode rpc result failed: " + e.getMessage(), e); } response.setStatus(Response.CLIENT_ERROR); response.setErrorMessage(StringUtils.toString(e)); } finally { hasDecoded = true; } } } /*** * 反序列化,解码过程,读取input的调用结果字节数据,经反序列化成方法返回类型对象 * 并发放回结果设置到RpcResult的result字段里 * 以及异常返回字段的设置。 * @param channel channel. * @param input input stream. * @return * @throws IOException */ public Object decode(Channel channel, InputStream input) throws IOException { //获取反序列化方案 ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); //反序列化出 结果标识 调用ObjectInput的readByte方法,反序列化出一个byte值 byte flag = in.readByte();// switch (flag) { case DubboCodec.RESPONSE_NULL_VALUE://null 值 break; case DubboCodec.RESPONSE_VALUE: //非空值 try { //根据invocation获取调用方法的放回类型 Type[] returnType = RpcUtils.getReturnTypes(invocation); //根据返回类型,反序列出结果并这是到RpcResult 的result字段里。 //void 类型 结果值 是null ;int 等基本类型,自动装箱 Integer; //具体调用ObjectInput的readObject重载的两个方法,反序列出结果对象 setValue(returnType == null || returnType.length == 0 ? in.readObject() : (returnType.length == 1 ? in.readObject((Class<?>) returnType[0]) : in.readObject((Class<?>) returnType[0], returnType[1]))); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; case DubboCodec.RESPONSE_WITH_EXCEPTION://异常信息反序列化,设置到exception字段 try { //具体调用ObjectInput的readObject重载的两个方法,反序列出异常对象 Object obj = in.readObject(); if (obj instanceof Throwable == false) throw new IOException("Response data error, expect Throwable, but get " + obj); setException((Throwable) obj); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; default: throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag); } return this; }
下一篇再说说反序列化。