记得前段时间咱们生产上的一个网关出现了故障。java
这个网关逻辑很是简单,就是接收客户端的请求而后解析报文最后发送短信。git
但这个请求并非常见的 HTTP ,而是利用 Netty 自定义的协议。程序员
有个前提是:网关是须要读取一段完整的报文才能进行后面的逻辑。github
问题是有天忽然发现网关解析报文出错,查看了客户端的发送日志也没发现问题,最后经过日志发现收到了许多不完整的报文,有些还多了。shell
因而想会不会是 TCP 拆、粘包带来的问题,最后利用 Netty 自带的拆包工具解决了该问题。api
这便有了此文。缓存
问题虽然解决了,但仍是得想一想缘由,为啥会这样?打破砂锅问到底才是一个靠谱的程序员。网络
这就得从 TCP 这个协议提及了。app
TCP 是一个面向字节流的协议,它是性质是流式的,因此它并无分段。就像水流同样,你无法知道何时开始,何时结束。框架
因此他会根据当前的套接字缓冲区的状况进行拆包或是粘包。
下图展现了一个 TCP 协议传输的过程:
发送端的字节流都会先传入缓冲区,再经过网络传入到接收端的缓冲区中,最终由接收端获取。
当咱们发送两个完整包到接收端的时候:
正常状况会接收到两个完整的报文。
但也有如下的状况:
接收到的是一个报文,它是由发送的两个报文组成的,这样对于应用程序来讲就很难处理了(这样称为粘包)。
还有可能出现上面这样的虽然收到了两个包,可是里面的内容倒是互相包含,对于应用来讲依然没法解析(拆包)。
对于这样的问题只能经过上层的应用来解决,常见的方式有:
以上的这些方式咱们在 Netty 的 pipline 中里加入对应的解码器均可以手动实现。
但其实 Netty 已经帮咱们作好了,彻底能够开箱即用。
好比:
LineBasedFrameDecoder
能够基于换行符解决。DelimiterBasedFrameDecoder
可基于分隔符解决。FixedLengthFrameDecoder
可指定长度解决。下面来模拟一下最简单的字符串传输。
仍是在以前的
进行演示。
在 Netty 客户端中加了一个入口能够循环发送 100 条字符串报文到接收端:
/** * 向服务端发消息 字符串 * @param stringReqVO * @return */
@ApiOperation("客户端发送消息,字符串")
@RequestMapping(value = "sendStringMsg", method = RequestMethod.POST)
@ResponseBody
public BaseResponse<NULLBody> sendStringMsg(@RequestBody StringReqVO stringReqVO){
BaseResponse<NULLBody> res = new BaseResponse();
for (int i = 0; i < 100; i++) {
heartbeatClient.sendStringMsg(stringReqVO.getMsg()) ;
}
// 利用 actuator 来自增
counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT);
SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
sendMsgResVO.setMsg("OK") ;
res.setCode(StatusEnum.SUCCESS.getCode()) ;
res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
return res ;
}
/** * 发送消息字符串 * * @param msg */
public void sendStringMsg(String msg) {
ByteBuf message = Unpooled.buffer(msg.getBytes().length) ;
message.writeBytes(msg.getBytes()) ;
ChannelFuture future = channel.writeAndFlush(message);
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("客户端手动发消息成功={}", msg));
}
复制代码
服务端直接打印便可:
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
LOGGER.info("收到msg={}", msg);
}
复制代码
顺便提一下,这里加的有一个字符串的解码器:.addLast(new StringDecoder())
其实就是把消息解析为字符串。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(msg.toString(charset));
}
复制代码
在 Swagger 中调用了客户端的接口用于给服务端发送了 100 次消息:
正常状况下接收端应该打印 100 次 hello
才对,可是查看日志会发现:
收到的内容有完整的、多的、少的、拼接的;这也就对应了上面提到的拆包、粘包。
该怎么解决呢?这即可采用以前提到的 LineBasedFrameDecoder
利用换行符解决。
LineBasedFrameDecoder
解码器使用很是简单,只须要在 pipline 链条上添加便可。
//字符串解析,换行防拆包
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
复制代码
构造函数中传入了 1024 是指报的长度最大不超过这个值,具体能够看下文的源码分析。
而后咱们再进行一次测试看看结果:
注意,因为 LineBasedFrameDecoder 解码器是经过换行符来判断的,因此在发送时,一条完整的消息须要加上
\n
。
最终的结果:
仔细观察日志,发现确实没有一条被拆、粘包。
目的达到了,来看看它的实现原理:
findEndOfLine
方法去找到当前报文中是否存在分隔符,存在就会返回分隔符所在的位置。从这个逻辑中能够看出就是寻找报文中是否包含换行符,并进行相应的截取。
因为是经过缓冲区读取的,因此即便此次没有换行符的数据,只要下一次的报文存在换行符,上一轮的数据也不会丢。
上面提到的其实就是在解码中进行操做,咱们也能够自定义本身的拆、粘包工具。
编解码的主要目的就是为了能够编码成字节流用于在网络中传输、持久化存储。
Java 中也能够实现 Serializable 接口来实现序列化,但因为它性能等缘由在一些 RPC 调用中用的不多。
而 Google Protocol
则是一个高效的序列化框架,下面来演示在 Netty 中如何使用。
首先第一步天然是安装:
在官网下载对应的包。
本地配置环境变量:
当执行 protoc --version
出现如下结果代表安装成功:
接着是须要按照官方要求的语法定义本身的协议格式。
好比我这里须要定义一个输入输出的报文格式:
BaseRequestProto.proto:
syntax = "proto2";
package protocol;
option java_package = "com.crossoverjie.netty.action.protocol";
option java_outer_classname = "BaseRequestProto";
message RequestProtocol {
required int32 requestId = 2;
required string reqMsg = 1;
}
复制代码
BaseResponseProto.proto:
syntax = "proto2";
package protocol;
option java_package = "com.crossoverjie.netty.action.protocol";
option java_outer_classname = "BaseResponseProto";
message ResponseProtocol {
required int32 responseId = 2;
required string resMsg = 1;
}
复制代码
再经过
protoc --java_out=/dev BaseRequestProto.proto BaseResponseProto.proto
复制代码
protoc 命令将刚才定义的协议格式转换为 Java 代码,并生成在 /dev
目录。
只须要将生成的代码拷贝到咱们的项目中,同时引入依赖:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.4.0</version>
</dependency>
复制代码
利用 Protocol 的编解码也很是简单:
public class ProtocolUtil {
public static void main(String[] args) throws InvalidProtocolBufferException {
BaseRequestProto.RequestProtocol protocol = BaseRequestProto.RequestProtocol.newBuilder()
.setRequestId(123)
.setReqMsg("你好啊")
.build();
byte[] encode = encode(protocol);
BaseRequestProto.RequestProtocol parseFrom = decode(encode);
System.out.println(protocol.toString());
System.out.println(protocol.toString().equals(parseFrom.toString()));
}
/** * 编码 * @param protocol * @return */
public static byte[] encode(BaseRequestProto.RequestProtocol protocol){
return protocol.toByteArray() ;
}
/** * 解码 * @param bytes * @return * @throws InvalidProtocolBufferException */
public static BaseRequestProto.RequestProtocol decode(byte[] bytes) throws InvalidProtocolBufferException {
return BaseRequestProto.RequestProtocol.parseFrom(bytes);
}
}
复制代码
利用 BaseRequestProto
来作一个演示,先编码再解码最后比较最终的结果是否相同。答案确定是一致的。
利用 protoc 命令生成的 Java 文件里已经帮咱们把编解码所有都封装好了,只须要简单调用就好了。
能够看出 Protocol 建立对象使用的是构建者模式,对使用者来讲清晰易读,更多关于构建器的内容能够参考这里。
更多关于 Google Protocol
内容请查看官方开发文档。
Netty 已经自带了对 Google protobuf 的编解码器,也是只须要在 pipline 中添加便可。
server 端:
// google Protobuf 编解码
.addLast(new ProtobufDecoder(BaseRequestProto.RequestProtocol.getDefaultInstance()))
.addLast(new ProtobufEncoder())
复制代码
客户端:
// google Protobuf 编解码
.addLast(new ProtobufDecoder(BaseResponseProto.ResponseProtocol.getDefaultInstance()))
.addLast(new ProtobufEncoder())
复制代码
稍微注意的是,在构建 ProtobufDecoder 时须要显式指定解码器须要解码成什么类型。
我这里服务端接收的是 BaseRequestProto,客户端收到的是服务端响应的 BaseResponseProto 因此就设置了对应的实例。
一样的提供了一个接口向服务端发送消息,当服务端收到了一个特殊指令时也会向客户端返回内容:
@Override
protected void channelRead0(ChannelHandlerContext ctx, BaseRequestProto.RequestProtocol msg) throws Exception {
LOGGER.info("收到msg={}", msg.getReqMsg());
if (999 == msg.getRequestId()){
BaseResponseProto.ResponseProtocol responseProtocol = BaseResponseProto.ResponseProtocol.newBuilder()
.setResponseId(1000)
.setResMsg("服务端响应")
.build();
ctx.writeAndFlush(responseProtocol) ;
}
}
复制代码
在 swagger 中调用相关接口:
在日志能够看到服务端收到了消息,同时客户端也收到了返回:
虽然说 Netty 封装了 Google Protobuf 相关的编解码工具,其实查看它的编码工具就会发现也是利用上文提到的 api 实现的。
Google Protocol 的使用确实很是简单,但仍是有值的注意的地方,好比它依然会有拆、粘包问题。
不妨模拟一下:
连续发送 100 次消息看服务端收到的怎么样:
会发现服务端在解码的时候报错,其实就是被拆、粘包了。
这点 Netty 天然也考虑到了,因此已经提供了相关的工具。
//拆包解码
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufVarint32LengthFieldPrepender())
复制代码
只须要在服务端和客户端加上这两个编解码工具便可,再来发送一百次试试。
查看日志发现没有出现一次异常,100 条信息所有都接收到了。
这个编解码工具能够简单理解为是在消息体中加了一个 32 位长度的整形字段,用于代表当前消息长度。
网络这块一样是计算机的基础,因为近期在作相关的工做因此接触的比较多,也算是给大学补课了。
后面会接着更新 Netty 相关的内容,最后会产出一个高性能的 HTTP 以及 RPC 框架,敬请期待。
上文相关的代码:
最近在总结一些 Java 相关的知识点,感兴趣的朋友能够一块儿维护。
欢迎关注公众号一块儿交流: