一、reactor(反应器)模式html
使用单线程模拟多线程,提升资源利用率和程序的效率,增长系统吞吐量。下面例子比较形象的说明了什么是反应器模式:java
一个老板经营一个饭店,react
传统模式 - 来一个客人安排一个服务员招呼,客人很满意;(至关于一个链接一个线程)服务器
后来客人愈来愈多,须要的服务员愈来愈多,资源条件不足以再请更多的服务员了,传统模式已经不能知足需求。老板之因此为老板天然有过人之处,老板发现,服务员在为客人服务时,当客人点菜的时候,服务员基本处于等待状态,(阻塞线程,不作事)。网络
因而乎就让服务员在客人点菜的时候,去为其余客人服务,当客人菜点好后再招呼服务员便可。 --反应器(reactor)模式诞生了多线程
饭店的生意红红火火,几个服务员就足以支撑大量的客流量,老板用有限的资源赚了更多的money~~~~^_^app
二、NIO中的重要概念 通道、缓冲区、选择器异步
通道:相似于流,可是能够异步读写数据(流只能同步读写),通道是双向的,(流是单向的),通道的数据老是要先读到一个buffer 或者 从一个buffer写入,即通道与buffer进行数据交互。socket
通道类型: ide
FileChannel比较特殊,它能够与通道进行数据交互, 不能切换到非阻塞模式,套接字通道能够切换到非阻塞模式;
缓冲区 - 本质上是一块能够存储数据的内存,被封装成了buffer对象而已!
缓冲区类型:
经常使用方法:
缓冲区的一些属性:
切换到读模式时,position会被置为0,表示当前读的位置
选择器:至关于一个观察者,用来监听通道感兴趣的事件,一个选择器能够绑定多个通道;
通道向选择器注册时,须要指定感兴趣的事件,选择器支持如下事件:
若是你对不止一种事件感兴趣,那么能够用“位或”操做符将常量链接起来,以下:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
通道向选择器注册时,会返回一个 SelectionKey对象,具备以下属性
用“位与”操做interest 集合和给定的SelectionKey常量,能够肯定某个肯定的事件是否在interest 集合中。
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
ready 集合是通道已经准备就绪的操做的集合。在一次选择(Selection)以后,你会首先访问这个ready set。Selection将在下一小节进行解释。能够这样访问ready集合:
int readySet = selectionKey.readyOps();
也可使用如下四个方法获取已就绪事件,返回值为boolean:
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
能够将一个对象或者更多信息附着到SelectionKey上,即记录在附加对象上,方法以下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
能够经过选择器的select方法获取是否有就绪的通道;
返回值表示上次执行select以后,就绪通道的个数。
能够经过selectedKeySet获取已就绪的通道。返回值是SelectionKey 的集合,处理完相应的通道以后,须要removed 由于Selector不会本身removed
select阻塞后,能够用wakeup唤醒;执行wakeup时,若是没有阻塞的select 那么执行完wakeup后下一个执行select就会当即返回。
调用close() 方法关闭selector
下面是一个简单的实例代码,帮助理解上面的内容:
package com.pt.nio; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.Iterator; import java.util.Set; public class Reactor implements Runnable { public int id = 100001; public int bufferSize = 2048; @Override public void run() { // TODO Auto-generated method stub init(); } public void init() { try { // 建立通道和选择器 ServerSocketChannel socketChannel = ServerSocketChannel.open(); Selector selector = Selector.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress( InetAddress.getLocalHost(), 4700); socketChannel.socket().bind(inetSocketAddress); // 设置通道非阻塞 绑定选择器 socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_ACCEPT).attach( id++); System.out.println("Server started .... port:4700"); listener(selector); } catch (Exception e) { // TODO: handle exception } } public void listener(Selector in_selector) { try { while (true) { Thread.sleep(1*1000); in_selector.select(); // 阻塞 直到有就绪事件为止 Set<SelectionKey> readySelectionKey = in_selector .selectedKeys(); Iterator<SelectionKey> it = readySelectionKey.iterator(); while (it.hasNext()) { SelectionKey selectionKey = it.next(); // 判断是哪一个事件 if (selectionKey.isAcceptable()) {// 客户请求链接 System.out.println(selectionKey.attachment() + " - 接受请求事件"); // 获取通道 接受链接, // 设置非阻塞模式(必须),同时须要注册 读写数据的事件,这样有消息触发时才能捕获 ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey .channel(); serverSocketChannel .accept() .configureBlocking(false) .register( in_selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE).attach(id++); System.out .println(selectionKey.attachment() + " - 已链接"); // 下面这种写法是有问题的 不该该在serverSocketChannel上面注册 /* * serverSocketChannel.configureBlocking(false); * serverSocketChannel.register(in_selector, * SelectionKey.OP_READ); * serverSocketChannel.register(in_selector, * SelectionKey.OP_WRITE); */ } if (selectionKey.isReadable()) {// 读数据 System.out.println(selectionKey.attachment() + " - 读数据事件"); SocketChannel clientChannel=(SocketChannel)selectionKey.channel(); ByteBuffer receiveBuf = ByteBuffer.allocate(bufferSize); clientChannel.read(receiveBuf); System.out.println(selectionKey.attachment() + " - 读取数据:" + getString(receiveBuf)); } if (selectionKey.isWritable()) {// 写数据 System.out.println(selectionKey.attachment() + " - 写数据事件"); SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); ByteBuffer sendBuf = ByteBuffer.allocate(bufferSize); String sendText = "hello\n"; sendBuf.put(sendText.getBytes()); sendBuf.flip(); //写完数据后调用此方法 clientChannel.write(sendBuf); } if (selectionKey.isConnectable()) { System.out.println(selectionKey.attachment() + " - 链接事件"); } // 必须removed 不然会继续存在,下一次循环还会进来, // 注意removed 的位置,针对一个.next() remove一次 it.remove(); } } } catch (Exception e) { // TODO: handle exception System.out.println("Error - " + e.getMessage()); e.printStackTrace(); } } /** * ByteBuffer 转换 String * @param buffer * @return */ public static String getString(ByteBuffer buffer) { String string = ""; try { for(int i = 0; i<buffer.position();i++){ string += (char)buffer.get(i); } return string; } catch (Exception ex) { ex.printStackTrace(); return ""; } } }
package com.pt.bio; import java.io.*; import java.net.*; public class BioServer implements Runnable { @Override public void run() { // TODO Auto-generated method stub System.out.println("Hello Server!!"); try { ServerSocket server = null; try { server = new ServerSocket(4700); // 建立一个ServerSocket在端口4700监听客户请求 } catch (Exception e) { System.out.println("can not listen to:" + e); // 出错,打印出错信息 } Socket socket = null; try { socket = server.accept(); // 使用accept()阻塞等待客户请求,有客户 // 请求到来则产生一个Socket对象,并继续执行 } catch (Exception e) { System.out.println("Error." + e); // 出错,打印出错信息 } String line; BufferedReader is = new BufferedReader(new InputStreamReader( socket.getInputStream())); // 由Socket对象获得输入流,并构造相应的BufferedReader对象 // 由Socket对象获得输出流,并构造PrintWriter对象 // BufferedReader sin = new BufferedReader(new InputStreamReader( // System.in)); // 由系统标准输入设备构造BufferedReader对象 System.out.println("Client:" + is.readLine()); PrintWriter os = new PrintWriter(socket.getOutputStream()); // 在标准输出上打印从客户端读入的字符串 line = "hello"; // 从标准输入读入一字符串 // while (!line.equals("bye")) { // 若是该字符串为 "bye",则中止循环 os.println(line); // 向客户端输出该字符串 os.flush(); // 刷新输出流,使Client立刻收到该字符串 // System.out.println("Server:" + line); // 在系统标准输出上打印读入的字符串 // System.out.println("Client:" + is.readLine()); // 从Client读入一字符串,并打印到标准输出上 // line = sin.readLine(); // 从系统标准输入读入一字符串 // } // 继续循环 // os.close(); // 关闭Socket输出流 is.close(); // 关闭Socket输入流 socket.close(); // 关闭Socket server.close(); // 关闭ServerSocket } catch (Exception e) { System.out.println("Error." + e); // 出错,打印出错信息 } } }
package com.pt; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import org.junit.Test; import com.pt.bio.BioServer; import com.pt.nio.Reactor; public class TestReactor { @Test public void testConnect() throws Exception{ Socket socket=new Socket("192.168.82.35",4700);//BIO 阻塞 System.out.println("链接成功"); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //下面这种写法,不用关闭客户端,服务器端也是能够收到的 { PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true); printWriter.println("hi"); printWriter.flush(); } //这种写法必须关闭客户端,服务器端才能够收到 NIO不用 { // socket.getOutputStream().write(new byte[]{'h','i'}); // socket.getOutputStream().flush(); //必须关闭BIO服务器才能收到消息.NIO服务器不须要关闭 //socket.close(); } byte[] buf = new byte[2048]; System.out.println("准备读取数据~~"); while(true){ try { //两种读取数据方式 int count = socket.getInputStream().read(buf); //会阻塞 //String readFromServer = bufferedReader.readLine();//能够读取到数据 会阻塞,直到碰见\n //System.out.println("方式二: 读取数据" + readFromServer); System.out.println("方式一: 读取数据" + new String(buf) + " count = " + count); Thread.sleep(1*1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //break; } } @Test public void testNioServer(){ Thread server = new Thread(new Reactor()); server.start(); while(true){ try { Thread.sleep(3*1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } @Test public void testBioServer(){ Thread server = new Thread(new BioServer()); server.start(); while(true){ try { Thread.sleep(3*1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
其中 testNioServer()方法,是启动NIO服务器端;
testBioServer()方法是启动BIO服务器端
testConnect()是BIO的一个链接
基于NIO实现的时钟服务器:http://www.cnblogs.com/tengpan-cn/p/6529628.html
一篇写的比较详细的JAVA NIO的文章:http://www.iteye.com/magazines/132-Java-NIO