阻塞式I/O的阻塞指的是,socket的read函数、write函数是阻塞的。linux
public static void main(String[] args) {
try (ServerSocket serverSocket = new ServerSocket()) {
// 绑定端口
serverSocket.bind(new InetSocketAddress(8081));
while (true) {
// 轮询established
Socket socket = serverSocket.accept();
new Thread(() -> {
try (BufferedReader buffer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true)) {
// 读消息
while (true) {
String body = buffer.readLine();
if (body == null) {
break;
}
log.info("receive body: {}", body);
}
// 写消息
printWriter.write("server receive message!");
} catch (Exception e) {
log.error(e.getMessage());
}
}).start();
}
} catch (Exception e) {
log.error(e.getMessage());
}
}
复制代码
由于socket的accept函数,read函数,write函数是同步阻塞的,因此主线程不断调用socket的accept函数,轮询状态是established的TCP链接。编程
read函数会从内核缓冲区中读取已经准备好的数据,复制到用户进程,若是内核缓冲区中没有数据,那么这个线程就的就会被挂起,相应的cpu的使用权被释放出来。当内核缓冲中准备好数据后,cpu会响应I/O的中断信号,唤醒被阻塞的线程处理数据。缓存
当一个链接在处理I/O的时候,系统是阻塞的,若是是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可让CPU去处理更多的事情。bash
阻塞式I/O模型多线程
缺少扩展性,严重依赖线程。Java的线程占用内存在512K-1M,线程数量过多会致使JVM内存溢出。大量的线程上下文切换严重消耗CPU性能。大量的I/O线程被激活会致使系统锯齿状负载。架构
同步非阻塞I/O模型并发
对于NIO来讲,若是内核缓冲区中没有数据就直接返回一个EWOULDBLOCK错误,通常来讲进程能够轮询调用read函数,当缓冲区中有数据的时候将数据复制到用户空间,而不用挂起线程。socket
因此同步非阻塞中的非阻塞指的是socket的读写函数不是阻塞的,可是用户进程依然须要轮询读写函数,因此是同步的。可是NIO给咱们提供了不须要新起线程就能够利用CPU的可能,也就是I/O多路复用技术ide
在linux系统中,可使用select/poll/epoll使用一个线程监控多个socket,只要有一个socket的读缓存有数据了,方法就当即返回,而后你就能够去读这个可读的socket了,若是全部的socket读缓存都是空的,则会阻塞,也就是将线程挂起。函数
一开始用的linux用的是select,可是selct比较慢,最终使用了epoll。
NIO其实是一个事件驱动的模型,NIO中最重要的就是多路复用器(Selector)。在NIO中它提供了选择就绪事件的能力,咱们只须要把通道(Channel) 注册到Selector上,Selector就会经过select方法(实际上操做系统是经过epoll)不断轮询注册在其上的Channel,若是某个Channel上发生了读就绪、写就绪或者链接到来就会被Selector轮询出来,而后经过SelectionKey(Channel注册到Selector上时会返回和其绑定的SelectionKey)能够获取到已经就绪的Channel集合,不然Selector就会阻塞在select方法上。
Selector调用select方法,并非一个线程经过for循环去选择就绪的Channel,而是操做系统经过epoll以事件的方式的通知JVM的线程,哪一个通道发生了读就绪或者写就绪的事件。因此select方法更像是一个监听器。
多路复用的核心目的就是使用最少的线程去操做更多的通道,在其内部并非只有一个线程。建立线程的个数是根据通道的数量来决定的,每注册1023个通道就建立1个新的线程。
NIO的核心是多路复用器和事件模型,搞清楚了这两点其实就能搞清楚NIO的基本工做原理。原来在学习NIO的时候感受很复杂,随着对TCP理解的深刻,发现NIO其实并不难。在使用NIO的时候,最核心的代就是把Channel和要监听的事件注册到Selector上。
不一样类型通道支持的事件
NIO事件模型示意图:
ServerReactor
@Slf4j
public class ServerReactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
private volatile boolean stop = false;
public ServerReactor(int port, int backlog) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(port), backlog);
serverSocket.setReuseAddress(true);
serverSocketChannel.configureBlocking(false);
// 将channel注册到多路复用器上,并监听ACCEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void setStop(boolean stop) {
this.stop = stop;
}
@Override
public void run() {
try {
// 无限的接收客户端链接
while (!stop && !Thread.interrupted()) {
int num = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 移除key,不然会致使事件重复消费
it.remove();
try {
handle(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handle(SelectionKey key) throws Exception {
if (key.isValid()) {
// 若是是ACCEPT事件,表明是一个新的链接请求
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// 至关于三次握手后,从全链接队列中获取可用的链接
// 必须使用accept方法消费ACCEPT事件,不然将致使多路复用器死循环
SocketChannel socketChannel = serverSocketChannel.accept();
// 设置为非阻塞模式,当没有可用的链接时直接返回null,而不是阻塞。
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String content = new String(bytes);
System.out.println("recv client content: " + content);
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(("服务端已收到: " + content).getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);
} else if (readBytes < 0) {
key.cancel();
socketChannel.close();
}
}
}
}
}
复制代码
ClientReactor
public class ClientReactor implements Runnable {
final String host;
final int port;
final SocketChannel socketChannel;
final Selector selector;
private volatile boolean stop = false;
public ClientReactor(String host, int port) throws IOException {
this.socketChannel = SocketChannel.open();
this.socketChannel.configureBlocking(false);
Socket socket = this.socketChannel.socket();
socket.setTcpNoDelay(true);
this.selector = Selector.open();
this.host = host;
this.port = port;
}
@Override
public void run() {
try {
// 若是通道呈阻塞模式,则当即发起链接;
// 若是呈非阻塞模式,则不是当即发起链接,而是在随后的某个时间才发起链接。
// 若是链接是当即创建的,说明通道是阻塞模式,当链接成功时,则此方法返回true,链接失败出现异常。
// 若是此通道处于阻塞模式,则此方法的调用将会阻塞,直到创建链接或发生I/O错误。
// 若是链接不是当即创建的,说明通道是非阻塞模式,则此方法返回false,
// 而且之后必须经过调用finishConnect()方法来验证链接是否完成
// socketChannel.isConnectionPending()判断此通道是否正在进行链接
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
while (!stop && !Thread.interrupted()) {
int num = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 移除key,不然会致使事件重复消费
it.remove();
try {
handle(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handle(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (socketChannel.finishConnect()) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
System.out.println("recv server content: " + new String(bytes));
} else if (readBytes < 0) {
key.cancel();
socketChannel.close();
}
}
}
}
private void doWrite(SocketChannel socketChannel) {
Scanner scanner = new Scanner(System.in);
new Thread(() -> {
while (scanner.hasNext()) {
try {
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(scanner.nextLine().getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);
} catch (Exception e) {
}
}
}).start();
}
}
复制代码
参考文章: