反应堆模式:“反应”器名字中”反应“的由来:数组
注意,Reactor的单线程模式的单线程主要是针对于I/O操做而言,也就是全部的I/O的accept()、read()、write()以及connect()操做都在一个线程上完成的。服务器
先简单介绍NIO中几个重要对象:
Selector并发
Channels
通道,被创建的一个应用程序和操做系统交互事件、传递内容的渠道(注意是链接到操做系统)。那么既然是和操做系统进行内容的传递,那么说明应用程序能够经过通道读取数据,也能够经过通道向操做系统写数据。socket
通道中的数据老是要先读到一个Buffer,或者老是要从一个Buffer中写入。ide
服务端处理器:性能
/** * 类说明:nio通讯服务端处理器 */ public class NioServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /** * 构造方法 * @param port 指定要监听的端口号 */ public NioServerHandle(int port) { try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); serverChannel.register(selector,SelectionKey.OP_ACCEPT); started = true; System.out.println("服务器已启动,端口号:"+port); } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { //循环遍历selector while(started){ try{ //阻塞,只有当至少一个注册的事件发生的时候才会继续. selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Throwable t){ t.printStackTrace(); } } //selector关闭后会自动释放里面管理的资源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //处理新接入的请求消息 if(key.isAcceptable()){ //得到关心当前事件的channel ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); //经过ServerSocketChannel的accept建立SocketChannel实例 //完成该操做意味着完成TCP三次握手,TCP物理链路正式创建 SocketChannel sc = ssc.accept(); System.out.println("======socket channel 创建链接" ); //设置为非阻塞的 sc.configureBlocking(false); //链接已经完成了,能够开始关心读事件了 sc.register(selector,SelectionKey.OP_READ); } //读消息 if(key.isReadable()){ System.out.println("======socket channel 数据准备完成," + "能够去读==读取======="); SocketChannel sc = (SocketChannel) key.channel(); //建立ByteBuffer,并开辟一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //读取请求码流,返回读取到的字节数 int readBytes = sc.read(buffer); //读取到字节,对字节进行编解码 if(readBytes>0){ //将缓冲区当前的limit设置为position=0, // 用于后续对缓冲区的读取操做 buffer.flip(); //根据缓冲区可读字节数建立字节数组 byte[] bytes = new byte[buffer.remaining()]; //将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String message = new String(bytes,"UTF-8"); System.out.println("服务器收到消息:" + message); //处理数据 String result = response(message) ; //发送应答消息 doWrite(sc,result); } //链路已经关闭,释放资源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } //发送应答消息 private void doWrite(SocketChannel channel,String response) throws IOException { //将消息编码为字节数组 byte[] bytes = response.getBytes(); //根据数组容量建立ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //将字节数组复制到缓冲区 writeBuffer.put(bytes); //flip操做 writeBuffer.flip(); //发送缓冲区的字节数组 channel.write(writeBuffer); } } public class NioServer { private static NioServerHandle nioServerHandle; public static void start(){ if(nioServerHandle !=null) nioServerHandle.stop(); nioServerHandle = new NioServerHandle(DEFAULT_PORT); new Thread(nioServerHandle,"Server").start(); } public static void main(String[] args){ start(); } }
客户端处理器:大数据
public class NioClientHandle implements Runnable{ private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean started; public NioClientHandle(String ip, int port) { this.host = ip; this.port = port; try { //建立选择器 selector = Selector.open(); //打开通道 socketChannel = SocketChannel.open(); //若是为 true,则此通道将被置于阻塞模式; // 若是为 false,则此通道将被置于非阻塞模式 socketChannel.configureBlocking(false); started = true; } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } //循环遍历selector while(started){ try { //阻塞,只有当至少一个注册的事件发生的时候才会继续 selector.select(); //获取当前有哪些事件可使用 Set<SelectionKey> keys = selector.selectedKeys(); //转换为迭代器 Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try { handleInput(key); } catch (IOException e) { e.printStackTrace(); if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } //selector关闭后会自动释放里面管理的资源 if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } //具体的事件处理方法 private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //得到关心当前事件的channel SocketChannel sc = (SocketChannel)key.channel(); if(key.isConnectable()){//链接事件 if(sc.finishConnect()){} else{System.exit(1);} } //有数据可读事件 if(key.isReadable()){ //建立ByteBuffer,并开辟一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //读取请求码流,返回读取到的字节数 int readBytes = sc.read(buffer); //读取到字节,对字节进行编解码 if(readBytes>0){ //将缓冲区当前的limit设置为position,position=0, // 用于后续对缓冲区的读取操做 buffer.flip(); //根据缓冲区可读字节数建立字节数组 byte[] bytes = new byte[buffer.remaining()]; //将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String result = new String(bytes,"UTF-8"); System.out.println("accept message:"+result); }else if(readBytes<0){ key.cancel(); sc.close(); } } } } //发送消息 private void doWrite(SocketChannel channel,String request) throws IOException { //将消息编码为字节数组 byte[] bytes = request.getBytes(); //根据数组容量建立ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //将字节数组复制到缓冲区 writeBuffer.put(bytes); //flip操做 writeBuffer.flip(); //发送缓冲区的字节数组 channel.write(writeBuffer); } private void doConnect() throws IOException { /*若是此通道处于非阻塞模式, 则调用此方法将启动非阻塞链接操做。 若是当即创建链接,就像本地链接可能发生的那样,则此方法返回true。 不然,此方法返回false, 稍后必须经过调用finishConnect方法完成链接操做。*/ if(socketChannel.connect(new InetSocketAddress(host,port))){} else{ //链接还未完成,因此注册链接就绪事件,向selector表示关注这个事件 socketChannel.register(selector,SelectionKey.OP_CONNECT); } } //写数据对外暴露的API public void sendMsg(String msg) throws Exception{ socketChannel.register(selector,SelectionKey.OP_READ); doWrite(socketChannel,msg); } } public class NioClient { private static NioClientHandle nioClientHandle; public static void start(){ if(nioClientHandle !=null) nioClientHandle.stop(); nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT); new Thread(nioClientHandle,"Client").start(); } //向服务器发送消息 public static boolean sendMsg(String msg) throws Exception{ nioClientHandle.sendMsg(msg); return true; } public static void main(String[] args) throws Exception { start(); System.out.println("请输入请求信息:"); Scanner scanner = new Scanner(System.in); while(NioClient.sendMsg(scanner.next())); } }
服务端过程:this
客户端过程:编码
初始化工做如打开selector,channel,设置通道模式是否阻塞.spa
但在单线程Reactor模式中,不只I/O操做在该Reactor线程上,连非I/O的业务操做也在该线程上进行处理了,这可能会大大延迟I/O请求的响应。因此咱们应该将非I/O的业务逻辑操做从Reactor线程上卸载,以此来加速Reactor线程对I/O请求的响应.
添加了一个工做者线程池,并将非I/O操做从Reactor线程中移出转交给工做者线程池来执行。这样可以提升Reactor线程的I/O响应,不至于由于一些耗时的业务逻辑而延迟对后面I/O请求的处理。
改进的版本中,因此的I/O操做依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操做。
对于一些小容量应用场景,可使用单线程模型。可是对于高负载、大并发或大数据量的应用场景却不合适,主要缘由以下:
Reactor线程池中的每一Reactor线程都会有本身的Selector、线程和分发的事件循环逻辑。
mainReactor能够只有一个,但subReactor通常会有多个。mainReactor线程主要负责接收客户端的链接请求,而后将接收到的SocketChannel传递给subReactor,由subReactor来完成和客户端的通讯。
流程:
注意,因此的I/O操做(包括,I/O的accept()、read()、write()以及connect()操做)依旧仍是在Reactor线程(mainReactor线程 或 subReactor线程)中完成的。Thread Pool(线程池)仅用来处理非I/O操做的逻辑。
多Reactor线程模式将“接受客户端的链接请求”和“与该客户端的通讯”分在了两个Reactor线程来完成。mainReactor完成接收客户端链接请求的操做,它不负责与客户端的通讯,而是将创建好的链接转交给subReactor线程来完成与客户端的通讯,这样一来就不会由于read()数据量太大而致使后面的客户端链接请求得不到即时处理的状况。而且多Reactor线程模式在海量的客户端并发请求的状况下,还能够经过实现subReactor线程池来将海量的链接分发给多个subReactor线程,在多核的操做系统中这能大大提高应用的负载和吞吐量。
Netty服务端使用了“多Reactor线程模式”