《精通并发与Netty》学习笔记(14 - 解决TCP粘包拆包(二)Netty自定义协议解决粘包拆包)

1、Netty粘包和拆包解决方案

Netty提供了多个解码器,能够进行分包的操做,分别是: 
* LineBasedFrameDecoder (换行)
   LineBasedFrameDecoder是回车换行解码器,若是用户发送的消息以回车换行符做为消息结束的标识,则能够直接使用Netty的LineBasedFrameDecoder对消息进行解码,只须要在初始化Netty服务端或者客户端时将LineBasedFrameDecoder正确的添加到ChannelPipeline中便可,不须要本身从新实现一套换行解码器。
   LineBasedFrameDecoder的工做原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n”,若是有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。若是连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉以前读到的异常码流。防止因为数据报没有携带换行符致使接收到ByteBuf无限制积压,引发系统内存溢出。java

* DelimiterBasedFrameDecoder(添加特殊分隔符报文来分包) 
   DelimiterBasedFrameDecoder是分隔符解码器,用户能够指定消息结束的分隔符,它能够自动完成以分隔符做为码流结束标识的消息的解码。
   回车换行解码器其实是一种特殊的DelimiterBasedFrameDecoder解码器。spring

* FixedLengthFrameDecoder(使用定长的报文来分包) 
    FixedLengthFrameDecoder是固定长度解码器,它可以按照指定的长度对消息进行自动解码,开发者不须要考虑TCP的粘包/拆包等问题,很是实用。
    对于定长消息,若是消息实际长度小于定长,则每每会进行补位操做,它在必定程度上致使了空间和资源的浪费。可是它的优势也是很是明显的,编解码比较简单,所以在实际项目中仍然有必定的应用场景。bootstrap

* LengthFieldBasedFrameDecoder (自定义解码器跟编码器)服务器

   本文介绍的重点LengthFieldBasedFrameDecoder,通常包含了消息头(head)、消息体(body):消息头是固定的长度,通常有有如下信息 -> 是否压缩(zip)、消息类型(type or cmdid)、消息体长度(body length);消息体长度不是固定的,其大小由消息头记载,通常记载业务交互信息。dom

  netty对应来讲就是编码器(Encoder)跟解码器(Decoder),通常其中会有一个基本消息类对外输出socket

2、实例演示ide

首先编写自定义协议类:oop

 

package com.spring.netty.handler2;

/**
 * 自定义Person协议
 */
public class PersonProtocol {
    private int length;
    private byte[] content;

    public int getLength() {
        return length;
    }
    public void setLength(int length) {
        this.length = length;
    }
    public byte[] getContent() {
        return content;
    }
    public void setContent(byte[] content) {
        this.content = content;
    }
}

 

新建服务端代码:ui

 

package com.spring.netty.handler2;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).
                    childHandler(new MyServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

 

 

package com.spring.netty.handler2;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyPersonDecoder());
        pipeline.addLast(new MyPersonEncoder());

        pipeline.addLast(new MyServerHandler());
    }
}

 

package com.spring.netty.handler2;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

/**
 * Person解码器
 */
public class MyPersonDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyPersonDecoder decode invoked!");

        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);

        PersonProtocol personProtocol = new PersonProtocol();
        personProtocol.setLength(length);
        personProtocol.setContent(content);
        out.add(personProtocol);
    }
}

 

package com.spring.netty.handler2;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 编码器
 */
public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyPersonEncoder encode invoked!");

        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent());
    }
}

 

package com.spring.netty.handler2;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;
import java.util.UUID;

public class MyServerHandler extends SimpleChannelInboundHandler<PersonProtocol> {
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, PersonProtocol msg) throws Exception {
        int length = msg.getLength();
        byte[] content = msg.getContent();
        System.out.println("服务端接收到的数据:");
        System.out.println("长度:"+length);
        System.out.println("内容:"+new String(content, Charset.forName("utf-8")));
        System.out.println("服务端接收到的消息数量:"+(++this.count));
        //服务端向客户端返回uuid
        String responseMessage = UUID.randomUUID().toString();
        int responseLength = responseMessage.getBytes("utf-8").length;
        byte[] responseContent = responseMessage.getBytes("utf-8");

        PersonProtocol personProtocol = new PersonProtocol();
        personProtocol.setLength(length);
        personProtocol.setContent(content);
        ctx.writeAndFlush(responseContent);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

编写客户端程序:this

package com.spring.netty.handler2;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new MyPersonDecoder());
                            pipeline.addLast(new MyPersonEncoder());

                            pipeline.addLast(new MyClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

 

package com.spring.netty.handler2;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;

public class MyClientHandler extends SimpleChannelInboundHandler<PersonProtocol> {
    private int count;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i=0;i<10;++i){
            String messageToBeSent = "sent from client";
            byte[] content = messageToBeSent.getBytes(Charset.forName("utf-8"));
            int length = messageToBeSent.getBytes(Charset.forName("utf-8")).length;

            PersonProtocol personProtocol = new PersonProtocol();
            personProtocol.setLength(length);
            personProtocol.setContent(content);

            ctx.writeAndFlush(personProtocol);
        }
    }

    //从服务器端接收数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, PersonProtocol msg) throws Exception {
        int length = msg.getLength();
        byte[] content = msg.getContent();
        System.out.println("客户端接收到的数据:");
        System.out.println("长度:"+length);
        System.out.println("内容:"+new String(content, Charset.forName("utf-8")));
        System.out.println("客户端接收到的消息数量:"+(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

分别启动服务端和客户端查看效果:

服务端效果如图:

客户端效果如图:

相关文章
相关标签/搜索