NIO、Netty(Netty基础)

1、概述

Netty是一个Java的开源框架。提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。java

Netty是一个NIO客户端,服务端框架。容许快速简单的开发网络应用程序。例如:服务端和客户端之间的协议,它简化了网络编程规范。算法

 

2、NIO开发的问题

一、NIO类库和API复杂,使用麻烦。编程

二、须要具有Java多线程编程能力(涉及到Reactor模式)。后端

三、客户端断线重连、网络不稳定、半包读写、失败缓存、网络阻塞和异常码流等问题处理难度很是大数组

四、存在部分BUG缓存

 

NIO进行服务器开发的步骤:安全

一、建立ServerSocketChannel,配置为非阻塞模式;服务器

二、绑定监听,配置TCP参数;网络

三、建立一个独立的IO线程,用于轮询多路复用器Selector;多线程

四、建立Selector,将以前建立的ServerSocketChannel注册到Selector上,监听Accept事件;

五、启动IO线程,在循环中执行Select.select()方法,轮询就绪的Channel;

六、当轮询处处于就绪状态的Channel时,须要对其进行判断,若是是OP_ACCEPT状态,说明有新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端;

七、设置新接入的客户端链路SocketChannel为非阻塞模式,配置TCP参数;

八、将SocketChannel注册到Selector上,监听READ事件;

九、若是轮询的Channel为OP_READ,则说明SocketChannel中有新的准备就绪的数据包须要读取,则构造ByteBuffer对象,读取数据包;

十、若是轮询的Channel为OP_WRITE,则说明还有数据没有发送完成,须要继续发送。

3、Netty的优势

一、API使用简单,开发门槛低;

二、功能强大,预置了多种编解碼功能,支持多种主流协议;

三、定制功能强,能够经过ChannelHandler对通讯框架进行灵活的扩展;

四、性能高,经过与其余业界主流的NIO框架对比,Netty综合性能最优;

五、成熟、稳定,Netty修复了已经发现的NIO全部BUG;

六、社区活跃;

七、经历了不少商用项目的考验。

/**
 * 服务端
 */
public class TimeServer {

	public static void main(String[] args) throws Exception {
		int port=8080; //服务端默认端口
		new TimeServer().bind(port);
	}
	
	public void bind(int port) throws Exception{
		//1用于服务端接受客户端的链接
		EventLoopGroup acceptorGroup = new NioEventLoopGroup();
		//2用于进行SocketChannel的网络读写
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			//Netty用于启动NIO服务器的辅助启动类
			ServerBootstrap sb = new ServerBootstrap();
			//将两个NIO线程组传入辅助启动类中
			sb.group(acceptorGroup, workerGroup)
				//设置建立的Channel为NioServerSocketChannel类型
				.channel(NioServerSocketChannel.class)
				//配置NioServerSocketChannel的TCP参数
				.option(ChannelOption.SO_BACKLOG, 1024)
				//设置绑定IO事件的处理类
				.childHandler(new ChannelInitializer<SocketChannel>() {
					//建立NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件
					@Override
					protected void initChannel(SocketChannel arg0) throws Exception {
						
						arg0.pipeline().addLast(new TimeServerHandler());
					}
				});
			//绑定端口,同步等待成功(sync():同步阻塞方法,等待bind操做完成才继续)
			//ChannelFuture主要用于异步操做的通知回调
			ChannelFuture cf = sb.bind(port).sync();
			System.out.println("服务端启动在8080端口。");
			//等待服务端监听端口关闭
			cf.channel().closeFuture().sync();
		} finally {
			//优雅退出,释放线程池资源
			acceptorGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}
/**
 * 服务端channel
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		//buf.readableBytes():获取缓冲区中可读的字节数;
		//根据可读字节数建立数组
		byte[] req = new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body = new String(req, "UTF-8");
		System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body);
		String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
		
		ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
		//将待发送的消息放到发送缓存数组中
		ctx.writeAndFlush(resp);
	}
}
/**
 * 客户端
 */
public class TimeClient {
	public static void main(String[] args) throws Exception {
		int port=8080; //服务端默认端口
		new TimeClient().connect(port, "127.0.0.1");
	}
	public void connect(int port, String host) throws Exception{
		//配置客户端NIO线程组
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bs = new Bootstrap();
			bs.group(group)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					//建立NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件
					protected void initChannel(SocketChannel arg0) throws Exception {
						arg0.pipeline().addLast(new TimeClientHandler());
					}
				});
			//发起异步链接操做
			ChannelFuture cf = bs.connect(host, port).sync();
			//等待客户端链路关闭
			cf.channel().closeFuture().sync();
		} finally {
			//优雅退出,释放NIO线程组
			group.shutdownGracefully();
		}
	}
}
/**
 * 客户端channel
 */
public class TimeClientHandler extends ChannelHandlerAdapter {

	@Override
	//向服务器发送指令
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		for (int i = 0; i < 1; i++) {
			byte[] req = "QUERY TIME ORDER".getBytes();
			ByteBuf firstMessage = Unpooled.buffer(req.length);
			firstMessage.writeBytes(req);
			ctx.writeAndFlush(firstMessage);
		}
	}

	@Override
	//接收服务器的响应
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		//buf.readableBytes():获取缓冲区中可读的字节数;
		//根据可读字节数建立数组
		byte[] req = new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body = new String(req, "UTF-8");
		System.out.println("Now is : "+body);
	}

	@Override
	//异常处理
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//释放资源
		ctx.close();
	}
	
}

4、粘包/拆包问题

TCP是一个“流”协议,所谓流,就是没有界限的一串数据。能够想象为河流中的水,并无分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际状况进行包的划分,因此在业务上认为,一个完整的包可能会被TCP拆分红多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

TCP粘包拆包问题示例图:

假设客户端分别发送了两个数据包D1和D2给服务端,因为服务端一次读取到的字节数是不肯定的,可能存在如下4种状况。

一、服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;

二、服务端一次接收到了两个数据包,D1和D2粘合在一块儿,被称为TCP粘包;

三、服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部份内容,第二次读取到了D2包的剩余部份内容,这被称为TCP拆包;

四、服务端分两次读取到了两个数据包,第一次读取到了D1包的部份内容D1_1,第二次读取到了D1包的剩余内容D1_1和D2包的完整内容;

若是此时服务器TCP接收滑窗很是小,而数据包D1和D2比较大,颇有可能发生第五种状况,既服务端分屡次才能将D1和D2包接收彻底,期间发生屡次拆包;

问题的解决策略

因为底层的TCP没法理解上层的业务数据,因此在底层是没法保证数据包不被拆分和重组的,这个问题只能经过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案可概括以下:

一、消息定长,例如每一个报文的大小为固定长度200字节,若是不够,空位补空格;

二、在包尾增长回车换行符进行分割,例如FTP协议;

三、将消息分为消息头和消息体,消息头中包含消息总长度(或消息体总长度)的字段,一般设计思路为消息头的第一个字段使用int32来表示消息的总程度;

四、更复杂的应用层协议;

LineBasedFrameDecoder

为了解决TCP粘包/拆包致使的半包读写问题,Netty默认提供了多种编解碼器用于处理半包。

LinkeBasedFrameDecoder的工做原理是它一次遍历ByteBuf中的可读字节,判断看是否有“\n”、“\r\n”,若是有,就一次位置为结束位置,从可读索引到结束位置区间的字节就组成一行。它是以换行符为结束标志的编解碼,支持携带结束符或者不携带结束符两种解碼方式,同时支持配置单行的最大长度。若是连续读取到最大长度后任然没有发现换行符,就会抛出异常,同时忽略掉以前读到的异常码流。

/**
 * 服务端 
 */
public class TimeServer {

	public static void main(String[] args) throws Exception {
		int port=8080; //服务端默认端口
		new TimeServer().bind(port);
	}
	public void bind(int port) throws Exception{
		//Reactor线程组
		//1用于服务端接受客户端的链接
		EventLoopGroup acceptorGroup = new NioEventLoopGroup();
		//2用于进行SocketChannel的网络读写
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			//Netty用于启动NIO服务器的辅助启动类
			ServerBootstrap sb = new ServerBootstrap();
			//将两个NIO线程组传入辅助启动类中
			sb.group(acceptorGroup, workerGroup)
				//设置建立的Channel为NioServerSocketChannel类型
				.channel(NioServerSocketChannel.class)
				//配置NioServerSocketChannel的TCP参数
				.option(ChannelOption.SO_BACKLOG, 1024)
				//设置绑定IO事件的处理类
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel arg0) throws Exception {
						//处理粘包/拆包问题
						arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
						arg0.pipeline().addLast(new StringDecoder());
						
						arg0.pipeline().addLast(new TimeServerHandler());
					}
				});
			//绑定端口,同步等待成功(sync():同步阻塞方法)
			//ChannelFuture主要用于异步操做的通知回调
			ChannelFuture cf = sb.bind(port).sync();
				
			//等待服务端监听端口关闭
			cf.channel().closeFuture().sync();
		} finally {
			//优雅退出,释放线程池资源
			acceptorGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}
/**
 * 服务端channel
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

	private int counter;
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//		ByteBuf buf = (ByteBuf) msg;
//		//buf.readableBytes():获取缓冲区中可读的字节数;
//		//根据可读字节数建立数组
//		byte[] req = new byte[buf.readableBytes()];
//		buf.readBytes(req);
//		String body = new String(req, "UTF-8");
		String body = (String) msg;
		System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter);
		String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
		currentTime = currentTime + System.getProperty("line.separator");
		
		ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
		//将待发送的消息放到发送缓存数组中
		ctx.writeAndFlush(resp);
	}

}
/**
 * 客户端
 */
public class TimeClient {
	public static void main(String[] args) throws Exception {
		int port=8080; //服务端默认端口
		new TimeClient().connect(port, "127.0.0.1");
	}
	public void connect(int port, String host) throws Exception{
		//配置客户端NIO线程组
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bs = new Bootstrap();
			bs.group(group)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					//建立NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件
					protected void initChannel(SocketChannel arg0) throws Exception {
						//处理粘包/拆包问题
						arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
						arg0.pipeline().addLast(new StringDecoder());
						
						arg0.pipeline().addLast(new TimeClientHandler());
					}
				});
			//发起异步链接操做
			ChannelFuture cf = bs.connect(host, port).sync();
			//等待客户端链路关闭
			cf.channel().closeFuture().sync();
		} finally {
			//优雅退出,释放NIO线程组
			group.shutdownGracefully();
		}
	}
}
/**
 * 客户端channel
 */
public class TimeClientHandler extends ChannelHandlerAdapter {

	private int counter;
	private byte[] req;
	
	@Override
	//向服务器发送指令
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		ByteBuf message=null;
		//模拟一百次请求,发送重复内容
		for (int i = 0; i < 200; i++) {
			req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
			message=Unpooled.buffer(req.length);
			message.writeBytes(req);
			ctx.writeAndFlush(message);
		}
		
	}

	@Override
	//接收服务器的响应
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//		ByteBuf buf = (ByteBuf) msg;
//		//buf.readableBytes():获取缓冲区中可读的字节数;
//		//根据可读字节数建立数组
//		byte[] req = new byte[buf.readableBytes()];
//		buf.readBytes(req);
//		String body = new String(req, "UTF-8");
		String body = (String) msg;
		System.out.println("Now is : "+body+". the counter is : "+ ++counter);
	}

	@Override
	//异常处理
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//释放资源
		ctx.close();
	}
	
}

DelimiterBasedFrameDecoder

实现自定义分隔符做为消息的结束标志,完成解碼。

/**
 * 服务端
 */
public class TimeServer {
	public static void main(String[] args) throws Exception {
		int port=8080; //服务端默认端口
		new TimeServer().bind(port);
	}

	public void bind(int port) throws Exception{
		//Reactor线程组
		//1用于服务端接受客户端的链接
		EventLoopGroup acceptorGroup = new NioEventLoopGroup();
		//2用于进行SocketChannel的网络读写
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			//Netty用于启动NIO服务器的辅助启动类
			ServerBootstrap sb = new ServerBootstrap();
			//将两个NIO线程组传入辅助启动类中
			sb.group(acceptorGroup, workerGroup)
				//设置建立的Channel为NioServerSocketChannel类型
				.channel(NioServerSocketChannel.class)
				//配置NioServerSocketChannel的TCP参数
				.option(ChannelOption.SO_BACKLOG, 1024)
				//设置绑定IO事件的处理类
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel arg0) throws Exception {
						//处理粘包/拆包问题
						ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
						arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
						arg0.pipeline().addLast(new StringDecoder());
						arg0.pipeline().addLast(new TimeServerHandler());
					}
				});
			//绑定端口,同步等待成功(sync():同步阻塞方法)
			//ChannelFuture主要用于异步操做的通知回调
			ChannelFuture cf = sb.bind(port).sync();
				
			//等待服务端监听端口关闭
			cf.channel().closeFuture().sync();
		} finally {
			//优雅退出,释放线程池资源
			acceptorGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}
/**
 * 服务端channel
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

	private int counter;
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		String body = (String) msg;
		System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter);
		String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
		currentTime += "$_";
		
		ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
		//将待发送的消息放到发送缓存数组中
		ctx.writeAndFlush(resp);
	}
}
/**
 * 客户端
 */
public class TimeClient {
	public static void main(String[] args) throws Exception {
		int port=8080; //服务端默认端口
		new TimeClient().connect(port, "127.0.0.1");
	}
	public void connect(int port, String host) throws Exception{
		//配置客户端NIO线程组
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bs = new Bootstrap();
			bs.group(group)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					//建立NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件
					protected void initChannel(SocketChannel arg0) throws Exception {
						//处理粘包/拆包问题
						ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
						arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
						arg0.pipeline().addLast(new StringDecoder());
						
						arg0.pipeline().addLast(new TimeClientHandler());
					}
				});
			//发起异步链接操做
			ChannelFuture cf = bs.connect(host, port).sync();
			//等待客户端链路关闭
			cf.channel().closeFuture().sync();
		} finally {
			//优雅退出,释放NIO线程组
			group.shutdownGracefully();
		}
	}
}
/**
 * 客户端channel
 */
public class TimeClientHandler extends ChannelHandlerAdapter {
	
	private int counter;
	private byte[] req;
	
	@Override
	//向服务器发送指令
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		ByteBuf message=null;
		//模拟一百次请求,发送重复内容
		for (int i = 0; i < 200; i++) {
			req = ("QUERY TIME ORDER"+"$_").getBytes();
			message=Unpooled.buffer(req.length);
			message.writeBytes(req);
			ctx.writeAndFlush(message);
		}
		
	}

	@Override
	//接收服务器的响应
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		String body = (String) msg;
		System.out.println("Now is : "+body+". the counter is : "+ ++counter);
	}

	@Override
	//异常处理
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//释放资源
		ctx.close();
	}
	
}

FixedLengthFrameDecoder

是固定长度解碼器,可以按照指定的长度对消息进行自动解碼,开发者不须要考虑TCP的粘包/拆包问题。

 

5、Netty的高性能

一、异步非阻塞通讯

在IO编程过程当中,当须要同时处理多个客户端接入请求时,能够利用多线程或者IO多路复用技术进行处理。IO多路复用技术经过把多个IO的阻塞复用到同一个Selector的阻塞上,从而使得系统在单线程的状况下能够同时处理多个客户端请求。与传统的多线程/多进程模型相比,IO多路复用的最大优点是系统开销小,系统不须要建立新的额外进程或者线程,也不须要维护这些进程和线程的运行,下降了系统的维护工做量,节省了系统资源。

Netty的IO线程NioEventLoop因为聚合了多路复用器Selector,能够同时并发处理成百上千个客户端SocketChannel。因为读写操做都是非阻塞的,这就能够充分提高IO线程的运行效率,避免由频繁的IO阻塞致使的线程挂起。另外,因为Netty采用了异步通讯模式,一个IO线程能够并发处理N个客户端链接和读写操做,这从根本上解决了传统同步阻塞IO中 一链接一线程模型,架构的性能、弹性伸缩能力和可靠性都获得了极大的提高。

 

二、高效的Reactor线程模型

经常使用的Reactor线程模型有三种,分别以下:

1.Reactor单线程模型;

2.Reactor多线程模型;

3.主从Reactor多线程模型;

 

Reactor单线程模型,指的是全部的IO操做都在同一个NIO线程上面完成,NIO线程职责以下:

一、做为NIO服务端,接收客户端的TCP链接;

二、做为NIO客户端,向服务端发起TCP链接;

三、读取通讯对端的请求或者应答消息;

四、向通讯对端发送请求消息或者应答消息;

 

因为Reactor模式使用的是异步非阻塞IO,全部的IO操做都不会致使阻塞,理论上一个线程能够独立处理全部IO相关操做。从架构层面看,一个NIO线程确实能够完成其承担的职责。例如,经过Acceptor接收客户端的TCP链接请求消息,链路创建成功以后,经过Dispatch将对应的ByteBuffer派发到指定的Handler上进行消息编码。用户Handler能够经过NIO线程将消息发送给客户端。

对于一些小容量应用场景,可使用单线程模型,可是对于高负载、大并发的应用却不合适,主要缘由以下:

一、一个NIO线程同时处理成百上千的链路,性能上没法支撑。即使NIO线程的CPU负荷达到100%,也没法知足海量消息的编码、解碼、读取和发送;

二、当NIO线程负载太重后,处理速度将变慢,这会致使大量客户端链接超时,超时以后每每会进行重发,这更加剧了NIO线程的负载,最终会致使大量消息积压和处理超时,NIO线程会成为系统的性能瓶颈;

三、可靠性问题。一旦NIO线程意外进入死循环,会致使整个系统通讯模块不可用,不能接收和处理外部消息,形成节点故障。

为了解决这些问题,从而演进出了Reactor多线程模型。

 

Reactor多线程模型与单线程模型最大的区别就是有一组NIO线程处理IO操做,特色以下:

一、有一个专门的NIO线程——Acceptor线程用于监听服务端,接收客户端TCP链接请求;

二、网络IO操做——读、写等由一个NIO线程池负责,线程池能够采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、编码、解碼和发送;

三、1个NIO线程能够同时处理N条链路,可是1个链路只对应1个NIO线程,防止发生并发操做问题。

在绝大多数场景下,Reactor多线程模型均可以知足性能需求;可是,在极特殊应用场景中,一个NIO线程负责监听和处理全部的客户端链接可能会存在性能问题。例如百万客户端并发链接,或者服务端须要对客户端的握手消息进行安全认证,认证自己很是损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型——主从Reactor多线程模型。

 

主从Reactor线程模型的特色是:服务端用于接收客户端链接的再也不是一个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP链接请求处理完成后(可能包含接入认证等),将新建立的SocketChannel注册到IO线程池(subReactor线程池)的某个IO线程上,由它负责SocketChannel的读写和编解碼工做。Acceptor线程池只用于客户端的登陆、握手和安全认证,一旦链路创建成功,就将链路注册到后端subReactor线程池的IO线程上,由IO线程负责后续的IO操做。

利用主从NIO线程模型,能够解决1个服务端监听线程没法有效处理全部客户端链接的性能不足问题。Netty官方推荐使用该线程模型。它的工做流程总结以下:

一、从主线程池中随机选择一个Reactor线程做为Acceptor线程,用于绑定监听端口,接收客户端链接;

二、Acceptor线程接收客户端链接请求以后,建立新的SocketChannel,将其注册到主线程池的其余Reactor线程上,由其负责接入认证、IP黑白名单过滤、握手等操做;

三、而后也业务层的链路正式创建成功,将SocketChannel从主线程池的Reactor线程的多路复用器上摘除,从新注册到Sub线程池的线程上,用于处理IO的读写操做。

 

三、无化的串行设计

在大多数场景下,并行多线程处理能够提高系统的并发性能。可是,若是对于共享资源的并发访问处理不当,会带来严重的锁竞争,这最终会致使性能的降低。为了尽量地避免锁竞争带来的性能损耗,能够经过串行化设计,既消息的处理尽量在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。

为了尽量提高性能,Netty采用了串行无锁化设计,在IO线程内部进行串行操做,避免多线程竞争致使的性能降低。表面上看,串行化设计彷佛CPU利用率不高,并发程度不够。可是,经过调整NIO线程池的线程参数,能够同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列——多个工做线程模型性能更优。

Netty串行化设计工做原理图以下:

Netty的NioEventLoop读取到消息后,直接调用ChannelPipeline的fireChannelRead(Object msg),只要用户不主动切换线程,一直会由NioEventLoop调用到用户的Handler,期间不进行线程切换。这种串行化处理方式避免了多线程致使的锁竞争,从性能角度看是最优的。

 

四、高效的并发编程

Netty高效并发编程主要体现

一、volatile的大量、正确使用;

二、CAS和原子类的普遍使用;

三、线程安全容器的使用;

四、经过读写锁提高并发性能。

 

五、高性能的序列化框架

    影响序列化性能的关键因素总结以下:

    一、序列化后的码流大小(网络宽带的占用);

    二、序列化与反序列化的性能(CPU资源占用);

    三、是否支持跨语言(异构系统的对接和开发语言切换)。

    Netty默认提供了对GoogleProtobuf的支持,经过扩展Netty的编解碼接口,用户能够实现其余的高性能序列化框架

 

六、零拷贝

    Netty的“零拷贝”主要体如今三个方面:

    1)、Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不须要进行字节缓冲区的二次拷贝。若是使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,而后才写入Socket中。相比于堆外直接内存,消息在发送过程当中多了一次缓冲区的内存拷贝。

    2)、第二种“零拷贝 ”的实现CompositeByteBuf,它对外将多个ByteBuf封装成一个ByteBuf,对外提供统一封装后的ByteBuf接口。

    3)、第三种“零拷贝”就是文件传输,Netty文件传输类DefaultFileRegion经过transferTo方法将文件发送到目标Channel中。不少操做系统直接将文件缓冲区的内容发送到目标Channel中,而不须要经过循环拷贝的方式,这是一种更加高效的传输方式,提高了传输性能,下降了CPU和内存占用,实现了文件传输的“零拷贝”。

        

七、内存池

    随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个很是轻量级的工做。可是对于缓冲区Buffer,状况却稍有不一样,特别是对于堆外直接内存的分配和回收,是一件耗时的操做。为了尽可能重用缓冲区,Netty提供了基于内存池的缓冲区重用机制。

  

八、灵活的TCP参数配置能力

    Netty在启动辅助类中能够灵活的配置TCP参数,知足不一样的用户场景。合理设置TCP参数在某些场景下对于性能的提高能够起到的显著的效果,总结一下对性能影响比较大的几个配置项:

    1)、SO_RCVBUF和SO_SNDBUF:一般建议值为128KB或者256KB;

    2)、TCP_NODELAY:NAGLE算法经过将缓冲区内的小封包自动相连,组成较大的封包,阻止大量小封包的发送阻塞网络,从而提升网络应用效率。可是对于时延敏感的应用场景须要关闭该优化算法;

    3)、软中断:若是Linux内核版本支持RPS(2.6.35以上版本),开启RPS后能够实现软中断,提高网络吞吐量。RPS根据数据包的源地址,目的地址以及目的和源端口,计算出一个hash值,而后根据这个hash值来选择软中断运行的CPU。从上层来看,也就是说将每一个链接和CPU绑定,并经过这个hash值,来均衡软中断在多个CPU上,提高网络并行处理性能。

相关文章
相关标签/搜索