此系列文章会详细解读NIO的功能逐步丰满的路程,为Reactor-Netty 库的讲解铺平道路。java
关于Java编程方法论-Reactor与Webflux的视频分享,已经完成了Rxjava 与 Reactor,b站地址以下:linux
Rxjava源码解读与分享:www.bilibili.com/video/av345…git
Reactor源码解读与分享:www.bilibili.com/video/av353…github
本系列源码解读基于JDK11 api细节可能与其余版本有所差异,请自行解决jdk版本问题。编程
本系列前几篇:windows
如咱们在前面内容所讲,在学生肯定以后,咱们就要对其状态进行设定,而后再交由Selector
进行管理,其状态的设定咱们就经过SelectionKey
来进行。数据结构
那这里咱们先经过以前在Channel
中并未仔细讲解的SelectableChannel
下的register
方法。咱们前面有提到过, SelectableChannel
将channel
打形成能够经过Selector
来进行多路复用。做为管理者,channel
想要实现复用,就必须在管理者这里进行注册登记。因此,SelectableChannel
下的register
方法也就是咱们值得二次关注的核心了,也是对接咱们接下来内容的切入点,对于register
方法的解读,请看咱们以前的文章BIO到NIO源码的一些事儿之NIO 上 中赋予Channel可被多路复用的能力这一节的内容。
这里要记住的是SelectableChannel
是对接channel
特征(即SelectionKey
)的关键所在,这有点相似于表设计,本来能够将特征什么的设定在一张表内,但为了操做更加具备针对性,即为了让代码功能更易于管理,就进行抽取并设计了第二张表,这个就有点像人体器官,总体上你们共同协做完成一件事,但器官内部本身专一于本身的主要特定功能,偶尔也具有其余器官的一些小功能。
由此,咱们也就能够知道,SelectionKey
表示一个SelectableChannel
与Selector
关联的标记,能够简单理解为一个token
。就比如是咱们作权限管理系统用户登陆后前台会从后台拿到的一个token
同样,用户能够凭借此token
来访问操做相应的资源信息。
//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { ...
synchronized (regLock) {
...
synchronized (keyLock) {
...
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}
复制代码
结合上下两段源码,在每次Selector
使用register
方法注册channel
时,都会建立并返回一个SelectionKey
。
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
复制代码
咱们在BIO到NIO源码的一些事儿之NIO 上 中赋予Channel可被多路复用的能力这一节的内容知道,一旦注册到Selector
上,Channel
将一直保持注册直到其被解除注册。在解除注册的时候会解除Selector
分配给Channel
的全部资源。 也就是SelectionKey
在其调用SelectionKey#channel
方法,或这个key所表明的channel
关闭,抑或此key所关联的Selector
关闭以前,都是有效。咱们在前面的文章分析中也知道,取消一个SelectionKey
,不会马上从Selector
移除,它将被添加到Selector
的cancelledKeys
这个Set
集合中,以便在下一次选择操做期间删除,咱们能够经过java.nio.channels.SelectionKey#isValid
判断一个SelectionKey
是否有效。
SelectionKey包含四个操做集,每一个操做集用一个Int来表示,int值中的低四位的bit 用于表示channel
支持的可选操做种类。
/** * Operation-set bit for read operations. */
public static final int OP_READ = 1 << 0;
/** * Operation-set bit for write operations. */
public static final int OP_WRITE = 1 << 2;
/** * Operation-set bit for socket-connect operations. */
public static final int OP_CONNECT = 1 << 3;
/** * Operation-set bit for socket-accept operations. */
public static final int OP_ACCEPT = 1 << 4;
复制代码
经过interestOps
来肯定了selector
在下一个选择操做的过程当中将测试哪些操做类别的准备状况,操做事件是不是channel
关注的。interestOps
在SelectionKey
建立时,初始化为注册Selector
时的ops值,这个值可经过sun.nio.ch.SelectionKeyImpl#interestOps(int)
来改变,这点咱们在SelectorImpl#register
能够清楚的看到。
//sun.nio.ch.SelectionKeyImpl
public final class SelectionKeyImpl extends AbstractSelectionKey {
private static final VarHandle INTERESTOPS =
ConstantBootstraps.fieldVarHandle(
MethodHandles.lookup(),
"interestOps",
VarHandle.class,
SelectionKeyImpl.class, int.class);
private final SelChImpl channel;
private final SelectorImpl selector;
private volatile int interestOps;
private volatile int readyOps;
// registered events in kernel, used by some Selector implementations
private int registeredEvents;
// index of key in pollfd array, used by some Selector implementations
private int index;
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
channel = ch;
selector = sel;
}
...
}
复制代码
readyOps
表示经过Selector
检测到channel
已经准备就绪的操做事件。在SelectionKey
建立时(即上面源码所示),readyOps
值为0,在Selector
的select
操做中可能会更新,可是须要注意的是咱们不能直接调用来更新。
SelectionKey
的readyOps
表示一个channel
已经为某些操做准备就绪,但不能保证在针对这个就绪事件类型的操做过程当中不会发生阻塞,即该操做所在线程有可能会发生阻塞。在完成select
操做后,大部分状况下会当即对readyOps
更新,此时readyOps
值最准确,若是外部的事件或在该channel
有IO操做,readyOps
可能不许确。因此,咱们有看到其是volatile
类型。
SelectionKey
定义了全部的操做事件,可是具体channel
支持的操做事件依赖于具体的channel
,即具体问题具体分析。 全部可选择的channel
(即SelectableChannel
的子类)均可以经过SelectableChannel#validOps
方法,判断一个操做事件是否被channel
所支持,即每一个子类都会有对validOps
的实现,返回一个数字,仅标识channel
支持的哪些操做。尝试设置或测试一个不被channel
所支持的操做设定,将会抛出相关的运行时异常。 不一样应用场景下,其所支持的Ops
是不一样的,摘取部分以下所示:
//java.nio.channels.SocketChannel#validOps
public final int validOps() {
//即1|4|8 1101
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE
| SelectionKey.OP_CONNECT);
}
//java.nio.channels.ServerSocketChannel#validOps
public final int validOps() {
// 16
return SelectionKey.OP_ACCEPT;
}
//java.nio.channels.DatagramChannel#validOps
public final int validOps() {
// 1|4
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
复制代码
若是须要常常关联一些咱们程序中指定数据到SelectionKey
,好比一个咱们使用一个object表示上层的一种高级协议的状态,object用于通知实现协议处理器。因此,SelectionKey支持经过attach
方法将一个对象附加到SelectionKey
的attachment
上。attachment
能够经过java.nio.channels.SelectionKey#attachment
方法进行访问。若是要取消该对象,则能够经过该种方式:selectionKey.attach(null)
。
须要注意的是若是附加的对象再也不使用,必定要人为清除,若是没有,假如此SelectionKey
一直存在,因为此处属于强引用,那么垃圾回收器不会回收该对象,若不清除的话会成内存泄漏。
SelectionKey在由多线程并发使用时,是线程安全的。咱们只须要知道,Selector
的select
操做会一直使用在调用该操做开始时当前的interestOps
所设定的值。
到如今为止,咱们已经多多少少接触了Selector
,其是一个什么样的角色,想必都很清楚了,那咱们就在咱们已经接触到的来进一步深刻探究Selector
的设计运行机制。
从命名上就能够知道 SelectableChannel
对象是依靠Selector
来实现多路复用的。 咱们能够经过调用java.nio.channels.Selector#open
来建立一个selector
对象:
//java.nio.channels.Selector#open
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
复制代码
关于这个SelectorProvider.provider()
,其使用了根据所在系统的默认实现,我这里是windows系统,那么其默认实现为sun.nio.ch.WindowsSelectorProvider
,这样,就能够调用基于相应系统的具体实现了。
//java.nio.channels.spi.SelectorProvider#provider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
//sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {
/** * Prevent instantiation. */
private DefaultSelectorProvider() { }
/** * Returns the default SelectorProvider. */
public static SelectorProvider create() {
return new sun.nio.ch.WindowsSelectorProvider();
}
}
复制代码
基于windows来说,selector这里最终会使用sun.nio.ch.WindowsSelectorImpl
来作一些核心的逻辑。
public class WindowsSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
复制代码
这里,咱们须要来看一下WindowsSelectorImpl
的构造函数:
//sun.nio.ch.WindowsSelectorImpl#WindowsSelectorImpl
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
复制代码
咱们由Pipe.open()
就可知道selector
会保持打开的状态,直到其调用它的close
方法:
//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
boolean open = selectorOpen.getAndSet(false);
if (!open)
return;
implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
wakeup();
synchronized (this) {
implClose();
synchronized (publicSelectedKeys) {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
selectedKeys.remove(ski);
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
}
}
}
//sun.nio.ch.WindowsSelectorImpl#implClose
@Override
protected void implClose() throws IOException {
assert !isOpen();
assert Thread.holdsLock(this);
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
pollWrapper.free();
// Make all remaining helper threads exit
for (SelectThread t: threads)
t.makeZombie();
startLock.startThreads();
}
复制代码
能够看到,前面的wakeupPipe
在close方法中关闭掉了。这里的close方法中又涉及了wakeupPipe.sink()
与wakeupPipe.source()
的关闭与pollWrapper.free()
的释放,此处也是咱们本篇的难点所在,这里,咱们来看看它们究竟是什么样的存在。 首先,咱们对WindowsSelectorImpl(SelectorProvider sp)
这个构造函数作下梳理:
PollArrayWrapper
对象(pollWrapper
);Pipe.open()
打开一个管道;wakeupSourceFd
和wakeupSinkFd
两个文件描述符;wakeupSourceFd
)放到pollWrapper
里;这里咱们会有疑惑,为何要建立一个管道,它是用来作什么的。
咱们来看Pipe.open()
源码实现:
//java.nio.channels.Pipe#open
public static Pipe open() throws IOException {
return SelectorProvider.provider().openPipe();
}
//sun.nio.ch.SelectorProviderImpl#openPipe
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
//sun.nio.ch.PipeImpl#PipeImpl
PipeImpl(final SelectorProvider sp) throws IOException {
try {
AccessController.doPrivileged(new Initializer(sp));
} catch (PrivilegedActionException x) {
throw (IOException)x.getCause();
}
}
private class Initializer implements PrivilegedExceptionAction<Void> {
private final SelectorProvider sp;
private IOException ioe = null;
private Initializer(SelectorProvider sp) {
this.sp = sp;
}
@Override
public Void run() throws IOException {
LoopbackConnector connector = new LoopbackConnector();
connector.run();
if (ioe instanceof ClosedByInterruptException) {
ioe = null;
Thread connThread = new Thread(connector) {
@Override
public void interrupt() {}
};
connThread.start();
for (;;) {
try {
connThread.join();
break;
} catch (InterruptedException ex) {}
}
Thread.currentThread().interrupt();
}
if (ioe != null)
throw new IOException("Unable to establish loopback connection", ioe);
return null;
}
复制代码
从上述源码咱们能够知道,建立了一个PipeImpl
对象, 在PipeImpl
的构造函数里会执行AccessController.doPrivileged
,在它调用后紧接着会执行Initializer
的run
方法:
//sun.nio.ch.PipeImpl.Initializer.LoopbackConnector
private class LoopbackConnector implements Runnable {
@Override
public void run() {
ServerSocketChannel ssc = null;
SocketChannel sc1 = null;
SocketChannel sc2 = null;
try {
// Create secret with a backing array.
ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);
// Loopback address
InetAddress lb = InetAddress.getLoopbackAddress();
assert(lb.isLoopbackAddress());
InetSocketAddress sa = null;
for(;;) {
// Bind ServerSocketChannel to a port on the loopback
// address
if (ssc == null || !ssc.isOpen()) {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(lb, 0));
sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
}
// Establish connection (assume connections are eagerly
// accepted)
sc1 = SocketChannel.open(sa);
RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
do {
sc1.write(secret);
} while (secret.hasRemaining());
secret.rewind();
// Get a connection and verify it is legitimate
sc2 = ssc.accept();
do {
sc2.read(bb);
} while (bb.hasRemaining());
bb.rewind();
if (bb.equals(secret))
break;
sc2.close();
sc1.close();
}
// Create source and sink channels
source = new SourceChannelImpl(sp, sc1);
sink = new SinkChannelImpl(sp, sc2);
} catch (IOException e) {
try {
if (sc1 != null)
sc1.close();
if (sc2 != null)
sc2.close();
} catch (IOException e2) {}
ioe = e;
} finally {
try {
if (ssc != null)
ssc.close();
} catch (IOException e2) {}
}
}
}
}
复制代码
这里即为建立pipe
的过程,windows
下的实现是建立两个本地的socketChannel
,而后链接(链接的过程经过写一个随机数据作两个socket的链接校验),两个socketChannel
分别实现了管道pipe
的source
与sink
端。 而咱们依然不清楚这个pipe
到底干什么用的, 假如你们熟悉系统调用的C/C++
的话,就能够知道,一个阻塞在select
上的线程有如下三种方式能够被唤醒:
time out
。non-block
的信号。可由kill
或pthread_kill
发出。因此,Selector.wakeup()
要唤醒阻塞的select
,那么也只能经过这三种方法,其中:
select
一旦阻塞,没法修改其time out
时间。Linux
上实现,Windows
上没有这种信号通知的机制。看来只有第一种方法了。假如咱们屡次调用Selector.open()
,那么在Windows
上会每调用一次,就会创建一对本身和本身的loopback
的TCP
链接;在Linux上的话,每调用一次,会开一对pipe
(pipe在Linux下通常都成对打开),到这里,估计咱们可以猜得出来——那就是若是想要唤醒select
,只须要朝着本身的这个loopback
链接发点数据过去,因而,就能够唤醒阻塞在select
上的线程了。
咱们对上面所述作下总结:在Windows
下,Java
虚拟机在Selector.open()
时会本身和本身创建loopback
的TCP
链接;在Linux
下,Selector
会建立pipe
。这主要是为了Selector.wakeup()
能够方便唤醒阻塞在select()
系统调用上的线程(经过向本身所创建的TCP
连接和管道上随便写点什么就能够唤醒阻塞线程)。
在WindowsSelectorImpl
构造器最后,咱们看到这一句代码:pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
,即把pipe内Source端的文件描述符(wakeupSourceFd
)放到pollWrapper
里。pollWrapper
做为PollArrayWrapper
的实例,它究竟是什么,这一节,咱们就来对其探索一番。
class PollArrayWrapper {
private AllocatedNativeObject pollArray; // The fd array
long pollArrayAddress; // pollArrayAddress
@Native private static final short FD_OFFSET = 0; // fd offset in pollfd
@Native private static final short EVENT_OFFSET = 4; // events offset in pollfd
static short SIZE_POLLFD = 8; // sizeof pollfd struct
private int size; // Size of the pollArray
PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}
...
// Access methods for fd structures
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
...
// Adds Windows wakeup socket at a given index.
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, Net.POLLIN);
}
}
复制代码
这里将wakeupSourceFd
的POLLIN
事件标识为pollArray
的EventOps
的对应的值,这里使用的是unsafe直接操做的内存,也就是相对于这个pollArray
所在内存地址的偏移量SIZE_POLLFD * i + EVENT_OFFSET
这个位置上写入Net.POLLIN
所表明的值,即参考下面本地方法相关源码所展现的值。putDescriptor
一样是这种相似操做。当sink端
有数据写入时,source
对应的文件描述符wakeupSourceFd
就会处于就绪状态。
//java.base/windows/native/libnio/ch/nio_util.h
/* WSAPoll()/WSAPOLLFD and the corresponding constants are only defined */
/* in Windows Vista / Windows Server 2008 and later. If we are on an */
/* older release we just use the Solaris constants as this was previously */
/* done in PollArrayWrapper.java. */
#define POLLIN 0x0001
#define POLLOUT 0x0004
#define POLLERR 0x0008
#define POLLHUP 0x0010
#define POLLNVAL 0x0020
#define POLLCONN 0x0002
复制代码
AllocatedNativeObject
这个类的父类有大量的unsafe
类的操做,这些都是直接基于内存级别的操做。从其父类的构造器中,咱们能也清楚的看到pollArray
是经过unsafe.allocateMemory(size + ps)
分配的一块系统内存。
class AllocatedNativeObject // package-private extends NativeObject {
/** * Allocates a memory area of at least {@code size} bytes outside of the * Java heap and creates a native object for that area. */
AllocatedNativeObject(int size, boolean pageAligned) {
super(size, pageAligned);
}
/** * Frees the native memory area associated with this object. */
synchronized void free() {
if (allocationAddress != 0) {
unsafe.freeMemory(allocationAddress);
allocationAddress = 0;
}
}
}
//sun.nio.ch.NativeObject#NativeObject(int, boolean)
protected NativeObject(int size, boolean pageAligned) {
if (!pageAligned) {
this.allocationAddress = unsafe.allocateMemory(size);
this.address = this.allocationAddress;
} else {
int ps = pageSize();
long a = unsafe.allocateMemory(size + ps);
this.allocationAddress = a;
this.address = a + ps - (a & (ps - 1));
}
}
复制代码
至此,咱们算是完成了对Selector.open()
的解读,其主要任务就是完成创建Pipe
,并把pipe
source
端的wakeupSourceFd
放入pollArray
中,这个pollArray
是Selector
完成其角色任务的枢纽。本篇主要围绕Windows的实现来进行分析,即在windows下经过两个链接的socketChannel
实现了Pipe
,linux
下则直接使用系统的pipe
便可。
所谓的注册,其实就是将一个对象放到注册地对象内的一个容器字段上,这个字段能够是数组,队列,也能够是一个set集合,也能够是一个list。这里,一样是这样,只不过,其须要有个返回值,那么把这个要放入集合的对象返回便可。
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
//sun.nio.ch.WindowsSelectorImpl#implRegister
@Override
protected void implRegister(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
}
复制代码
这段代码咱们以前已经有看过,这里咱们再次温习下。 首先会新建一个SelectionKeyImpl
对象,这个对象就是对Channel
的包装,不只如此,还顺带把当前这个Selector
对象给收了进去,这样,咱们也能够经过SelectionKey
的对象来拿到其对应的Selector
对象。
接着,基于windows
平台实现的implRegister
,先经过ensureOpen()
来确保该Selector
是打开的。接着将这个SelectionKeyImpl
加入到WindowsSelectorImpl
内针对于新注册SelectionKey进行管理的newKeys
之中,newKeys
是一个ArrayDeque
对象。对于ArrayDeque
有不懂的,能够参考Java 容器源码分析之 Deque 与 ArrayDeque这篇文章。
而后再将此这个SelectionKeyImpl
加入到sun.nio.ch.SelectorImpl#keys
中去,这个Set<SelectionKey>
集合表明那些已经注册到当前这个Selector
对象上的SelectionKey
集合。咱们来看sun.nio.ch.SelectorImpl
的构造函数:
//sun.nio.ch.SelectorImpl#SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = ConcurrentHashMap.newKeySet();
selectedKeys = new HashSet<>();
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
复制代码
也就是说,这里的publicKeys
就来源于keys
,只是publicKeys
属于只读的,咱们想要知道当前Selector
对象上所注册的keys
,就能够调用sun.nio.ch.SelectorImpl#keys
来获得:
//sun.nio.ch.SelectorImpl#keys
@Override
public final Set<SelectionKey> keys() {
ensureOpen();
return publicKeys;
}
复制代码
再回到这个构造函数中,selectedKeys
,顾名思义,其属于已选择Keys,即前一次操做期间,已经准备就绪的Channel
所对应的SelectionKey
。此集合为keys
的子集。经过selector.selectedKeys()
获取。
//sun.nio.ch.SelectorImpl#selectedKeys
@Override
public final Set<SelectionKey> selectedKeys() {
ensureOpen();
return publicSelectedKeys;
}
复制代码
咱们看到其返回的是publicSelectedKeys
,针对这个字段里的元素操做能够作删除,但不能作增长。 在前面的内容中,咱们有涉及到SelectionKey
的取消,因此,咱们在java.nio.channels.spi.AbstractSelector
方法内,是有定义cancelledKeys
的,也是一个HashSet
对象。其表明已经被取消但还没有取消注册(deregister)的SelectionKey
。此Set集合没法直接访问,一样,它也是keys()的子集。
对于新的Selector
实例,上面几个集合均为空。由上面展现的源码可知,经过channel.register
将SelectionKey
添加keys
中,此为key的来源。 若是某个selectionKey.cancel()
被调用,那么此key将会被添加到cancelledKeys
这个集合中,而后在下一次调用selector select
方法期间,此时canceldKeys
不为空,将会触发此SelectionKey
的deregister
操做(释放资源,并从keys
中移除)。不管经过channel.close()
仍是经过selectionKey.cancel()
,都会致使SelectionKey
被加入到cannceldKey
中.
每次选择操做(select)期间,均可以将key添加到selectedKeys
中或者将从cancelledKeys
中移除。
了解了上面的这些,咱们来进入到select
方法中,观察下它的细节。由Selector
的api可知,select
操做有两种形式,一种为 select(),selectNow(),select(long timeout);另外一种为select(Consumer<SelectionKey> action, long timeout)
,select(Consumer<SelectionKey> action)
,selectNow(Consumer<SelectionKey> action)
。后者为JDK11新加入的api,主要针对那些准备好进行I/O操做的channels在select过程当中对相应的key进行的一个自定义操做。
须要注意的是,有Consumer<SelectionKey> action
参数的select操做是阻塞的,只有在选择了至少一个Channel的状况下,才会调用此Selector
实例的wakeup
方法来唤醒,一样,其所在线程被打断也能够。
//sun.nio.ch.SelectorImpl
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl
@Override
public final int select(Consumer<SelectionKey> action, long timeout) throws IOException {
Objects.requireNonNull(action);
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
复制代码
咱们能够观察,不管哪一种,它们最后都落在了lockAndDoSelect
这个方法上,最终会执行特定系统上的doSelect(action, timeout)
实现。 这里咱们以sun.nio.ch.WindowsSelectorImpl#doSelect
为例来说述其操做执行的步骤:
// sun.nio.ch.WindowsSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
processUpdateQueue(); // <1>
processDeregisterQueue(); // <2>
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll(); // <3>
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue(); // <4>
int updated = updateSelectedKeys(action); // <5>
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket(); // <6>
return updated;
}
复制代码
首先经过相应操做系统实现类(此处是WindowsSelectorImpl)的具体实现咱们能够知道,经过<1>
处的 processUpdateQueue()
得到关于每一个剩余Channel
(有些Channel取消了)的在此刻的interestOps
,这里包括新注册的和updateKeys
,并对其进行pollWrapper
的管理操做。
即对于新注册的
SelectionKeyImpl
,咱们在相对于这个pollArray
所在内存地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSET
与SIZE_POLLFD * totalChannels + EVENT_OFFSET
分别存入SelectionKeyImpl
的文件描述符fd
与其对应的EventOps
(初始为0)。对
updateKeys
,由于是其以前已经在pollArray
的某个相对位置上存储过,这里咱们还须要对拿到的key的有效性进行判断,若是有效,只须要将正在操做的这个SelectionKeyImpl
对象的interestOps
写入到在pollWrapper
中的存放它的EventOps
位置上。
注意: 在对
newKeys
进行key的有效性判断以后,若是有效,会调用growIfNeeded()
方法,这里首先会判断channelArray.length == totalChannels
,此为一个SelectionKeyImpl
的数组,初始容量大小为8。channelArray
其实就是方便Selector
管理在册SelectionKeyImpl
数量的一个数组而已,经过判断它的数组长度大小,若是和totalChannels
(初始值为1)相等,不只仅是为了channelArray
扩容,更重要的是为了辅助pollWrapper
,让pollWrapper
扩容才是目的所在。而当
totalChannels % MAX_SELECTABLE_FDS == 0
时,则多开一个线程处理selector
。windows
上select
系统调用有最大文件描述符限制,一次只能轮询1024
个文件描述符,若是多于1024个,须要多线程进行轮询。同时调用pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels)
在相对于这个pollArray
所在内存地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSET
这个位置上写入wakeupSourceFd
所表明的fdVal
值。这样在新起的线程就能够经过MAX_SELECTABLE_FDS
来肯定这个用来监控的wakeupSourceFd
,方便唤醒selector
。经过ski.setIndex(totalChannels)
记录下SelectionKeyImpl
在数组中的索引位置,以待后续使用。
/** * sun.nio.ch.WindowsSelectorImpl#processUpdateQueue * Process new registrations and changes to the interest ops. */
private void processUpdateQueue() {
assert Thread.holdsLock(this);
synchronized (updateLock) {
SelectionKeyImpl ski;
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
pollWrapper.putEntry(totalChannels, ski);
totalChannels++;
MapEntry previous = fdMap.put(ski);
assert previous == null;
}
}
// changes to interest ops
while ((ski = updateKeys.pollFirst()) != null) {
int events = ski.translateInterestOps();
int fd = ski.getFDVal();
if (ski.isValid() && fdMap.containsKey(fd)) {
int index = ski.getIndex();
assert index >= 0 && index < totalChannels;
pollWrapper.putEventOps(index, events);
}
}
}
}
//sun.nio.ch.PollArrayWrapper#putEntry
// Prepare another pollfd struct for use.
void putEntry(int index, SelectionKeyImpl ski) {
putDescriptor(index, ski.getFDVal());
putEventOps(index, 0);
}
//sun.nio.ch.WindowsSelectorImpl#growIfNeeded
private void growIfNeeded() {
if (channelArray.length == totalChannels) {
int newSize = totalChannels * 2; // Make a larger array
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
channelArray = temp;
pollWrapper.grow(newSize);
}
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
totalChannels++;
threadsCount++;
}
}
// Initial capacity of the poll array
private final int INIT_CAP = 8;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private static final int MAX_SELECTABLE_FDS = 1024;
// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array, where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
// The number of valid entries in poll array, including entries occupied
// by wakeup socket handle.
private int totalChannels = 1;
//sun.nio.ch.PollArrayWrapper#grow
// Grows the pollfd array to new size
void grow(int newSize) {
PollArrayWrapper temp = new PollArrayWrapper(newSize);
for (int i = 0; i < size; i++)
replaceEntry(this, i, temp, i);
pollArray.free();
pollArray = temp.pollArray;
this.size = temp.size;
pollArrayAddress = pollArray.address();
}
// Maps file descriptors to their indices in pollArray
private static final class FdMap extends HashMap<Integer, MapEntry> {
static final long serialVersionUID = 0L;
private MapEntry get(int desc) {
return get(Integer.valueOf(desc));
}
private MapEntry put(SelectionKeyImpl ski) {
return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
}
private MapEntry remove(SelectionKeyImpl ski) {
Integer fd = Integer.valueOf(ski.getFDVal());
MapEntry x = get(fd);
if ((x != null) && (x.ski.channel() == ski.channel()))
return remove(fd);
return null;
}
}
// class for fdMap entries
private static final class MapEntry {
final SelectionKeyImpl ski;
long updateCount = 0;
MapEntry(SelectionKeyImpl ski) {
this.ski = ski;
}
}
private final FdMap fdMap = new FdMap();
复制代码
上面WindowsSelectorImpl#doSelect展现源码中<2>
处的 processDeregisterQueue()
。
cancelledKeys
进行清除,遍历cancelledKeys
,并对每一个key
进行deregister
操做,而后从cancelledKeys
集合中删除,从keys
集合与selectedKeys
中删除,以此来释放引用,方便gc回收,implDereg
方法,将会从channelArray
中移除对应的Channel
表明的SelectionKeyImpl
,调整totalChannels
和线程数,从map
和keys
中移除SelectionKeyImpl
,移除Channel
上的SelectionKeyImpl
并关闭Channel
。processDeregisterQueue()
方法在调用poll
方法先后都进行调用,这是确保可以正确处理在调用poll
方法阻塞的这一段时间以内取消的键能被及时清理。cancelledKey
所表明的channel
是否打开和解除注册,若是关闭并解除注册,则应该将相应的文件描述符对应占用的资源给关闭掉。/** * sun.nio.ch.SelectorImpl#processDeregisterQueue * Invoked by selection operations to process the cancelled-key set */
protected final void processDeregisterQueue() throws IOException {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicSelectedKeys);
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
i.remove();
// remove the key from the selector
implDereg(ski);
selectedKeys.remove(ski);
keys.remove(ski);
// remove from channel's key set
deregister(ski);
SelectableChannel ch = ski.channel();
if (!ch.isOpen() && !ch.isRegistered())
((SelChImpl)ch).kill();
}
}
}
}
//sun.nio.ch.WindowsSelectorImpl#implDereg
@Override
protected void implDereg(SelectionKeyImpl ski) {
assert !ski.isValid();
assert Thread.holdsLock(this);
if (fdMap.remove(ski) != null) {
int i = ski.getIndex();
assert (i >= 0);
if (i != totalChannels - 1) {
// Copy end one over it
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
channelArray[i] = endChannel;
endChannel.setIndex(i);
pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
}
ski.setIndex(-1);
channelArray[totalChannels - 1] = null;
totalChannels--;
if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
totalChannels--;
threadsCount--; // The last thread has become redundant.
}
}
}
//sun.nio.ch.SocketChannelImpl#kill
@Override
public void kill() throws IOException {
synchronized (stateLock) {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
}
}
}
//C:/Program Files/Java/jdk-11.0.1/lib/src.zip!/java.base/sun/nio/ch/SocketChannelImpl.java:1126
static {
IOUtil.load();
nd = new SocketDispatcher();
}
//sun.nio.ch.SocketDispatcher#close
void close(FileDescriptor fd) throws IOException {
close0(fd);
}
复制代码
上面WindowsSelectorImpl#doSelect
展现源码中adjustThreadsCount()
方法的调用。
totalChannels % MAX_SELECTABLE_FDS == 0
,则多开一个线程处理selector
。这里就是根据分配的线程数量值来增长或减小线程,其实就是针对操做系统的最大select
操做的文件描述符限制对线程个数进行调整。SelectThread
的run
方法实现。经过观察其源码能够看到它首先是while (true)
,经过startLock.waitForStart(this)
来控制该线程是否运行仍是等待,运行状态的话,会进而调用subSelector.poll(index)
(这个咱们后面内容详细解读),poll
结束,并且相对于当前主线程假若有多条SelectThread
子线程的话,当前这条SelectThread
线程第一个结束poll
的话,就调用finishLock.threadFinished()
来通知主线程。在刚新建这个线程并调用其run
方法的时候,此时lastRun = 0
,在第一次启动的时候sun.nio.ch.WindowsSelectorImpl.StartLock#runsCounter
一样为0,因此会调用startLock.wait()
进而进入等待状态。注意:
sun.nio.ch.WindowsSelectorImpl.StartLock
一样会判断当前其所检测的线程是否废弃,废弃的话就返回true
,这样被检测线程也就能跳出其内run方法的while
循环从而结束线程运行。- 在调整线程的时候(调用
adjustThreadsCount
方法)与Selector
调用close
方法会间接调用到sun.nio.ch.WindowsSelectorImpl#implClose
,这两个方法都会涉及到Selector
线程的释放,即调用sun.nio.ch.WindowsSelectorImpl.SelectThread#makeZombie
。finishLock.threadFinished()
会调用wakeup()
方法来通知主线程,这里,咱们能够学到一个细节,若是线程正阻塞在select
方法上,就能够调用wakeup
方法会使阻塞的选择操做当即返回,经过Windows
的相关实现,原理实际上是向pipe
的sink
端写入了一个字节,source
文件描述符就会处于就绪状态,poll
方法会返回,从而致使select
方法返回。而在其余solaris或者linux系统上其实采用系统调用pipe
来完成管道的建立,至关于直接用了系统的管道。经过wakeup()
相关实现还能够看出,调用wakeup
会设置interruptTriggered
的标志位,因此连续屡次调用wakeup
的效果等同于一次调用,不会引发无所谓的bug出现。
//sun.nio.ch.WindowsSelectorImpl#adjustThreadsCount
// After some channels registered/deregistered, the number of required
// helper threads may have changed. Adjust this number.
private void adjustThreadsCount() {
if (threadsCount > threads.size()) {
// More threads needed. Start more threads.
for (int i = threads.size(); i < threadsCount; i++) {
SelectThread newThread = new SelectThread(i);
threads.add(newThread);
newThread.setDaemon(true);
newThread.start();
}
} else if (threadsCount < threads.size()) {
// Some threads become redundant. Remove them from the threads List.
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
threads.remove(i).makeZombie();
}
}
//sun.nio.ch.WindowsSelectorImpl.SelectThread
// Represents a helper thread used for select.
private final class SelectThread extends Thread {
private final int index; // index of this thread
final SubSelector subSelector;
private long lastRun = 0; // last run number
private volatile boolean zombie;
// Creates a new thread
private SelectThread(int i) {
super(null, null, "SelectorHelper", 0, false);
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
this.lastRun = startLock.runsCounter;
}
void makeZombie() {
zombie = true;
}
boolean isZombie() {
return zombie;
}
public void run() {
while (true) { // poll loop
// wait for the start of poll. If this thread has become
// redundant, then exit.
if (startLock.waitForStart(this))
return;
// call poll()
try {
subSelector.poll(index);
} catch (IOException e) {
// Save this exception and let other threads finish.
finishLock.setException(e);
}
// notify main thread, that this thread has finished, and
// wakeup others, if this thread is the first to finish.
finishLock.threadFinished();
}
}
}
// sun.nio.ch.WindowsSelectorImpl.FinishLock#threadFinished
// Each helper thread invokes this function on finishLock, when
// the thread is done with poll().
private synchronized void threadFinished() {
if (threadsToFinish == threads.size()) { // finished poll() first
// if finished first, wakeup others
wakeup();
}
threadsToFinish--;
if (threadsToFinish == 0) // all helper threads finished poll().
notify(); // notify the main thread
}
//sun.nio.ch.WindowsSelectorImpl#wakeup
@Override
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
//sun.nio.ch.WindowsSelectorImpl#setWakeupSocket
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);
JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd) {
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
复制代码
subSelector.poll()
是select的核心,由native
函数poll0
实现,并把pollWrapper.pollArrayAddress
做为参数传给poll0
,readFds
、writeFds
和exceptFds
数组用来保存底层select
的结果,数组的第一个位置都是存放发生事件的socket
的总数,其他位置存放发生事件的socket
句柄fd
。 咱们经过下面的代码可知: 这个poll0()
会监听pollWrapper
中的FD
有没有数据进出,这里会形成IO
阻塞,直到有数据读写事件发生。因为pollWrapper
中保存的也有ServerSocketChannel
的FD
,因此只要ClientSocket
发一份数据到ServerSocket
,那么poll0()
就会返回;又因为pollWrapper
中保存的也有pipe
的write
端的FD
,因此只要pipe
的write
端向FD
发一份数据,也会形成poll0()
返回;若是这两种状况都没有发生,那么poll0()
就一直阻塞,也就是selector.select()
会一直阻塞;若是有任何一种状况发生,那么selector.select()
就会返回,全部在SelectThread
的run()
里要用while (true) {}
,这样就能够保证在selector
接收到数据并处理完后继续监听poll()
;能够看出,NIO依然是阻塞式的IO,那么它和BIO的区别究竟在哪呢。 其实它的区别在于阻塞的位置不一样,
BIO
是阻塞在read
方法(recvfrom),而NIO
阻塞在select
方法。那么这样作有什么好处呢。若是单纯的改变阻塞的位置,天然是没有什么变化的,但epoll等
的实现的巧妙之处就在于,它利用回调机制,让监听可以只须要知晓哪些socket
上的数据已经准备好了,只须要处理这些线程上面的数据就好了。采用BIO
,假设有1000
个链接,须要开1000
个线程,而后有1000
个read
的位置在阻塞(咱们在讲解BIO部分已经经过Demo体现),采用NIO
编程,只须要1个线程,它利用select
的轮询策略配合epoll
的事件机制及红黑树数据结构,下降了其内部轮询的开销,同时极大的减少了线程上下文切换的开销。
//sun.nio.ch.WindowsSelectorImpl.SubSelector
private final class SubSelector {
private final int pollArrayIndex; // starting index in pollArray to poll
// These arrays will hold result of native select().
// The first element of each array is the number of selected sockets.
// Other elements are file descriptors of selected sockets.
// 保存发生read的FD
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
// 保存发生write的FD
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
//保存发生except的FD
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
private SubSelector() {
this.pollArrayIndex = 0; // main thread
}
private SubSelector(int threadIndex) { // helper threads
this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
}
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
...
}
复制代码
上面WindowsSelectorImpl#doSelect展现源码中<5>
处的 updateSelectedKeys(action)
来处理每一个channel
的 准备就绪的信息。key
还没有在selectedKeys
中存在,则将其添加到该集合中。key
已经存在selectedKeys
中,即这个channel
存在所支持的ReadyOps
就绪操做中必须包含一个这种操做(由(ski.nioReadyOps() & ski.nioInterestOps()) != 0
来肯定),此时修改其ReadyOps
为当前所要进行的操做。而咱们以前看到的Consumer<SelectionKey>
这个动做也是在此处进行。而由下面源码可知,先前记录在ReadyOps
中的任何就绪信息在调用此action
以前被丢弃掉,直接进行设定。//sun.nio.ch.WindowsSelectorImpl#updateSelectedKeys
private int updateSelectedKeys(Consumer<SelectionKey> action) {
updateCount++;
int numKeysUpdated = 0;
numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
for (SelectThread t: threads) {
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
}
return numKeysUpdated;
}
//sun.nio.ch.SelectorImpl#processReadyEvents
protected final int processReadyEvents(int rOps, SelectionKeyImpl ski, Consumer<SelectionKey> action) {
if (action != null) {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
action.accept(ski);
ensureOpen();
return 1;
}
} else {
assert Thread.holdsLock(publicSelectedKeys);
if (selectedKeys.contains(ski)) {
if (ski.translateAndUpdateReadyOps(rOps)) {
return 1;
}
} else {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
return 1;
}
}
}
return 0;
}
//sun.nio.ch.WindowsSelectorImpl.SubSelector#processSelectedKeys
private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
int numKeysUpdated = 0;
numKeysUpdated += processFDSet(updateCount, action, readFds,
Net.POLLIN,
false);
numKeysUpdated += processFDSet(updateCount, action, writeFds,
Net.POLLCONN |
Net.POLLOUT,
false);
numKeysUpdated += processFDSet(updateCount, action, exceptFds,
Net.POLLIN |
Net.POLLCONN |
Net.POLLOUT,
true);
return numKeysUpdated;
}
/** * sun.nio.ch.WindowsSelectorImpl.SubSelector#processFDSet * updateCount is used to tell if a key has been counted as updated * in this select operation. * * me.updateCount <= updateCount */
private int processFDSet(long updateCount, Consumer<SelectionKey> action, int[] fds, int rOps, boolean isExceptFds) {
int numKeysUpdated = 0;
for (int i = 1; i <= fds[0]; i++) {
int desc = fds[i];
if (desc == wakeupSourceFd) {
synchronized (interruptLock) {
interruptTriggered = true;
}
continue;
}
MapEntry me = fdMap.get(desc);
// If me is null, the key was deregistered in the previous
// processDeregisterQueue.
if (me == null)
continue;
SelectionKeyImpl sk = me.ski;
// The descriptor may be in the exceptfds set because there is
// OOB data queued to the socket. If there is OOB data then it
// is discarded and the key is not added to the selected set.
if (isExceptFds &&
(sk.channel() instanceof SocketChannelImpl) &&
discardUrgentData(desc))
{
continue;
}
//咱们应该关注的
int updated = processReadyEvents(rOps, sk, action);
if (updated > 0 && me.updateCount != updateCount) {
me.updateCount = updateCount;
numKeysUpdated++;
}
}
return numKeysUpdated;
}
复制代码
至此,关于Selector的内容就暂时告一段落,在下一篇中,我会针对Java NIO Buffer进行相关解读。