Netty 做为一个网络框架,提供了诸多功能,好比咱们以前说的编解码,Netty 准备不少现成的编解码,同时,Netty 还为咱们准备了网络中,很是重要的一个服务-----心跳机制。经过心跳检查对方是否有效,这在 RPC 框架中是必不可少的功能。git
Netty 提供了 IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 检测链接的有效性。固然,你也能够本身写个任务。但咱们今天不许备使用自定义任务,而是使用 Netty 内部的。github
说如下这三个 handler 的做用。promise
序 号 | 名称 | 做用 |
---|---|---|
1 | IdleStateHandler | 当链接的空闲时间(读或者写)太长时,将会触发一个 IdleStateEvent 事件。而后,你能够经过你的 ChannelInboundHandler 中重写 userEventTrigged 方法来处理该事件。 |
2 | ReadTimeoutHandler | 若是在指定的事件没有发生读事件,就会抛出这个异常,并自动关闭这个链接。你能够在 exceptionCaught 方法中处理这个异常。 |
3 | WriteTimeoutHandler | 当一个写操做不能在必定的时间内完成时,抛出此异常,并关闭链接。你一样能够在 exceptionCaught 方法中处理这个异常。 |
注意:
其中,关于 WriteTimeoutHandler 的描述,著名的 《Netty 实战》和 他的英文原版的描述都过期了,原文描述:服务器
若是在指定的时间间隔内没有任何出站数据写入,则抛出一个 WriteTimeoutException.网络
此书出版的时候,Netty 的文档确实是这样的,但在 2015 年 12 月 28 号的时候,被一个同窗修改了逻辑,见下方 git 日志:框架
image.pngide
貌似仍是个国人妹子。。。。而如今的文档描述是:oop
Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
当一个写操做不能在必定的时间内完成时,就会产生一个 WriteTimeoutException。源码分析
ReadTimeout 事件和 WriteTimeout 事件都会自动关闭链接,并且,属于异常处理,因此,这里只是介绍如下,咱们重点看 IdleStateHandler。性能
当链接的空闲时间(读或者写)太长时,将会触发一个 IdleStateEvent 事件。而后,你能够经过你的 ChannelInboundHandler 中重写 userEventTrigged 方法来处理该事件。
IdleStateHandler 既是出站处理器也是入站处理器,继承了 ChannelDuplexHandler 。一般在 initChannel 方法中将 IdleStateHandler 添加到 pipeline 中。而后在本身的 handler 中重写 userEventTriggered 方法,当发生空闲事件(读或者写),就会触发这个方法,并传入具体事件。
这时,你能够经过 Context 对象尝试向目标 Socekt 写入数据,并设置一个 监听器,若是发送失败就关闭 Socket (Netty 准备了一个ChannelFutureListener.CLOSE_ON_FAILURE
监听器用来实现关闭 Socket 逻辑)。
这样,就实现了一个简单的心跳服务。
1.构造方法,该类有 3 个构造方法,主要对一下 4 个属性赋值:
private final boolean observeOutput;// 是否考虑出站时较慢的状况。默认值是false(不考虑)。 private final long readerIdleTimeNanos; // 读事件空闲时间,0 则禁用事件 private final long writerIdleTimeNanos;// 写事件空闲时间,0 则禁用事件 private final long allIdleTimeNanos; //读或写空闲时间,0 则禁用事件
2. handlerAdded 方法
当该 handler 被添加到 pipeline 中时,则调用 initialize 方法:
private void initialize(ChannelHandlerContext ctx) { switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx); lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0) { // 这里的 schedule 方法会调用 eventLoop 的 schedule 方法,将定时任务添加进队列中 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }
只要给定的参数大于0,就建立一个定时任务,每一个事件都建立。同时,将 state 状态设置为 1,防止重复初始化。调用 initOutputChanged 方法,初始化 “监控出站数据属性”,代码以下:
private void initOutputChanged(ChannelHandlerContext ctx) { if (observeOutput) { Channel channel = ctx.channel(); Unsafe unsafe = channel.unsafe(); ChannelOutboundBuffer buf = unsafe.outboundBuffer(); // 记录了出站缓冲区相关的数据,buf 对象的 hash 码,和 buf 的剩余缓冲字节数 if (buf != null) { lastMessageHashCode = System.identityHashCode(buf.current()); lastPendingWriteBytes = buf.totalPendingWriteBytes(); } } }
首先说说这个 observeOutput “监控出站数据属性” 的做用。由于 github 上有人提了 issue ,issue 地址,原本是没有这个参数的。为何须要呢?
假设:当你的客户端应用每次接收数据是30秒,而你的写空闲时间是 25 秒,那么,当你数据尚未写出的时候,写空闲时间触发了。其实是不合乎逻辑的。由于你的应用根本不空闲。
怎么解决呢?
Netty 的解决方案是:记录最后一次输出消息的相关信息,并使用一个值 firstXXXXIdleEvent 表示是否再次活动过,每次读写活动都会将对应的 first 值更新为 true,若是是 false,说明这段时间没有发生过读写事件。同时若是第一次记录出站的相关数据和第二次获得的出站相关数据不一样,则说明数据在缓慢的出站,就不用触发空闲事件。
总的来讲,这个字段就是用来对付 “客户端接收数据奇慢无比,慢到比空闲时间还多” 的极端状况。因此,Netty 默认是关闭这个字段的。
3. 该类内部的 3 个定时任务类
以下图:
这 3 个定时任务分别对应 读,写,读或者写 事件。共有一个父类。这个父类提供了一个模板方法:
当通道关闭了,就不执行任务了。反之,执行子类的 run 方法。
1. 读事件的 run 方法
代码以下:
protected void run(ChannelHandlerContext ctx) { long nextDelay = readerIdleTimeNanos; if (!reading) { nextDelay -= ticksInNanos() - lastReadTime; } if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. // 用于取消任务 promise readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false; try { // 再次提交任务 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); // 触发用户 handler use channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // Read occurred before the timeout - set a new timeout with shorter delay. readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } }
该方法很简单:
总的来讲,每次读取操做都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间的间隔,若是间隔超过了设置的时间,就触发 UserEventTriggered 方法。就是这么简单。
再看看写事件任务。
2. 写事件的 run 方法
写任务的逻辑基本和读任务的逻辑同样,惟一不一样的就是有一个针对 出站较慢数据的判断。
if (hasOutputChanged(ctx, first)) { return; }
若是这个方法返回 true,就不执行触发事件操做了,即便时间到了。看看该方法实现:
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) { if (observeOutput) { // 若是最后一次写的时间和上一次记录的时间不同,说明写操做进行过了,则更新此值 if (lastChangeCheckTimeStamp != lastWriteTime) { lastChangeCheckTimeStamp = lastWriteTime; // 但若是,在这个方法的调用间隙修改的,就仍然不触发事件 if (!first) { // #firstWriterIdleEvent or #firstAllIdleEvent return true; } } Channel channel = ctx.channel(); Unsafe unsafe = channel.unsafe(); ChannelOutboundBuffer buf = unsafe.outboundBuffer(); // 若是出站区有数据 if (buf != null) { // 拿到出站缓冲区的 对象 hashcode int messageHashCode = System.identityHashCode(buf.current()); // 拿到这个 缓冲区的 全部字节 long pendingWriteBytes = buf.totalPendingWriteBytes(); // 若是和以前的不相等,或者字节数不一样,说明,输出有变化,将 "最后一个缓冲区引用" 和 “剩余字节数” 刷新 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) { lastMessageHashCode = messageHashCode; lastPendingWriteBytes = pendingWriteBytes; // 若是写操做没有进行过,则任务写的慢,不触发空闲事件 if (!first) { return true; } } } } return false; }
写了一些注释,仍是再梳理一下吧:
整个逻辑以下:
流程图
这里有个问题,为何第一次的时候必定要触发事件呢?假设,客户端开始变得很慢,这个时候,定时任务监听发现时间到了,就进入这里判断,当上次记录的缓冲区相关数据已经不一样,这个时候难道触发事件吗?
实际上,这里是 Netty 的一个考虑:假设真的发生了很写出速度很慢的问题,极可能引起 OOM,相比叫链接空闲,这要严重多了。为何第一次必定要触发事件呢?若是不触发,用户根本不知道发送了什么,当一次写空闲事件触发,随后出现了 OOM,用户能够感知到:多是写的太慢,后面的数据根本写不进去,因此发生了OOM。因此,这里的一次警告仍是必要的。
固然,这是个人一个猜想。有必要的话,能够去 Netty 那里提个 issue。
好,关于客户端写的慢的特殊处理告一段落。再看看另外一个任务的逻辑。
3. 全部事件的 run 方法
这个类叫作 AllIdleTimeoutTask ,表示这个监控着全部的事件。当读写事件发生时,都会记录。代码逻辑和写事件的的基本一致,除了这里:
long nextDelay = allIdleTimeNanos; if (!reading) { // 当前时间减去 最后一次写或读 的时间 ,若大于0,说明超时了 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime); }
这里的时间计算是取读写事件中的最大值来的。而后像写事件同样,判断是否发生了写的慢的状况。最后调用 ctx.fireUserEventTriggered(evt) 方法。
一般这个使用的是最多的。构造方法通常是:
pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));
读写都是 0 表示禁用,30 表示 30 秒内没有任务读写事件发生,就触发事件。注意,当不是 0 的时候,这三个任务会重叠。
IdleStateHandler 能够实现心跳功能,当服务器和客户端没有任何读写交互时,并超过了给定的时间,则会触发用户 handler 的 userEventTriggered 方法。用户能够在这个方法中尝试向对方发送信息,若是发送失败,则关闭链接。
IdleStateHandler 的实现基于 EventLoop 的定时任务,每次读写都会记录一个值,在定时任务运行的时候,经过计算当前时间和设置时间和上次事件发生时间的结果,来判断是否空闲。
内部有 3 个定时任务,分别对应读事件,写事件,读写事件。一般用户监听读写事件就足够了。
同时,IdleStateHandler 内部也考虑了一些极端状况:客户端接收缓慢,一次接收数据的速度超过了设置的空闲时间
。Netty 经过构造方法中的 observeOutput 属性来决定是否对出站缓冲区的状况进行判断。
若是出站缓慢,Netty 不认为这是空闲,也就不触发空闲事件。但第一次不管如何也是要触发的。由于第一次没法判断是出站缓慢仍是空闲。固然,出站缓慢的话,OOM 比空闲的问题更大。
因此,当你的应用出现了内存溢出,OOM之类,而且写空闲极少发生(使用了 observeOutput 为 true),那么就须要注意是否是数据出站速度过慢。
默认 observeOutput 是 false,意思是,即便你的应用出站缓慢,Netty 认为是写空闲。
可见这个 observeOutput 的做用好像不是那么重要,若是真的发生了出站缓慢,判断是否空闲根本就不重要了,重要的是 OOM。因此 Netty 选择了默认 false。
还有一个注意的地方:刚开始咱们说的 ReadTimeoutHandler ,就是继承自 IdleStateHandler,当触发读空闲事件的时候,就触发 ctx.fireExceptionCaught 方法,并传入一个 ReadTimeoutException,而后关闭 Socket。
而 WriteTimeoutHandler 的实现不是基于 IdleStateHandler 的,他的原理是,当调用 write 方法的时候,会建立一个定时任务,任务内容是根据传入的 promise 的完成状况来判断是否超出了写的时间。当定时任务根据指定时间开始运行,发现 promise 的 isDone 方法返回 false,代表尚未写完,说明超时了,则抛出异常。当 write 方法完成后,会打判定时任务。
好了,关于 Netty 自带的心跳相关的类就介绍到这里。这些功能对于开发稳定的高性能 RPC 相当重要。