Netty网络编程实战 - 通讯框架,私有协议、生产级报文追踪、认证机制、自动空闲检测、断线自动重连

前言
bootstrap

前面咱们已经基于不一样维度介绍关于Netty的不少知识了,包括通讯原理、框架工做机制、核心组件、应用实战,以及不一样场景对不一样方案的选择等等。那么咱们此次就研究一下咱们项目中基于Netty端对端开发中如何搭建一个完整的应用框架,以供开发人员嵌入他们关注的各类应用部件等。缓存


实现Netty应用级框架须要考虑哪些因素服务器

image.png

不少人问,咱们在基于某种网络通讯框架构建咱们本身的应用框架的时候,究竟须要考虑到哪些方面?咱们如何构建一个与业务解耦的应用基础设施、定制协议格式、健壮性的机制等来支撑咱们的开发呢?你们也能够在下方的留言讨论,而就我的的理解和相关实践经验,我认为至少应考虑到如下的问题:网络

  1. 网络通讯协议的选择,方案的比较, 咱们应基于TCP?UDP?仍是应用层的一些成熟协议?...session

  2. 网络I/O模型该采用何种?BIO?NIO?IO复用?AIO?仍是信号驱动IO呢?架构

  3. 底层通讯框架咱们要定制,仍是沿用已有的成熟框架?框架

  4. 咱们场景是否须要统必定制全局可复用的交互协议、报文等?异步

  5. 是否应该建设一种高效可靠的消息编解码机制支撑快速通讯?ide

  6. 来自客户端的链接、业务请求等是否须要有认证和鉴权?工具

  7. 当业务通讯发生异常了,可否方便看到和追踪通讯报文细节(更便于咱们一步一步查找问题缘由)??

  8. 当服务端由于非预期缘由断开或崩溃,而后发现修复重启后是否每一个客户端都要手动再链接一下??

  9. 当服务器空闲一段时间后,是否该有一种机制自动触发心跳检测网络的健康情况??

   ... ...

  还有不少本文就不一一列举。那咱们今天就以上考虑到的问题点来手写实现一个基于Netty通讯框架的应用级框架,而后验证咱们的问题是否能得以圆满解决。


应用框架实战

  在如下应用框架中咱们将给出以上问题的解决方案

  1. 网络通讯协议的选择,方案的比较, 咱们应基于TCP?UDP?仍是应用层的一些成熟协议? 采用TCP/IP

  2. 网络I/O模型该采用何种?BIO?NIO?IO复用?AIO?仍是信号驱动IO呢?采用NIO非阻塞模型

  3. 底层通讯框架咱们要定制,仍是沿用已有的成熟框架? 基于Netty框架

  4. 咱们场景是否须要统必定制全局可复用的交互协议、报文等? 私有协议定义和交互约定方式

  5. 是否应该建设一种高效可靠的消息编解码机制支撑快速通讯?采用ByteToMessage/MessageToByte/Kryo

  6. 来自客户端的链接、业务请求等服务端是否须要有认证和鉴权?采用服务端白名单

  7. 当业务通讯发生异常了,可否方便看到和追踪通讯报文细节(更便于咱们一步一步查找问题缘由)??采用Netty内置Logging机制

  8. 当服务端由于非预期缘由断开或崩溃,而后发现修复重启后是否每一个客户端都要手动再链接一下?? 设计断线自动尝试重连

  9. 当服务器空闲一段时间后,是否该有一种机制自动触发心跳检测网络的健康情况??采用IdleState和自动触发机制


1、基础设施部分

/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:Kryo序列化器单例
*/
public class KryoBuilder {
   private KryoBuilder(){}

   /**
    * 获取单例
    *
@return
   
*/
   
public static Kryo getInstance(){
       return SingleKryo.builder;
   
}
   private static class SingleKryo{
       private static Kryo builder = new Kryo();
   
}

   /**
    * 构建kryo对象和注册
    *
@return
   
*/
   
public static Kryo build(){
       Kryo kryo = getInstance();
       
kryo.setRegistrationRequired(false);
       
kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
       
kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());
       
kryo.register(InvocationHandler.class, new JdkProxySerializer());
       
kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());
       
kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());
       
kryo.register(Pattern.class, new RegexSerializer());
       
kryo.register(BitSet.class, new BitSetSerializer());
       
kryo.register(URI.class, new URISerializer());
       
kryo.register(UUID.class, new UUIDSerializer());
       
UnmodifiableCollectionsSerializer.registerSerializers(kryo);
       
SynchronizedCollectionsSerializer.registerSerializers(kryo);

       
kryo.register(HashMap.class);
       
kryo.register(ArrayList.class);
       
kryo.register(LinkedList.class);
       
kryo.register(HashSet.class);
       
kryo.register(TreeSet.class);
       
kryo.register(Hashtable.class);
       
kryo.register(Date.class);
       
kryo.register(Calendar.class);
       
kryo.register(ConcurrentHashMap.class);
       
kryo.register(SimpleDateFormat.class);
       
kryo.register(GregorianCalendar.class);
       
kryo.register(Vector.class);
       
kryo.register(BitSet.class);
       
kryo.register(StringBuffer.class);
       
kryo.register(StringBuilder.class);
       
kryo.register(Object.class);
       
kryo.register(Object[].class);
       
kryo.register(String[].class);
       
kryo.register(byte[].class);
       
kryo.register(char[].class);
       
kryo.register(int[].class);
       
kryo.register(float[].class);
       
kryo.register(double[].class);

       return
kryo;
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:Kryo序列化器
*/
public class KryoSerializer {
   private static final Kryo kryo = KryoBuilder.build();

   
/**
    * 序列化
    *
    *
@param object
   
* @param buf
   
*/
   
public static void serialize(Object object, ByteBuf buf) {
       ByteArrayOutputStream stream = new ByteArrayOutputStream();
       try
{
           Output out = new Output(stream);
           
kryo.writeClassAndObject(out, object);
           
out.flush();
           
out.close();

           byte
[] bytes = stream.toByteArray();
           
stream.flush();
           
stream.close();
           
/**
            * 写入buffer
            */
           
buf.writeBytes(bytes);
       
} catch (Exception e) {
           e.printStackTrace();
       
}
   }

   /**
    * 反序列化
    *
@param buf 数据缓冲
    *
@return
   
*/
   
public static Object deserialize(ByteBuf buf) {
       try(ByteBufInputStream stream = new ByteBufInputStream(buf)) {
           Input input = new Input(stream);
           return
kryo.readClassAndObject(input);
       
} catch (Exception e) {
           e.printStackTrace();
       
}
       return null;
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:Kryo编码器
*/
public class KryoEncoder extends MessageToByteEncoder<Message> {
   /**
    * 编码实现
    *
@param channelHandlerContext 处理器上下文
    *
@param message 报文
    *
@param byteBuf 对端数据缓冲
    *
@throws Exception
    */
   
@Override
   
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
       KryoSerializer.serialize(message, byteBuf);
       
channelHandlerContext.flush();
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:Kryo解码器
*/
public class KryoDecoder extends ByteToMessageDecoder {
   /**
    * 解码实现
    *
@param channelHandlerContext 处理器上下文
    *
@param byteBuf 对端缓冲
    *
@param list 反序列化列表
    *
@throws Exception
    */
   
@Override
   
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
       Object object = KryoSerializer.deserialize(byteBuf);
       
list.add(object);
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:系统快捷工具类
*/
public final class Utility {
   /**
    * 构建报文
    *
@param sessionId 会话Id
    *
@param msg 主体
    *
@return
   
*/
   
public static Message buildMessage(int sessionId, Object msg, byte type){
       //获取校验码
       
final int OFFSET = 9;
       int
seed = sessionId+(sessionId > OFFSET ? sessionId : sessionId+OFFSET);
       
String crc = CRC.getCRC16(seed);
       
MessageHeader header = new MessageHeader();
       
header.setCrc(crc);
       
header.setLength(calcBufferLen(msg));
       
header.setSessionId(sessionId);
       
header.setType(type);

       
Message message = new Message();
       
message.setHeader(header);
       
message.setBody(msg);

       return
message;
   
}

   /**
    * 是否IP认证经过
    *
@return
   
*/
   
public static boolean isIPPassed(String ip){
       for (String p : Constant.WHITELIST){
           if(ip.equals(p)){
               return true;
           
}
       }
       return false;
   
}

   /**
    * 计算报文长度
    *
@param msg 报文对象
    *
@return int
    */
   
private static int calcBufferLen(Object msg){
       try(ByteArrayOutputStream stream = new ByteArrayOutputStream();
           
ObjectOutputStream output = new ObjectOutputStream(stream)){
           output.writeObject(msg);
           return
stream.toByteArray().length;
       
}catch (IOException e){
           e.printStackTrace();
       
}
       return 0;
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:私有报文
*/
public final class Message {//<T extends Object>
   
/**
    * 报文头
    */
   
private MessageHeader header;
   
/**
    * 报文主体
    */
   
private Object body;
   public
MessageHeader getHeader() {
       return header;
   
}

   public void setHeader(MessageHeader header) {
       this.header = header;
   
}

   public Object getBody() {
       return body;
   
}

   public void setBody(Object body) {
       this.body = body;
   
}

   @Override
   
public String toString() {
       return "Message [header=" + this.header + "][body="+this.body+"]";
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:报文头
*/
public final class MessageHeader {
   /**
    * CRC校验码
    */
   
private String crc;
   
/**
    * 会话id
    */
   
private int sessionId;
   
/**
    * 报文长度
    */
   
private int length;
   
/**
    * 报文类型码
    */
   
private byte type;
   
/**
    * 报文优先级
    */
   
private int priority;
   
/**
    * 报文附件
    */
   
private Map<String,Object> attachment = new HashMap<>();

   public
String getCrc() {
       return crc;
   
}

   public void setCrc(String crc) {this.crc = crc;}

   public int getSessionId() {
       return sessionId;
   
}

   public void setSessionId(int sessionId) {
       this.sessionId = sessionId;
   
}

   public int getLength() {
       return length;
   
}

   public void setLength(int length) {
       this.length = length;
   
}

   public byte getType() {
       return type;
   
}

   public void setType(byte type) {
       this.type = type;
   
}

   public int getPriority() {
       return priority;
   
}

   public void setPriority(int priority) {
       this.priority = priority;
   
}

   public Map<String, Object> getAttachment() {
       return attachment;
   
}

   public void setAttachment(Map<String, Object> attachment) {
       this.attachment = attachment;
   
}
   @Override
   
public String toString() {
       return "MessageHeader [crc=" + this.crc + ", length=" + this.length
               
+ ", sessionId=" + this.sessionId + ", type=" + this.type + ", priority="
               
+ this.priority + ", attachment=" + this.attachment + "]";
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:数据报文类型
*/
public enum MessageType {
   /**
    * 认证请求
    */
   
AUTH_REQUEST((byte)0),
   
/**
    * 认证应答
    */
   
AUTH_REPLY((byte)1),
   
/**
    * 心跳请求
    */
   
HEARTBEAT_REQUEST((byte)2),
   
/**
    * 心跳应答
    */
   
HEARTBEAT_REPLY((byte)3),
   
/**
    * 普通请求
    */
   
REQUEST((byte)4),
   
/**
    * 普通应答
    */
   
REPLY((byte)5);

   public byte
getValue() {
       return value;
   
}

   private final byte value;
   
MessageType(byte b) {
       this.value = b;
   
};
}

2、客户端部分

/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:框架客户端启动器类
*/
public class ClientStarter {
   /**
    * 日志处理
    */
   
private static final Log log = LogFactory.getLog(ClientStarter.class);

   
/**
    * 客户端启动
    *
@param args
   
*/
   
public static void main(String[] args) throws Exception {
       ClientTask client = new ClientTask(Constant.SERV_HOST, Constant.SERV_PORT);
       new
Thread(client).start();
       while
(!client.isConnected()){
           synchronized (client){
               client.wait();
           
}
       }
       log.info("与服务器链接已创建,准备通讯...");
       
/**
        * 采用在控制台适时输入消息主体的方式,发送报文
        */
       
Scanner scanner = new Scanner(System.in);
       for
(;;){
           String body = scanner.next();
           if
(null != body && !"".equals(body)){
               if(!body.equalsIgnoreCase("exit")){
                   client.send(body);
               
}else{
                   client.close();
                   
/**
                    * 等待链接正常关闭通知
                    */
                   
while (client.isConnected()){
                       synchronized(client){
                           client.wait();
                       
}
                   }
                   scanner.close();
                   
System.exit(0);//提示正常退出
               
}
           }
       }
   }
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:客户端封装
*/
public class ClientTask implements Runnable {
   private final String host;
   private final int
port;
   public
ClientTask(String host, int port) {
       this.host = host;
       this
.port = port;
   
}
   /**
    * 日志处理
    */
   
private static final Log log = LogFactory.getLog(ClientTask.class);
   
/**
    * 报文计数器
    */
   
public final static AtomicInteger counter = new AtomicInteger(0);
   
/**
    * 这里用1个后台线程,定时执行检测客户端链接是否断开,若非用户断开则自动尝试重连
    */
   
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
   
/**
    * 客户端链接通道
    */
   
private Channel channel;
   
/**
    * 工做线程池组
    */
   
private EventLoopGroup group = new NioEventLoopGroup();
   
/**
    * 是否意外关闭:出异常或网络断开(区别于人为主动关闭)
    */
   
private volatile boolean except_closed = true;
   
/**
    * 是否链接成功
    */
   
private volatile boolean connected = false;
   private static
Object _obj = new Object();

   
/**
    * 是否链接
    *
@return
   
*/
   
public boolean isConnected(){
       return this.connected;
   
}

   /**
    * 执行客户端
    */
   
@Override
   
public void run() {
       try {
           this.connect();
       
} catch (Exception e) {
           e.printStackTrace();
       
}
   }
   /**
    * 链接服务器端
    */
   
public void connect() throws Exception{
       try {
           Bootstrap bootstrap = new Bootstrap();
           
bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    //设置TCP底层保温发送不延迟
                   
.option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializerExt());

           
//(1)异步链接服务器端,等待发送和接收报文
           
ChannelFuture future = bootstrap.connect(new InetSocketAddress(this.host, this.port)).sync();
           this
.channel = future.sync().channel();
           
//(2)通知其余等待线程,链接已创建
           
synchronized(this){
               this.connected = true;
               this
.notifyAll();
           
}
           this.channel.closeFuture().sync();
       
} finally {
           //检测并执行重链接
           
this.reconnect();
       
}
   }

   /**
    * 关闭链接:非正常关闭
    */
   
public void close(){
       this.except_closed = false;
       this
.channel.close();
   
}

   /**
    * 发送报文
    *
@param body 报文主体
    */
   
public void send(String body){
       if(null != this.channel && this.channel.isActive()){
           Message message = Utility.buildMessage(counter.incrementAndGet(), body, MessageType.REQUEST.getValue());
           this
.channel.writeAndFlush(message);
           return;
       
}
       log.info("通讯还没有创建,请稍后再试...");
   
}
   /**
    * 执行重链接
    *
@throws Exception
    */
   
private void reconnect() throws Exception{
       //主动关闭被检测到
       
if(this.except_closed){
           log.info("链接非正常关闭,准备尝试重连...");
           this
.executor.execute(new ReconnectTask(this.host, this.port));
       
}else{
           //主动关闭链接:释放资源
           
this.relese();
       
}
   }

   /**
    * 关闭链接释放资源,通知其它等待的线程
    *
@throws Exception
    */
   
private void relese() throws Exception{
       this.channel = null;
       this
.group.shutdownGracefully().sync();
       synchronized
(this){
           this.connected = false;
           this
.notifyAll();
       
}
   }

   /**
    * 尝试重连服务器任务
    */
   
private class ReconnectTask implements Runnable{
       private final String h;
       private final int
p;
       public
ReconnectTask(String h, int p) {
           this.h = h;
           this
.p = p;
       
}
       /**
        * 尝试重连
        */
       
@Override
       
public void run() {
           try {
               //间隔1秒重试一次
               
Thread.sleep(1000);
               
connect();
           
} catch (Exception e) {
               e.printStackTrace();
           
}
       }
   }
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:客户端通道初始化器扩展
*/
public class ChannelInitializerExt extends ChannelInitializer<SocketChannel> {
   /**
    * 初始化通道
    *
@param channel 通道
    *
@throws Exception 异常
    */
   
@Override
   
protected void initChannel(SocketChannel channel) throws Exception {
       ChannelPipeline pipeline = channel.pipeline();
       
//(1)报文粘包处理
       
pipeline.addLast(new LengthFieldBasedFrameDecoder(Constant.MAX_MSG_LEN,0,2,0,2));
       
//(2)给报文增长分割长度
       
pipeline.addLast(new LengthFieldPrepender(2));
       
//(3)报文解码器
       
pipeline.addLast(new KryoDecoder());
       
//(4)报文编码器
       
pipeline.addLast(new KryoEncoder());
       
//(5)链接超时检测
       
pipeline.addLast(new ReadTimeoutHandler(Constant.TIMEOUT_SECONDS));
       
//(6)认证请求
       
pipeline.addLast(new AuthenticationHandler());
       
//(7)心跳处理:发送心跳
       
pipeline.addLast(new HeartbeatHandler());
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:客户端认证请求
*/
public class AuthenticationHandler extends ChannelInboundHandlerAdapter {
   /**
    * 日志处理
    */
   
private static final Log log = LogFactory.getLog(AuthenticationHandler.class);
   
/**
    * 全局计数器
    */
   
private final static AtomicInteger counter = new AtomicInteger(0);
   
/**
    * 通道开启事件
    *
@param ctx 处理器上下文
    *
@throws Exception
    */
   
@Override
   
public void channelActive(ChannelHandlerContext ctx) throws Exception {
       /**
        * 发起认证请求
        */
       
Message message = Utility.buildMessage(counter.incrementAndGet(), "Auth Request",
               
MessageType.AUTH_REQUEST.getValue());
       
ctx.writeAndFlush(message);
   
}

   /**
    * 处理网络读取事件
    *
@param ctx 处理器上下文
    *
@param msg 报文
    *
@throws Exception
    */
   
@Override
   
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       Message message = (Message)msg;
       if
(null != message){
           MessageHeader header = message.getHeader();
           
//处理认证应答
           
if(null != header && header.getType() == MessageType.AUTH_REPLY.getValue()){
               String body = message.getBody().toString();
               if
(body.equals(AuthenticationResult.FAILED)){
                   log.info("Authentication failed, channel close..");
                   
ctx.close();
                   return;
               
}
               log.info("Authentication is ok: "+message);
           
}
       }
       ctx.fireChannelRead(msg);
   
}

   /**
    * 客户端认证异常处理
    *
@param ctx
   
* @param cause
   
* @throws Exception
    */
   
@Override
   
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       
ctx.fireExceptionCaught(cause);
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:客户端心跳处理器
*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
   /**
    * 日志处理
    */
   
private static final Log log = LogFactory.getLog(AuthenticationHandler.class);
   
/**
    * 心跳定时任务
    */
   
private volatile ScheduledFuture<?> scheduleHeartbeat;
   
/**
    * 处理客户端心跳请求报文
    *
@param ctx 处理器上下文
    *
@param msg 消息对象
    *
@throws Exception
    */
   
@Override
   
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       Message message = (Message)msg;
       if
(null != message){
           MessageHeader header = message.getHeader();
           
//处理认证应答
           
if(header.getType() == MessageType.AUTH_REPLY.getValue()){
               //登陆完成后,开启客户端对服务端心跳
               
this.scheduleHeartbeat = ctx.executor().scheduleAtFixedRate(new HeartbeatTask(ctx),
                       
0, Constant.HEARTBEAT_TIMEOUT,
                       
TimeUnit.MILLISECONDS);
               return;
           
}
           //处理心跳应答
           
if(header.getType() == MessageType.HEARTBEAT_REPLY.getValue()){
               log.info("Client recevied server heartbeat: "+message);
               
ReferenceCountUtil.release(msg);
               return;
           
}
       }
       ctx.fireChannelRead(msg);
   
}

   /**
    * 客户端捕获心跳异常
    *
@param ctx 处理器上下文
    *
@param cause 异常
    *
@throws Exception
    */
   
@Override
   
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       if
(null != this.scheduleHeartbeat){
           this.scheduleHeartbeat.cancel(true);
           this
.scheduleHeartbeat = null;
       
}
       //传递给TailHandler处理
       
ctx.fireExceptionCaught(cause);
   
}

   /**
    * 定义心跳任务
    */
   
private class HeartbeatTask implements Runnable{
       /**
        * 心跳计数器
        */
       
private final AtomicInteger counter = new AtomicInteger(0);
       private final
ChannelHandlerContext ctx;
       public
HeartbeatTask(ChannelHandlerContext ctx) {
           this.ctx = ctx;
       
}
       /**
        * 心跳任务执行
        */
       
@Override
       
public void run() {
           //客户端心跳报文
           
Message heartbeat = Utility.buildMessage(this.counter.incrementAndGet(), Constant.HEARTBEAT_ACK,
                   
MessageType.HEARTBEAT_REQUEST.getValue());
           this
.ctx.writeAndFlush(heartbeat);
       
}
   }
}

3、服务器部分

/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:框架服务端启动器类
*/
public class ServerStarter {
   /**
    * 日志处理
    */
   
private static final Log log = LogFactory.getLog(ServerStarter.class);
   
/**
    * 服务端启动
    *
@param args
   
*/
   
public static void main(String[] args) throws Exception {
       //主线程池组:负责处理链接
       
EventLoopGroup main = new NioEventLoopGroup();
       
//工做线程池组:负责请求对应的业务Handler处理
       
EventLoopGroup work = new NioEventLoopGroup();

       
ServerBootstrap bootstrap = new ServerBootstrap();
       
bootstrap.group(main, work)
                .channel(NioServerSocketChannel.class)
                //设置底层协议接收缓存队列最大长度
               
.option(ChannelOption.SO_BACKLOG, Constant.TCP_MAX_QUEUE_SIZE)
                .childHandler(new ChannelInitializerExt());
       
//绑定端口,等待同步报文
       
bootstrap.bind(Constant.SERV_PORT).sync();
       
log.info("Server started and listen port: "+Constant.SERV_PORT+"...");
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:服务器通道初始化器
*/
public class ChannelInitializerExt extends ChannelInitializer<SocketChannel> {
   /**
    * 初始化处理器
    *
@param channel 链接通道
    *
@throws Exception 异常
    */
   
@Override
   
protected void initChannel(SocketChannel channel) throws Exception {
       ChannelPipeline pipeline = channel.pipeline();
       
//(1)日志打印处理:能够打印报文字节码
       
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
       
//(2)处理粘包问题:带长度
       
pipeline.addLast(new LengthFieldBasedFrameDecoder(Constant.MAX_MSG_LEN,0,2,0,2));
       
//(3)报文编码器:消息发送增长分隔符
       
pipeline.addLast(new LengthFieldPrepender(2));
       
//(4)私有报文解码
       
pipeline.addLast(new KryoDecoder());
       
//(5)私有报文编码
       
pipeline.addLast(new KryoEncoder());
       
//(6)通道链接超时检测,发送心跳
       
pipeline.addLast(new ReadTimeoutHandler(Constant.TIMEOUT_SECONDS));
       
//(7)身份认证应答
       
pipeline.addLast(new AuthenticationHandler());
       
//(8)心跳应答
       
pipeline.addLast(new HeartbeatHandler());
       
//(9)其余业务处理...
       
pipeline.addLast(new OtherServiceHandler());
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:服务端身份认证处理器
*/
public class AuthenticationHandler extends ChannelInboundHandlerAdapter {
   /**
    * 日志处理
    */
   
private static final Log log = LogFactory.getLog(AuthenticationHandler.class);
   
/**
    * 定义认证业务计数器
    */
   
private static final AtomicInteger counter = new AtomicInteger(0);
   
/**
    * 缓存已认证ip列表
    */
   
private static final List<String>  authedIPList = new LinkedList<>();
   
/**
    * 认证业务处理
    *
@param ctx
   
* @param msg
   
* @throws Exception
    */
   
@Override
   
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       Message message = (Message)msg;
       
Message authMessage;
       if
(null != message){
           MessageHeader header = message.getHeader();
           
//处理认证请求
           
if(null != header && header.getType() == MessageType.AUTH_REQUEST.getValue()){
               String ip = ctx.channel().remoteAddress().toString();
               
String result;
               
//重复登陆
               
if(authedIPList.contains(ip)){
                   result = AuthenticationResult.REPEAT_AUTH.toString();
               
}else{
                   //是否ip认证经过
                   
if(Utility.isIPPassed(ip)){
                       authedIPList.add(ip);
                       
result = AuthenticationResult.SUCCESS.toString();
                   
}else{
                       result = AuthenticationResult.FAILED.toString();
                   
}
               }
               authMessage = Utility.buildMessage(counter.incrementAndGet(), result, MessageType.AUTH_REPLY.getValue());
               
ctx.writeAndFlush(authMessage);
               
//释放对象,再也不向后传递
               
ReferenceCountUtil.release(msg);
               
log.info("Server reply client auth request:"+authMessage);
               return;
           
}
       }
       ctx.fireChannelRead(msg);
   
}

   /**
    * 认证处理器捕获异常处理
    *
@param ctx 处理器上下文
    *
@param cause 异常
    *
@throws Exception
    */
   
@Override
   
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       
authedIPList.remove(ctx.channel().remoteAddress().toString());
       
ctx.close();
       
ctx.fireExceptionCaught(cause);
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:服务器端心跳包处理器
*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
   /**
    * 日志处理
    */
   
private static final Log log = LogFactory.getLog(HeartbeatHandler.class);
   
/**
    * 会话计数器
    */
   
private final AtomicInteger counter = new AtomicInteger(0);
   
/**
    * 处理心跳报文
    *
@param ctx 处理器上下文
    *
@param msg 消息报文
    *
@throws Exception
    */
   
@Override
   
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       Message message = (Message)msg;
       if
(null != message){
           MessageHeader header = message.getHeader();
           
/**
            * 处理心跳请求
            */
           
if(null != header && header.getType() == MessageType.HEARTBEAT_REQUEST.getValue()){
               log.info("Server recevied client heartbeat: "+message);
               
//应答报文
               
Message heartbeat = Utility.buildMessage(counter.incrementAndGet(), Constant.HEARTBEAT_ACK,
                       
MessageType.HEARTBEAT_REPLY.getValue());
               
ctx.writeAndFlush(heartbeat);
               
//引用计数器释放对象
               
ReferenceCountUtil.release(msg);
               return;
           
}
       }
       ctx.fireChannelRead(msg);
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:其它业务处理
*/
public class OtherServiceHandler extends SimpleChannelInboundHandler<Message> {
   /**
    * 日志处理
    */
   
private static final Log log = LogFactory.getLog(OtherServiceHandler.class);
   
/**
    * 读取对端发送的报文
    *
@param ctx 处理器上下文
    *
@param message 报文
    *
@throws Exception
    */
   
@Override
   
protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {
       log.info(message);
   
}
   /**
    * 链接断开事件
    *
@param ctx 上下文
    *
@throws Exception
    */
   
@Override
   
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       log.info("["+ctx.channel().remoteAddress()+"]断开了链接...");
   
}
}


运行检验

image.png

image.png

image.png

image.png

image.png

开发小结

以上就是咱们基于Netty实现的一整套通讯应用框架的和核心代码。全部的业务开发均可定制和构建相似此的基础应用框架,开发Handler处理器的业务人员可任意嵌入被解耦化的业务领域Handler。可采用Handler自动注入Netty管道的方式零侵入框架,支持更多更复杂的业务!但愿本文能给你们提供靠谱和一站式的借鉴。你们有任何关于Netty的问题能够在下方留言,谢谢关注!

相关文章
相关标签/搜索