千锋扣丁学堂Java培训之探索IO模型总结整理

今天千锋扣丁学堂Java培训老师给你们分享一篇关于探索Java开发I/O模型演进的详细介绍,什么是同步?什么是异步?阻塞和非阻塞又有什么区别?本文先从Unix的I/O模型讲起,介绍了5种常见的I/O模型。然后再引出Java的I/O模型的演进过程,并用实例说明如何选择合适的JavaI/O模型来提升系统的并发量和可用性。java

同步和异步,描述的是用户线程与内核的交互方式:git

同步是指用户线程发起I/O请求后须要等待或者轮询内核I/O操做完成后才能继续执行;github

异步是指用户线程发起I/O请求后仍继续执行,当内核I/O操做完成后会通知用户线程,或者调用用户线程注册的回调函数。设计模式

阻塞和非阻塞,描述的是用户线程调用内核I/O操做的方式:安全

阻塞是指I/O操做须要完全完成后才返回到用户空间;服务器

非阻塞是指I/O操做被调用后当即返回给用户一个状态值,无需等到I/O操做完全完成。微信

一个I/O操做其实分红了两个步骤:发起I/O请求和实际的I/O操做。阻塞I/O和非阻塞I/O的区别在于第一步,发起I/O请求是否会被阻塞,若是阻塞直到完成那么就是传统的阻塞I/O,若是不阻塞,那么就是非阻塞I/O。同步I/O和异步I/O的区别就在于第二个步骤是否阻塞,若是实际的I/O读写阻塞请求进程,那么就是同步I/O。网络

UnixI/O模型多线程

Unix下共有五种I/O模型:并发

阻塞I/O

非阻塞I/O

I/O复用(select和poll)

信号驱动I/O(SIGIO)

异步I/O(POSIX的aio_系列函数)

阻塞I/O

请求没法当即完成则保持阻塞。

阶段1:等待数据就绪。网络I/O的状况就是等待远端数据陆续抵达;磁盘I/O的状况就是等待磁盘数据从磁盘上读取到内核态内存中。

阶段2:数据从内核拷贝到进程。出于系统安全,用户态的程序没有权限直接读取内核态内存,所以内核负责把内核态内存中的数据拷贝一份到用户态内存中。

非阻塞I/O

socket设置为NONBLOCK(非阻塞)就是告诉内核,当所请求的I/O操做没法完成时,不要将进程睡眠,而是返回一个错误码(EWOULDBLOCK),这样请求就不会阻塞

I/O操做函数将不断的测试数据是否已经准备好,若是没有准备好,继续测试,直到数据准备好为止。整个I/O请求的过程当中,虽然用户线程每次发起I/O请求后能够当即返回,可是为了等到数据,仍须要不断地轮询、重复请求,消耗了大量的CPU的资源

数据准备好了,从内核拷贝到用户空间。

通常不多直接使用这种模型,而是在其余I/O模型中使用非阻塞I/O这一特性。这种方式对单个I/O请求意义不大,但给I/O多路复用铺平了道路.

I/O复用(异步阻塞I/O)

I/O多路复用会用到select或者poll函数,这两个函数也会使进程阻塞,可是和阻塞I/O所不一样的的,这两个函数能够同时阻塞多个I/O操做。并且能够同时对多个读操做,多个写操做的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操做函数。

从流程上来看,使用select函数进行I/O请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操做,效率更差。可是,使用select之后最大的优点是用户能够在一个线程内同时处理多个socket的I/O请求。用户能够注册多个socket,而后不断地调用select读取被激活的socket,便可达到在同一个线程内同时处理多个I/O请求的目的。而在同步阻塞模型中,必须经过多线程的方式才能达到这个目的。

I/O多路复用模型使用了Reactor设计模式实现了这一机制。

调用select/poll该方法由一个用户态线程负责轮询多个socket,直到某个阶段1的数据就绪,再通知实际的用户线程执行阶段2的拷贝。经过一个专职的用户态线程执行非阻塞I/O轮询,模拟实现了阶段一的异步化

信号驱动I/O(SIGIO)

首先咱们容许socket进行信号驱动I/O,并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个SIGIO信号,能够在信号处理函数中调用I/O操做函数处理数据。

异步I/O

调用aio_read函数,告诉内核描述字,缓冲区指针,缓冲区大小,文件偏移以及通知的方式,而后当即返回。当内核将数据拷贝到缓冲区后,再通知应用程序。

异步I/O模型使用了Proactor设计模式实现了这一机制。

告知内核,当整个过程(包括阶段1和阶段2)所有完成时,通知应用程序来读数据.

几种I/O模型的比较

前四种模型的区别是阶段1不相同,阶段2基本相同,都是将数据从内核拷贝到调用者的缓冲区。而异步I/O的两个阶段都不一样于前四个模型。

同步I/O操做引发请求进程阻塞,直到I/O操做完成。异步I/O操做不引发请求进程阻塞。

常见JavaI/O模型

在了解了UNIX的I/O模型以后,其实Java的I/O模型也是相似。

“阻塞I/O”模式

在上一节Socket章节中的EchoServer就是一个简单的阻塞I/O例子,服务器启动后,等待客户端链接。在客户端链接服务器后,服务器就阻塞读写取数据流。

EchoServer代码:

public class EchoServer {

public static int DEFAULT_PORT = 7;

public static void main(String[] args) throws IOException {

int port;

try {

port = Integer.parseInt(args[0]);

} catch (RuntimeException ex) {

port = DEFAULT_PORT;

}

try (

ServerSocket serverSocket =

new ServerSocket(port);

Socket clientSocket = serverSocket.accept();

PrintWriter out =

new PrintWriter(clientSocket.getOutputStream(), true);

BufferedReader in = new BufferedReader(

new InputStreamReader(clientSocket.getInputStream()));

) {

String inputLine;

while ((inputLine = in.readLine()) != null) {

out.println(inputLine);

}

} catch (IOException e) {

System.out.println("Exception caught when trying to listen on port "

  • port + " or listening for a connection");

System.out.println(e.getMessage());

}

}

}

改进为“阻塞I/O+多线程”模式

使用多线程来支持多个客户端来访问服务器。

主线程MultiThreadEchoServer.java

public class MultiThreadEchoServer {

public static int DEFAULT_PORT = 7;

public static void main(String[] args) throws IOException {

int port;

try {

port = Integer.parseInt(args[0]);

} catch (RuntimeException ex) {

port = DEFAULT_PORT;

}

Socket clientSocket = null;

try (ServerSocket serverSocket = new ServerSocket(port);) {

while (true) {

clientSocket = serverSocket.accept();

// MultiThread

new Thread(new EchoServerHandler(clientSocket)).start();

}

} catch (IOException e) {

System.out.println(

"Exception caught when trying to listen on port " + port + " or listening for a connection");

System.out.println(e.getMessage());

}

}

}

处理器类EchoServerHandler.java

public class EchoServerHandler implements Runnable {

private Socket clientSocket;

public EchoServerHandler(Socket clientSocket) {

this.clientSocket = clientSocket;

}

@Override

public void run() {

try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);

BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {

String inputLine;

while ((inputLine = in.readLine()) != null) {

out.println(inputLine);

}

} catch (IOException e) {

System.out.println(e.getMessage());

}

}

}

存在问题:每次接收到新的链接都要新建一个线程,处理完成后销毁线程,代价大。当有大量地短链接出现时,性能比较低。

改进为“阻塞I/O+线程池”模式

针对上面多线程的模型中,出现的线程重复建立、销毁带来的开销,能够采用线程池来优化。每次接收到新链接后从池中取一个空闲线程进行处理,处理完成后再放回池中,重用线程避免了频率地建立和销毁线程带来的开销。

主线程ThreadPoolEchoServer.java

public class ThreadPoolEchoServer {

public static int DEFAULT_PORT = 7;

public static void main(String[] args) throws IOException {

int port;

try {

port = Integer.parseInt(args[0]);

} catch (RuntimeException ex) {

port = DEFAULT_PORT;

}

ExecutorService threadPool = Executors.newFixedThreadPool(5);

Socket clientSocket = null;

try (ServerSocket serverSocket = new ServerSocket(port);) {

while (true) {

clientSocket = serverSocket.accept();

// Thread Pool

threadPool.submit(new Thread(new EchoServerHandler(clientSocket)));

}

} catch (IOException e) {

System.out.println(

"Exception caught when trying to listen on port " + port + " or listening for a connection");

System.out.println(e.getMessage());

}

}

}

存在问题:在大量短链接的场景中性能会有提高,由于不用每次都建立和销毁线程,而是重用链接池中的线程。但在大量长链接的场景中,由于线程被链接长期占用,不须要频繁地建立和销毁线程,于是没有什么优点。

虽然这种方法能够适用于小到中度规模的客户端的并发数,若是链接数超过100,000或更多,那么性能将很不理想。

改进为“非阻塞I/O”模式

“阻塞I/O+线程池”网络模型虽然比”阻塞I/O+多线程”网络模型在性能方面有提高,但这两种模型都存在一个共同的问题:读和写操做都是同步阻塞的,面对大并发(持续大量链接同时请求)的场景,须要消耗大量的线程来维持链接。CPU在大量的线程之间频繁切换,性能损耗很大。一旦单机的链接超过1万,甚至达到几万的时候,服务器的性能会急剧降低。

而NIO的Selector却很好地解决了这个问题,用主线程(一个线程或者是CPU个数的线程)保持住全部的链接,管理和读取客户端链接的数据,将读取的数据交给后面的线程池处理,线程池处理完业务逻辑后,将结果交给主线程发送响应给客户端,少许的线程就能够处理大量链接的请求。

JavaNIO由如下几个核心部分组成:

Channel

Buffer

Selector

要使用Selector,得向Selector注册Channel,而后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就能够处理这些事件,事件的例子有如新链接进来,数据接收等。

主线程NonBlokingEchoServer.java

public class NonBlokingEchoServer {

public static int DEFAULT_PORT = 7;

public static void main(String[] args) throws IOException {

int port;

try {

port = Integer.parseInt(args[0]);

} catch (RuntimeException ex) {

port = DEFAULT_PORT;

}

System.out.println("Listening for connections on port " + port);

ServerSocketChannel serverChannel;

Selector selector;

try {

serverChannel = ServerSocketChannel.open();

InetSocketAddress address = new InetSocketAddress(port);

serverChannel.bind(address);

serverChannel.configureBlocking(false);

selector = Selector.open();

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

} catch (IOException ex) {

ex.printStackTrace();

return;

}

while (true) {

try {

selector.select();

} catch (IOException ex) {

ex.printStackTrace();

break;

}

Set<SelectionKey> readyKeys = selector.selectedKeys();

Iterator<SelectionKey> iterator = readyKeys.iterator();

while (iterator.hasNext()) {

SelectionKey key = iterator.next();

iterator.remove();

try {

if (key.isAcceptable()) {

ServerSocketChannel server = (ServerSocketChannel) key.channel();

SocketChannel client = server.accept();

System.out.println("Accepted connection from " + client);

client.configureBlocking(false);

SelectionKey clientKey = client.register(selector,

SelectionKey.OP_WRITE | SelectionKey.OP_READ);

ByteBuffer buffer = ByteBuffer.allocate(100);

clientKey.attach(buffer);

}

if (key.isReadable()) {

SocketChannel client = (SocketChannel) key.channel();

ByteBuffer output = (ByteBuffer) key.attachment();

client.read(output);

}

if (key.isWritable()) {

SocketChannel client = (SocketChannel) key.channel();

ByteBuffer output = (ByteBuffer) key.attachment();

output.flip();

client.write(output);

output.compact();

}

} catch (IOException ex) {

key.cancel();

try {

key.channel().close();

} catch (IOException cex) {

}

}

}

}

}

}

改进为“异步I/O”模式

JavaSE7版本以后,引入了异步I/O(NIO.2)的支持,为构建高性能的网络应用提供了一个利器。

主线程AsyncEchoServer.java

public class AsyncEchoServer {

public static int DEFAULT_PORT = 7;

public static void main(String[] args) throws IOException {

int port;

try {

port = Integer.parseInt(args[0]);

} catch (RuntimeException ex) {

port = DEFAULT_PORT;

}

ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory());

// create asynchronous server socket channel bound to the default group

try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()) {

if (asynchronousServerSocketChannel.isOpen()) {

// set some options

asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);

asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);

// bind the server socket channel to local address

asynchronousServerSocketChannel.bind(new InetSocketAddress(port));

// display a waiting message while ... waiting clients

System.out.println("Waiting for connections ...");

while (true) {

Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel

.accept();

try {

final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture

.get();

Callable<String> worker = new Callable<String>() {

@Override

public String call() throws Exception {

String host = asynchronousSocketChannel.getRemoteAddress().toString();

System.out.println("Incoming connection from: " + host);

final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

// transmitting data

while (asynchronousSocketChannel.read(buffer).get() != -1) {

buffer.flip();

asynchronousSocketChannel.write(buffer).get();

if (buffer.hasRemaining()) {

buffer.compact();

} else {

buffer.clear();

}

}

asynchronousSocketChannel.close();

System.out.println(host + " was successfully served!");

return host;

}

};

taskExecutor.submit(worker);

} catch (InterruptedException | ExecutionException ex) {

System.err.println(ex);

System.err.println("n Server is shutting down ...");

// this will make the executor accept no new threads

// and finish all existing threads in the queue

taskExecutor.shutdown();

// wait until all threads are finished

while (!taskExecutor.isTerminated()) {

}

break;

}

}

} else {

System.out.println("The asynchronous server-socket channel cannot be opened!");

}

} catch (IOException ex) {

System.err.println(ex);

}

}

}

本章例子的源码,能够在https://github.com/waylau/ess...

以上就是关于千锋扣丁学堂Java培训之探索IO模型总结整理的所有内容,但愿对你们的学习有所帮助,想要了解更多关于Java开发方面内容的小伙伴,请关注扣丁学堂Java培训官网、微信等平台,扣丁学堂IT职业在线学习教育有专业的Java讲师为您指导,此外扣丁学堂老师精心推出的Java视频教程定能让你快速掌握Java从入门到精通开发实战技能。

相关文章
相关标签/搜索