NIO.2特性总结(三)AIO

接上一篇NIO.2特性总结(二)加强的通道 NetworkChannel java

We’ve finally reached the most powerful feature introduced in NIO.2, the asynchronous channel API. 多线程

我以为第一个问题就是同步和异步的区别,The Difference Between Synchronous And Asynchronous。异步IO有时候也会叫overlapped I/O。这里别跟阻塞和非阻塞混淆了。咱们这节主要说的是异步的问题,若是你不了解阻塞和非阻塞,你能够看以前的文章,再罗嗦一点,异步和非阻塞不是一个东西。 app

同步I/O操做:In a synchronous I/O operation, a thread enters into action and waits until the I/O request is completed (the program is “stuck” waiting for the process to end, with no way out)。其实看这段我特别别扭,怎么看怎么像阻塞的解释。不过我在网上查到了这么一段话:阻塞IO,非阻塞IO,IO复用,信号驱动IO,异步IO,前四种都属于同步IO。这么一看好像仍是那么一回事儿。具体我还得去核实一下。 dom

异步I/O操做:When the same action occurs in an asynchronous environment, a thread performs the I/O operation with more kernel help. Actually, it immediately passes the request to the kernel and continues on to process another job. The kernel signals to the thread when the operation has completed, and the thread “respects” the signal by interrupting its current job and processing the data from the I/O operation as necessary。这个是否是又容易和Non-blocking混淆了,个人理解,异步操做会将任务直接交给内核,非阻塞虽然也返回,可是始终是在本身的线程上,他们之间的关系仍是看下面: 异步


网上也有不少例子,也有很多UML的图来解释这四个词的含义,这个须要你们慢慢去体会,水仍是很深的。 socket

Java7中异步IO的实现是创建在两件事情上: async

1.         在异步通道上独立执行的线程(读、写、链接、关闭等)操做 this

2.         在操做初始化以后的控制机制 spa

Java7的异步IO操做有两种form .net

1.         Pending Result:这个结果返回的是concurrent包中的Future对象。

2.         Complete Result:采用CompleteHandler的回调机制返回结果。

咱们看异步通道全部的异步通道有须要遵循一个规则,在不阻塞应用去执行其余任务的状况下初始化IO操做和IO结束后的通知机制。在java7中有三个异步通道:AsynchronousFileChannel、AsynchronousSocketChannel和AsynchronousServerSocketChannel。另外还有一个比较重要的概念是group,要有组的概念是为了把资源共享,每个异步的channel都会属于一个group,同一个group里的对象就能够共享一个线程池。这个group由AsynchronousChannelGroup,实现。

接下来很少写了,直接上代码,仍是代码容易理解,更多的代码请看《pro java7 nio.2》,我只是摘了一部分:

异步文件:

package com.a2.nio2.chapter9.aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Future;

public class AIOFileChannal {
	public static void main(String[] args) {
		ByteBuffer buffer = ByteBuffer.allocate(100);
		String encoding = System.getProperty("file.encoding");
		Path path = Paths.get("C:/rafaelnadal/grandslam/RolandGarros",
				"story.txt");
		try (AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel
				.open(path, StandardOpenOption.READ)) {
			Future<Integer> result = asynchronousFileChannel.read(buffer, 0);
			while (!result.isDone()) {
				System.out.println("Do something else while reading ...");
			}
			System.out.println("Read done: " + result.isDone());
			System.out.println("Bytes read: " + result.get());
		} catch (Exception ex) {
			System.err.println(ex);
		}
		buffer.flip();
		System.out.print(Charset.forName(encoding).decode(buffer));
		buffer.clear();
	}
}
异步socket,服务端:
package com.a2.nio2.chapter9.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class EchoServer {
	public static void main(String[] args) {
		final int DEFAULT_PORT = 5555;
		final String IP = "127.0.0.1";
		// create an asynchronous server socket channel bound to the default
		// group
		try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel
				.open()) {
			if (asynchronousServerSocketChannel.isOpen()) {
				// set some options
				asynchronousServerSocketChannel.setOption(
						StandardSocketOptions.SO_RCVBUF, 4 * 1024);
				asynchronousServerSocketChannel.setOption(
						StandardSocketOptions.SO_REUSEADDR, true);
				// bind the asynchronous server socket channel to local address
				asynchronousServerSocketChannel.bind(new InetSocketAddress(IP,
						DEFAULT_PORT));// display a waiting message while ...
										// waiting clients
				System.out.println("Waiting for connections ...");
				while (true) {
					Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel
							.accept();
					try (AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture
							.get()) {
						System.out.println("Incoming connection from: "
								+ asynchronousSocketChannel.getRemoteAddress());
						final ByteBuffer buffer = ByteBuffer
								.allocateDirect(1024);
						// transmitting data
						while (asynchronousSocketChannel.read(buffer).get() != -1) {
							buffer.flip();
							asynchronousSocketChannel.write(buffer).get();
							if (buffer.hasRemaining()) {
								buffer.compact();
							} else {
								buffer.clear();
							}
						}
						System.out.println(asynchronousSocketChannel
								.getRemoteAddress()
								+ " was successfully served!");
					} catch (IOException | InterruptedException
							| ExecutionException ex) {
						System.err.println(ex);
					}
				}
			} else {
				System.out
						.println("The asynchronous server-socket channel cannot be opened!");
			}
		} catch (IOException ex) {
			System.err.println(ex);
		}
	}
}
客户端:
package com.a2.nio2.chapter9.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Random;
import java.util.concurrent.ExecutionException;

public class EchoClient {
	public static void main(String[] args) {
		final int DEFAULT_PORT = 5555;
		final String IP = "127.0.0.1";
		ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
		ByteBuffer helloBuffer = ByteBuffer.wrap("Hello !".getBytes());
		ByteBuffer randomBuffer;
		CharBuffer charBuffer;
		Charset charset = Charset.defaultCharset();
		CharsetDecoder decoder = charset.newDecoder();
		// create an asynchronous socket channel bound to the default group
		try (AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel
				.open()) {
			if (asynchronousSocketChannel.isOpen()) {
				// set some options
				asynchronousSocketChannel.setOption(
						StandardSocketOptions.SO_RCVBUF, 128 * 1024);
				asynchronousSocketChannel.setOption(
						StandardSocketOptions.SO_SNDBUF, 128 * 1024);
				asynchronousSocketChannel.setOption(
						StandardSocketOptions.SO_KEEPALIVE, true);
				// connect this channel's socket
				Void connect = asynchronousSocketChannel.connect(
						new InetSocketAddress(IP, DEFAULT_PORT)).get();
				if (connect == null) {
					System.out.println("Local address: "
							+ asynchronousSocketChannel.getLocalAddress());
					// transmitting data
					asynchronousSocketChannel.write(helloBuffer).get();
					while (asynchronousSocketChannel.read(buffer).get() != -1) {
						buffer.flip();
						charBuffer = decoder.decode(buffer);
						System.out.println(charBuffer.toString());
						if (buffer.hasRemaining()) {
							buffer.compact();
						} else {
							buffer.clear();
						}
						int r = new Random().nextInt(100);
						if (r == 50) {
							System.out
									.println("50 was generated! Close the asynchronous socket channel!");
							break;
						} else {
							randomBuffer = ByteBuffer.wrap("Random number:"
									.concat(String.valueOf(r)).getBytes());
							asynchronousSocketChannel.write(randomBuffer).get();
						}
					}
				} else {
					System.out.println("The connection cannot be established!");
				}
			} else {
				System.out
						.println("The asynchronous socket channel cannot be opened!");
			}
		} catch (IOException | InterruptedException | ExecutionException ex) {
			System.err.println(ex);
		}
	}
}
-------------------------------------------------------

其余还有好比怎么启多线程去处理请求,怎么操做group,书上都有就不贴出来了。这个还真须要点时间去研究研究。

固然还有个问题就是非阻塞怎么和异步去结合,这个问题我也会抽空去研究一下。

后面的相关文章会补上NIO.2在文件系统操做上的新特性.

相关文章
相关标签/搜索