#Dubbo处理TCP拆包粘包问题java
在TCP网络传输工程中,因为TCP包的缓存大小限制,每次请求数据有可能不在一个TCP包里面,或者也可能多个请求的数据在一个TCP包里面。那么若是合理的decode接受的TCP数据很重要,须要考虑TCP拆包和粘包的问题。咱们知道在Netty提供了各类Decoder来解决此类问题,好比LineBasedFrameDecoder
,LengthFieldBasedFrameDecoder
等等,可是这些都是处理一些通用简单的协议栈,并不能处理高度自定义的协议栈。因为dubbo协议是自定义协议栈,而且包含消息头和消息体两部分,而消息头中包含消息类型、协议版本、协议魔数以及payload长度等信息。因此使用Netty自带的处理方案可能没法知足Dubbo解析自身协议的需求,因此须要Dubbo本身来处理,那本身处理,就须要本身处理TCP的拆包和粘包的问题。这里就对Dubbo处理此类问题进行探讨,从而加深本身对它的理解。web
##说明 此处所描述的协议是dubbo协议,其余的协议好比http,webservice等协议不是这里讨论范围。而且这里使用的通讯框架以Netty来说解,Mina以及grizzly也不在种类讨论范围。缓存
##NettyCodecAdapter NettyCodecAdapter
是对dubbo协议解析的入口,里面包含decoder和encoder两部分,而TCP的拆包和粘包主要是decoder部分,因此encoder这里不进行讨论。在NettyCodecAdapter
中的decoder是由InternalDecoder
来实现,它的父类是Netty的SimpleChannelUpstreamHandler
能够接受全部inbound消息,那么就能够对接受的消息进行decode。这里须要说明一下对于某一个Channel都有一个私有的InternalDecoder
对象,并非和其余的Channel共享,这里就避免了并发问题,因此在InternalDecoder
里面能够用单线程的方式去看待,这样就比较容易理解。网络
###InternalDecoder 每一个channel的inbound消息都会发送到InternalDecoder
的messageReceived
方法,而dubbo会先将接受的消息缓存到InternalDecoder
的buffer
属性中,这个变量很重要,后面会讨论。下面是messageReceived
方法中将接受的消息负载到buffer
实现。并发
private class InternalDecoder extends SimpleChannelUpstreamHandler { private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { Object o = event.getMessage(); if (! (o instanceof ChannelBuffer)) { ctx.sendUpstream(event); return; } ChannelBuffer input = (ChannelBuffer) o; int readable = input.readableBytes(); if (readable <= 0) { return; } com.alibaba.dubbo.remoting.buffer.ChannelBuffer message; if (buffer.readable()) { if (buffer instanceof DynamicChannelBuffer) { buffer.writeBytes(input.toByteBuffer()); message = buffer; } else { int size = buffer.readableBytes() + input.readableBytes(); message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer( size > bufferSize ? size : bufferSize); message.writeBytes(buffer, buffer.readableBytes()); message.writeBytes(input.toByteBuffer()); } } else { message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer( input.toByteBuffer()); } NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); Object msg; int saveReaderIndex; try { // decode object. do { saveReaderIndex = message.readerIndex(); try { msg = codec.decode(channel, message); } catch (IOException e) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; } else { if (saveReaderIndex == message.readerIndex()) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw new IOException("Decode without read data."); } if (msg != null) { Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress()); } } } while (message.readable()); } finally { if (message.readable()) { message.discardReadBytes(); buffer = message; } else { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; } NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { ctx.sendUpstream(e); } }
首先是判断当前decoder对象的buffer
中是否有能够读取的消息,若是有则进行合并,而且把对象引用赋予message
局部变量,因此message
则获取了当前channel的inbound消息。获得inbound消息以后,那么接下来就是对协议的解析了。app
do { saveReaderIndex = message.readerIndex(); try { msg = codec.decode(channel, message); } catch (IOException e) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; } else { if (saveReaderIndex == message.readerIndex()) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw new IOException("Decode without read data."); } if (msg != null) { Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress()); } } } while (message.readable());
这里首先要作的是把当前message的读索引保存到局部变量saveReaderIndex
中,用于后面的消息回滚。后面紧接着是对消息的decode,这里的codec
是DubboCountCodec
对象实体,这里须要注意一点,DubboCountCodec
的decode
每次只会解析出一个完整的dubbo协议栈,带着这个看看decode
的实现。框架
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex(); MultiMessage result = MultiMessage.create(); do { Object obj = codec.decode(channel, buffer); if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) { buffer.readerIndex(save); break; } else { result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); } } while (true); if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1) { return result.get(0); } return result; }
这里暂存了当前buffer
的读索引,一样也是为了后面的回滚。能够看到当decode返回的是NEED_MORE_INPUT则表示当前的buffer
中数据不足,不能完整解析出一个dubbo协议栈,同时将buffer的读索引回滚到以前暂存的索引而且退出循环,将结果返回。那接下来看看何时会返回NEED_MORE_INPUT,最终会定位到在ExchangeCodec
的decode
方法会解析出协议栈。tcp
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // check magic number. if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i ++) { if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; } } return super.decode(channel, buffer, readable, header); } // check length. if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } // get data length. int len = Bytes.bytes2int(header, 12); checkPayload(channel, len); int tt = len + HEADER_LENGTH; if( readable < tt ) { return DecodeResult.NEED_MORE_INPUT; } // limit input stream. ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { if (logger.isWarnEnabled()) { logger.warn("Skip input stream " + is.available()); } StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } }
这个方法开始是对telnet协议进行解析(因为dubbo支持telnet链接,因此这里提供了支持,能够忽略这一部分)。看到会有两个地方返回NEED_MORE_INPUT
,一个是当前buffer
的可读长度尚未消息头长,说明当前buffer
连协议栈的头都不完整,因此须要继续读取inbound数据,另外一个是当前buffer
包含了完整的消息头,即可以获得payload的长度,发现它的可读的长度,并无包含整个协议栈的数据,因此也须要继续读取inbound数据。若是上面两个状况都不复核,那么说明当前的buffer
至少包含一个dubbo协议栈的数据,那么从当前buffer
中读取一个dubbo协议栈的数据,解析出一个dubbo数据,固然这里可能读取完一个dubbo数据以后还会有剩余的数据。ide
上面对dubbo解析出一个完整的dubbo协议栈过程进行了讨论,可是尚未对TCP的拆包和粘包问题作过多的讨论。下面结合上面内容作一个综合讨论。url
我这里对TCP拆包和粘包分别列举一个场景来讨论。
###当反生TCP拆包问题时候 这里假设以前尚未发生过任何数据交互,系统刚刚初始化好,那么这个时候在InternalDecoder
里面的buffer
属性会是EMPTY_BUFFER
。当发生第一次inbound数据的时候,第一次在InternalDecoder
里面接收的确定是dubbo消息头的部分(这个由TCP协议保证),因为发生了拆包状况,那么此时接收的inbound消息可能存在一下几种状况
一、当前inbound消息只包含dubbo协议头的一部分
二、当前inbound消息只包含dubbo的协议头
三、当前inbound消息只包含dubbo消息头和部分payload消息
经过上面的讨论,咱们知道发生上面三种状况,都会触发ExchangeCodec
返回NEED_MORE_INPUT
,因为在DubboCountCodec
对余返回NEED_MORE_INPUT
会回滚读索引,因此此时的buffer
里面的数据能够看成并无发生过读取操做,而且DubboCountCodec
的decode也会返回NEED_MORE_INPUT
,在InternalDecoder
对于当判断返回NEED_MORE_INPUT
,也会进行读索引回滚,而且退出循环,最后会执行finally
内容,这里会判断inbound消息是否还有可读的,因为在DubboCountCodec
里面进行了读索引回滚,因此次数的buffer
里面是完整的inbound消息,等待第二次的inbound消息的到来,当第二次inbound消息过来的时候,再次通过上面的判断。
###当发生TCP粘包的时候 当发生粘包的时候是tcp将一个以上的dubbo协议栈放在一个tcp包中,那么有可能发生下面几种状况
一、当前inbound消息只包含一个dubbo协议栈
二、当前inbound消息包含一个dubbo协议栈,同时包含部分另外一个或者多个dubbo协议栈内容
若是发生只包含一个协议栈,那么当前buffer
经过ExchangeCodec
解析协议以后,当前的buffer
的readeIndex位置应该是 buffer
尾部,那么在返回到InternalDecoder
中message
的方法readable
返回的是false,那么就会对buffer
从新赋予EMPTY_BUFFER
实体,而针对包含一个以上的dubbo协议栈,固然也会解析出其中一个dubbo协议栈,可是通过ExchangeCodec
解析以后,message
的readIndex不在message
尾部,因此message
的readable
方法返回的是true
。那么则会继续遍历message
,读取下面的信息。最终要么message
恰好整数倍包含完整的dubbo协议栈,要不ExchangeCodec
返回NEED_MORE_INPUT
,最后将未读完的数据缓存到buffer
中,等待下次inbound事件,将buffer
中的消息合并到下次的inbound消息中,种类又回到了拆包的问题上。
##总结
dubbo在处理tcp的粘包和拆包时是借助InternalDecoder
的buffer
缓存对象来缓存不完整的dubbo协议栈数据,等待下次inbound事件,合并进去。因此说在dubbo中解决TCP拆包和粘包的时候是经过buffer
变量来解决的。