04.第四阶段、基于Netty的RPC架构实战演练-1. nio

1、概要java

传统IO特色
阻塞点
server.accept();
inputStream.read(bytes);

单线程状况下只能有一个客户端


用线程池能够有多个客户端链接,可是很是消耗性能


=======================分割线==========================

NIO的特色

ServerSocketChannel	ServerSocket

SocketChannel		Socket

Selector

SelectionKey

NIO的一些疑问
  1. 客户端关闭的时候会抛出异常,死循环服务器

    解决方案socket

    int read = channel.read(buffer);
     	if(read > 0){
     		byte[] data = buffer.array();
     		String msg = new String(data).trim();
     		System.out.println("服务端收到信息:" + msg);
    
     		//回写数据
     		ByteBuffer outBuffer = ByteBuffer.wrap("好的".getBytes());
     		channel.write(outBuffer);// 将消息回送给客户端
     	}else{
     		System.out.println("客户端关闭");
     		key.cancel();
     	}

二、selector.select();阻塞,那为何说nio是非阻塞的IO?ide

selector.select()
selector.select(1000);不阻塞
selector.wakeup();也能够唤醒selector
selector.selectNow();也能够立马返还

有的同窗说了,怎么证实这个write是wakeup方法调用的,而不是其余方法呢,这个很好证实,咱们多调用几回:


public class SelectorTest {  
	public static void main(String[] args) throws Exception {  
		Selector selector = Selector.open();  
		selector.wakeup();  
		selector.selectNow();  
		selector.wakeup();  
		selector.selectNow();  
		selector.wakeup();  
	}  
}  

	修改程序调用三次wakeup,心细的朋友确定注意到咱们还调用了两次selectNow,这是由于在两次成功的select方法之间调用wakeup多 次都只算作一次,为了显示3次write,这里就每次调用前select一下将前一次写入的字节读到,一样执行上面的strace调用,输出:




Process 29313 attached  
[pid 29303] write(36, "\1", 1)          = 1  
[pid 29303] write(36, "\1", 1)          = 1  
[pid 29303] write(36, "\1", 1)          = 1  
Process 29313 detached  


	 果真是3次write的系统调用,都是写入一个字节,若是咱们去掉selectNow,那么三次wakeup仍是等于一次:

public class SelectorTest {  
	public static void main(String[] args) throws Exception {  
		Selector selector = Selector.open();  
		selector.wakeup();  
		selector.wakeup();  
		selector.wakeup();  
	}  
}  


   输出:


Process 29339 attached  
Process 29340 attached  
Process 29341 attached  
[pid 29331] write(36, "\1", 1)          = 1  
Process 29341 detached  
Process 29337 detached  


	  wakeup方法的API说明没有欺骗咱们。wakeup方法的API还告诉咱们,若是当前Selector没有阻塞在select方法上,那么本次 wakeup调用会在下一次select阻塞的时候生效,这个道理很简单,wakeup方法写入一个字节,下次poll等待的时候当即发现可读并返回,因 此不会阻塞。
  1. SelectionKey.OP_WRITE是表明什么意思性能

    OP_WRITE表示底层缓冲区是否有空间,是则响应返还true测试

    NIO.pngthis

    socketIO.png.net

    oio线程

    package oio;
    
    	import java.io.IOException;
    	import java.io.InputStream;
    	import java.net.ServerSocket;
    	import java.net.Socket;
    	import java.util.concurrent.ExecutorService;
    	import java.util.concurrent.Executors;
    
    	/**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/7    11:24 AM
    	 */
    
    	public class OioServer {
    
    		@SuppressWarnings("resource")
    		public static void main(String[] args) throws Exception {
    
    			ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
    			//建立socket服务,监听10101端口
    			ServerSocket server=new ServerSocket(10101);
    			System.out.println("服务器启动!");
    			while(true){
    				//获取一个套接字(阻塞)
    				final Socket socket = server.accept();
    				System.out.println("来个一个新客户端!");
    				newCachedThreadPool.execute(new Runnable() {
    
    					@Override
    					public void run() {
    						//业务处理
    						handler(socket);
    					}
    				});
    
    			}
    		}
    
    		/**
    		 * 读取数据
    		 * @param socket
    		 * @throws Exception
    		 */
    		public static void handler(Socket socket){
    			try {
    				byte[] bytes = new byte[1024];
    				InputStream inputStream = socket.getInputStream();
    
    				while(true){
    					System.out.println("read前");
    
    					//读取数据(阻塞)
    					int read = inputStream.read(bytes);
    					System.out.println("read后");
    					if(read != -1){
    						System.out.println(new String(bytes, 0, read));
    					}else{
    						break;
    					}
    				}
    			} catch (Exception e) {
    				e.printStackTrace();
    			}finally{
    				try {
    					System.out.println("socket关闭");
    					socket.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}

    niocode

    package nio;
    
    	/**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/7    12:11 PM
    	 */
    
    	import java.io.IOException;
    	import java.net.InetSocketAddress;
    	import java.nio.ByteBuffer;
    	import java.nio.channels.SelectionKey;
    	import java.nio.channels.Selector;
    	import java.nio.channels.ServerSocketChannel;
    	import java.nio.channels.SocketChannel;
    	import java.util.Iterator;
    
    	public class NIOServer {
    		// 通道管理器
    		private Selector selector;
    
    		/**
    		 * 得到一个ServerSocket通道,并对该通道作一些初始化的工做
    		 *
    		 * @param port
    		 *            绑定的端口号
    		 * @throws IOException
    		 */
    		public void initServer(int port) throws IOException {
    			// 得到一个ServerSocket通道
    			ServerSocketChannel serverChannel = ServerSocketChannel.open();
    			// 设置通道为非阻塞
    			serverChannel.configureBlocking(false);
    			// 将该通道对应的ServerSocket绑定到port端口
    			serverChannel.socket().bind(new InetSocketAddress(port));
    			// 得到一个通道管理器
    			this.selector = Selector.open();
    			// 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
    			// 当该事件到达时,selector.select()会返回,若是该事件没到达selector.select()会一直阻塞。
    			serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    		}
    
    		/**
    		 * 采用轮询的方式监听selector上是否有须要处理的事件,若是有,则进行处理
    		 *
    		 * @throws IOException
    		 */
    		public void listen() throws IOException {
    			System.out.println("服务端启动成功!");
    			// 轮询访问selector
    			while (true) {
    				// 当注册的事件到达时,方法返回;不然,该方法会一直阻塞
    				selector.select();
    				// 得到selector中选中的项的迭代器,选中的项为注册的事件
    				Iterator<?> ite = this.selector.selectedKeys().iterator();
    				while (ite.hasNext()) {
    					SelectionKey key = (SelectionKey) ite.next();
    					// 删除已选的key,以防重复处理
    					ite.remove();
    
    					handler(key);
    				}
    			}
    		}
    
    		/**
    		 * 处理请求
    		 *
    		 * @param key
    		 * @throws IOException
    		 */
    		public void handler(SelectionKey key) throws IOException {
    
    			// 客户端请求链接事件
    			if (key.isAcceptable()) {
    				handlerAccept(key);
    				// 得到了可读的事件
    			} else if (key.isReadable()) {
    				handelerRead(key);
    			}
    		}
    
    		/**
    		 * 处理链接请求
    		 *
    		 * @param key
    		 * @throws IOException
    		 */
    		public void handlerAccept(SelectionKey key) throws IOException {
    			ServerSocketChannel server = (ServerSocketChannel) key.channel();
    			// 得到和客户端链接的通道
    			SocketChannel channel = server.accept();
    			// 设置成非阻塞
    			channel.configureBlocking(false);
    
    			// 在这里能够给客户端发送信息哦
    			System.out.println("新的客户端链接");
    			// 在和客户端链接成功以后,为了能够接收到客户端的信息,须要给通道设置读的权限。
    			channel.register(this.selector, SelectionKey.OP_READ);
    		}
    
    		/**
    		 * 处理读的事件
    		 *
    		 * @param key
    		 * @throws IOException
    		 */
    		public void handelerRead(SelectionKey key) throws IOException {
    			// 服务器可读取消息:获得事件发生的Socket通道
    			SocketChannel channel = (SocketChannel) key.channel();
    			// 建立读取的缓冲区
    			ByteBuffer buffer = ByteBuffer.allocate(1024);
    			int read = channel.read(buffer);
    			if(read > 0){
    				byte[] data = buffer.array();
    				String msg = new String(data).trim();
    				System.out.println("服务端收到信息:" + msg);
    
    				//回写数据
    				ByteBuffer outBuffer = ByteBuffer.wrap("好的".getBytes());
    				channel.write(outBuffer);// 将消息回送给客户端
    			}else{
    				System.out.println("客户端关闭");
    				key.cancel();
    			}
    		}
    
    		/**
    		 * 启动服务端测试
    		 *
    		 * @throws IOException
    		 */
    		public static void main(String[] args) throws IOException {
    			NIOServer server = new NIOServer();
    			server.initServer(8000);
    			server.listen();
    		}
    
    	}
    访问
     telnet 127.0.0.1 10101
    
     send hello
相关文章
相关标签/搜索