原本是想学习Netty的,可是Netty是一个NIO框架,所以在学习netty以前,仍是先梳理一下NIO的知识。经过剖析源码理解NIO的设计原理。html
本系列文章针对的是JDK1.8.0.161的源码。java
NIO-Selector源码分析对Selector
的功能和建立过程进行了分析,本篇对Windows下的WindowsSelectorImpl
源码实现进行详细讲解。windows
上一篇文章提到,若没有进行配置时,默认经过sun.nio.ch.DefaultSelectorProvider.create()
建立SelectorProvider
。 Windows下的代码路径在jdk\src\windows\classes\sun\nio\ch\DefaultSelectorProvider.java
。在其内部经过实际是建立了一个WindowsSelectorProvider)
。数组
WindowsSelectorProvider
是用于建立WindowsSelectorImpl
的。缓存
Selector.Open()->
SelectorProvider.provider()->
sun.nio.ch.DefaultSelectorProvider.create()->
new WindowsSelectorImpl(this)->
WindowsSelectorProvider.openSelector()
public class WindowsSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
复制代码
在详细讲解WindowsSelectorImpl
源码以前,先了解WindowsSelectorImpl
的大体代码结构。微信
在其内部有几个主要的数据结构和属性。数据结构
名称 | 做用 |
---|---|
SelectionKeyImpl[] channelArray | 存放注册的SelectionKey |
PollArrayWrapper pollWrapper | 底层的本机轮询数组包装对象,用于存放Socket文件描述符和事件掩码 |
List<SelectThread> threads | 辅助线程,多个线程有助于提升高并发时的性能 |
Pipe wakeupPipe | 用于唤醒辅助线程 |
FdMap fdMap | 保存文件描述符和SelectionKey的映射关系 |
SubSelector subSelector | 调用JNI的poll和处理就绪的SelectionKey |
StartLock startLock | 新增的辅助线程使用该锁等待主线程的开始信号 |
FinishLock finishLock | 主线程用该锁等待全部辅助线程执行完毕 |
用于存放Channel
,Selector
以及存放Channel注册时的事件掩码。多线程
SelectionKeyImpl
SelectionKeyImpl
加入到SelectionKeyImpl[] channelArray
SelectionKeyImpl
的对应关系加入到FdMap fdMap
PollArrayWrapper pollWrapper
中。PollArrayWrapper
用于存放文件描述符的文件描述符和事件掩码的native数组。相关的文件描述符的结构以下图:并发
其中每项的结构以下:app
名称 | 大小 | 说明 |
---|---|---|
SOCKET fd | 4字节 | 存放Socket文件句柄 |
short events | 2字节 | 等待的事件掩码 |
short reevents | 2字节 | 实际发生的事件掩码,暂时美有用到 |
如上所示,每项为8字节,即为SIZE_POLLFD
的值,目前NIO实际只用前两个字段。
class PollArrayWrapper {
private AllocatedNativeObject pollArray; // The fd array
long pollArrayAddress; // pollArrayAddress
static short SIZE_POLLFD = 8; // sizeof pollfd struct
private int size; // Size of the pollArray
PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}
...
}
复制代码
在 PollArrayWrapper
内部使用 AllocatedNativeObject
对象建立的堆外(native)内存对象。 将数组的首地址保存到pollArrayAddress
中,在调用Poll
的时候须要传递该参数给JNI。
PollArrayWrapper
暴露了读写FD和Event的方法供WindowsSelectorImpl
使用。
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
int getEventOps(int i) {
return pollArray.getShort(SIZE_POLLFD * i + EVENT_OFFSET);
}
int getDescriptor(int i) {
return pollArray.getInt(SIZE_POLLFD * i + FD_OFFSET);
}
复制代码
因为select最大一次性获取1024个文件描述符。所以为了提升poll的性能 WindowsSelectorImpl
底层 经过引入多个辅助线程的方式实现多线程poll以提升高并发时的性能问题。 咱们先看一下注册的逻辑
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
//判断是否须要扩容队列以及添加辅助线程
growIfNeeded();
//保存到缓存中
channelArray[totalChannels] = ski;
//保存在数组中的位置
ski.setIndex(totalChannels);
//保存文件描述符和SelectionKeyImpl的映射关系到FDMap
fdMap.put(ski);
//保存到keys中
keys.add(ski);
//保存文件描述符和事件到native数组中
pollWrapper.addEntry(totalChannels, ski);
totalChannels++;
}
}
复制代码
在注册以前会先会判断当前注册的Channel
数量 是否达到须要启动辅助线程的阈值。若是达到阈值则须要扩容pollWrapper
数组,同时还要 将wakeupSourceFd
加入到扩容后的第一个位置 (具体做用下面会讲解)。
private void growIfNeeded() {
if (channelArray.length == totalChannels) {
//channel数组已满,扩容两倍
int newSize = totalChannels * 2; // Make a larger array
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
channelArray = temp;
//文件描述符数组扩容
pollWrapper.grow(newSize);
}
//达到最大文件描述符数量时添加辅助线程
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
//将唤醒的文件描述符加入到扩容后的第一个位置。
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
totalChannels++;
//添加线程数
threadsCount++;
}
}
复制代码
扩容PollArrayWrapper
pollWrapper.grow(newSize);
void grow(int newSize) {
//建立新的数组
PollArrayWrapper temp = new PollArrayWrapper(newSize);
for (int i = 0; i < size; i++)
//将原来的数组的内容存放到新的数组中
replaceEntry(this, i, temp, i);
//释放原来的数组
pollArray.free();
//更新引用
pollArray = temp.pollArray;
//更新大小
this.size = temp.size;
//更新地址
pollArrayAddress = pollArray.address();
}
复制代码
扩容完成时,须要添加一个辅助线程以并行的处理全部文件描述符。主线程处理前1024个文件描述符,第二个辅助线程处理1025到2048的文件描述符,以此类推。 这样使得主线程调用poll的时候,经过多线程并行执行一次性获取到全部的已就绪的文件描述符,从而提升在高并发时的poll的性能。
每1024个PollFD的第一个句柄都要设置为wakeupSourceFd
,所以在扩容的时候也须要将新的位置的第一个设置为wakeupSourceFd
,该线程的目的是为了唤醒辅助线程 。当多个线程阻塞在Poll
,若此时主线程已经处理完成,则须要等待全部辅助线程完成,经过向wakeupSourceFd
发送信号以激活Poll
不在阻塞。
如今咱们知道了windows下poll多线程的使用方法,由于多线程poll还须要其余的数据结构支持同步,具体的多线程执行逻辑咱们下面再讨论。
FDMap只是为了保存文件描述符句柄和SelectionKey
的关系,前面咱们提到了PollFD的数据结构包含了文件描述符句柄信息,所以咱们能够经过文件描述符句柄从FdMap中获取到对应的SelectionKey
。
private final static class FdMap extends HashMap<Integer, MapEntry> {
static final long serialVersionUID = 0L;
private MapEntry get(int desc) {
return get(new Integer(desc));
}
private MapEntry put(SelectionKeyImpl ski) {
return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
}
private MapEntry remove(SelectionKeyImpl ski) {
Integer fd = new Integer(ski.channel.getFDVal());
MapEntry x = get(fd);
if ((x != null) && (x.ski.channel == ski.channel))
return remove(fd);
return null;
}
}
复制代码
SubSelector
封装了调用JNI poll的逻辑,以及获取就绪SelectionKey
的方法。
主线程和每个子线程都有一个SubSelector
,其内存保存了poll获取到的可读文件描述符,可写文件描述符以及异常的文件描述符。这样每一个线程就有本身单独的就绪文件描述符数组。
private final int pollArrayIndex;
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
复制代码
pollArrayIndex
记录了当前SubSelector
的序号,在调用poll的时候,须要将文件描述符数组的地址传递给JNI中,因为咱们有多个线程一块儿调用poll,且每一个线程处理1024个Channel
。经过序号和数组的地址计算当前SubSelector
所负责哪些通道。
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
复制代码
在主线程调用poll以后,会获取到已就绪的文件描述符(包含可读、可写、异常)。经过调用processSelectedKeys
将就绪的文件描述符对应的SelectorKey
加入到selectedKeys
中。这样咱们外部就能够调用到全部就绪的SelectorKey
进行遍历处理。
private int processSelectedKeys(long updateCount) {
int numKeysUpdated = 0;
numKeysUpdated += processFDSet(updateCount, readFds,
Net.POLLIN,
false);
numKeysUpdated += processFDSet(updateCount, writeFds,
Net.POLLCONN |
Net.POLLOUT,
false);
numKeysUpdated += processFDSet(updateCount, exceptFds,
Net.POLLIN |
Net.POLLCONN |
Net.POLLOUT,
true);
return numKeysUpdated;
}
复制代码
可读文件描述符,可写文件描述符以及异常文件描述符的处理逻辑都是同样的,调用processFDSet
处理更新SelectorKey
的就绪事件。这里会传入文件描述符的数组。须要注意的是文件描述符第一个元素是数组的长度。
private int processFDSet(long updateCount, int[] fds, int rOps, boolean isExceptFds) {
int numKeysUpdated = 0;
//1. 遍历文件描述符数组
for (int i = 1; i <= fds[0]; i++) {
//获取文件描述符句柄值
int desc = fds[i];
//2. 判断当前文件描述符是不是用于唤醒的文件描述
if (desc == wakeupSourceFd) {
synchronized (interruptLock) {
interruptTriggered = true;
}
continue;
}
//3. 获取文件描述符句柄对应的SelectionKey的映射值
MapEntry me = fdMap.get(desc);
// 4. 若为空,则表示已经被取消。
if (me == null)
continue;
SelectionKeyImpl sk = me.ski;
// 5. 丢弃OOD数据(紧急数据)
if (isExceptFds &&
(sk.channel() instanceof SocketChannelImpl) &&
discardUrgentData(desc))
{
continue;
}
//6. 判断key是否已经就绪,若已就绪,则将当前操做累加到原来的操做上,好比原来写事件就绪,如今读事件就绪,就须要更新该key读写就绪
if (selectedKeys.contains(sk)) { // Key in selected set
//clearedCount 和 updateCount用于避免同一个key的事件设置屡次,由于同一个文件描述符可能在可读文件描述符数组也可能在异常文件描述符数组中。
if (me.clearedCount != updateCount) {
if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
} else { // Key is not in selected set yet
//key原来未就绪,将key加入selectedKeys中
if (me.clearedCount != updateCount) {
sk.channel.translateAndSetReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
sk.channel.translateAndUpdateReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
}
}
return numKeysUpdated;
}
复制代码
wakeupSourceFd
,前面说了该文件描述符用于唤醒。discardUrgentData
读取并忽略。如今大部分数据结构都已经介绍了,在谈论Pipe、StartLock和FinishLock以前,是时候引入多线程Poll功能了,在谈论多线程时,会对上述三个数据结构和功能进行详细说明。
首先咱们先看一下建立WindowsSelectorImpl
作了什么
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
复制代码
PollArrayWrapper
wakeupSourceFd
存到PollArrayWapper
每1024个元素的第一个位置。使得每一个线程都能被wakeupSourceFd
唤醒。因为select最大支持1024个句柄,这里第一个文件描述符是wakeupSourceFd
,因此一个线程实际最多并发处理1023个socket文件描述符。
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, Net.POLLIN);
}
复制代码
如今咱们看一下doSelect逻辑
protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
//1. 删除取消的key
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
//2. 调整线程数 ,等待运行
adjustThreadsCount();
//3. 设置辅助线程数
finishLock.reset();
//4. 开始运行新增的辅助线程
startLock.startThreads();
try {
begin();
try {
//5. 获取就绪文件描述符
subSelector.poll();
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
//6. 等待全部辅助线程完成
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
//7. 再次检查删除取消的key
processDeregisterQueue();
//8. 将就绪的key加入到selectedKeys中
int updated = updateSelectedKeys();
// 完成,重置唤醒标记下次在运行。
resetWakeupSocket();
return updated;
}
复制代码
cancelledKeys
中。protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
k.cancel();
}
}
}
public final void cancel() {
...
((AbstractSelector)selector()).cancel(this);
...
}
void cancel(SelectionKey k) { // package-private
synchronized (cancelledKeys) {
cancelledKeys.add(k);
}
}
复制代码
调用processDeregisterQueue
进行注销。
processDeregisterQueue();
//遍历全部已取消的key,取消他们
void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
//遍历全部key
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
try {
//注销key
implDereg(ski);
} catch (SocketException se) {
throw new IOException("Error deregistering key", se);
} finally {
i.remove();
}
}
}
}
}
protected void implDereg(SelectionKeyImpl ski) throws IOException{
int i = ski.getIndex();
assert (i >= 0);
synchronized (closeLock) {
if (i != totalChannels - 1) {
// 把最后一个通道复制到取消key所在的位置。
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
channelArray[i] = endChannel;
endChannel.setIndex(i);
pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
pollWrapper, i);
}
ski.setIndex(-1);
}
//将最后一个通道清空。
channelArray[totalChannels - 1] = null;
totalChannels--;
//判断是否须要减小一个辅助线程。
if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
totalChannels--;
threadsCount--; // The last thread has become redundant.
}
//清除对应的缓存。
fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
keys.remove(ski);
selectedKeys.remove(ski);
//设置key无效
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
//关闭文件描述符
((SelChImpl)selch).kill();
}
//将全部key都设置为无效
protected final void deregister(AbstractSelectionKey key) {
((AbstractSelectableChannel)key.channel()).removeKey(key);
}
void removeKey(SelectionKey k) { // package-private
synchronized (keyLock) {
for (int i = 0; i < keys.length; i++)
if (keys[i] == k) {
keys[i] = null;
keyCount--;
}
//将key设置为无效
((AbstractSelectionKey)k).invalidate();
}
}
复制代码
channelArray
中删除。fdMap
、keys
、selectedKeys
数据缓存。((SelChImpl)selch).kill();
是在各个Channel中实现的,以SocketChannel为例,最终会调用nd.close(fd);
关闭对应的文件描述符
private void adjustThreadsCount() {
//当线程大于实际线程,建立更多线程
if (threadsCount > threads.size()) {
// More threads needed. Start more threads.
for (int i = threads.size(); i < threadsCount; i++) {
SelectThread newThread = new SelectThread(i);
threads.add(newThread);
//设置为守护线程
newThread.setDaemon(true);
newThread.start();
}
} else if (threadsCount < threads.size()) {
// 当线程小于实际线程,移除线程。
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
threads.remove(i).makeZombie();
}
}
复制代码
在建立新的线程时,会记录上一次运行的数量保存到lastRun
变量中
private SelectThread(int i) {
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
this.lastRun = startLock.runsCounter;
}
复制代码
当线程启动时会等待主线程激活
public void run() {
while (true) { // poll loop
//等待主线程信号激活
if (startLock.waitForStart(this))
return;
// call poll()
try {
subSelector.poll(index);
} catch (IOException e) {
// Save this exception and let other threads finish.
finishLock.setException(e);
}
// 通知主线程完成.
finishLock.threadFinished();
}
}
复制代码
经过startLock
等待主线程的开始信号。若当前线程是新启动的线程,则runsCounter == thread.lastRun
为真,此时新的线程须要等待主线程调用启动。
startLock.waitForStart(this)
private synchronized boolean waitForStart(SelectThread thread) {
while (true) {
while (runsCounter == thread.lastRun) {
try {
startLock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (thread.isZombie()) { // redundant thread
return true; // will cause run() to exit.
} else {
thread.lastRun = runsCounter; // update lastRun
return false; // will cause run() to poll.
}
}
}
}
复制代码
记录当前辅助线程数量,下次新增的辅助线程须要等待主线程通知启动。
finishLock.reset();
private void reset() {
threadsToFinish = threads.size(); // helper threads
}
复制代码
startLock.startThreads();
private synchronized void startThreads() {
runsCounter++; // next run
notifyAll(); // 通知全部辅助线程继续执行,
}
复制代码
subSelector.poll();
//主线程调用
private int poll() throws IOException{
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
//辅助线程调用
private int poll(int index) throws IOException {
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
复制代码
辅助线程和主线程调用的区别就是存放PollFD的位置变化,每一个线程会有1024个PollFD(8B)的位置存放PollFD。这样使得多个线程的数据内存分离互不影响。 下面看一下JNI的poll0
作了什么处理。下面罗略了主要的逻辑
typedef struct {
jint fd;
jshort events;
} pollfd;
Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,
jlong pollAddress, jint numfds,
jintArray returnReadFds, jintArray returnWriteFds,
jintArray returnExceptFds, jlong timeout)
{
DWORD result = 0;
pollfd *fds = (pollfd *) pollAddress;
int i;
FD_SET readfds, writefds, exceptfds;
struct timeval timevalue, *tv;
static struct timeval zerotime = {0, 0};
...
/* Call select */
if ((result = select(0 , &readfds, &writefds, &exceptfds, tv))
== SOCKET_ERROR) {
//当出现错误时,变量每一个socket获取它的就绪状态
FD_SET errreadfds, errwritefds, errexceptfds;
...
for (i = 0; i < numfds; i++) {
errreadfds.fd_count = 0;
errwritefds.fd_count = 0;
if (fds[i].events & POLLIN) {
errreadfds.fd_array[0] = fds[i].fd;
errreadfds.fd_count = 1;
}
if (fds[i].events & (POLLOUT | POLLCONN))
{
errwritefds.fd_array[0] = fds[i].fd;
errwritefds.fd_count = 1;
}
errexceptfds.fd_array[0] = fds[i].fd;
errexceptfds.fd_count = 1;
//遍历每一个socket,探测它的状态
/* call select on the i-th socket */
if (select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)
== SOCKET_ERROR) {
/* This socket causes an error. Add it to exceptfds set */
exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
exceptfds.fd_count++;
} else {
...
}
}
}
}
/* Return selected sockets. */
/* Each Java array consists of sockets count followed by sockets list */
...
(*env)->SetIntArrayRegion(env, returnReadFds, 0,
readfds.fd_count + 1, (jint *)&readfds);
(*env)->SetIntArrayRegion(env, returnWriteFds, 0,
writefds.fd_count + 1, (jint *)&writefds);
(*env)->SetIntArrayRegion(env, returnExceptFds, 0,
exceptfds.fd_count + 1, (jint *)&exceptfds);
return 0;
}
复制代码
pollfd *fds = (pollfd *) pollAddress;
将pollAddress的地址转换为polldf的数组结构。这里会自动内存对齐,pollfd一共只有6个字节,第一个是int类型的文件描述符句柄,第二个是short类型的等待事件掩码值。第二个short后会填充2B,所以每一个pollFD是8B。而实际后面2字节用于存放实际发生事件的事件掩码。
discardUrgentData
进行清理。JNIEXPORT jboolean JNICALL Java_sun_nio_ch_WindowsSelectorImpl_discardUrgentData(JNIEnv* env, jobject this, jint s) {
char data[8];
jboolean discarded = JNI_FALSE;
int n;
do {
//读取MSG_OOB数据
n = recv(s, (char*)&data, sizeof(data), MSG_OOB);
if (n > 0) {
//读取到设置标记为true
discarded = JNI_TRUE;
}
} while (n > 0);
return discarded;
}
复制代码
若是timeval为{0,0},则select()当即返回,这可用于探询所选套接口的状态。若是处于这种状态,则select()调用可认为是非阻塞的,且一切适用于非阻塞调用的假设都适用于它。
当获取到全部的就绪的文件描述符时,须要保存到返回结果中,同时读写和异常的返回结果的数组第一个为就绪的长度值。
等待全部辅助线程完成,当主线程完成时会当即调用wakeup
向wakeupSourceFd
发生数据以触发辅助线程唤醒。辅助线程唤醒后也会调用wakeup
一次。当辅助线程都被唤醒后就会通知主线程。
if (threads.size() > 0)
finishLock.waitForHelperThreads();
private synchronized void waitForHelperThreads() {
if (threadsToFinish == threads.size()) {
// no helper threads finished yet. Wakeup them up.
wakeup();
}
while (threadsToFinish != 0) {
try {
finishLock.wait();
} catch (InterruptedException e) {
// Interrupted - set interrupted state.
Thread.currentThread().interrupt();
}
}
}
private synchronized void threadFinished() {
if (threadsToFinish == threads.size()) { // finished poll() first
// if finished first, wakeup others
wakeup();
}
threadsToFinish--;
if (threadsToFinish == 0) // all helper threads finished poll().
notify(); // notify the main thread
}
复制代码
若辅助线接收到数据,则它须要调用wakeup
来唤醒其余辅助线程,这样使得主线程火辅助线程至少能调用一次wakeup
激活其余辅助线程。wakeup
内部会调用setWakeupSocket
向wakeupSourceFd
发生一个信号。
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
//发生一个字节数据唤醒wakeupsocket
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
jint scoutFd)
{
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
复制代码
当主线被激活时,须要调用resetWakeupSocket
将wakeupSourceFd
的数据读取出来。
private void resetWakeupSocket() {
synchronized (interruptLock) {
if (interruptTriggered == false)
return;
resetWakeupSocket0(wakeupSourceFd);
interruptTriggered = false;
}
}
//读取wakeupsocket的数据。
Java_sun_nio_ch_WindowsSelectorImpl_resetWakeupSocket0(JNIEnv *env, jclass this,
jint scinFd)
{
char bytes[WAKEUP_SOCKET_BUF_SIZE];
long bytesToRead;
/* 获取数据大小 */
ioctlsocket (scinFd, FIONREAD, &bytesToRead);
if (bytesToRead == 0) {
return;
}
/* 从缓冲区读取全部数据 */
if (bytesToRead > WAKEUP_SOCKET_BUF_SIZE) {
char* buf = (char*)malloc(bytesToRead);
recv(scinFd, buf, bytesToRead, 0);
free(buf);
} else {
recv(scinFd, bytes, WAKEUP_SOCKET_BUF_SIZE, 0);
}
}
复制代码
ioctlsocket()是一个计算机函数,功能是控制套接口的模式。可用于任一状态的任一套接口。它用于获取与套接口相关的操做参数,而与具体协议或通信子系统无关。第二个参数时对socket的操做命令
int updated = updateSelectedKeys();
private int updateSelectedKeys() {
updateCount++;
int numKeysUpdated = 0;
numKeysUpdated += subSelector.processSelectedKeys(updateCount);
for (SelectThread t: threads) {
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
}
return numKeysUpdated;
}
复制代码
若key首次被加入,则会调用translateAndSetReadyOps
,若key已经在selectKeys中,则会调用translateAndUpdateReadyOps
。这两个方法都是调用translateReadyOps
,translateReadyOps
操做会将已就绪的操做保存。
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, sk.nioReadyOps(), sk);
}
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, 0, sk);
}
复制代码
关闭WindowsSelectorImpl时会将全部注册的通道一同关闭
protected void implClose() throws IOException {
synchronized (closeLock) {
if (channelArray != null) {
if (pollWrapper != null) {
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
//关闭全部channel
for(int i = 1; i < totalChannels; i++) { // Deregister channels
if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
deregister(channelArray[i]);
SelectableChannel selch = channelArray[i].channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
}
//释放数据
pollWrapper.free();
pollWrapper = null;
selectedKeys = null;
channelArray = null;
//释放辅助线程
for (SelectThread t: threads)
t.makeZombie();
//唤醒辅助线程使其退出。
startLock.startThreads();
}
}
}
}
复制代码
本文对WindowsSelectorImpl
的代码实现进行详细解析。下一篇将对Linux下的EpollSelectorImpl
的实现继续讲解。
![]()
- 微信扫一扫二维码关注订阅号杰哥技术分享
- 出处:www.cnblogs.com/Jack-Blog/p…
- 做者:杰哥很忙
- 本文使用「CC BY 4.0」创做共享协议。欢迎转载,请在明显位置给出出处及连接。