前言java
对于应用层通信协议而言,目前流行的协议虽然能够很好地支持业务的快速迭代,可是不能否认存在安全性、可拓展性等问题。在消息队列或者微服务框架中,利用自定义协议提升通信效率很常见的现象。是否你也曾想自定义协议但无从入手而苦恼,跟着小栈一块儿动手实现一个自定义协议吧!安全
请求响应协议设计bash
通用设计:考虑多协议通信,利用版本号以及协议类型使得协议能够平滑引入新协议拓展;已有的协议升级则利用协议版本供拓展。
服务器
请求-响应设计:区分请求或响应类型,引入消息类型标识REQUEST(byte)0、RESPONCE(byte)1;消息主体需利用编码器序列化框架进行编码解码,引入编码类型;在请求响应过程当中,利用消息id做为惟一标识,利用超时时间来检测服务器处理时效,超时直接不返回;引入body_length来标识消息主体的长度,使得协议可变长。app
协议处理实现框架
编码ide
在UniqueEncoder构建可变储存ChannelBuffer,读取协议类型并在协议工厂查找协议对业务对象编码。函数
public class UniqueEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
int type = 0;
if (msg instanceof ResponceWrapper) {
type = ((ResponceWrapper) msg).getProtocolType();
} else if (msg instanceof RequestWrapper) {
type = ((RequestWrapper) msg).getProtocolType();
}
return ProtocolFactory.getProtocol(type).encode(msg, channelBuffer);
}
}复制代码
那编码究竟是怎么处理的呢?微服务
能够清晰看到,ProcessorProtocol类前面明确地说明了协议头、协议主体的结构信息,其实跟刚开始贴的图是一致的。其实这个方即可以清晰地根听说明对业务编码,能够养成这个好的习惯。编码过程先校验业务对象类型,是请求 OR 响应类型,并调用ChannelBuffer将协议相关信息依次写入,再也不赘述。能够看到写入长度是5 * 1B + 3 * 4B + body主体。ui
/** * PROTOCOL HEADER * VERSION (1B) version * TYPE (1B) type * PROCESSOR PROTOCOL * VERSION (1B) * TYPE (1B) REQUEST RESPONCE * CODEC (1B) serialize/deserialize * ID (4B) msg id * TIMEOUT (4B) timeout * BODYLENGHT (4B) body length * BODY CONTEXT (BODYLENGTH) * @author duxuan */
public class ProcessorProtocol implements Protocol {
private final int type = 1;
private final int version = 1;
private final int CUSTOMPROTOCOL_HEADER_LENGTH = 3 * 1 + 3 * 4;
public static final byte REQUEST = (byte) 1;
public static final byte RESPONCE = (byte) 0;
/** * @param msg 消息实体 * @param channelBuffer 接收编码数据 * @return * @throws Exception */
@Override
public ChannelBuffer encode(Object msg, ChannelBuffer channelBuffer) throws Exception {
if (!(msg instanceof RequestWrapper ||
msg instanceof ResponceWrapper)) {
throw new Exception();
}
int processType = REQUEST;
int codec = 0;
int id = 0;
int timeout = 0;
int bodyLength = 0;
byte[] body = new byte[0];
if (msg instanceof RequestWrapper) {
RequestWrapper requestWrapper = (RequestWrapper) msg;
processType = REQUEST;
codec = requestWrapper.getCodecType();
id = requestWrapper.getId();
timeout = requestWrapper.getTimeout();
body = Codecs.getEncoder(codec).encode(requestWrapper.getMsg());
} else if (msg instanceof ResponceWrapper) {
processType = RESPONCE;
ResponceWrapper responceWrapper = (ResponceWrapper) msg;
codec = responceWrapper.getCodecType();
id = responceWrapper.getId();
body = Codecs.getEncoder(codec).encode(responceWrapper.getBody());
}
bodyLength = body.length;
/** * 5 * 1B */
// default version
channelBuffer.writeByte(1);
channelBuffer.writeByte(type);
channelBuffer.writeByte(version);
channelBuffer.writeByte(processType);
channelBuffer.writeByte(codec);
/** * 3 * 4B */
channelBuffer.writeInt(id);
channelBuffer.writeInt(timeout);
channelBuffer.writeInt(bodyLength);
/** * body */
channelBuffer.writeBytes(body);
return channelBuffer;
}
}复制代码
解码
利用FrameDecoder实现无感知粘包拆包处理,而UniqueDecoder只须要负责协议解析便可,如解析失败,则重置读指针originPos;解析成功则返回已解析对象。
(粘包拆包如何处理?未了解的同窗可点击查看)
public class UniqueDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
final int originPos = buffer.readerIndex();
if (buffer.readableBytes() < 2) {
return null;
}
int version = buffer.readByte();
if (version == 1) {
int type = buffer.readByte();
// 协议支持 协议解析+body
Protocol protocol = ProtocolFactory.getProtocol(type);
if (protocol == null) {
throw new Exception("UnSupport protocol");
}
// reset
protocol.decode(buffer, null, originPos);
} else {
throw new Exception("UnSupport version:" + version);
}
return null;
}
}复制代码
能够看到在UniqueDecoder中读取通信版本version、协议类型type,并查找已注册到协议工厂的协议来负责解析,若是查找不到,则抛出异常。
/**
* PROTOCOL HEADER
* VERSION (1B) version
* TYPE (1B) type
* PROCESSOR PROTOCOL
* VERSION (1B)
* TYPE (1B) REQUEST RESPONCE
* CODEC (1B) serialize/deserialize
* ID (4B) msg id
* TIMEOUT (4B) timeout
* BODYLENGHT (4B) body length
* BODY CONTEXT (BODYLENGTH)
* @author duxuan
*/
public class ProcessorProtocol implements Protocol {
private final int type = 1;
private final int version = 1;
private final int CUSTOMPROTOCOL_HEADER_LENGTH = 3 * 1 + 3 * 4;
public static final byte REQUEST = (byte) 1;
public static final byte RESPONCE = (byte) 0;
/**
* @param channelBuffer 已读取过通信版本以及协议类型(2B)
* @param errorObject 设置解码失败返回的类型
* @param originPos 已记录解码前的readerIndex,用于读取失败重置
* @return * @throws Exception
*/
@Override
public Object decode(ChannelBuffer channelBuffer, Object errorObject, final int originPos) throws Exception {
if (channelBuffer.readableBytes() < CUSTOMPROTOCOL_HEADER_LENGTH) {
channelBuffer.readerIndex(originPos);
return errorObject;
}
int version = channelBuffer.readByte();
if (version == 1) {
int type = channelBuffer.readByte();
int codec = channelBuffer.readByte();
int msgId = channelBuffer.readInt();
int timeout = channelBuffer.readInt();
int bodyLength = channelBuffer.readInt();
byte[] body = new byte[bodyLength];
channelBuffer.readBytes(body);
// decode
Decoder decoder = Codecs.getDecoder(codec);
if (decoder == null) {
throw new Exception("could not support codec decoder");
}
if (type == REQUEST) {
RequestWrapper requestWrapper = new RequestWrapper(body, msgId, timeout, codec, type);
return requestWrapper;
} else if (type == RESPONCE) {
ResponceWrapper responceWrapper = new ResponceWrapper(body, msgId, codec, type);
return responceWrapper;
}
}else {
throw new Exception("could not support processorProtocol version");
}
return null;
}
} 复制代码
解码时校验可读字节数是否小于协议头长度,若是小于,重置readerIndex;不然依次读取协议版本、请求或者响应类型、编码类型、消息id、超时时间、消息主体长度、消息主体,并根据编码类型调用序列化框架将消息解码成业务对象供业务处理器使用。
经常使用序列化框架介绍
Kryo | 速度快,序列化后体积小 | 跨语言支持较复杂 |
Hessian | 默认支持跨语言 | 较慢 |
Protostuff | 速度快,基于protobuf | 需静态编译 |
Protostuff-Runtime | 无需静态编译,但序列化前需预先传入schema | 不支持无默认构造函数的类,反序列化时需用户本身初始化序列化后的对象,其只负责将该对象进行赋值 |
Java | 使用方便,可序列化全部类 | 速度慢,占空间 |
序列化处理能够根据须要实现并注册到协议编解码器便可!
能够扫描关注路上小栈或者加wx(arron1126912882)备注掘金欢迎交流!