目录java
BIO设计模式
server端:服务器
client端:异步
NIOsocket
单线程模型ide
代码实现测试
AIOthis
单线程AIO代码实现操作系统
当肯定客户端链接数不多时,BIO也可使用,简单不易出错,效率低下不表明没有用武之地。
BIO的accept,read,write都是阻塞的,一个线程老在那阻塞着,其实它没干事,可是它脑门上贴着一个“I`m busying”我很忙,效率极其低下,即便用线程池还不是一堆线程在那"Busy"
import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; public class Server { public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(); ss.bind(new InetSocketAddress("127.0.0.1", 8888)); while(true) { Socket s = ss.accept(); //阻塞方法 new Thread(() -> { handle(s); }).start(); } } static void handle(Socket s) { try { byte[] bytes = new byte[1024]; int len = s.getInputStream().read(bytes); System.out.println(new String(bytes, 0, len)); s.getOutputStream().write(bytes, 0, len); s.getOutputStream().flush(); } catch (IOException e) { e.printStackTrace(); } } }
import java.io.IOException; import java.io.OutputStream; import java.net.Socket; public class Client { public static void main(String[] args) throws IOException { Socket s = new Socket("127.0.0.1", 8888); s.getOutputStream().write("HelloServer".getBytes()); s.getOutputStream().flush(); //s.getOutputStream().close(); System.out.println("write over, waiting for msg back..."); byte[] bytes = new byte[1024]; int len = s.getInputStream().read(bytes); System.out.println(new String(bytes, 0, len)); s.close(); } }
selector选择器大管家,专门作监听这件事,监听链接、读、写事件。
只须要一个链接就能够处理客户端的链接,客户端的读,客户端的写,selector所有都能处理。
使用到的设计模式就是observer观察者模式。
selector说:我对某些事感兴趣,哪些事呢?关于客户端的链接、读、写这些事我通通感兴趣!我会每隔一段时间就去检查server这些事件有没有发生,若是有一个客户端要链接了,selector就帮助客户端和server创建链接,来一个就帮忙创建一个链接,来一个建一个。selector除了负责客户端的链接以外,还会盯着已经链接好的客户端通道有没有须要读和写的数据,须要读就读过来,须要写就写出去。
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class Server { public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8888)); ssc.configureBlocking(false); System.out.println("server started, listening on :" + ssc.getLocalAddress()); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); while(true) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); handle(key); } } } private static void handle(SelectionKey key) { if(key.isAcceptable()) { try { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //new Client // //String hostIP = ((InetSocketAddress)sc.getRemoteAddress()).getHostString(); /* log.info("client " + hostIP + " trying to connect"); for(int i=0; i<clients.size(); i++) { String clientHostIP = clients.get(i).clientAddress.getHostString(); if(hostIP.equals(clientHostIP)) { log.info("this client has already connected! is he alvie " + clients.get(i).live); sc.close(); return; } }*/ sc.register(key.selector(), SelectionKey.OP_READ ); } catch (IOException e) { e.printStackTrace(); } finally { } } else if (key.isReadable()) { //flip SocketChannel sc = null; try { sc = (SocketChannel)key.channel(); ByteBuffer buffer = ByteBuffer.allocate(512); buffer.clear(); int len = sc.read(buffer); if(len != -1) { System.out.println(new String(buffer.array(), 0, len)); } ByteBuffer bufferToWrite = ByteBuffer.wrap("HelloClient".getBytes()); sc.write(bufferToWrite); } catch (IOException e) { e.printStackTrace(); } finally { if(sc != null) { try { sc.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
引入了线程池,selector领着一帮工人在干活,成了一个包工头,再也不是一我的单打独斗了。
selector就是一个boss,只负责客户端的链接,线程池就是worker,须要读写了就交给worker来处理,worker中谁闲着就交给谁来处理,而不是像NIO单线程模型同样有读有写了就new一个工人出来。
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class PoolServer { ExecutorService pool = Executors.newFixedThreadPool(50); private Selector selector; //中文测试 /** * * @throws IOException */ public static void main(String[] args) throws IOException { PoolServer server = new PoolServer(); server.initServer(8000); server.listen(); } /** * * @param port * @throws IOException */ public void initServer(int port) throws IOException { // ServerSocketChannel serverChannel = ServerSocketChannel.open(); // serverChannel.configureBlocking(false); // serverChannel.socket().bind(new InetSocketAddress(port)); // this.selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务端启动成功!"); } /** * * @throws IOException */ @SuppressWarnings("unchecked") public void listen() throws IOException { // 轮询访问selector while (true) { // selector.select(); // Iterator ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // ite.remove(); // if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); // SocketChannel channel = server.accept(); // channel.configureBlocking(false); // channel.register(this.selector, SelectionKey.OP_READ); // } else if (key.isReadable()) { // key.interestOps(key.interestOps()&(~SelectionKey.OP_READ)); // pool.execute(new ThreadHandlerChannel(key)); } } } } } /** * * @param * @throws IOException */ class ThreadHandlerChannel extends Thread{ private SelectionKey key; ThreadHandlerChannel(SelectionKey key){ this.key=key; } @Override public void run() { // SocketChannel channel = (SocketChannel) key.channel(); // ByteBuffer buffer = ByteBuffer.allocate(1024); // ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { int size = 0; while ((size = channel.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(),0,size); buffer.clear(); } baos.close(); // byte[] content=baos.toByteArray(); ByteBuffer writeBuf = ByteBuffer.allocate(content.length); writeBuf.put(content); writeBuf.flip(); channel.write(writeBuf);// if(size==-1){ channel.close(); }else{ // key.interestOps(key.interestOps()|SelectionKey.OP_READ); key.selector().wakeup(); } }catch (Exception e) { System.out.println(e.getMessage()); } } }AIO
NIO须要不停地轮询server有没有事件发生。
AIO大概是这样的:有客户端要链接的时候,交给操做系统OS去链接,OS一旦链接上客户端,就会给大管家selector发一消息说有人要连上来,要不要给它连,大管家能够有多个,但通常是一个,它只负责链接客户端,跟NIO不一样的是,selector只须要在那里坐着等就行,不用转圈去轮询server。
AIO与NIO底层都是使用OS的epoll系统调用函数实现的,epoll就是轮询,而NIO也是轮询,因此netty就直接封装了NIO,其API封装的更像是AIO。
Windows实现的AIO是真正的AIO异步IO,效率要比Linux要高。可是服务器大多数选择的是Linux而非Windows。
AIO用到的设计模式是钩子函数(模板方法)。
并非线程数越多越好,线程数太多会增长线程切换的开销。
IO都是OS操做系统来实现的,要否则你程序能够随便往硬盘里写点东西,不通过OS老大也太放肆了吧?
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class Server { public static void main(String[] args) throws Exception { final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open() .bind(new InetSocketAddress(8888)); serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(AsynchronousSocketChannel client, Object attachment) { serverChannel.accept(null, this); try { System.out.println(client.getRemoteAddress()); ByteBuffer buffer = ByteBuffer.allocate(1024); client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); System.out.println(new String(attachment.array(), 0, result)); client.write(ByteBuffer.wrap("HelloClient".getBytes())); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); while (true) { Thread.sleep(1000); } } }
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ServerWithThreadGroup { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); //中文测试 final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(threadGroup) .bind(new InetSocketAddress(8888)); serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(AsynchronousSocketChannel client, Object attachment) { serverChannel.accept(null, this); try { System.out.println(client.getRemoteAddress()); ByteBuffer buffer = ByteBuffer.allocate(1024); client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); System.out.println(new String(attachment.array(), 0, result)); client.write(ByteBuffer.wrap("HelloClient".getBytes())); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); while (true) { Thread.sleep(1000); } } }