选择器 Selector 是 I/O 多路复用的核心组件,它能够监控实现了 SelectableChannel 的通道的就绪状况。有了多路复用(multiplexing) I/O 模型,使得单线程的 Java 程序在极端状况下可以处理数万个链接,极大提升了程序的并发数。css
I/O 多路复用模型是操做系统提供给应用程序的一种进行 I/O 操做的模型。应用程序经过 select/poll 系统调用监控多个 I/O 设备,一旦某个或者多个 I/O 设备的处于就绪状态(例如:可读)则返回,应用程序随后可对就绪的设备进行操做。html
java
大体流程以下:git
1)应用程序向内核发起 select 系统调用,该调用会阻塞应用程序。github
2)内核等待数据到达。数据可能由 DMA 复制到内核缓冲区,也有多是 CPU 进行复制。服务器
3)数据准备完毕,select 调用返回。select 返回的是一个集和,可能有多个链接都已经就绪。多线程
4)应用程序发起 read 系统调用。并发
5)操做系统将数据有内核缓冲区复制到用户缓冲区。app
6)read 调用返回。异步
I/O 多路复用模型本质上是一种阻塞 I/O,进行读操做的 read 系统调用是阻塞的,select 的时候也是阻塞的。不过 I/O 多路复用模型的优点在于阻塞时能够等待多路 I/O 就绪,而后一并处理。与多线程处理多路 I/O 相比,它是单线程的,没有线程切换的开销,单位时间内可以处理多的链接数。
在 Java 中,通道 Channel 能够表示 I/O 链接,而选择器能够监控某些 I/O 事件就绪的通道,选择通道中就绪的 I/O 事件。这里的通道必须是实现了 SelectableChannel 接口的通道,例如:SocketChannel, DatagramChannel 等;而文件通道 FileChannel 没有实现该接口,因此不支持选择器。
选择器 Selector 监控通道时监控的是通道中的事件,选择键 SelectionKey 就表明着 I/O 事件。程序经过调用 Selector.select() 方法来选中选择器所监控的通道中的就绪的 I/O 事件的集合,而后遍历集合,对事件做出相应的处理。
选择键 SelectionKey 能够表示 4 种事件,这 4 种事件使用 int 类型的常量来表示。
1)SelectionKey.OP_ACCEPT 表示 accept 事件就绪。例如:对于 ServerSocketChannel 来讲,该事件就绪表示能够调用 accept() 方法来得到与客户端链接的通道 SocketChannel。
2)SelectionKey.OP_CONNECT 表示客户端与服务端链接成功。
3)SelectionKey.OP_READ 表示通道中已经有了可读数据,能够调用 read() 方法从通道中读取数据。
4)SelectionKey.OP_WRITE 表示写事件就绪,能够调用 write() 方法往通道中写入数据。
不一样的通道所可以支持的 I/O 事件不一样,例如:ServerSocketChannel 只支持 accept 事件,而 DatagramChannel 只支持 read 和 write 事件。要查看通道所支持的事件,能够查看通道的 javadoc 文档,或者调用通道的 validOps() 方法来进行判断。例如:channel.validOps() & SelectionKey.OP_READ > 0 表示 channel 支持读事件。
与通道和缓冲区的获取相似,选择器的获取也是经过静态工厂方法 open() 来获得的。
Selector selector = Selector.open(); // 获取一个选择器实例
可以被选择器监控的通道必须实现了 SelectableChannel 接口,而且须要将通道配置成非阻塞模式,不然后续的注册步骤会抛出 IllegalBlockingModeException。
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); // 打开 SocketChannel 并链接到本机 9090 端口 socketChannel.configureBlocking(false); // 配置通道为非阻塞模式
通道在被指定的选择器监控以前,应该先告诉选择器,而且告知监控的事件,即:将通道注册到选择器。
通道的注册经过 SelectableChannel.register(Selector selector, int ops) 来完成,ops 表示关注的事件,若是须要关注该通道的多个 I/O 事件,能够传入这些事件类型或运算以后的结果。这些事件必须是通道所支持的,不然抛出 IllegalArgumentException。
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); // 将套接字经过到注册到选择器,关注 read 和 write 事件
经过调用选择器的 Selector.select() 方法能够获取就绪事件,该方法会将就绪事件放到一个 SelectionKey 集合中,而后返回就绪的事件的个数。这个方法映射多路复用 I/O 模型中的 select 系统调用,它是一个阻塞方法。正常状况下,直到至少有一个就绪事件,或者其它线程调用了当前 Selector 对象的 wakeup() 方法,或者当前线程被中断时返回。
while (selector.select() > 0){ // 轮询,且返回时有就绪事件 Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪事件集合 ....... }
有 3 种方式能够 select 就绪事件:
1)select() 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup() 或者当前线程被中断时返回。
2)select(long timeout) 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup(),或者当前线程被中断,或者阻塞时长达到了 timeout 时返回。不抛出超时异常。
3)selectNode() 不阻塞,若是无就绪事件,则返回 0;若是有就绪事件,则将就绪事件放到一个集合,返回就绪事件的数量。
每次能够 select 出一批就绪的事件,因此须要对这些事件进行迭代。从一个 SelectionKey 对象能够获得:1)就绪事件的对应的通道;2)就绪的事件。经过这些信息,就能够很方便地进行 I/O 操做。
for(SelectionKey key : keys){ if(key.isWritable()){ // 可写事件 if("Bye".equals( (line = scanner.nextLine()) )){ socketChannel.shutdownOutput(); socketChannel.close(); break; } buf.put(line.getBytes()); buf.flip(); socketChannel.write(buf); buf.compact(); } } keys.clear(); // 清除选择键(事件)集,避免下次循环的时候重复处理。
须要注意的是,处理完 I/O 事件以后,须要清除选择键集和,避免下一轮循环的时候对同一事件重复处理。
下面给出一个完整的实例,实例中包含 TCP 客户端 TcpClient, UDP 客户端 UdpClient 和服务端 EchoServer。服务端 EchoServer 能够同时处理 UDP 请求和 TCP 请求,用户能够在客户端控制台输入内容,按回车发送给服务端,服务端打印客户端发送过来的内容。完整代码:https://github.com/Robothy/java-experiments/tree/main/nio/Selector
public class EchoServer { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); // 获取选择器 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开服务器通道 serverSocketChannel.configureBlocking(false); // 服务器通道配置为非阻塞模式 serverSocketChannel.bind(new InetSocketAddress(9090)); // 绑定 TCP 端口 9090 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 将服务器通道注册到选择器 selector 中,注册事件为 ACCEPT DatagramChannel datagramChannel = DatagramChannel.open(); // 打开套接字通道 datagramChannel.configureBlocking(false); // 配置通道为非阻塞模式 datagramChannel.bind(new InetSocketAddress(9090)); // 绑定 UDP 端口 9090 datagramChannel.register(selector, SelectionKey.OP_READ); // 将通道注册到选择器 selector 中,注册事件为读取数据 ByteBuffer buf = ByteBuffer.allocate(1024); // 分配一个 1024 字节的堆字节缓冲区 while (selector.select() > 0){ // 轮询已经就绪的注册的通道的 I/O 事件 Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪的 I/O 事件,即选择器键集合 for (SelectionKey key : keys){ // 遍历选择键,处理就绪事件 if(key.isAcceptable()){ // 选择键的事件的是 I/O 链接事件 SocketChannel socketChannel = serverSocketChannel.accept(); // 执行 I/O 操做,获取套接字链接通道 socketChannel.configureBlocking(false); // 配置为套接字通道为非阻塞模式 socketChannel.register(selector, SelectionKey.OP_READ); // 将套接字经过到注册到选择器,关注 READ 事件 }else if(key.isReadable()){ // 选择键的事件是 READ StringBuilder sb = new StringBuilder(); if(key.channel() instanceof DatagramChannel){ // 选择的通道为数据报通道,客户端是经过 UDP 链接过来的 sb.append("UDP Client: "); datagramChannel.receive(buf); // 最多读取 1024 字节,数据报多出的部分自动丢弃 buf.flip(); while(buf.position() < buf.limit()) { sb.append((char)buf.get()); } buf.clear(); }else{ // 选择的通道为套接字通道,客户端时经过 TCP 链接过来的 sb.append("TCP Client: "); ReadableByteChannel channel = (ReadableByteChannel) key.channel(); // 获取通道 int size; while ( (size = channel.read(buf))>0){ buf.flip(); while (buf.position() < buf.limit()) { sb.append((char)buf.get()); } buf.clear(); } if (size == -1) { sb.append("Exit"); channel.close(); } } System.out.println(sb); } } keys.clear(); // 将选择键清空,防止下次循环时被重复处理 } } }
public class TcpClient { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_WRITE); Scanner scanner = new Scanner(System.in); String line; ByteBuffer buf = ByteBuffer.allocate(1024); while (selector.select() > 0){ Set<SelectionKey> keys = selector.selectedKeys(); for(SelectionKey key : keys){ if(key.isWritable()){ if("Bye".equals( (line = scanner.nextLine()) )){ socketChannel.shutdownOutput(); socketChannel.close(); break; } buf.put(line.getBytes()); buf.flip(); socketChannel.write(buf); buf.compact(); } } keys.clear(); if(!socketChannel.isOpen()) break; } } }
public class UdpClient { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); // 获取选择器 DatagramChannel datagramChannel = DatagramChannel.open(); // 打开一个数据报通道 datagramChannel.configureBlocking(false); // 配置通道为非阻塞模式 datagramChannel.register(selector, SelectionKey.OP_WRITE); // 将通道的写事件注册到选择器 ByteBuffer buff = ByteBuffer.allocate(1024); // 分配字节缓冲区 Scanner scanner = new Scanner(System.in); // 建立扫描器,扫描控制台输入流 InetSocketAddress server = new InetSocketAddress("localhost", 9090); while (selector.select() > 0){ // 有就绪事件 Set<SelectionKey> keys = selector.selectedKeys(); // 获取选择键,即就绪的事件 for(SelectionKey key : keys){ // 遍历选择键 if(key.isWritable()){ // 若是当前选择键是读就绪 String line; if("Bye".equals( line = scanner.nextLine() )) { // 从控制台获取 1 行输入,并检查输入的是否是 Bye System.exit(0); // 正常退出 } buff.put(line.getBytes()); // 放入缓冲区 buff.flip(); // 将缓冲区置为读状态 datagramChannel.send(buff, server); // 往 I/O 写数据 buff.compact(); // 压缩缓冲区,保留没发送完的数据 } } keys.clear(); } } }
Selector 做为多路复用 I/O 模型的核心组件,可以同时监控多路 I/O 通道。选择器在 select 就绪事件地时候会阻塞,在处理 I/O 事件的时候也会阻塞,它的优点在于在阻塞的时候能够等待多路 I/O 就绪,是一种异步阻塞 I/O 模型。与多线程处理多路 I/O 相比,多路复用模型只须要单个线程便可处理万级链接,没有线程切换的开销。