咱们知道NIO是同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的链接请求都会注册到多路复用器上,多路复用器轮询到链接有I/O请求时才启动一个线程进行处理。
而AIO则是则是异步非阻塞的,而且提供了异步文件通道和异步套接字通道的实现。主要经过两种方式获取操做的结果:java
public class TimeServer { public static void main(String[] args){ int port = 8080; if (args != null && args.length >0){ try{ port = Integer.parseInt(args[0]); }catch (NumberFormatException e){ } } AsyncTimeServerHandler timeServerHandler = new AsyncTimeServerHandler(port); new Thread(timeServerHandler,"AIO-AsyncTimeServerHandler-001").start(); } }
首先建立异步的时间服务处理器,而后启动线程将异步时间服务Handler拉起编程
public class AsyncTimeServerHandler implements Runnable{ private int port; CountDownLatch countDownLatch; AsynchronousServerSocketChannel channel; public AsyncTimeServerHandler(int port) { this.port = port; try { channel = AsynchronousServerSocketChannel.open(); channel.bind(new InetSocketAddress(port)); }catch (IOException e){ e.printStackTrace(); } } public void run() { countDownLatch = new CountDownLatch(1); doAccept(); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public void doAccept(){ channel.accept(this,new AcceptCompletionHandler()); } }
在构造方法中,咱们建立了一个一步的Channel,而后调用bind方法绑定了监听的端口。
在run方法中咱们初始化了一个CountDownLatch对象,是为了在完成一组正在执行的操做以前,线程一直阻塞在那儿
在doAccept方法中接收客户端的链接,咱们能够传递一个handler示例接受accept操做成功的通知消息,其代码以下:服务器
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler>{ @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { attachment.channel.accept(attachment,this); ByteBuffer buffer = ByteBuffer.allocate(1024); result.read(buffer,buffer,new ReadCompltetionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.countDownLatch.countDown(); } }
其中,咱们在complete方法中继续调用了accept方法,是为了有新的客户端接入成功,由于一个AsynchronousServerSocketChannel能够接受成千上万个客户端
而链路创建成功之后,服务端能够接受客户端的请求消息了,经过read方法进行异步读操做,其中传入了一个Handler,接受通知回调业务。其代码以下网络
public class ReadCompltetionHandler implements CompletionHandler<Integer,ByteBuffer>{ private AsynchronousSocketChannel channel; public ReadCompltetionHandler(AsynchronousSocketChannel channel) { if (channel == null) this.channel = channel; } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] body = new byte[attachment.remaining()]; attachment.get(body); try { String req = new String(body,"UTF-8"); String curentTime = "QUERY TIME ORDER".equalsIgnoreCase(req)?new Date( System.currentTimeMillis() ).toString():"BAD ORDER"; doWrite(curentTime); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } private void doWrite(String currentTime){ if (!StringUtil.isNullOrEmpty(currentTime)){ byte[] bytes = currentTime.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); buffer.flip(); channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { if (buffer.hasRemaining()){ channel.write(buffer,buffer,this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
其中具体的处理逻辑和NIO的TimeServer相同,不作详细分析了异步
public class TimeClient { public static void main(String[] args){ int port = 8080; if (args != null && args.length >0){ try{ port = Integer.parseInt(args[0]); }catch (NumberFormatException e){ } } AsyncTimeClientHandler timeClientHandler = new AsyncTimeClientHandler("127.0.0.1",port); new Thread(timeClientHandler,"AIO-AsyncTimeClientHandler-001").start(); } }
在其中咱们经过一个I/O线程建立一步时间服务器客户端Handler,具体代码以下:ide
public class AsyncTimeClientHandler implements CompletionHandler<Void,AsyncTimeClientHandler>,Runnable{ private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch countDownLatch; public AsyncTimeClientHandler(String host, int port) { this.host = host; this.port = port; try { client = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void completed(Void result, AsyncTimeClientHandler attachment) { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer buffer = ByteBuffer.allocate(req.length); buffer.put(req); buffer.flip(); client.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()){ client.write(attachment,attachment,this); }else{ ByteBuffer readBuffer = ByteBuffer.allocate(1024); client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { byte[] bytes = new byte[attachment.remaining()]; attachment.get(bytes); try { String body = new String(bytes,"UTF-8"); System.out.print(body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); countDownLatch.countDown(); } catch (IOException e) { e.printStackTrace(); } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); countDownLatch.countDown(); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, AsyncTimeClientHandler attachment) { exc.printStackTrace(); try { client.close(); countDownLatch.countDown(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { countDownLatch = new CountDownLatch(1); client.connect(new InetSocketAddress(host,port),this,this); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } try { client.close(); } catch (IOException e) { e.printStackTrace(); } } }