在通讯中设计的心跳消息,一般是为了检查网络链路是否正常。虽然TCP协议提供keep-alive机制,但须要在链路空闲2小时后才触发检测,这显然对业务很是不友好。当存在大量链接异常,而服务端却须要等2个小时后才感知到的时候,有限的系统资源会被逐渐耗尽,最终没法为新链接请求继续提供服务。java
要解决此类问题,业界的广泛作法是在应用层加入心跳机制。心跳消息能够是单向心跳也能够是双向心跳,所谓单向心跳表示由服务端或者客户端的其中一方主动发送心跳请求消息,而另外一方返回响应消息(以下图)。双向心跳表示服务端与客户端相互发送心跳请求和响应。由于不管何种类型,实现方案都是同样的,本文以单向心跳为例给你们作讲解。git
心跳消息一般是周期性的发送,或者是在链路空闲必定时长后触发。若是经历几个周期后都未收到响应,则能够视为链路异常。此时能够继续尝试发送心跳,也能够执行告警并断开链接。网络
在 smart-socket 中咱们提供了现成的心跳插件 HeartPlugin,能够很方便的实现心跳。本文是假定读者朋友对 smart-socket 已有了初步的了解,因此不会涉及 smart-socket 的基础使用,重点描述如何在服务中集成心跳插件。session
在HeartPlugin中有三种心跳策略可供选择,经过选择不一样的构造方案肯定。socket
心跳策略肯定好后,下一步就是如何去发送心跳消息,以及如何识别收到的消息是否为响应消息。在 HeartPlugin 中已经定义了这两个接口,须要开发人员去实现处理逻辑:ide
public void sendHeartRequest(AioSession session) throws IOException{ WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_req".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); }
public boolean isHeartMessage(AioSession session, String msg) { //心跳请求消息,返回响应 if("heart_req".equals(msg)){ try { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_rsp".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); }catch (Exception e){ } return true; } //是否为心跳响应消息 return "heart_rsp".equals(msg); }
public class HeartServer { private static final Logger LOGGER = LoggerFactory.getLogger(HeartServer.class); public static void main(String[] args) throws IOException { //定义消息处理器 AbstractMessageProcessor<String> processor = new AbstractMessageProcessor<String>() { @Override public void process0(AioSession<String> session, String msg) { LOGGER.info("收到客户端:{}消息:{}", session.getSessionID(), msg); } @Override public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { switch (stateMachineEnum) { case SESSION_CLOSED: LOGGER.info("客户端:{} 断开链接", session.getSessionID()); break; } } }; //注册心跳插件:每隔1秒发送一次心跳请求,5秒内未收到消息超时关闭链接 processor.addPlugin(new HeartPlugin<String>(1, 5, TimeUnit.SECONDS) { @Override public void sendHeartRequest(AioSession session) throws IOException { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_req".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); } @Override public boolean isHeartMessage(AioSession session, String msg) { //心跳请求消息,返回响应 if ("heart_req".equals(msg)) { try { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_rsp".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); } catch (Exception e) { } return true; } //是否为心跳响应消息 if ("heart_rsp".equals(msg)) { LOGGER.info("收到来自客户端:{} 的心跳响应消息", session.getSessionID()); return true; } return false; } }); //启动服务 AioQuickServer<String> server = new AioQuickServer<>(8888, new StringProtocol(), processor); server.start(); } }
public class HeartClient { private static final Logger LOGGER = LoggerFactory.getLogger(HeartClient.class); public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { AbstractMessageProcessor<String> client_1_processor = new AbstractMessageProcessor<String>() { @Override public void process0(AioSession<String> session, String msg) { LOGGER.info("client_1 收到服务端消息:" + msg); } @Override public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { LOGGER.info("stateMachineEnum:{}", stateMachineEnum); } }; AioQuickClient<String> client_1 = new AioQuickClient<>("localhost", 8888, new StringProtocol(), client_1_processor); client_1.start(); AbstractMessageProcessor<String> client_2_processor = new AbstractMessageProcessor<String>() { @Override public void process0(AioSession<String> session, String msg) { LOGGER.info("client_2 收到服务端消息:" + msg); try { if ("heart_req".equals(msg)) { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_rsp".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); LOGGER.info("client_2 发送心跳响应消息"); } } catch (Exception e) { e.printStackTrace(); } } @Override public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { LOGGER.info("stateMachineEnum:{}", stateMachineEnum); } }; AioQuickClient<String> client_2 = new AioQuickClient<>("localhost", 8888, new StringProtocol(), client_2_processor); client_2.start(); } }
服务端
ui
客户端
编码
本文围绕着心跳原理做了简单的实践分享。现实场景中若是对接的设备数量高达几万,甚至十几万,本文的心跳方案是否依旧适用,欢迎一块儿交流讨论。插件
本文涉及到的示例代码可从smart-socket仓库中下载设计