本文内容涉及同步与异步, 阻塞与非阻塞, BIO、NIO、AIO等概念, 这块内容自己比较复杂, 很难用三言两语说明白. 而书上的定义更不容易理解是什么意思. 下面跟着我一块儿解开它们神秘的面纱。java
从简单的开始,咱们以经典的读取文件的模型举例。(对操做系统而言,全部的输入输出设备都被抽象成文件。)git
在发起读取文件的请求时,应用层会调用系统内核的I/O接口。github
若是应用层调用的是阻塞型I/O,那么在调用以后,应用层即刻被挂起,一直出于等待数据返回的状态,直到系统内核从磁盘读取完数据并返回给应用层,应用层才用得到的数据进行接下来的其余操做。编程
若是应用层调用的是非阻塞I/O,那么调用后,系统内核会当即返回(虽然尚未文件内容的数据),应用层并不会被挂起,它能够作其余任意它想作的操做。(至于文件内容数据如何返回给应用层,这已经超出了阻塞和非阻塞的辨别范畴。)数组
这即是(脱离同步和异步来讲以后)阻塞和非阻塞的区别。总结来讲,是不是阻塞仍是非阻塞,关注的是接口调用(发出请求)后等待数据返回时的状态。被挂起没法执行其余操做的则是阻塞型的,能够被当即「抽离」去完成其余「任务」的则是非阻塞型的。 缓存
阻塞和非阻塞解决了应用层等待数据返回时的状态问题,那系统内核获取到的数据到底如何返回给应用层呢?这里不一样类型的操做便体现的是同步和异步的区别。bash
对于同步型的调用,应用层须要本身去向系统内核问询,若是数据还未读取完毕,那此时读取文件的任务还未完成,应用层根据其阻塞和非阻塞的划分,或挂起或去作其余事情(因此同步和异步并不决定其等待数据返回时的状态);若是数据已经读取完毕,那此时系统内核将数据返回给应用层,应用层便可以用取得的数据作其余相关的事情。服务器
而对于异步型的调用,应用层无需主动向系统内核问询,在系统内核读取完文件数据以后,会主动通知应用层数据已经读取完毕,此时应用层便可以接收系统内核返回过来的数据,再作其余事情。网络
这即是(脱离阻塞和非阻塞来讲以后)同步和异步的区别。也就是说,是不是同步仍是异步,关注的是任务完成时消息通知的方式。由调用方盲目主动问询的方式是同步调用,由被调用方主动通知调用方任务已完成的方式是异步调用。数据结构
假设小明须要在网上下载一个软件:
若是小明点击下载按钮以后,就一直干瞪着进度条不作其余任何事情直到软件下载完成,这是同步阻塞; 若是小明点击下载按钮以后,就一直干瞪着进度条不作其余任何事情直到软件下载完成,可是软件下载完成实际上是会「叮」的一声通知的(但小明依然那样干等着),这是异步阻塞;(不常见) 若是小明点击下载按钮以后,就去作其余事情了,不过他总须要时不时瞄一眼屏幕看软件是否是下载完成了,这是同步非阻塞; 若是小明点击下载按钮以后,就去作其余事情了,软件下载完以后「叮」的一声通知小明,小明再回来继续处理下载完的软件,这是异步非阻塞。 相信看完以上这个案例以后,这几个概念已经可以分辨得很清楚了。
总的来讲,同步和异步关注的是任务完成消息通知的机制,而阻塞和非阻塞关注的是等待任务完成时请求者的状态。
咱们经过 客户端像服务端查询信息做为一个例子。分别经过三种模型来实现。
在传统的网络编程中,服务端监听端口,客户端请求服务端的ip跟监听的端口,跟服务端通讯,必须三次握手创建。若是链接成功,经过套接字(socket)进行通讯。 在BIO通讯模型:采用BIO通讯模型的服务端,一般由一个独立的Acceptor线程负责监听客户端的链接,它接收到客户端链接请求以后为每一个客户端建立一个新的线程进行链路处理没处理完成后,经过输出流返回应答给客户端,线程销毁。即典型的一请求一应答通讯模型。
/** * @author yukong * @date 2018年8月24日18:51:40 * 服务端 */
public class Server {
/** * 默认端口 */
private static final Integer DEFAULT_PORT = 6789;
public void start() throws IOException {
start(DEFAULT_PORT);
}
public void start(Integer port) throws IOException {
ServerSocket serverSocket = new ServerSocket(port);
System.out.println("小yu机器人启动,监听端口为:" + port);
//经过无线循环监听客户端链接
while (true) {
// 阻塞方法,直至有客户端链接成功
Socket socket = serverSocket.accept();
// 多线程处理客户端请求
new Thread(new ServerHandler(socket)).start();
}
}
public static void main(String[] args) throws IOException {
new Server().start();
}
}
复制代码
服务端处理器代码
/**
* @author yukong
* @date 2018年8月24日18:51:40
* 服务端业务逻辑处理器
*/
public class ServerHandler implements Runnable{
private Socket socket;
public ServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
// 获取socket的字符缓存输入流 也就是获取客户端给服务器的字符流
in = new BufferedReader( new InputStreamReader(this.socket.getInputStream()));
// 获取socket的字符输出流 也就是发送的客户的字符流 第二个参数自动刷新
out = new PrintWriter( new OutputStreamWriter(this.socket.getOutputStream()), true);
String request, response;
// 读取输入流的消息 若是为空 则退出读取
while ((request = in.readLine()) != null) {
System.out.println("[" + Thread.currentThread().getName()+ "]" + "小yu机器人收到消息:" + request);
// 具体业务逻辑处理 查询信息。
response = ResponseUtil.queryMessage(request);
out.println(response);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 资源释放
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if (out != null) {
out.close();
out = null;
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}
复制代码
客户端代码
/** * @author yukong * @date 2018年8月24日18:51:40 * 客户端 */
public class Client {
/** * 默认端口 */
private static final Integer DEFAULT_PORT = 6789;
/** * 默认端口 */
private static final String DEFAULT_HOST = "localhost";
public void send(String key){
send(DEFAULT_PORT,key);
}
public void send(int port,String key){
System.out.println("查询的key为:" + key);
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try{
socket = new Socket(DEFAULT_HOST,port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(),true);
out.println(key);
System.out.println("查询的结果为:" + in.readLine());
}catch(Exception e){
e.printStackTrace();
}finally{
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if(out != null){
out.close();
out = null;
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
Client client = new Client();
while (scanner.hasNext()) {
String key = scanner.next();
client.send(key);
}
}
}
复制代码
从代码中能够得知,咱们每次请求都是new Thread去处理,意味着线程消耗巨大,可能会有朋友说道,那就用线程池,一样的若是使用线程池,当达到线程最大数量,也会达到瓶颈。该模式不适合高并发的访问。
NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不一样的套接字通道实现。 新增的着两种通道都支持阻塞和非阻塞两种模式。 阻塞模式使用就像传统中的支持同样,比较简单,可是性能和可靠性都很差;非阻塞模式正好与之相反。 对于低负载、低并发的应用程序,可使用同步阻塞I/O来提高开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用NIO的非阻塞模式来开发。 下面会先对基础知识进行介绍。
Buffer是一个对象,包含一些要写入或者读出的数据。
在NIO库中,全部数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任什么时候候访问NIO中的数据,都是经过缓冲区进行操做。 缓冲区其实是一个数组,并提供了对数据结构化访问以及维护读写位置等信息。 具体的缓存区有这些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他们实现了相同的接口:Buffer。
咱们对数据的读取和写入要经过Channel,它就像水管同样,是一个通道。通道不一样于流的地方就是通道是双向的,能够用于读、写和同时读写操做。 底层的操做系统的通道通常都是全双工的,因此全双工的Channel比流能更好的映射底层操做系统的API。 Channel主要分两大类:
后面代码会涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子类。
Selector是Java NIO 编程的基础。 Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,若是某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,而后经过SelectionKey能够获取就绪Channel的集合,进行后续的I/O操做。 一个Selector能够同时轮询多个Channel,由于JDK使用了epoll()代替传统的select实现,因此没有最大链接句柄1024/2048的限制。因此,只须要一个线程负责Selector的轮询,就能够接入成千上万的客户端。 服务端代码
/** * 服务端 */
public class Server {
/** * 默认端口 */
private static final Integer DEFAULT_PORT = 6780;
public void start() throws IOException {
start(DEFAULT_PORT);
}
public void start(Integer port) throws IOException {
// 打开多路复用选择器
Selector selector = Selector.open();
// 打开服务端监听通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 绑定监听的端口
serverSocketChannel.bind(new InetSocketAddress(port));
// 将选择器绑定到监听信道,只有非阻塞信道才能够注册选择器.并在注册过程当中指出该信道能够进行Accept操做
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("小yu机器人启动,监听端口为:" + port);
new Thread(new ServerHandler(selector)).start();
}
public static void main(String[] args) throws IOException {
new Server().start();
}
}
复制代码
服务端处理器
public class ServerHandler implements Runnable{
private Selector selector;
public ServerHandler(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (true) {
// 等待某信道就绪(或超时)
if(selector.select(1000)==0){
// System.out.print("独自等待.");
continue;
}
// 取得迭代器.selectedKeys()中包含了每一个准备好某一I/O操做的信道的SelectionKey
Iterator<SelectionKey> keyIterator=selector.selectedKeys().iterator();
while (keyIterator.hasNext()){
SelectionKey sk = keyIterator.next();
// 删除已选的key 以防重负处理
keyIterator.remove();
// 处理key
handlerSelect(sk);
}
}
} catch (IOException e) {
}
}
private void handlerSelect(SelectionKey sk) throws IOException {
// 处理新接入的请求
if (sk.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) sk.channel();
//经过ServerSocketChannel的accept建立SocketChannel实例
//完成该操做意味着完成TCP三次握手,TCP物理链路正式创建
SocketChannel sc = ssc.accept();
//设置为非阻塞的
sc.configureBlocking(false);
//注册为读
sc.register(selector, SelectionKey.OP_READ);
}
// 读操做
if (sk.isReadable()) {
String request, response;
SocketChannel sc = (SocketChannel) sk.channel();
// 建立一个ByteBuffer 并设置大小为1m
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 获取到读取的字节长度
int readBytes = sc.read(byteBuffer);
// 判断是否有数据
if (readBytes > 0) {
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操做
byteBuffer.flip();
//根据缓冲区可读字节数建立字节数组
byte[] bytes = new byte[byteBuffer.remaining()];
// 复制至新的缓冲字节流
byteBuffer.get(bytes);
request = new String(bytes, "UTF-8");
System.out.println("[" + Thread.currentThread().getName()+ "]" + "小yu机器人收到消息:" + request);
// 具体业务逻辑处理 查询信息。
response = ResponseUtil.queryMessage(request);
//将消息编码为字节数组
byte[] responseBytes = response.getBytes();
//根据数组容量建立ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(responseBytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(responseBytes);
//flip操做
writeBuffer.flip();
//发送缓冲区的字节数组
sc.write(writeBuffer);
}
}
}
}
复制代码
客户端
/** * 客户端 */
public class Client {
// 通道选择器
private Selector selector;
// 与服务器通讯的通道
SocketChannel socketChannel;
/** * 默认端口 */
private static final Integer DEFAULT_PORT = 6780;
/** * 默认端口 */
private static final String DEFAULT_HOST = "127.0.0.1";
public void send(String key) throws IOException {
send(DEFAULT_PORT, DEFAULT_HOST, key);
}
public void send(int port,String host, String key) throws IOException {
init(port, host);
System.out.println("查询的key为:" + key);
//将消息编码为字节数组
byte[] bytes = key.getBytes();
//根据数组容量建立ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操做
writeBuffer.flip();
//发送缓冲区的字节数组
socketChannel.write(writeBuffer);
//****此处不含处理“写半包”的代码
}
public void init(int port,String host) throws IOException {
// 建立选择器
selector = Selector.open();
// 设置连接的服务端地址
InetSocketAddress socketAddress = new InetSocketAddress(host, port);
// 打开通道
socketChannel = SocketChannel.open(socketAddress);// 非阻塞
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
new Thread(new ClientHandler(selector)).start();
}
public static void main(String[] args) throws IOException {
Scanner scanner = new Scanner(System.in);
Client client = new Client();
while (scanner.hasNext()) {
String key = scanner.next();
client.send(key);
}
}
}
复制代码
客户端处理器
public class ClientHandler implements Runnable {
private Selector selector;
public ClientHandler(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (true) {
if(selector.select(1000) < 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIterator = keys.iterator();
while (selectionKeyIterator.hasNext()) {
SelectionKey sc = selectionKeyIterator.next();
selectionKeyIterator.remove();
//读消息
if (sc.isReadable()) {
SocketChannel socketChannel = (SocketChannel) sc.channel();
//建立ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int byteSize = socketChannel.read(byteBuffer);
if (byteSize > 0) {
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操做
byteBuffer.flip();
//根据缓冲区可读字节数建立字节数组 总长度减去空余的
byte[] bytes = new byte[byteBuffer.remaining()];
// 复制至新的缓冲字节流
byteBuffer.get(bytes);
String message = new String(bytes, "UTF-8");
System.out.println(message);
}
}
}
}
} catch (IOException e) {
}
}
}
复制代码
从代码中 咱们也能看出来,nio解决的是阻塞与非阻塞的,经过selector轮询上注册的channel的状态,来获取对应准备就绪channel的 那么请求者就不用一直去accpet阻塞,等待了。那为何是同步呢,由于仍是咱们请求者不停的轮询selector是否有彻底就绪的channel。
NIO 2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。 异步的套接字通道时真正的异步非阻塞I/O,对应于UNIX网络编程中的事件驱动I/O(AIO)。他不须要过多的Selector对注册的通道进行轮询便可实现异步读写,从而简化了NIO的编程模型。 服务端代码
/** * 异步非阻塞服务端 */
public class Sever {
/** * 默认端口 */
private static final Integer DEFAULT_PORT = 6780;
private AsynchronousServerSocketChannel serverChannel;
//做为handler接收客户端链接
class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
private AsynchronousServerSocketChannel serverChannel;
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
private CharBuffer charBuffer;
private CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
public ServerCompletionHandler(AsynchronousServerSocketChannel serverChannel) {
this.serverChannel = serverChannel;
}
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
//当即接收下一个请求,不停顿
serverChannel.accept(null, this);
try {
while (result.read(buffer).get() != -1) {
buffer.flip();
charBuffer = decoder.decode(buffer);
String request = charBuffer.toString().trim();
System.out.println("[" + Thread.currentThread().getName()+ "]" + "小yu机器人收到消息:" + request);
// 具体业务逻辑处理 查询信息。
String response = ResponseUtil.queryMessage(request);
//将消息编码为字节数组
byte[] responseBytes = response.getBytes();
//根据数组容量建立ByteBuffer
ByteBuffer outBuffer = ByteBuffer.allocate(responseBytes.length);
//将字节数组复制到缓冲区
outBuffer.put(responseBytes);
//flip操做
outBuffer.flip();
//发送缓冲区的字节数组
result.write(outBuffer).get();
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (CharacterCodingException e) {
e.printStackTrace();
} finally {
try {
result.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, Void attachment) {
//当即接收下一个请求,不停顿
serverChannel.accept(null, this);
throw new RuntimeException("connection failed!");
}
}
public void init() throws IOException, InterruptedException {
init(DEFAULT_PORT);
}
public void init(Integer port) throws IOException, InterruptedException {
// 打开异步通道
this.serverChannel = AsynchronousServerSocketChannel.open();
// 判断通道是否打开
if (serverChannel.isOpen()) {
serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverChannel.bind(new InetSocketAddress(port));
} else {
throw new RuntimeException("Channel not opened!");
}
start(port);
}
public void start(Integer port) throws InterruptedException {
System.out.println("小yu机器人启动,监听端口为:" + port);
this.serverChannel.accept(null, new ServerCompletionHandler(serverChannel));
// 保证线程不会挂了
while (true) {
Thread.sleep(5000);
}
}
public static void main(String[] args) throws IOException, InterruptedException {
Sever server = new Sever();
server.init();
}
}
复制代码
客户端
public class Client {
class ClientCompletionHandler implements CompletionHandler<Void, Void> {
private AsynchronousSocketChannel channel;
private CharBuffer charBufferr = null;
private CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
private BufferedReader clientInput = new BufferedReader(new InputStreamReader(System.in));
public ClientCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Void result, Void attachment) {
System.out.println("Input Client Reuest:");
String request;
try {
request = clientInput.readLine();
channel.write(ByteBuffer.wrap(request.getBytes()));
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
while(channel.read(buffer).get() != -1){
buffer.flip();
charBufferr = decoder.decode(buffer);
System.out.println(charBufferr.toString());
if(buffer.hasRemaining()){
buffer.compact();
}
else{
buffer.clear();
}
request = clientInput.readLine();
channel.write(ByteBuffer.wrap(request.getBytes())).get();
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, Void attachment) {
throw new RuntimeException("channel not opened!");
}
}
public void start() throws IOException, InterruptedException{
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
if(channel.isOpen()){
channel.setOption(StandardSocketOptions.SO_RCVBUF, 128*1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 128*1024);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
channel.connect(new InetSocketAddress("127.0.0.1",6780),null,new ClientCompletionHandler(channel));
while(true){
Thread.sleep(5000);
}
}
else{
throw new RuntimeException("Channel not opened!");
}
}
public static void main(String[] args) throws IOException, InterruptedException{
Client client = new Client();
client.start();
}
}
复制代码
先以一张表来直观的对比一下: