1.2 问题分析
首先对 GC 数据进行分析,发现老年代已满,发生屡次 Full GC,耗时达 3 分多,系统已经没法正常运行(示例):算法
图 1 直播高峰期服务端 GC 统计数据
Dump 内存堆栈进行分析,发现大量的发送任务堆积,致使内存溢出(示例):数组
图 2 直播高峰期服务端内存 Dump 文件分析
经过以上分析能够看出,在直播高峰期,服务端向上万客户端推送消息时,发生了发送队列积压,引发内存泄漏,最终致使服务端频繁 GC,没法正常处理业务。promise
1.3 解决策略
服务端在进行消息发送的时候作保护,具体策略以下:安全
根据可接入的最大用户数作客户端并发接入数流控,须要根据内存、CPU 处理能力,以及性能测试结果作综合评估。网络
设置消息发送的高低水位,针对消息的平均大小、客户端并发接入数、JVM 内存大小进行计算,得出一个合理的高水位取值。服务端在推送消息时,对 Channel 的状态进行判断,若是达到高水位以后,Channel 的状态会被 Netty 置为不可写,此时服务端不要继续发送消息,防止发送队列积压。并发
服务端基于上述策略优化了代码,内存泄漏问题获得解决。app
1.4. 总结
尽管 Netty 框架自己作了大量的可靠性设计,可是对于具体的业务场景,仍然须要用户作针对特定领域和场景的可靠性设计,这样才能提高应用的可靠性。框架
除了消息发送积压致使的内存泄漏,Netty 还有其它常见的一些内存泄漏点,本文将针对这些可能致使内存泄漏的功能点进行分析和总结。异步
消息收发防内存泄漏策略
2.1. 消息接收
2.1.1 消息读取
Netty 的消息读取并不存在消息队列,可是若是消息解码策略不当,则可能会发生内存泄漏,主要有以下几点:ide
畸形码流***:若是客户端按照协议规范,将消息长度值故意伪造的很是大,可能会致使接收方内存溢出。
代码 BUG:错误的将消息长度字段设置或者编码成一个很是大的值,可能会致使对方内存溢出。
避免内存泄漏的策略以下:
不管采用哪一种×××实现,都对消息的最大长度作限制,当超过限制以后,抛出解码失败异常,用户能够选择忽略当前已经读取的消息,或者直接关闭连接。
以 Netty 的 DelimiterBasedFrameDecoder 代码为例,建立 DelimiterBasedFrameDecoder 对象实例时,指定一个比较合理的消息最大长度限制,防止内存溢出:
复制代码
/**
{1}
{1}
*@parammaxFrameLength the maximum length of the decoded frame.
{1}
publicDelimiterBasedFrameDecoder(
intmaxFrameLength,booleanstripDelimiter, ByteBuf delimiter) {
this(maxFrameLength, stripDelimiter,true, delimiter);
}
须要根据单个 Netty 服务端能够支持的最大客户端并发链接数、消息的最大长度限制以及当前 JVM 配置的最大内存进行计算,并结合业务场景,合理设置 maxFrameLength 的取值。
2.1.2 ChannelHandler 的并发执行
Netty 的 ChannelHandler 支持串行和异步并发执行两种策略,在将 ChannelHandler 加入到 ChannelPipeline 时,若是指定了 EventExecutorGroup,则 ChannelHandler 将由 EventExecutorGroup 中的 EventExecutor 异步执行。这样的好处是能够实现 Netty I/O 线程与业务 ChannelHandler 逻辑执行的分离,防止 ChannelHandler 中耗时业务逻辑的执行阻塞 I/O 线程。
ChannelHandler 异步执行的流程以下所示:
图 3 ChannelHandler 异步并发执行流程
若是业务 ChannelHandler 中执行的业务逻辑耗时较长,消息的读取速度又比较快,很容易发生消息在 EventExecutor 中积压的问题,若是建立 EventExecutor 时没有经过 io.netty.eventexecutor.maxPendingTasks 参数指定积压的最大消息个数,则默认取值为 0x7fffffff,长时间的积压将致使内存溢出,相关代码以下所示(异步执行 ChannelHandler,将消息封装成 Task 加入到 taskQueue 中):
复制代码
public void execute(Runnable task) {
if(task==null) {
thrownewNullPointerException("task");
}
boolean inEventLoop =inEventLoop();
if(inEventLoop) {
addTask(task);
}else{
startThread();
addTask(task);
if(isShutdown()&&removeTask(task)) {
reject();
}
}
解决对策:对 EventExecutor 中任务队列的容量作限制,能够经过 io.netty.eventexecutor.maxPendingTasks 参数作全局设置,也能够经过构造方法传参设置。结合 EventExecutorGroup 中 EventExecutor 的个数来计算 taskQueue 的个数,根据 taskQueue N 任务队列平均大小 maxPendingTasks < 系数 K(0 < K < 1) 总内存的公式来进行计算和评估。
2.2. 消息发送
2.2.1 如何防止发送队列积压
为了防止高并发场景下,因为对方处理慢致使自身消息积压,除了服务端作流控以外,客户端也须要作并发保护,防止自身发生消息积压。
利用 Netty 提供的高低水位机制,能够实现客户端更精准的流控,它的工做原理以下:
图 4 Netty 高水位接口说明
当发送队列待发送的字节数组达到高水位上限时,对应的 Channel 就变为不可写状态。因为高水位并不影响业务线程调用 write 方法并把消息加入到待发送队列中,所以,必需要在消息发送时对 Channel 的状态进行判断:当到达高水位时,Channel 的状态被设置为不可写,经过对 Channel 的可写状态进行判断来决定是否发送消息。
在消息发送时设置高低水位并对 Channel 状态进行判断,相关代码示例以下:
复制代码
public void channelActive(finalChannelHandlerContextctx){
ctx.channel().config().setWriteBufferHighWaterMark(10 *1024*1024);
loadRunner =newRunnable(){
@Override
public void run(){
try{
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
ByteBuf msg = null;
while(true) {
if(ctx.channel().isWritable()) {
msg =Unpooled.wrappedBuffer("Netty OOM Example".getBytes());
ctx.writeAndFlush(msg);
}else{
LOG.warning("The write queue is busy : "+ ctx.channel().unsafe().outboundBuffer().nioBufferSize());
}
}
}
};
newThread(loadRunner,"LoadRunner-Thread").start();
}
对上述代码作验证,客户端代码中打印队列积压相关日志,说明基于高水位的流控机制生效,日志以下:
警告: The write queue is busy : 17
经过内存监控,发现内存占用平稳:
图 5 进行高低水位保护优化以后内存占用状况
在实际项目中,根据业务 QPS 规划、客户端处理性能、网络带宽、链路数、消息平均码流大小等综合因素计算并设置高水位(WriteBufferHighWaterMark)阈值,利用高水位作消息发送速率的流控,既能够保护自身,同时又能减轻服务端的压力,防止服务端被压挂。
2.2.2 其它可能致使发送队列积压的因素
须要指出的是,并不是只有高并发场景才会触发消息积压,在一些异常场景下,尽管系统流量不大,但仍然可能会致使消息积压,可能的场景包括:
网络瓶颈,发送速率超过网络连接处理能力时,会致使发送队列积压。
对端读取速度小于己方发送速度,致使自身 TCP 发送缓冲区满,频繁发生 write 0 字节时,待发送消息会在 Netty 发送队列排队。
当出现大量排队时,很容易致使 Netty 的直接内存泄漏,示例以下:
图 6 消息积压致使内存泄漏相关堆栈
咱们在设计系统时,须要根据业务的场景、所处的网络环境等因素进行综合设计,为潜在的各类故障作容错和保护,防止由于外部因素致使自身发生内存泄漏。
事实上,这种观点是错误的,即使 ByteBuf 是 Netty 建立的,若是使用不当仍然会发生内存泄漏。在实际项目中如何更好的管理 ByteBuf,下面咱们分四种场景进行说明。
3.2 ByteBuf 的释放策略
3.2.1 基于内存池的请求 ByteBuf
这类 ByteBuf 主要包括 PooledDirectByteBuf 和 PooledHeapByteBuf,它由 Netty 的 NioEventLoop 线程在处理 Channel 的读操做时分配,须要在业务 ChannelInboundHandler 处理完请求消息以后释放(一般是解码以后),它的释放有 2 种策略:
策略 1:业务 ChannelInboundHandler 继承自 SimpleChannelInboundHandler,实现它的抽象方法 channelRead0(ChannelHandlerContext ctx, I msg),ByteBuf 的释放业务不用关心,由 SimpleChannelInboundHandler 负责释放,相关代码以下所示(SimpleChannelInboundHandler):
复制代码
@Override
public void channelRead(ChannelHandlerContextctx, Objectmsg)throws Exception {
boolean release =true;
try{
if(acceptInboundMessage(msg)) {
I imsg = (I) msg;
channelRead0(ctx,imsg);
}else{
release =false;
ctx.fireChannelRead(msg);
}
} finally {
if(autoRelease&&release) {
ReferenceCountUtil.release(msg);
}
}
}
若是当前业务 ChannelInboundHandler 须要执行,则调用完 channelRead0 以后执行 ReferenceCountUtil.release(msg) 释放当前请求消息。若是没有匹配上须要继续执行后续的 ChannelInboundHandler,则不释放当前请求消息,调用 ctx.fireChannelRead(msg) 驱动 ChannelPipeline 继续执行。
继承自 SimpleChannelInboundHandler,即使业务不释放请求 ByteBuf 对象,依然不会发生内存泄漏,相关示例代码以下所示:
复制代码
publiccla***outerServerHandlerV2extendsSimpleChannelInboundHandler<ByteBuf>{
// 代码省略...
@Override
publicvoidchannelRead0(ChannelHandlerContext ctx, ByteBuf msg){
byte[] body =newbyte[msg.readableBytes()];
executorService.execute(()->
{
// 解析请求消息,作路由转发,代码省略...
// 转发成功,返回响应给客户端
ByteBuf respMsg = allocator.heapBuffer(body.length);
respMsg.writeBytes(body);// 做为示例,简化处理,将请求返回
ctx.writeAndFlush(respMsg);
});
}
对上述代码作性能测试,发现内存占用平稳,无内存泄漏问题,验证了以前的分析结论。
策略 2:在业务 ChannelInboundHandler 中调用 ctx.fireChannelRead(msg) 方法,让请求消息继续向后执行,直到调用到 DefaultChannelPipeline 的内部类 TailContext,由它来负责释放请求消息,代码以下所示(TailContext):
复制代码
protectedvoidonUnhandledInboundMessage(Object msg){
try{
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. "+
"Please check your pipeline configuration.", msg);
}finally{
ReferenceCountUtil.release(msg);
}
}
3.2.2 基于非内存池的请求 ByteBuf
若是业务使用非内存池模式覆盖 Netty 默认的内存池模式建立请求 ByteBuf,例如经过以下代码修改内存申请策略为 Unpooled:
复制代码
// 代码省略...
.childHandler(newChannelInitializer<SocketChannel>() {
@Override
publicvoidinitChannel(SocketChannel ch)throwsException{
ChannelPipeline p = ch.pipeline(); ch.config().setAllocator(UnpooledByteBufAllocator.DEFAULT);
p.addLast(newRouterServerHandler());
}
});
}
也须要按照内存池的方式去释放内存。
3.2.3 基于内存池的响应 ByteBuf
只要调用了 writeAndFlush 或者 flush 方法,在消息发送完成以后都会由 Netty 框架进行内存释放,业务不须要主动释放内存。
它的工做原理以下:
调用 ctx.writeAndFlush(respMsg) 方法,当消息发送完成以后,Netty 框架会主动帮助应用来释放内存,内存的释放分为两种场景:
若是是堆内存(PooledHeapByteBuf),则将 HeapByteBuffer 转换成 DirectByteBuffer,并释放 PooledHeapByteBuf 到内存池,代码以下(AbstractNioChannel 类):
复制代码
protected final ByteBufnewDirectBuffer(ByteBufbuf){
finalintreadableBytes = buf.readableBytes();
if(readableBytes==0) {
ReferenceCountUtil.safeRelease(buf);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if(alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf,buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
} }
// 后续代码省略
}
若是消息完整的被写到 SocketChannel 中,则释放 DirectByteBuffer,代码以下(ChannelOutboundBuffer)所示:
复制代码
public boolean remove(){
Entry e = flushedEntry;
if(e==null) {
clearNioBuffers();
returnfalse;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
intsize = e.pendingSize;
removeEntry(e);
if(!e.cancelled) {
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size,false,true);
}
// 后续代码省略
}
对 Netty 源码进行断点调试,验证上述分析:
断点 1:在响应消息发送处打印断点,获取到 PooledUnsafeHeapByteBuf 实例 ID 为 1506。
图 7 响应发送处断点调试
断点 2:在 HeapByteBuffer 转换成 DirectByteBuffer 处打断点,发现实例 ID 为 1506 的 PooledUnsafeHeapByteBuf 被释放。
图 8 响应消息释放处断点
断点 3:转换以后待发送的响应消息 PooledUnsafeDirectByteBuf 实例 ID 为 1527。
图 9 响应消息转换处断点
断点 4:响应消息发送完成以后,实例 ID 为 1527 的 PooledUnsafeDirectByteBuf 被释放到内存池。
图 10 转换以后的响应消息释放处断点
若是是 DirectByteBuffer,则不须要转换,当消息发送完成以后,由 ChannelOutboundBuffer 的 remove() 负责释放。
3.2.4 基于非内存池的响应 ByteBuf
不管是基于内存池仍是非内存池分配的 ByteBuf,若是是堆内存,则将堆内存转换成堆外内存,而后释放 HeapByteBuffer,待消息发送完成以后,再释放转换后的 DirectByteBuf;若是是 DirectByteBuffer,则无需转换,待消息发送完成以后释放。所以对于须要发送的响应 ByteBuf,由业务建立,可是不须要业务来释放。
以 Netty HTTPS 服务端为例,典型的业务组网示例以下所示:
图 11 Netty HTTPS 组网图
客户端采用 HTTP 链接池的方式与服务端进行 RPC 调用,单个客户端链接池上限为 200,客户端部署了 30 个实例,而服务端只部署了 3 个实例。在业务高峰期,每一个服务端须要处理 6000 个 HTTP 链接,当服务端时延增大以后,会致使客户端批量超时,超时以后客户端会关闭链接从新发起 connect 操做,在某个瞬间,几千个 HTTPS 链接同时发起 SSL 握手操做,因为服务端此时也处于高负荷运行状态,就会致使部分链接 SSL 握手失败或者超时,超时以后客户端会继续重连,进一步加剧服务端的处理压力,最终致使服务端来不及释放客户端 close 的链接,引发 NioSocketChannel 大量积压,最终 OOM。
经过客户端的运行日志能够看到一些 SSL 握手发生了超时,示例以下:
图 12 SSL 握手超时日志
服务端并无对客户端的链接数作限制,这会致使尽管 ESTABLISHED 状态的链接数并不会超过 6000 上限,可是因为一些 SSL 链接握手失败,再加上积压在服务端的链接并无及时释放,最终引发了 NioSocketChannel 的大量积压。
4.2.Netty HTTS 并发链接数流控
在服务端增长对客户端并发链接数的控制,原理以下所示:
图 13 服务端 HTTS 链接数流控
基于 Netty 的 Pipeline 机制,能够对 SSL 握手成功、SSL 链接关闭作切面拦截(相似于 Spring 的 AOP 机制,可是没采用反射机制,性能更高),经过流控切面接口,对 HTTPS 链接作计数,根据计数器作流控,服务端的流控算法以下:
获取流控阈值。
从全局上下文中获取当前的并发链接数,与流控阈值对比,若是小于流控阈值,则对当前的计数器作原子自增,容许客户端链接接入。
若是等于或者大于流控阈值,则抛出流控异常给客户端。
SSL 链接关闭时,获取上下文中的并发链接数,作原子自减。
在实现服务端流控时,须要注意以下几点:
流控的 ChannelHandler 声明为 @ChannelHandler.Sharable,这样全局建立一个流控实例,就能够在全部的 SSL 链接中共享。
经过 userEventTriggered 方法拦截 SslHandshakeCompletionEvent 和 SslCloseCompletionEvent 事件,在 SSL 握手成功和 SSL 链接关闭时更新流控计数器。
流控并非单针对 ESTABLISHED 状态的 HTTP 链接,而是针对全部状态的链接,由于客户端关闭链接,并不意味着服务端也同时关闭了链接,只有 SslCloseCompletionEvent 事件触发时,服务端才真正的关闭了 NioSocketChannel,GC 才会回收链接关联的内存。
流控 ChannelHandler 会被多个 NioEventLoop 线程调用,所以对于相关的计数器更新等操做,要保证并发安全性,避免使用全局锁,能够经过原子类等提高性能。
图 14 NioEventLoop 定时任务执行接口
建议业务在使用时,对 NioEventLoop 队列的积压状况进行采集和告警。
5.1.2 客户端链接池
业务在初始化链接池时,若是采用每一个客户端链接对应一个 EventLoopGroup 实例的方式,即每建立一个客户端链接,就会同时建立一个 NioEventLoop 线程来处理客户端链接以及后续的网络读写操做,采用的策略是典型的 1 个 TCP 链接对应一个 NIO 线程的模式。当系统的链接数不少、堆内存又不足时,就会发生内存泄漏或者线程建立失败异常。问题示意以下:
图 15 错误的客户端线程模型
优化策略:客户端建立链接池时,EventLoopGroup 能够重用,优化以后的链接池线程模型以下所示:
图 16 正确的客户端线程模型
5.2 内存泄漏问题定位
5.2.1 堆内存泄漏
经过 jmap -dump:format=b,file=xx pid 命令 Dump 内存堆栈,而后使用 MemoryAnalyzer 工具对内存占用进行分析,查找内存泄漏点,而后结合代码进行分析,定位内存泄漏的具体缘由,示例以下所示:
图 17 经过 MemoryAnalyzer 工具分析内存堆栈
5.2.2 堆外内存泄漏
建议策略以下:
排查下业务代码,看使用堆外内存的地方是否存在忘记释放问题。
若是使用到了 Netty 的 TLS/SSL/openssl,建议到 Netty 社区查下 BUG 列表,看是不是 Netty 老版本已知的 BUG,此类 BUG 经过升级 Netty 版本能够解决。
若是上述两个步骤排查没有结果,则能够经过 google-perftools 工具协助进行堆外内存分析