概念: Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 能够确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty至关简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。java
Helloword版
服务端这边绑定了两个端口,能够根据业务区别对待如端口1是作A业务,端2作B业务.编程
public class Server { public static void main(String[] args) throws InterruptedException { //1.建立两个线程组 (只有服务器端须要 ) //一个线程组专门用来管理接收客户端的请求链接的 //一个线程组进行网络通讯(读写) EventLoopGroup receiveGroup = new NioEventLoopGroup(); EventLoopGroup dealGroup = new NioEventLoopGroup(); //建立辅助工具类,用于设置服务器通道的一系列配置 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(receiveGroup, dealGroup)//绑定两个线程组 .channel(NioServerSocketChannel.class) //指定NIO的模式 .option(ChannelOption.SO_BACKLOG, 1024) //设置tcp缓冲区 .option(ChannelOption.SO_SNDBUF, 32*1024) //设置发送缓冲区大小 .option(ChannelOption.SO_RCVBUF, 32*1024) //设置接收缓冲大小 .option(ChannelOption.SO_KEEPALIVE, true) //保持链接 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //3 在这里配置具体数据接收方法的处理 sc.pipeline().addLast(new ServerHandler()); } }); //4 进行绑定 ChannelFuture cf1 = serverBootstrap.bind(8765).sync(); ChannelFuture cf2 = serverBootstrap.bind(8764).sync(); //5 等待关闭 cf1.channel().closeFuture().sync(); cf2.channel().closeFuture().sync(); receiveGroup.shutdownGracefully(); dealGroup.shutdownGracefully(); } }
服务端处理器:安全
public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "gbk"); System.out.println("Server :" + body ); String response = "进行返回给客户端的响应:" + body ; //注意使用了writeAndFlush的话就能够不释放ReferenceCountUtil.release(msg); 不然须要释放ByteBuf容器的数据。 ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); //.addListener(ChannelFutureListener.CLOSE);//监听,内容传输完毕后就关闭管道 } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("读完了"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } }
客户端:服务器
public class Client { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync(); ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //发送消息 cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:777".getBytes())); Thread.sleep(1000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:666".getBytes())); cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:888".getBytes())); Thread.sleep(2000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:888".getBytes())); cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:666".getBytes())); cf1.channel().closeFuture().sync(); cf2.channel().closeFuture().sync(); group.shutdownGracefully(); } }
客户端处理器:网络
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端的channelActive()方法"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "gbk"); System.out.println("Client :" + body ); } finally { ReferenceCountUtil.release(msg); } }
TCP是个“流”协议,所谓流,就是没有界限的一串数据。你们能够想一想河里的流水,是连成一片的,其间并无分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际状况进行包的划分,因此在业务上认为,一个完整的包可能会被TCP拆分红多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
通俗意义来讲多是三个数据如'A','B','C' 但通过TCP协议流式传输后成了'AB','C'两个数据了,这种就是粘包了数据包之间粘一块儿了。那么拆包的话有三种方式。app
两根水管(服务器与客户端)须要相互流通水(数据),那么须要一个转接头(套接字)链接,水流式没法区分一段段的数据,一种方式在流通的过程当中设置些标志性物品如记号笔勾一下(分隔符),另外一种方式则是设定每一段都是多少容量的水来区分.框架
能够理解管道流里流的都是ByteBuffer类型的数据,那么使用分隔符(非ByteBuffer类型)的话可能就意味着一个转码与解码的过程。
服务端:异步
public class Server { public static void main(String[] args) throws Exception{ //1 建立2个线程,一个是负责接收客户端的链接。一个是负责进行数据传输的 EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); //2 建立服务器辅助类 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //设置特殊分隔符 解决TCP拆包黏包问题, ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //设置字符串形式的解码 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //4 绑定链接 ChannelFuture cf = b.bind(8765).sync(); //等待服务器监听端口关闭 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
服务端处理器:socket
public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String request = (String)msg; System.out.println("Server channelRead:" + request); String response = "服务器响应:" + msg + "$"; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { System.out.println("exceptionCaught"); ctx.close(); } }
客户端:tcp
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("数据A$".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("数据B$".getBytes())); //等待客户端端口关闭 cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
客户端处理器:
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String response = (String)msg; System.out.println("Client: " + response); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exceptionCaught"); ctx.close(); } }
服务端:
public class Server { public static void main(String[] args) throws Exception{ //1 建立2个线程,一个是负责接收客户端的链接。一个是负责进行数据传输的 EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); //2 建立服务器辅助类 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //设置定长字符串接收 sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); //设置字符串形式的解码 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //4 绑定链接 ChannelFuture cf = b.bind(8765).sync(); //等待服务器监听端口关闭 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
客户端:
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes())); cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccc".getBytes())); //等待客户端端口关闭 cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
服务端与客户端的处理器参照上例以字符串分割的.
新手上路,多多关注...