Netty解决TCP的粘包和分包(二)java
使用LengthFieldBasedFrameDecoder解码器分包git
先看一下这个类的的属性,github
private final ByteOrder byteOrder; // private final int maxFrameLength; //定义最大帧的长度 private final int lengthFieldOffset; //长度属性的起始指针(偏移量) private final int lengthFieldLength; //长度属性的长度,即存放数据包长度的变量的的字节所占的长度 private final int lengthFieldEndOffset; //根据lengthFieldOffset和lengthFieldLength计算出来的,即就是起始偏移量+长度=结束偏移量 private final int lengthAdjustment; private final int initialBytesToStrip; //解码后的数据包须要跳过的头部信息的字节数 private final boolean failFast;//这个和DelimiterBasedFrameDecoder是一致的,就是若是设置成true,当发现解析的数据超过maxFrameLenght就立马报错,不然当整个帧的数据解析完后才报错 private boolean discardingTooLongFrame;//当前编码器的状态,是否是处于丢弃超长帧的状态 private long tooLongFrameLength;//当出现超长帧的时候,这个超长帧的长度 private long bytesToDiscard;//当出现超长帧的时候,丢弃的数据的字节数
实现细节http://asialee.iteye.com/blog/1784844 服务器
http://bylijinnan.iteye.com/blog/1985706session
这里是根据netty里一个聊天程序改的,以下代码,异步
服务器端代码:socket
public final class SecureChatServer { static final int PORT = Integer .parseInt(System.getProperty("port", "8992")); public static void main(String[] args) throws Exception { SelfSignedCertificate ssc = new SelfSignedCertificate(); SslContext sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new SecureChatServerInitializer(sslCtx)); // sync 等待异步操做的完成(done) // closeFuture().sync()等待socket关闭操做的完成(done) b.bind(PORT).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class SecureChatServerInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public SecureChatServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(sslCtx.newHandler(ch.alloc())); // On top of the SSL handler, add the text line codec. // 4字节表示消息体的长度,服务器端解码器 pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); // and then business logic. pipeline.addLast(new SecureChatServerHandler()); } }
public class SecureChatServerHandler extends SimpleChannelInboundHandler<String> { static final ChannelGroup channels = new DefaultChannelGroup( GlobalEventExecutor.INSTANCE); @Override public void channelActive(final ChannelHandlerContext ctx) { // Once session is secured, send a greeting and register the channel to the global channel // list so the channel received the messages from others. ctx.pipeline().get(SslHandler.class).handshakeFuture() .addListener(new GenericFutureListener<Future<Channel>>() { @Override public void operationComplete(Future<Channel> future) throws Exception { String welcome = "Welcome to " + InetAddress.getLocalHost().getHostName() + " secure chat service!"; ctx.writeAndFlush(ctx.alloc().buffer() .writeInt(welcome.length()) .writeBytes(welcome.getBytes())); String message = "Your session is protected by " + ctx.pipeline().get(SslHandler.class).engine() .getSession().getCipherSuite() + " cipher suite."; ctx.writeAndFlush(ctx.alloc().buffer() .writeInt(message.length()) .writeBytes(message.getBytes())); channels.add(ctx.channel()); } }); } @Override public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("[" + msg + "]"); String returnMsg; // Send the received message to all channels but the current one. for (Channel c : channels) { if (c != ctx.channel()) { returnMsg = "[" + ctx.channel().remoteAddress() + "] " + msg; c.writeAndFlush(ctx.alloc().buffer() .writeInt(returnMsg.length()) .writeBytes(returnMsg.getBytes())); } else { returnMsg = "[you] " + msg; c.writeAndFlush(ctx.alloc().buffer() .writeInt(returnMsg.length()) .writeBytes(returnMsg.getBytes())); } } // Close the connection if the client has sent 'bye'. if ("bye".equals(msg.toLowerCase())) { ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
客户端代码:ide
public final class SecureChatClient { static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer .parseInt(System.getProperty("port", "8992")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx = SslContext .newClientContext(InsecureTrustManagerFactory.INSTANCE); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .handler(new SecureChatClientInitializer(sslCtx)); // Start the connection attempt. // sync 等待链接创建成功。由于链接在这里表现为异步操做,因此要等待链接的Future完成(done)。 Channel ch = b.connect(HOST, PORT).sync().channel(); // Read commands from the stdin. ChannelFuture lastWriteFuture = null; BufferedReader in = new BufferedReader(new InputStreamReader( System.in)); for (;;) { String line = in.readLine(); if (line == null) { break; } //获取用户的输入,而后构造消息,发送消息 ByteBuf content = ch.alloc().buffer().writeInt(line.length()) .writeBytes(line.getBytes()); // Sends the received line to the server. lastWriteFuture = ch.writeAndFlush(content); // If user typed the 'bye' command, wait until the server closes // the connection. if ("bye".equals(line.toLowerCase())) { ch.closeFuture().sync(); break; } } // Wait until all messages are flushed before closing the channel. if (lastWriteFuture != null) { lastWriteFuture.sync(); } } finally { // The connection is closed automatically on shutdown. group.shutdownGracefully(); } } }
public class SecureChatClientInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public SecureChatClientInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(sslCtx.newHandler(ch.alloc(), SecureChatClient.HOST, SecureChatClient.PORT)); // On top of the SSL handler, add the text line codec. //使用该解码器解码服务器返回消息 pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); //字符串编码器 // and then business logic. pipeline.addLast(new SecureChatClientHandler()); } }
public class SecureChatClientHandler extends SimpleChannelInboundHandler<String> { @Override public void messageReceived(ChannelHandlerContext ctx, String msg) { System.err.println(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
netty原来的example在这里,https://github.com/netty/netty/tree/master/example/src/main/java/io/netty/example/securechat oop
如何在这里使用LengthFieldBasedFrameDecoder解码器,以下是在服务器端解码器的配置,ui
// lengthFieldLength =4 字节表示实际消息体的长度 // initialBytesToStrip =4 字节表示解码消息体的时候跳过长度 pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
当服务器端发送消息的时候,是这样发的,
for (Channel c : channels) { if (c != ctx.channel()) { returnMsg = "[" + ctx.channel().remoteAddress() + "] " + msg; c.writeAndFlush(ctx.alloc().buffer() .writeInt(returnMsg.length()) .writeBytes(returnMsg.getBytes())); } else { returnMsg = "[you] " + msg; c.writeAndFlush(ctx.alloc().buffer() .writeInt(returnMsg.length()) .writeBytes(returnMsg.getBytes())); } }
必定要把实际消息体的长度写入到buff中。相应的客户端的配置见上面的代码。
==================END==================