AIO也叫NIO2.0或者NIO.2,随JDK1.7发布。NIO的API比较麻烦,易错,开发效率低。AIO经过回调函数的方式来表示异步通讯,API相对简单一些。java
AIO在Windows系统中底层使用IOCP这样系统级的支持,比NIO的性能要好。但Java的服务端程序不多将Windows系统做为生产服务器。而在Linux系统上(内核2.6以上),AIO的底层使用的依然是epoll技术,与NIO同样,只不过封装成异步IO的样子,简化了API而已。服务器
接下来经过一个客户端与服务端通讯的例子,来学习使用AIO。客户端每隔1秒向服务端发送请求,服务端响应并返回数据。能够与上一篇的NIO对比学习。异步
服务端:ide
package cn.testAio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; /** * @Description : TODO * @Author : houshuiqiang@163.com, 2017年10月3日 下午1:38:39 * @Modified :houshuiqiang@163.com, 2017年10月3日 */ public class AioDemoServer { public static void main(String[] args) { AioServer aioServer = new AioServer(8181); new Thread(aioServer, "aio-server-test").start(); } } class AioServer implements Runnable { private AsynchronousServerSocketChannel assChannel; private CountDownLatch cdl; public AioServer (int port) { try { assChannel = AsynchronousServerSocketChannel.open(); assChannel.bind(new InetSocketAddress(port)); cdl = new CountDownLatch(1); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } @Override public void run() { assChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel,AioServer>(){ // 注册接受接连的回调函数 @Override public void completed(AsynchronousSocketChannel result, AioServer attachment) { // 若是链接成功 assChannel.accept(attachment, this); // 等待下一个连接 ByteBuffer buffer = ByteBuffer.allocate(1024); // 1M的空间用来读取数据,实际可能会>1M,须要屡次注册读的回调函数 result.read(buffer, buffer, new ReadsCompletionHandler(result)); // 注册读取完成以后的回调函数 } @Override public void failed(Throwable exc, AioServer attachment) { // 若是链接失败 exc.printStackTrace(); attachment.cdl.countDown(); } }); try { cdl.await(); // 阻塞该线程,使Server保持运行 } catch (InterruptedException e) { e.printStackTrace(); } } } class ReadsCompletionHandler implements CompletionHandler<Integer, ByteBuffer>{ private AsynchronousSocketChannel channel; public ReadsCompletionHandler(AsynchronousSocketChannel channel){ this.channel = channel; } @Override public void completed(Integer result, ByteBuffer attachment) { try { String body = getBody(attachment); // 获取请求内容 String resultBody = handlerBody(channel, body); // 模拟处理请求,获得返回结果 write2Client(channel, resultBody); // 将结果返回给客户端 } catch (IOException e) { e.printStackTrace(); // ignore } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } private String getBody(ByteBuffer byteBuffer){ byteBuffer.flip(); byte[] body = new byte[byteBuffer.remaining()]; byteBuffer.get(body); return new String(body); } private String handlerBody(AsynchronousSocketChannel channel, String body) throws IOException{ String remoteAddress = channel.getRemoteAddress().toString(); System.out.println("message from client : " + remoteAddress + ", content: " + body); // 模拟请求处理 return "server received message: " + body; // 模拟返回处理结果 } private void write2Client(AsynchronousSocketChannel channel, String resultBody){ byte[] bytes = resultBody.getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); byteBuffer.put(bytes); byteBuffer.flip(); channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>(){ // 注册写完以后的回调函数 @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()) { channel.write(attachment, attachment, this); }else{ // 写完以后注册读,等待客户端再次请求 ByteBuffer buffer = ByteBuffer.allocate(1024); // 1M的空间用来读取数据,实际可能会>1M channel.read(buffer, buffer, new ReadsCompletionHandler(channel)); // 注册读取完成以后的回调函数 } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } }); } }
客户端:函数
package cn.testAio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * @Description : TODO * @Author : houshuiqiang@163.com, 2017年10月3日 下午3:16:10 * @Modified :houshuiqiang@163.com, 2017年10月3日 */ public class AioDemoClient { public static void main(String[] args) throws InterruptedException { AioClient aioClient = new AioClient("192.168.10.47", 8181); Thread aioClientThread = new Thread(aioClient, "aio-client-test"); aioClientThread.start(); for (int i = 0; i < 5; i++) { aioClient.getQueue().offer("time" + i); Thread.sleep(1000); } aioClient.stop(); aioClient = null; } } class AioClient implements Runnable{ private int port; private String address; private AsynchronousSocketChannel channel; private LinkedBlockingQueue<String> queue; public AioClient(String address, int port){ this.address = address; this.port = port; queue = new LinkedBlockingQueue<String>(); try { channel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop(){ if (channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } @Override public void run() { channel.connect(new InetSocketAddress(address, port), this, new CompletionHandler<Void, AioClient>(){ @Override public void completed(Void result, AioClient attachment) { try { sendRequest(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, AioClient attachment) { exc.printStackTrace(); } }); } public BlockingQueue<String> getQueue(){ return queue; } private void sendRequest() throws InterruptedException{ String requestBody = queue.take(); // 从队列中阻塞获取数据 byte[] bytes = requestBody.getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); byteBuffer.put(bytes); byteBuffer.flip(); channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>(){ // 注册写完以后的回调函数 @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()) { // 若是buffer尚未写完 channel.write(attachment, attachment, this); } else { readResult(); // 写完以后注册读的回调函数,等待服务端返回数据 } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } private void readResult(){ ByteBuffer readBuffer = ByteBuffer.allocate(1024); channel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>(){ @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] bytes = new byte[attachment.remaining()]; attachment.get(bytes); String body = new String(bytes); System.out.println("The msg from Server is :" + body); try { sendRequest(); // 读完以后,注册写的回调函数 } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } }