Selector 容许一个单一的线程来操做多个 Channel. 若是咱们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 可是由于在一个线程中使用了多个 Channel, 所以也会形成了每一个 Channel 传输效率的下降.
使用 Selector 的图解以下:segmentfault
为了使用 Selector, 咱们首先须要将 Channel 注册到 Selector 中, 随后调用 Selector 的 select()方法, 这个方法会阻塞, 直到注册在 Selector 中的 Channel 发送可读写事件. 当这个方法返回后, 当前的这个线程就能够处理 Channel 的事件了.服务器
经过 Selector.open()方法, 咱们能够建立一个选择器:socket
Selector selector = Selector.open();
为了使用选择器管理 Channel, 咱们须要将 Channel 注册到选择器中:spa
channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
注意
, 若是一个 Channel 要注册到 Selector 中, 那么这个 Channel 必须是非阻塞的, 即channel.configureBlocking(false);
由于 Channel 必需要是非阻塞的, 所以 FileChannel 是不可以使用选择器的, 由于 FileChannel 都是阻塞的.线程
注意到, 在使用 Channel.register()方法时, 第二个参数指定了咱们对 Channel 的什么类型的事件感兴趣, 这些事件有:rest
Connect, 即链接事件(TCP 链接), 对应于SelectionKey.OP_CONNECTcode
Accept, 即确认事件, 对应于SelectionKey.OP_ACCEPTserver
Read, 即读事件, 对应于SelectionKey.OP_READ, 表示 buffer 可读.对象
Write, 即写事件, 对应于SelectionKey.OP_WRITE, 表示 buffer 可写.blog
一个 Channel发出一个事件也能够称为 对于某个事件, Channel 准备好了. 所以一个 Channel 成功链接到了另外一个服务器也能够被称为 connect ready.
咱们可使用或运算|来组合多个事件, 例如:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
注意, 一个 Channel 仅仅能够被注册到一个 Selector 一次, 若是将 Channel 注册到 Selector 屡次, 那么其实就是至关于更新 SelectionKey 的 interest set
. 例如:
channel.register(selector, SelectionKey.OP_READ); channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
上面的 channel 注册到同一个 Selector 两次了, 那么第二次的注册其实就是至关于更新这个 Channel 的 interest set 为 SelectionKey.OP_READ | SelectionKey.OP_WRITE.
如上所示, 当咱们使用 register 注册一个 Channel 时, 会返回一个 SelectionKey 对象, 这个对象包含了以下内容:
interest set, 即咱们感兴趣的事件集, 即在调用 register 注册 channel 时所设置的 interest set.
ready set
channel
selector
attached object, 可选的附加对象
咱们能够经过以下方式获取 interest set:
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
表明了 Channel 所准备好了的操做.
咱们能够像判断 interest set 同样操做 Ready set, 可是咱们还可使用以下方法进行判断:
int readySet = selectionKey.readyOps(); selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
咱们能够经过 SelectionKey 获取相对应的 Channel 和 Selector:
Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();
咱们能够在selectionKey中附加一个对象:
selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment();
或者在注册时直接附加:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
咱们能够经过 Selector.select()方法获取对某件事件准备好了的 Channel, 即若是咱们在注册 Channel 时, 对其的可写事件感兴趣, 那么当 select()返回时, 咱们就能够获取 Channel 了.
注意
, select()方法返回的值表示有多少个 Channel 可操做.
若是 select()方法返回值表示有多个 Channel 准备好了, 那么咱们能够经过 Selected key set 访问这个 Channel:
Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }
注意, 在每次迭代时, 咱们都调用 "keyIterator.remove()" 将这个 key 从迭代器中删除, 由于 select() 方法仅仅是简单地将就绪的 IO 操做放到 selectedKeys 集合中, 所以若是咱们从 selectedKeys 获取到一个 key, 可是没有将它删除, 那么下一次 select 时, 这个 key 所对应的 IO 事件还在 selectedKeys 中.
例如此时咱们收到 OP_ACCEPT 通知, 而后咱们进行相关处理, 可是并无将这个 Key 从 SelectedKeys 中删除, 那么下一次 select() 返回时 咱们还能够在 SelectedKeys 中获取到 OP_ACCEPT 的 key.注意, 咱们能够动态更改 SekectedKeys 中的 key 的 interest set.
例如在 OP_ACCEPT 中, 咱们能够将 interest set 更新为 OP_READ, 这样 Selector 就会将这个 Channel 的 读 IO 就绪事件包含进来了.
经过 Selector.open() 打开一个 Selector.
将 Channel 注册到 Selector 中, 并设置须要监听的事件(interest set)
不断重复:
调用 select() 方法
调用 selector.selectedKeys() 获取 selected keys
迭代每一个 selected key:
*从 selected key 中获取 对应的 Channel 和附加信息(若是有的话)
*判断是哪些 IO 事件已经就绪了, 而后处理它们. 若是是 OP_ACCEPT 事件, 则调用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 获取 SocketChannel, 并将它设置为 非阻塞的, 而后将这个 Channel 注册到 Selector 中.
*根据须要更改 selected key 的监听事件.
*将已经处理过的 key 从 selected keys 集合中删除.
当调用了 Selector.close()方法时, 咱们实际上是关闭了 Selector 自己而且将全部的 SelectionKey 失效, 可是并不会关闭 Channel.
public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000; public static void main(String args[]) throws Exception { // 打开服务端 Socket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开 Selector Selector selector = Selector.open(); // 服务端 Socket 监听8080端口, 并配置为非阻塞模式 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false); // 将 channel 注册到 selector 中. // 一般咱们都是先注册一个 OP_ACCEPT 事件, 而后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ // 注册到 Selector 中. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 经过调用 select 方法, 阻塞地等待 channel I/O 可操做 if (selector.select(TIMEOUT) == 0) { System.out.print("."); continue; } // 获取 I/O 操做就绪的 SelectionKey, 经过 SelectionKey 能够知道哪些 Channel 的哪类 I/O 操做已经就绪. Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); // 当获取一个 SelectionKey 后, 就要将它删除, 表示咱们已经对这个 IO 事件进行了处理. keyIterator.remove(); if (key.isAcceptable()) { // 当 OP_ACCEPT 事件到来时, 咱们就有从 ServerSocketChannel 中获取一个 SocketChannel, // 表明客户端的链接 // 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel. // 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel. SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中. // 注意, 这里咱们若是没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回. clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE)); } if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buf); if (bytesRead == -1) { clientChannel.close(); } else if (bytesRead > 0) { key.interestOps(OP_READ | SelectionKey.OP_WRITE); System.out.println("Get data length: " + bytesRead); } } if (key.isValid() && key.isWritable()) { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.write(buf); if (!buf.hasRemaining()) { key.interestOps(OP_READ); } buf.compact(); } } } } }
本文由 yongshun 发表于我的博客, 采用署名-非商业性使用-相同方式共享 3.0 中国大陆许可协议.
非商业转载请注明做者及出处. 商业转载请联系做者本人
Email: yongshun1228@gmail .com本文标题为: Java NIO 的前生今世 之四 NIO Selector 详解本文连接为: segmentfault.com/a/1190000006824196