1、概要java
什么是NIO Buffer Channel 网络编程 AIO 为何须要了解NIO和AIO?
什么是NIO编程
NIO是New I/O的简称,与旧式的基于流的I/O方法相对,从名字看,它表示新的一套Java I/O标 准。它是在Java 1.4中被归入到JDK中的,并具备如下特性: – NIO是基于块(Block)的,它以块为基本单位处理数据 – 为全部的原始类型提供(Buffer)缓存支持 – 增长通道(Channel)对象,做为新的原始 I/O 抽象 – 支持锁和内存映射文件的文件访问接口 – 提供了基于Selector的异步网络I/O
Buffer && Channel缓存
import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.Charset; /** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/4 11:14 PM */ public class DelayMain { public static void main(String[] args) throws IOException { //把DelayMain.java复制一份给DelayMains.java nioCopyFile("/Users/heliming/IdeaProjects/democloud/jvm/target/classes/DelayMain.class","/Users/heliming/IdeaProjects/democloud/jvm/target/classes/DelayMains.class"); test2(); // test3(); test4(); } public static void nioCopyFile(String resource, String destination) throws IOException { FileInputStream fis = new FileInputStream(resource); FileOutputStream fos = new FileOutputStream(destination); FileChannel readChannel = fis.getChannel(); FileChannel writeChannel = fos.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); while (true) { buffer.clear(); int len = readChannel.read(buffer); if (len == -1) { break; //读取完毕 } buffer.flip(); //写入文件 writeChannel.write(buffer); } readChannel.close(); writeChannel.close(); } public static void test4() throws IOException { RandomAccessFile raf = new RandomAccessFile("/Users/heliming/IdeaProjects/democloud/jvm/src/main/java/DelayMain.java", "rw"); FileChannel fc = raf.getChannel(); //将文件映射到内存中 MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()); //中文 Charset charset = Charset.defaultCharset(); CharBuffer charBuffer = charset.decode(mbb); while (charBuffer.hasRemaining()) { System.out.print((char) charBuffer.get()); } mbb.put(0, (byte) 98); //修改文件 raf.close(); } public static void test2() throws IOException { //2. ByteBuffer b = ByteBuffer.allocate(15); //15个字节大小的缓冲区 System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); for (int i = 0; i < 10; i++) { //存入10个字节数据 b.put((byte) i); } System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); b.flip(); //重置position System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); for (int i = 0; i < 5; i++) { System.out.print(b.get()); } System.out.println(); System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); b.flip(); System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); } public static void test3() throws IOException { RandomAccessFile raf = new RandomAccessFile("/Users/heliming/IdeaProjects/democloud/jvm/src/main/java/DelayMain.java", "rw"); FileChannel fc = raf.getChannel(); //将文件映射到内存中 MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()); while (mbb.hasRemaining()) { System.out.print((char) mbb.get()); } mbb.put(0, (byte) 98); //修改文件 raf.close(); } }
网络编程服务器
服务端网络
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * description: 服务端 * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/5 6:12 PM */ public class EchoServer { private static ExecutorService tp = Executors.newCachedThreadPool(); public static void main(String args[]) { ServerSocket echoServer = null; Socket clientSocket = null; try { echoServer = new ServerSocket(8000); } catch (IOException e) { System.out.println(e); } while (true) { try { clientSocket = echoServer.accept(); System.out.println(clientSocket.getRemoteSocketAddress() + " connect!"); tp.execute(new HandleMsg(clientSocket)); } catch (IOException e) { System.out.println(e); } } } static class HandleMsg implements Runnable { private Socket clientSocket ; public HandleMsg(Socket socket) { this.clientSocket = socket; } public void run() { PrintWriter os = null; BufferedReader is = null; try { is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); os = new PrintWriter(clientSocket.getOutputStream(), true);// 从InputStream当中读取客户端所发送的数据 String inputLine = null; long b = System.currentTimeMillis(); while ((inputLine = is.readLine()) != null) { os.println(inputLine); } long e = System.currentTimeMillis(); System.out.println("spend:" + (e - b) + "ms"); } catch (IOException e) { e.printStackTrace(); } finally { // 关闭资源 try { is.close(); os.close(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
客户端app
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; /** * description: 客户端 * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/5 9:21 PM */ public class EchoServerclient { public static void main(String[] args) throws IOException { Socket client = null; PrintWriter writer = null; BufferedReader reader = null; try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream(), true); writer.println("Hello!"); writer.flush(); reader = new BufferedReader(new InputStreamReader(client.getInputStream())); System.out.println("from server: " + reader.readLine()); } catch (Exception e) { } finally { //资源关闭 client.close(); writer.close(); reader.close(); } } }
网络编程-模拟低效的客户端dom
修改客户端代码异步
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.LockSupport; /** * description: 客户端 * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/5 9:21 PM */ public class EchoServerclient { private static final int sleep_time = 1000 * 1000 * 1000; private static ExecutorService tp = Executors.newCachedThreadPool(); public static void main(String[] args) { for(int i =0 ;i<10;i++){ tp.execute(new EchoClient()); } tp.shutdown(); } public static class EchoClient implements Runnable { Socket client = null; PrintWriter writer = null; BufferedReader reader = null; public void run() { try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream(), true); writer.print("H"); LockSupport.parkNanos(sleep_time); writer.print("e"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("o"); LockSupport.parkNanos(sleep_time); writer.print("!"); LockSupport.parkNanos(sleep_time); writer.println(); writer.flush(); reader = new BufferedReader(new InputStreamReader(client.getInputStream())); System.out.println("from server: " + reader.readLine()); } catch (Exception e) { } finally { //资源关闭 try { client.close(); writer.close(); reader.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服务器输出以下:jvm
spend:6038ms spend:6038ms spend:6040ms spend:6041ms spend:6042ms spend:6043ms spend:6043ms spend:6043ms spend:6045ms spend:6046ms
网络编程-NIOsocket
服务端代码替换为:
/** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/5 9:15 PM */ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class NIOEchoServer { private static ExecutorService tp = Executors.newCachedThreadPool(); public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>(); private static ServerSocketChannel ssc = null; private static Selector selector = null; private static final int PORT = 8000; public static void startServer() throws IOException { ssc = ServerSocketChannel.open(); selector = Selector.open(); ssc.configureBlocking(false); final ServerSocket serverSocket = ssc.socket(); serverSocket.bind(new InetSocketAddress(PORT)); ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { int n = selector.select(); if (n == 0) continue; final Set<SelectionKey> readyKeys = selector.selectedKeys(); final Iterator<SelectionKey> it = readyKeys.iterator(); long e = 0; while (it.hasNext()) { final SelectionKey key = it.next(); it.remove(); if(key.isAcceptable()){ doAccept(key); }else if(key.isValid() && key.isReadable()){ if (!geym_time_stat.containsKey(((SocketChannel) key .channel()).socket())) { geym_time_stat.put( ((SocketChannel) key.channel()).socket(), System.currentTimeMillis()); } doRead(key); }else if (key.isValid() && key.isWritable()) { doWrite(key); e = System.currentTimeMillis(); long b = geym_time_stat.remove(((SocketChannel) key .channel()).socket()); System.out.println("spend:" + (e - b) + "ms"); } } } } private static void doWrite(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); EchoClient echoClient = (EchoClient) key.attachment(); LinkedList<ByteBuffer> outq = echoClient.getOutputQueue(); ByteBuffer bb = outq.getLast(); try { int len = channel.write(bb); if (len == -1) { disconnect(key); return; } if (bb.remaining() == 0) { outq.removeLast(); } } catch (Exception e) { disconnect(key); } if (outq.size() == 0) { key.interestOps(SelectionKey.OP_READ); } } private static void doRead(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer bb = ByteBuffer.allocate(8192); int len; try { len = channel.read(bb); if (len < 0) { disconnect(key); return; } } catch (Exception e) { disconnect(key); return; } bb.flip(); tp.execute(new NIOEchoServer.HandleMsg(key, bb)); } private static void disconnect(SelectionKey sk) { try { sk.channel().close(); } catch (IOException e) { e.printStackTrace(); } } static class HandleMsg implements Runnable { SelectionKey sk; ByteBuffer bb; public HandleMsg(SelectionKey sk, ByteBuffer bb) { super(); this.sk = sk; this.bb = bb; } @Override public void run() { EchoClient echoClient = (EchoClient) sk.attachment(); echoClient.enqueue(bb); sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); //强迫selector当即返回 selector.wakeup(); } } private static void doAccept(SelectionKey key) { SocketChannel clientChannel= null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); try { clientChannel = server.accept(); clientChannel.configureBlocking(false); SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ); EchoClient echoClient = new EchoClient(); clientKey.attach(echoClient); InetAddress clientAddress = clientChannel.socket().getInetAddress(); System.out.println("Acception connection from "+ clientAddress.getHostAddress()+" ! "); } catch (IOException e) { System.out.println( " Failed to accept new client."); e.printStackTrace(); } } public static void main(String[] args) throws IOException { NIOEchoServer.startServer(); } } class EchoClient { private LinkedList<ByteBuffer> outq; public EchoClient() { this.outq = new LinkedList<ByteBuffer>(); } public LinkedList<ByteBuffer> getOutputQueue(){ return outq; } public void enqueue(ByteBuffer bb){ outq.addFirst(bb); } }
服务器输出以下:
spend:7ms spend:2ms spend:2ms spend:4ms spend:1ms spend:1ms spend:1ms spend:0ms spend:0ms spend:0ms
网络编程 AIO
读完了再通知我 不会加快IO,只是在读完后进行通知 使用回调函数,进行业务处理
服务器
/** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/5 11:43 PM */ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.Future; public class Server { public static void main(String[] args) { try { Server server = new Server(); } catch (Exception e) { e.printStackTrace(); } } public Server() throws Exception { AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8000); serverSocketChannel.bind(inetSocketAddress); Future<AsynchronousSocketChannel> accept; while (true) { // accept()不会阻塞。 accept = serverSocketChannel.accept(); System.out.println("================="); System.out.println("服务器等待链接..."); AsynchronousSocketChannel socketChannel = accept.get();// get()方法将阻塞。 System.out.println("服务器接受链接"); System.out.println("服务器与" + socketChannel.getRemoteAddress() + "创建链接"); ByteBuffer buffer = ByteBuffer.wrap("zhangphil".getBytes()); Future<Integer> write=socketChannel.write(buffer); while(!write.isDone()) { Thread.sleep(10); } System.out.println("服务器发送数据完毕."); socketChannel.close(); } } }
客户端
/** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/4 8:07 PM */ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.Future; public class Client { public static void main(String[] args) { AsynchronousSocketChannel socketChannel = null; try { socketChannel = AsynchronousSocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8000); Future<Void> connect = socketChannel.connect(inetSocketAddress); while (!connect.isDone()) { Thread.sleep(10); } System.out.println("创建链接" + socketChannel.getRemoteAddress()); ByteBuffer buffer = ByteBuffer.allocate(1024); Future<Integer> read = socketChannel.read(buffer); while (!read.isDone()) { Thread.sleep(10); } System.out.println("接收服务器数据:" + new String(buffer.array(), 0, read.get())); } catch (Exception e) { e.printStackTrace(); } } }
Future接口 public V get(long timeout, TimeUnit unit)调用 private int awaitDone(boolean timed, long nanos)调用 LockSupport类的park方法和unpark方法调用 UNSAFE类的park方法和unpark方法调用 native方法阻塞当前线程。 阻塞park方法,解除阻塞unpark方法 Unsafe(提供CAS操做) LockSupport(提供park/unpark操做) 最终都会回到操做系统的cas操做