Java NIO 由如下几个核心部分组成:
一、Buffer
二、Channel
三、Selectorlinux
Buffer和Channel在深刻浅出NIO之Channel、Buffer一文中已经介绍过,本文主要讲解NIO的Selector实现原理。面试
以前进行socket编程时,accept方法会一直阻塞,直到有客户端请求的到来,并返回socket进行相应的处理。整个过程是流水线的,处理完一个请求,才能去获取并处理后面的请求,固然也能够把获取socket和处理socket的过程分开,一个线程负责accept,一个线程池负责处理请求。编程
但NIO提供了更好的解决方案,采用选择器(Selector)返回已经准备好的socket,并按顺序处理,基于通道(Channel)和缓冲区(Buffer)来进行数据的传输。windows
这里出来一个新概念,selector,具体是一个什么样的东西?数组
想一想一个场景:在一个养鸡场,有这么一我的,天天的工做就是不停检查几个特殊的鸡笼,若是有鸡进来,有鸡出去,有鸡生蛋,有鸡生病等等,就把相应的状况记录下来,若是鸡场的负责人想知道状况,只须要询问那我的便可。缓存
在这里,这我的就至关Selector,每一个鸡笼至关于一个SocketChannel,每一个线程经过一个Selector能够管理多个SocketChannel。服务器
为了实现Selector管理多个SocketChannel,必须将具体的SocketChannel对象注册到Selector,并声明须要监听的事件(这样Selector才知道须要记录什么数据),一共有4种事件:架构
一、connect:客户端链接服务端事件,对应值为SelectionKey.OP_CONNECT(8)
二、accept:服务端接收客户端链接事件,对应值为SelectionKey.OP_ACCEPT(16)
三、read:读事件,对应值为SelectionKey.OP_READ(1)
四、write:写事件,对应值为SelectionKey.OP_WRITE(4)app
这个很好理解,每次请求到达服务器,都是从connect开始,connect成功后,服务端开始准备accept,准备就绪,开始读数据,并处理,最后写回数据返回。异步
因此,当SocketChannel有对应的事件发生时,Selector均可以观察到,并进行相应的处理。
为了更好的理解,先看一段服务端的示例代码
ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while(true){ int n = selector.select(); if (n == 0) continue; Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); if (key.isAcceptable()){ SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); //将选择器注册到链接到的客户端信道, //并指定该信道key值的属性为OP_READ, //同时为该信道指定关联的附件 clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize)); } if (key.isReadable()){ handleRead(key); } if (key.isWritable() && key.isValid()){ handleWrite(key); } if (key.isConnectable()){ System.out.println("isConnectable = true"); } ite.remove(); } }
一、建立ServerSocketChannel实例,并绑定指定端口;
二、建立Selector实例;
三、将serverSocketChannel注册到selector,并指定事件OP_ACCEPT,最底层的socket经过channel和selector创建关联;
四、若是没有准备好的socket,select方法会被阻塞一段时间并返回0;
五、若是底层有socket已经准备好,selector的select方法会返回socket的个数,并且selectedKeys方法会返回socket对应的事件(connect、accept、read or write);
六、根据事件类型,进行不一样的处理逻辑;
在步骤3中,selector只注册了serverSocketChannel的OP_ACCEPT事件
一、若是有客户端A链接服务,执行select方法时,能够经过serverSocketChannel获取客户端A的socketChannel,并在selector上注册socketChannel的OP_READ事件。
二、若是客户端A发送数据,会触发read事件,这样下次轮询调用select方法时,就能经过socketChannel读取数据,同时在selector上注册该socketChannel的OP_WRITE事件,实现服务器往客户端写数据。
SocketChannel、ServerSocketChannel和Selector的实例初始化都经过SelectorProvider类实现,其中Selector是整个NIO Socket的核心实现。
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
SelectorProvider在windows和linux下有不一样的实现,provider方法会返回对应的实现。
这里不由要问,Selector是如何作到同时管理多个socket?
下面咱们看看Selector的具体实现,Selector初始化时,会实例化PollWrapper、SelectionKeyImpl数组和Pipe。
WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); }
pollWrapper用Unsafe类申请一块物理内存pollfd,存放socket句柄fdVal和events,其中pollfd共8位,0-3位保存socket句柄,4-7位保存events。
pollWrapper提供了fdVal和event数据的相应操做,如添加操做经过Unsafe的putInt和putShort实现。
void putDescriptor(int i, int fd) { pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd); } void putEventOps(int i, int event) { pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event); }
先看看serverChannel.register(selector, SelectionKey.OP_ACCEPT)
是如何实现的
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { implRegister(k); } k.interestOps(ops); return k; } protected void implRegister(SelectionKeyImpl ski) { synchronized (closeLock) { if (pollWrapper == null) throw new ClosedSelectorException(); growIfNeeded(); channelArray[totalChannels] = ski; ski.setIndex(totalChannels); fdMap.put(ski); keys.add(ski); pollWrapper.addEntry(totalChannels, ski); totalChannels++; } }
一、以当前channel和selector为参数,初始化SelectionKeyImpl 对象selectionKeyImpl ,并添加附件attachment。
二、若是当前channel的数量totalChannels等于SelectionKeyImpl数组大小,对SelectionKeyImpl数组和pollWrapper进行扩容操做。
三、若是totalChannels % MAX_SELECTABLE_FDS == 0,则多开一个线程处理selector。
四、pollWrapper.addEntry将把selectionKeyImpl中的socket句柄添加到对应的pollfd。
五、k.interestOps(ops)方法最终也会把event添加到对应的pollfd。
因此,无论serverSocketChannel,仍是socketChannel,在selector注册的事件,最终都保存在pollArray中。
接着,再来看看selector中的select是如何实现一次获取多个有事件发生的channel的,底层由selector实现类的doSelect方法实现,以下:
protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); this.timeout = timeout; // set selector timeout processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return 0; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock // Wakeup helper threads, waiting on startLock, so they start polling. // Redundant threads will exit here after wakeup. startLock.startThreads(); // do polling in the main thread. Main thread is responsible for // first MAX_SELECTABLE_FDS entries in pollArray. try { begin(); try { subSelector.poll(); } catch (IOException e) { finishLock.setException(e); // Save this exception } // Main thread is out of poll(). Wakeup others and wait for them if (threads.size() > 0) finishLock.waitForHelperThreads(); } finally { end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); int updated = updateSelectedKeys(); // Done with poll(). Set wakeupSocket to nonsignaled for the next run. resetWakeupSocket(); return updated; }
其中 subSelector.poll() 是select的核心,由native函数poll0实现,readFds、writeFds 和exceptFds数组用来保存底层select的结果,数组的第一个位置都是存放发生事件的socket的总数,其他位置存放发生事件的socket句柄fd。
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1]; private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; private int poll() throws IOException{ // poll for the main thread return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout); }
执行 selector.select() ,poll0函数把指向socket句柄和事件的内存地址传给底层函数。
一、若是以前没有发生事件,程序就阻塞在select处,固然不会一直阻塞,由于epoll在timeout时间内若是没有事件,也会返回;
二、一旦有对应的事件发生,poll0方法就会返回;
三、processDeregisterQueue方法会清理那些已经cancelled的SelectionKey;
四、updateSelectedKeys方法统计有事件发生的SelectionKey数量,并把符合条件发生事件的SelectionKey添加到selectedKeys哈希表中,提供给后续使用。
在早期的JDK1.4和1.5 update10版本以前,Selector基于select/poll模型实现,是基于IO复用技术的非阻塞IO,不是异步IO。在JDK1.5 update10和linux core2.6以上版本,sun优化了Selctor的实现,底层使用epoll替换了select/poll。
经过遍历selector中的SelectionKeyImpl数组,获取发生事件的socketChannel对象,其中保存了对应的socket,实现以下
public int read(ByteBuffer buf) throws IOException { if (buf == null) throw new NullPointerException(); synchronized (readLock) { if (!ensureReadOpen()) return -1; int n = 0; try { begin(); synchronized (stateLock) { if (!isOpen()) { return 0; } readerThread = NativeThread.current(); } for (;;) { n = IOUtil.read(fd, buf, -1, nd); if ((n == IOStatus.INTERRUPTED) && isOpen()) { // The system call was interrupted but the channel // is still open, so retry continue; } return IOStatus.normalize(n); } } finally { readerCleanup(); // Clear reader thread // The end method, which end(n > 0 || (n == IOStatus.UNAVAILABLE)); // Extra case for socket channels: Asynchronous shutdown // synchronized (stateLock) { if ((n <= 0) && (!isInputOpen)) return IOStatus.EOF; } assert IOStatus.check(n); } } }
最终经过Buffer的方式读取socket的数据。
public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { setWakeupSocket(); interruptTriggered = true; } } return this; } // Sets Windows wakeup socket to a signaled state. private void setWakeupSocket() { setWakeupSocket0(wakeupSinkFd); } private native void setWakeupSocket0(int wakeupSinkFd);
看来wakeupSinkFd这个变量是为wakeup方法使用的。
其中interruptTriggered为中断已触发标志,当pollWrapper.interrupt()以后,该标志即为true了;由于这个标志,连续两次wakeup,只会有一次效果。
epoll是Linux下的一种IO多路复用技术,能够很是高效的处理数以百万计的socket句柄。
三个epoll相关的系统调用:
epoll内部实现大概以下:
以为不错请点赞支持,欢迎留言或进个人我的群855801563领取【架构资料专题目合集90期】、【BATJTMD大厂JAVA面试真题1000+】,本群专用于学习交流技术、分享面试机会,拒绝广告,我也会在群内不按期答题、探讨。