Netty框架底层是对NIO的高度封装,因此想要更好的学习Netty以前,应先了解下什么是NIO - NIO是non-blocking的简称,
在jdk1.4 里提供的新api,他的他的特性以下:
* 为全部的原始类型提供(Buffer)缓存支持,字符集编码解码解决方案。
* Channel :一个新的原始I/O 抽象。支持锁和内存映射文件的文件访问接口。提供多路(non-bloking)非阻塞式的高伸缩
性网络I/O 。
NIO是一个非阻塞式的I/O,它由一个专门的线程来处理全部的IO事件,并负责分发,而且它只有在事件到达的时候才会触发,
而不是去同步的监视事件;线程之间经过wait,notify 等方式通信。保证每次上下文切换都是有意义的。减小无谓的线程切换。
NIO和IO最大的区别是数据打包和传输方式。IO是以流的方式处理数据,而NIO是以块的方式处理数据。NIO的核心部分由
Channels、Buffers、Selectors三部分组成。java
正常的状况下,全部的IO在NIO中都从一个Channel 开始。Channel有点像流。数据能够从Channel读到Buffer中,也能够从
Buffer写到Channel中。JAVA NIO中的一些主要Channel的实现:FileChannel、DatagramChannel、SocketChannel、
ServerSocketChannel。这些实现类覆盖了UDP和TCP网络IO,以及文件IO。
而Buffer的一些实现类:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer,则覆盖
了能经过IO发送的基本数据类型:byte,short,int,long,float,double和char。算法
Selector容许单线程处理多个Channel。若是你的应用打开了多个链接(通道),但每一个链接的流量都很低,使用Selector
就会很方便。而要使用Selector,就得向Selector注册Channel,而后调用它的select()方法。这个方法会一直阻塞到某个注册的
通道有事件就绪。一旦这个方法返回,线程就能够处理这些事件,事件的例子有如新链接进来,数据接收等。bootstrap
大数据,高访问场景的互联网项目或者多系统的协同工做,使用一个服务器根本不能胜任。就须要把系统拆分红了多个服务,
根据须要部署在多个机器上,这些服务很是灵活,能够随着访问量弹性扩展。可是多个模块的跨服务通讯,时间和资源都是极大
地浪费。传统的Blocking IO不能解决,由于会有线程阻塞的问题,而使用非阻塞IO(NIO),则须要耗费太多的精力。而Netty框架
(RPC框架)则很好的解决了这个问题。
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、
高可靠性的网络服务器和客户端程序。他就是一个程序,是封装java socket noi的,咱们直接拿来用就行了。
Netty通讯服务端的步骤:
一、建立两个NIO线程组,一个专门用于网络事件处理(接受客户端的链接),另外一个则进行网络通讯的读写。
二、建立一个ServerBootstrap对象,配置Netty的一系列参数,例如接受传出数据的缓存大小等。
三、建立一个用于实际处理数据的类ChannelInitializer,进行初始化的准备工做,好比设置接受传出数据的字符集、
格式以及实际处理数据的接口。
四、绑定端口,执行同步阻塞方法等待服务器端启动便可。
五、关闭相应的资源。api
服务端栗子:
服务端的管理者:缓存
/** * 服务端处理通道.这里只是打印一下请求的内容,并不对请求进行任何的响应 * 继承自ChannelHandlerAdapter, 这个类实现了ChannelHandler接口, * ChannelHandler提供了许多事件处理的接口方法,而后你能够覆盖这些方法。 * @author lcy * */ public class DiscartServiceHandler extends ChannelHandlerAdapter { /** * 客户端收到新消息时,这个方法会被调用 * * @param ctx * 通道处理的上下文信息 * @param msg * 接受的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { // 将接收到的信息转换为缓冲区 ByteBuf str = (ByteBuf) msg; // 打印传输过来的信息 System.out.print(str.toString(CharsetUtil.UTF_8)); } finally { // 释放ByteBuf对象 ReferenceCountUtil.release(msg); } } /** * 在异常时触发 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //输出错误信息 cause.printStackTrace(); ctx.close(); } }
服务端:服务器
/** * 服务端 * @author lcy * */ public class DiscartServer { private int port; public DiscartServer(int port) { super(); this.port = port; } public void run() throws Exception { //(一)设置两个线程组 //用来接收进来的链接 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 用来处理已经接受的链接 NioEventLoopGroup workGroup = new NioEventLoopGroup(); System.out.println("准备运行的端口" + port); try { //(二)辅助工具类,用于服务器通道的一系列配置 ServerBootstrap bootstrap = new ServerBootstrap(); //(三)绑定两个线程组 // 设置group,这一步是必须的,若是没有设置group将会报java.lang.IllegalStateException:group not set异常 bootstrap = bootstrap.group(bossGroup, workGroup); //(四)指定NIO的模式 /*** * ServerSocketChannel以NIO的selector为基础进行实现的,用来接收新的链接 * 这里告诉Channel如何获取新的链接. */ bootstrap = bootstrap.channel(NioServerSocketChannel.class); //(五)配置具体的数据处理方式,就是往里添加规则 bootstrap = bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel arg0) throws Exception { //与50秒内都没有与服务端进行通讯的客户端断连 arg0.pipeline().addLast(new ReadTimeoutHandler(50)); arg0.pipeline().addLast(new HttpObjectAggregator(1048576)); // 添加实际处理数据的类 arg0.pipeline().addLast(new DiscartServiceHandler()); } }); //(六)设置TCP缓冲区 bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, 128); //保持链接 bootstrap = bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); //(七)绑定端口,启动接收进来的链接 ChannelFuture sync = bootstrap.bind(port).sync(); //(八) 这里会一直等待,直到socket被关闭 sync.channel().closeFuture().sync(); } finally { //(九)关闭资源 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } //服务开启 public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscartServer(port).run(); System.out.println("server:run()"); } }
客户端栗子:网络
实际处理数据的类:框架
public class ChannelClient extends ChannelInitializer{ @Override protected void initChannel(Channel arg0) throws Exception { //与50秒内都没有与服务端进行通讯的客户端断连 arg0.pipeline().addLast(new ReadTimeoutHandler(50)); arg0.pipeline().addLast(new HttpObjectAggregator(1048576)); //设置Channel arg0.pipeline().addLast(new ChannelHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { // 将接收到的信息转换为缓冲区 ByteBuf str = (ByteBuf) msg; // 打印传输过来的信息 System.out.print(str.toString(CharsetUtil.UTF_8)); } finally { // 释放ByteBuf对象 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //输出错误信息 cause.printStackTrace(); ctx.close(); } }); } }
客户端:异步
/** * 客户端 * @author lcy */ public class Client { @SuppressWarnings("resource") public static void main(String[] args) throws Exception { //建立一个新的线程组 NioEventLoopGroup workGroup = new NioEventLoopGroup(); //初始化Netty Bootstrap bootstrap = new Bootstrap(); //指定工做的线程组 bootstrap = bootstrap.group(workGroup); //指定 Channel的类型。由于是客户端, 所以使用了 NioSocketChannel。 bootstrap.channel(NioSocketChannel.class); /** * 设置连接的一些属性 */ //下降延迟,禁用了禁用nagle算法。nagle算法受TCP延迟确认影响,会致使相继两次向链接发送请求包。 bootstrap.option(ChannelOption.TCP_NODELAY, true); //保持链接检测对方主机是否崩溃,避免(服务器)永远阻塞于TCP链接的输入 bootstrap.option(ChannelOption.SO_KEEPALIVE, true); //使用netty默认的解码器会出现读取不完整,不会执行channelRead方法。设置这个属性惋惜保证Netty读取的完整 bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, Integer.MAX_VALUE); //设置数据处理器 bootstrap.handler(new ChannelClient()); //同步的连接 Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel(); channel.writeAndFlush(Unpooled.copiedBuffer("Hello Netty...".getBytes())); channel.closeFuture().sync(); workGroup.shutdownGracefully(); } }
一、使用长链接通道不断开的形式进行通讯,也就是服务器和客户端的通道一直处于开启状态,若是服务器性能足够好,
而且客户端数量也比较多的状况下,推荐这种方式。
二、一次性批量提交数据,采用短链接方式。也就是说先把数据保存到本地临时缓存区或者临时表,当达到界值时进行一
次性批量提交,又或者根据定时任务轮询提交。
三、使用一种特殊的长链接,在某一指定时间段内,服务器与某台客户端没有任何通讯,则断开链接。下次链接则是客户
端向服务器发送请求的时候,再次创建链接。socket
1. Decoder 解码器 负责将消息从字节或其余序列形式转成指定的消息对象。
2. Encoder 编码器 将消息对象转成字节或其余序列形式在网络上传输。
入站”ByteBuf读取bytes后由 ToIntegerDecoder 进行解码,而后将解码后的消息存入List集合中,而后传递到ChannelPipeline
中的下一个ChannelInboundHandler。
解码器:
1)ByteToMessageDecoder,需本身判断ByteBuf读取前是否有足够的字节,不然会出现沾包的现象。
2)ReplayingDecoder,无需本身检查字节长度,可是使用起来具备局限性:
* 不是全部的操做都被ByteBuf支持,若是调用一个不支持的操做会抛出DecoderException。
* ByteBuf.readableBytes()大部分时间不会返回指望值。
3)MessageToMessageDecoder(message-to-message)
解码器是用来处理入站数据,Netty提供了不少解码器的实现,能够根据需求详细了解。
编码器:
1)MessageToByteEncoder
2)MessageToMessageEncoder 须要将消息编码成其余的消息时可使用Netty提供的MessageToMessageEncoder抽象类
来实现。例如将Integer编码成String。
想要解决TCP的粘包/拆包问题,首先要知道什么是TCP粘包、拆包:
TCP是一个“流”协议,所谓流就是没有界限的遗传数据。你们能够想象一下,若是河水就比如数据,他们是连成一片的,没有
分界线,TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的具体状况进行包的划分,也就是说,在业务上一个完整的
包可能会被TCP分红多个包进行发送,也可能把多个小包封装成一个大的数据包发送出去,这就是所谓的粘包/拆包问题。
解决方案:
一、消息定长,例如每一个报文的大小固定为200个字节,若是不够,空位补空格。
二、在包尾部增长特殊字符进行分割,例如加回车等。
三、将消息分为消息头和消息体,在消息头中包含表示消息总长度的字段,而后进行业务逻辑的处理。
Netty中解决TCP粘包/拆包的方法: 一、分隔符类:DelimiterBasedFrameDecoder(自定义分隔符) 二、定长:FixedLengthFrameDecoder