NIO、Netty(NIO基础)

一、阻塞与非阻塞

阻塞与非阻塞是描述进程在访问某个资源时,数据是否准备就绪的的一种处理方式。当数据没有准备就绪时:java

阻塞:线程持续等待资源中数据准备完成,直到返回响应结果。数组

非阻塞:线程直接返回结果,不会持续等待资源准备数据结束后才响应结果。缓存

 

二、同步与异步

同步与异步是指访问数据的机制,同步通常指主动请求并等待IO操做完成的方式。服务器

异步则指主动请求数据后即可以继续处理其它任务,随后等待IO操做完毕的通知网络

 

老王烧开水:异步

一、普通水壶煮水,站在旁边,主动的看水开了没有?同步的阻塞socket

二、普通水壶煮水,去干点别的事,每过一段时间去看看水开了没有,水没开就走人。 同步非阻塞ide

三、响水壶煮水,站在旁边,不会每过一段时间主动看水开了没有。若是水开了,水壶自动通知他。 异步阻塞测试

四、响水壶煮水,去干点别的事,若是水开了,水壶自动通知他。异步非阻塞this

 

三、传统BIO模型

传统BIO是一种同步的阻塞IO,IO在进行读写时,该线程将被阻塞,线程没法进行其它操做。

IO流在读取时,会阻塞。直到发生如下状况:一、有数据能够读取。二、数据读取完成。三、发生异常。

 

四、伪异步IO模型

以传统BIO模型为基础,经过线程池的方式维护全部的IO线程,实现相对高效的线程开销及管理。

 

5.NIO模型

NIO(JDK1.4)模型是一种同步非阻塞IO,主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(多路复用器)。传统IO基于字节流和字符流进行操做,而NIO基于Channel和Buffer(缓冲区)进行操做,数据老是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(多路复用器)用于监听多个通道的事件(好比:链接打开,数据到达)。所以,单个线程能够监听多个数据通道。

NIO和传统IO(一下简称IO)之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取全部字节,它们没有被缓存在任何地方。此外,它不能先后移动流中的数据。若是须要先后移动从流中读取的数据,须要先将它缓存到一个缓冲区。NIO的缓冲导向方法略有不一样。数据读取到一个它稍后处理的缓冲区,须要时可在缓冲区中先后移动。这就增长了处理过程当中的灵活性。可是,还须要检查是否该缓冲区中包含全部您须要处理的数据。并且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里还没有处理的数据。

IO的各类流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据彻底写入。该线程在此期间不能再干任何事情了。 NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,可是它仅能获得目前可用的数据,若是目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,因此直至数据变的能够读取以前,该线程能够继续作其余的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不须要等待它彻底写入,这个线程同时能够去作别的事情。 线程一般将非阻塞IO的空闲时间用于在其它通道上执行IO操做,因此一个单独的线程如今能够管理多个输入和输出通道(channel)。

NIO优势:

  1. 经过Channel注册到Selector上的状态来实现一种客户端与服务端的通讯。
  2. Channel中数据的读取是经过Buffer , 一种非阻塞的读取方式。
  3. Selector 多路复用器  单线程模型,  线程的资源开销相对比较小。

Channel(通道)

传统IO操做对read()或write()方法的调用,可能会由于没有数据可读/可写而阻塞,直到有数据响应。也就是说读写数据的IO调用,可能会无限期的阻塞等待,效率依赖网络传输的速度。最重要的是在调用一个方法前,没法知道是否会被阻塞。

NIO的Channel抽象了一个重要特征就是能够经过配置它的阻塞行为,来实现非阻塞式的通道。

Channel是一个双向通道,与传统IO操做只容许单向的读写不一样的是,NIO的Channel容许在一个通道上进行读和写的操做。

FileChannel:文件

SocketChannel:

ServerSocketChannel:

DatagramChannel: UDP

Buffer(缓冲区)

Bufer顾名思义,它是一个缓冲区,其实是一个容器,一个连续数组。Channel提供从文件、网络读取数据的渠道,可是读写的数据都必须通过Buffer。

Buffer缓冲区本质上是一块能够写入数据,而后能够从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该模块内存。为了理解Buffer的工做原理,须要熟悉它的三个属性:capacity、position和limit。

position和limit的含义取决于Buffer处在读模式仍是写模式。无论Buffer处在什么模式,capacity的含义老是同样的。见下图:

capacity:做为一个内存块,Buffer有固定的大小值,也叫做“capacity”,只能往其中写入capacity个byte、long、char等类型。一旦Buffer满了,须要将其清空(经过读数据或者清楚数据)才能继续写数据。

position:当你写数据到Buffer中时,position表示当前的位置。出事的position值为0,当写入一个字节数据到Buffer中后,position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity-1。当读取数据时,也是从某个特定位置读,讲Buffer从写模式切换到读模式,position会被重置为0。当从Buffer的position处读取一个字节数据后,position向前移动到下一个可读的位置。

limit:在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。 写模式下,limit等于Buffer的capacity。当切换Buffer到读模式时, limit表示你最多能读到多少数据。所以,当切换Buffer到读模式时,limit会被设置成写模式下的position值。换句话说,你能读到以前写入的全部数据(limit被设置成已写数据的数量,这个值在写模式下就是position)

Buffer的分配:对Buffer对象的操做必须首先进行分配,Buffer提供一个allocate(int capacity)方法分配一个指定字节大小的对象。

向Buffer中写数据:写数据到Buffer中有两种方式:

一、从channel写到Buffer

int bytes = channel.read(buf); //将channel中的数据读取到buf中

二、经过Buffer的put()方法写到Buffer

buf.put(byte); //将数据经过put()方法写入到buf中

 

flip()方法:将Buffer从写模式切换到读模式,调用flip()方法会将position设置为0,并将limit设置为以前的position的值。

 

从Buffer中读数据:从Buffer中读数据有两种方式:

一、从Buffer读取数据到Channel

int bytes = channel.write(buf); //将buf中的数据读取到channel中

二、经过Buffer的get()方法读取数据

byte bt = buf.get(); //从buf中读取一个byte

 

rewind()方法:Buffer.rewind()方法将position设置为0,使得能够重读Buffer中的全部数据,limit保持不变。

clear()与compact()方法:一旦读完Buffer中的数据,须要让Buffer准备好再次被写入,能够经过clear()或compact()方法完成。若是调用的是clear()方法,position将被设置为0,limit设置为capacity的值。可是Buffer并未被清空,只是经过这些标记告诉咱们能够从哪里开始往Buffer中写入多少数据。若是Buffer中还有一些未读的数据,调用clear()方法将被"遗忘 "。compact()方法将全部未读的数据拷贝到Buffer起始处,而后将position设置到最后一个未读元素的后面,limit属性依然设置为capacity。可使得Buffer中的未读数据还能够在后续中被使用。

mark()与reset()方法:经过调用Buffer.mark()方法能够标记一个特定的position,以后能够经过调用Buffer.reset()恢复到这个position上。

 

Selector(多路复用器)

Selector与Channel是相互配合使用的,将Channel注册在Selector上以后,才能够正确的使用Selector,但此时Channel必须为非阻塞模式。Selector能够监听Channel的四种状态(Connect、Accept、Read、Write),当监听到某一Channel的某个状态时,才容许对Channel进行相应的操做。

 

测试代码

/**
 * 服务端
 */
public class MultiplexerTimeServer implements Runnable {

	private Selector selector;
	private ServerSocketChannel serverChannel;
	private volatile boolean stop;
	
	public MultiplexerTimeServer(int port) {
		try {
			//打开ServerSocketChannel
			serverChannel = ServerSocketChannel.open();
			//设置为非阻塞模式
			serverChannel.configureBlocking(false);
			//绑定监听的端口地址
			serverChannel.socket().bind(new InetSocketAddress(port), 1024);
			//建立Selector线程
			selector = Selector.open();
			//将ServerSocketChannel注册到Selector,交给Selector监听
			serverChannel.register(selector, SelectionKey.OP_ACCEPT);
			System.out.println("The time server is start in port:"+port);
		} catch (Exception e) {
			e.printStackTrace();
			System.exit(1);
		}
	}

	public void stop(){
		this.stop = true;
	}
	@Override
	public void run() {
		while(!stop){
			try {
				//经过Selector循环准备就绪的Key
				selector.select();
				Set<SelectionKey> selectionKeys = selector.selectedKeys();
				Iterator<SelectionKey> iterator = selectionKeys.iterator();
				SelectionKey selectionKey = null;
				while(iterator.hasNext()){
					selectionKey = iterator.next();
					iterator.remove();
					try {
						handleInput(selectionKey);
					} catch (Exception e) {
						if(selectionKey!=null){
							selectionKey.cancel();
							if(selectionKey.channel()!=null){
								selectionKey.channel().close();
							}
						}
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		if(selector !=null){
			try {
				selector.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	private void handleInput(SelectionKey selectionKey) throws IOException {
		if(selectionKey.isValid()){
			if (selectionKey.isAcceptable()) {
				ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
				//多路复用器监听到新的客户端链接,处理链接请求,完成TCP三次握手。
				SocketChannel client = server.accept();
				//设置为非阻塞模式
				client.configureBlocking(false);
	            // 将新链接注册到多路复用器上,监听其读操做,读取客户端发送的消息。
	            client.register(selector, SelectionKey.OP_READ);
			}
			if(selectionKey.isReadable()){
				SocketChannel client = (SocketChannel) selectionKey.channel();
				ByteBuffer receivebuffer = ByteBuffer.allocate(1024);
				//读取客户端请求消息到缓冲区
				int count = client.read(receivebuffer);   //非阻塞    
				if (count > 0) {
					receivebuffer.flip();
					byte[] bytes = new byte[receivebuffer.remaining()]; //remaining()方法
					//从缓冲区读取消息
					receivebuffer.get(bytes);
					String body = new String(bytes, "UTF-8");
					System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body);
					//将currentTime响应给客户端(客户端Channel)
					String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
					doWrite(client, currentTime);
				}else if(count < 0){
					selectionKey.channel();
					client.close();
				}else{
					
				}
			}
		}
	}

	private void doWrite(SocketChannel client, String currentTime) throws IOException {
		if(currentTime != null && currentTime.trim().length()>0){
			ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
			sendbuffer.put(currentTime.getBytes());
	        sendbuffer.flip();
	        //将客户端响应消息写入到客户端Channel中。
	        client.write(sendbuffer);
	        System.out.println("服务器端向客户端发送数据--:" + currentTime);
		}else{
			System.out.println("没有数据");
		}
	}

}

 

/**
 * 客户端
 */
public class TimeClientHandler implements Runnable {
	
	private String host;
	private int port;
	private SocketChannel socketChannel;
	private Selector selector;
	private volatile boolean stop;
	
	public TimeClientHandler(String host, int port) {
		this.host = host;
		this.port = port;
		try {
			//打开SocketChannel
			socketChannel = SocketChannel.open();
			//建立Selector线程
			selector = Selector.open();
			//设置为非阻塞模式
			socketChannel.configureBlocking(false);
		} catch (Exception e) {
			e.printStackTrace();
			System.exit(1);
		}
	}

	@Override
	public void run() {
		try {
			doConnect();
		} catch (Exception e) {
			e.printStackTrace();
			System.exit(1);
		}
		while(!stop){
			//轮训通道的状态
			try {
				selector.select(1000);
				Set<SelectionKey> selectionKeys = selector.selectedKeys();
				Iterator<SelectionKey> iterator = selectionKeys.iterator();
				SelectionKey selectionKey = null;
				while(iterator.hasNext()){
					selectionKey = iterator.next();
					iterator.remove();
					try {
						handleInput(selectionKey);
					} catch (Exception e) {
						if(selectionKey!=null){
							selectionKey.cancel();
							if(selectionKey.channel()!=null){
								selectionKey.channel().close();
							}
						}
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
				System.exit(1);
			}
		}
		if(selector !=null){
			try {
				selector.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	private void handleInput(SelectionKey selectionKey) throws Exception {
		if(selectionKey.isValid()){
			SocketChannel client = (SocketChannel) selectionKey.channel();
			if (selectionKey.isConnectable()){
				if(client.finishConnect()){
					client.register(selector, SelectionKey.OP_READ);
					doWrite(client);
				}else{
					System.exit(1);
				}
			}
			if (selectionKey.isReadable()) {
				ByteBuffer receivebuffer = ByteBuffer.allocate(1024);
				int count = client.read(receivebuffer);
				if (count > 0) {
					receivebuffer.flip();
					byte[] bytes = new byte[receivebuffer.remaining()]; //remaining()方法
					receivebuffer.get(bytes);
					String body = new String(bytes, "UTF-8");
					System.out.println("Now is "+body);
					this.stop = true;
				}else if(count < 0){
					selectionKey.channel();
					client.close();
				}else{
					
				}
			}
		}
	}

	private void doConnect() throws Exception {
		//链接服务端
		boolean connect = socketChannel.connect(new InetSocketAddress(host, port));
		//判断是否链接成功,若是链接成功,则监听Channel的读状态。
		if(connect){
			socketChannel.register(selector, SelectionKey.OP_READ);
			//写数据  写给服务端
			doWrite(socketChannel);
		}else{
			//若是没有链接成功,则向多路复用器注册Connect状态
			socketChannel.register(selector, SelectionKey.OP_CONNECT);
		}
		
	}

	private void doWrite(SocketChannel channel) throws IOException {
		ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
		sendbuffer.put("QUERY TIME ORDER".getBytes());
        sendbuffer.flip();
        //向Channel中写入客户端的请求指令  写到服务端
        channel.write(sendbuffer);
        if(!sendbuffer.hasRemaining()){
        	System.out.println("Send order to server succeed.");
        }
	}
}

 

/**
* 运行服务端
**/
public class TimeServer {
	public static void main(String[] args) {
		int port=8080; //服务端默认端口
		MultiplexerTimeServer timeServer=new MultiplexerTimeServer(port);
		new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
	}
}

/**
* 运行客户端
**/
public class TimeServerClient {
	
	public static void main(String[] args) {
		int port=8080; //服务端默认端口
		new Thread(new TimeClientHandler("127.0.0.1", port), "NIO-TimeServerClient-001").start();
	}
}
相关文章
相关标签/搜索