AIO是在NIO基础上实现的异步非阻塞通讯java
Windows下提供了IOCP技术,I/O Completion Port,称为I/O完成端口。IOCP是一个消息队列。当监听到客户请求的时候就把请求加到消息队列中。而后已有的线程去逐一处理,处理完成后须要获得反馈的工做线程就会收到通知,而后前去处理。当没有请求加入到消息队列的时候,相应的线程也就处理挂起的状态进行等待。异步
因此Windows下算是有实际意义上的异步非阻塞async
同步异步是消息通讯的机制
阻塞非阻塞是事件处理ide
阻塞:死等着被调用方回信,中间什么不干
非阻塞:没收到被调用方回信时,中间干点别的
同步:一下子一趟,没有结果就反复问
异步:问完等对方通知反馈测试
同步阻塞:到服务台反复问讯,死等服务生反馈this
同步非阻塞:到服务台反复问询,在等服务生反馈期间玩手机.net
异步阻塞:到服务台问讯,死等服务生反馈,服务生确认后通知我线程
异步非阻塞:到服务台问询,问完就玩手机去了,服务生确认后通知我unix
Linux用epoll进行相关实现netty
AIO服务端实现
package netty.aio;
/**
*/
public class AIOSocketServerMain {
public static void main(String[] args) {
int port = 9999; AIOSocketServer selector = new AIOSocketServer(port); new Thread(selector).start();
}
}
package netty.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
/**
*/
public class AIOSocketServer implements Runnable{
private int port;
//用于限制一个线程等待其余线程各自执行完毕后再执行。
CountDownLatch countDownLatch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
//传入端口参数
public AIOSocketServer(int port) {
this.port = port; try { asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); }
}
@Override
public void run() {
//这里使用countdownlatch是为了把线程阻止住 countDownLatch = new CountDownLatch(1); accept(); try { //多个线程在开始执行任务前首先 coundownlatch.await(), //当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。 countDownLatch.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }
}
private void accept() {
asynchronousServerSocketChannel.accept(this,new AIOHandler());
}
}
package netty.aio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AIOSocketHandler implements CompletionHandler<Integer,ByteBuffer>{
private AsynchronousSocketChannel asynchronousSocketChannel; public AIOSocketHandler(AsynchronousSocketChannel asynchronousSocketChannel) { if(this.asynchronousSocketChannel==null) this.asynchronousSocketChannel = asynchronousSocketChannel; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); try { String info = new String(bytes,"UTF-8"); System.out.println("收到信息:"+info); sendMessage(new Long(System.currentTimeMillis()).toString()); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void sendMessage(String message) { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put(message.getBytes()); buffer.flip(); asynchronousSocketChannel.write(buffer,buffer,new CompletionHandler<Integer,ByteBuffer>(){ @Override public void completed(Integer result, ByteBuffer buffer) { if(buffer.hasRemaining()) { asynchronousSocketChannel.write(buffer,buffer,this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { asynchronousSocketChannel.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { asynchronousSocketChannel.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
package netty.aio;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AIOHandler implements CompletionHandler<AsynchronousSocketChannel,AIOSocketServer>{
@Override public void completed(AsynchronousSocketChannel channel, AIOSocketServer handler) { // TODO Auto-generated method stub handler.asynchronousServerSocketChannel.accept(handler,this); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer, buffer, new AIOSocketHandler(channel)); } @Override public void failed(Throwable exc, AIOSocketServer handler) { // TODO Auto-generated method stub handler.countDownLatch.countDown(); }
}
客户端测试代码
package netty.aio;
public class ClientThreadMain {
public static void main(String[] args) { String ip = "127.0.0.1"; int port = 9999; new Thread(new AIOSocketClient(ip,port)).start(); }
}
package netty.aio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
import io.netty.channel.unix.Buffer;
/**
*/
public class AIOSocketClient implements CompletionHandler<Void,AIOSocketClient>, Runnable{
private AsynchronousSocketChannel asynchronousSocketChannel;
private String ip;
private int port;
private CountDownLatch countDownLatch;
public AIOSocketClient(String ip,int port) {
this.ip = ip; this.port = port; //创建通道 try { asynchronousSocketChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }
}br/>@Override
public void run() {
// TODO Auto-generated method stub
}
@Override
public void completed(Void result, AIOSocketClient attachment) {
byte[] bytes = "Hello JAVA AIO WORLD".getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); byteBuffer.put(bytes); byteBuffer.flip(); asynchronousSocketChannel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer,ByteBuffer>(){ @Override public void completed(Integer result, ByteBuffer buffer) { if(buffer.hasRemaining()) { asynchronousSocketChannel.write(buffer,buffer,this); }else { ByteBuffer reader = ByteBuffer.allocate(1024); asynchronousSocketChannel.read(reader, reader, new CompletionHandler<Integer,ByteBuffer>(){ @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String info = ""; try { info = new String(bytes,"UTF-8"); System.out.println("读入信息:"+info); countDownLatch.countDown(); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer buffer) { // TODO Auto-generated method stub try { asynchronousSocketChannel.close(); countDownLatch.countDown(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { asynchronousSocketChannel.close(); countDownLatch.countDown(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } });
}
@Override
public void failed(Throwable exc, AIOSocketClient attachment) {
// TODO Auto-generated method stub
try {
asynchronousSocketChannel.close();
countDownLatch.countDown(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }
}
}
这里用到了不少回调的地方,会稍后补一个回调的小例子进行说明
BIO,简单但不堪重负
NIO,华丽但不完美
因此后边要用Netty