咱们知道Dubbo的网络通讯框架Netty是基于TCP协议的,TCP协议的网络通讯会存在粘包和拆包的问题,先看下为何会出现粘包和拆包java
以上四点基本上是出现粘包和拆包的缘由,业界的解决方法通常有如下几种:数组
在dubbo.io官网上找到一张图,协议头约定 网络
看完这张图,大体能够理解Dubbo通讯协议解决的问题,Dubbo采用消息头和消息体的方式来解决粘包拆包,并在消息头中放入了一个惟一Id来解决异步通讯关联request和response的问题,下面以一次调用为入口分为四个部分来看下源码具体实现app
private class InternalEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
}
}
复制代码
这个InternalEncoder是一个NettyCodecAdapter的内部类,咱们看到codec.encode(channel, buffer, msg)这里,这个时候codec=DubboCountCodec,这个是在构造方法中传入的,DubboCountCodec.encode-->ExchangeCodec.encode-->ExchangeCodec.encodeRequest框架
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
//获取序列化方式,默认是Hessian序列化
Serialization serialization = getSerialization(channel);
// new了一个16位的byte数组,就是request的消息头
byte[] header = new byte[HEADER_LENGTH];
// 往消息头中set magic数字,这个时候header中前2个byte已经填充
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.第三个byte已经填充
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
// set request id.这个时候是0
Bytes.long2bytes(req.getId(), header, 4);
// 编码 request data.
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
//序列化
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
//编码消息体数据
encodeRequestData(channel, out, req.getData());
}
out.flushBuffer();
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);
//在消息头中设置消息体长度
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
复制代码
就是这方法,对request请求进行了编码操做,具体操做我写在代码的注释中,就是刚刚咱们分析的消息头的代码实现异步
看到NettyCodecAdapter中的InternalDecoder这个类的messageReceived方法,这里就是Provider端对于Consumer端的request请求的解码ide
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
···
try {
// decode object.
do {
saveReaderIndex = message.readerIndex();
try {
msg = codec.decode(channel, message);
} catch (IOException e) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw e;
}
···
复制代码
进入DubboCountCodec.decode--ExchangeCodec.decode编码
// 检查 magic number.
if (readable > 0 && header[0] != MAGIC_HIGH
···
}
// check 长度若是小于16位继续等待
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// get 消息体长度
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
//消息体长度+消息头的长度
int tt = len + HEADER_LENGTH;
//若是总长度小于tt,那么返回继续等待
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
//解析消息体内容
return decodeBody(channel, is, header);
} finally {
···
}
复制代码
这里对于刚刚的request进行解码操做,具体操做步骤写在注释中了url
当服务端执行完接口调用,看下服务端的响应编码,和消费端不同的地方是,服务端进入的是ExchangeCodec.encodeResponse方法spa
try {
//获取序列化方式 默认Hession协议
Serialization serialization = getSerialization(channel);
// 初始化一个16位的header
byte[] header = new byte[HEADER_LENGTH];
// set magic 数字
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] = serialization.getContentTypeId();
if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
// set response status.这里返回的是OK
byte status = res.getStatus();
header[3] = status;
// set request id.
Bytes.long2bytes(res.getId(), header, 4);
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// 编码返回消息体数据或者错误数据
if (status == Response.OK) {
if (res.isHeartbeat()) {
encodeHeartbeatData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult());
}
} else out.writeUTF(res.getErrorMessage());
out.flushBuffer();
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch (Throwable t) {
// 发送失败信息给Consumer,不然Consumer只能等超时了
if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
try {
// FIXME 在Codec中打印出错日志?在IoHanndler的caught中统一处理?
logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
Response r = new Response(res.getId(), res.getVersion());
r.setStatus(Response.BAD_RESPONSE);
r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
}
}
// 从新抛出收到的异常
···
}
复制代码
基本上和消费方请求编码同样,多了一个步骤,一个是在消息头中加入了一个状态位,第二个是若是发送有异常,则继续发送失败信息给Consumer,不然Consumer只能等超时了
和上面的解码同样,具体操做是在ExchangeCodec.decode--DubboCodec.decodeBody中