BIO到NIO源码的一些事儿之NIO 上

前言

此篇文章会详细解读NIO的功能逐步丰满的路程,为Reactor-Netty 库的讲解铺平道路。java

关于Java编程方法论-Reactor与Webflux的视频分享,已经完成了Rxjava 与 Reactor,b站地址以下:git

Rxjava源码解读与分享:www.bilibili.com/video/av345…github

Reactor源码解读与分享:www.bilibili.com/video/av353…编程

场景代入

接上一篇 BIO到NIO源码的一些事儿之BIO,咱们来接触NIO的一些事儿。安全

在上一篇中,咱们能够看到,咱们要作到异步非阻塞,咱们本身进行的是建立线程池同时对部分代码作timeout的修改来对接客户端,可是弊端也很清晰,咱们转换下思惟,这里举个场景例子,A班同窗要和B班同窗一块儿一对一完成任务,每对人拿到的任务是不同的,消耗的时间有长有短,任务由于有奖励因此同窗们会抢,传统模式下,A班同窗和B班同窗不经管理话,即使只是一个心跳检测的任务都得一块儿,在这种状况下,客户端根本不会有数据要发送,只是想告诉服务器本身还活着,这种状况下,假如B班再来一个同窗作对接的话,就颇有问题了,B班的每个同窗均可以当作服务器端的一个线程。因此,咱们须要一个管理者,因而Selector就出现了,做为管理者,这里,咱们每每须要管理同窗们的状态,是否在等待任务,是否在接收信息,是否在输出信息等等,Selector更侧重于动做,针对于这些状态标签来作事情就能够了,那这些状态标签其实也是须要管理的,因而SelectionKey也就应运而生。接着咱们须要对这些同窗进行包装加强,使之携带这样的标签。一样,对于同窗咱们应该进一步解放双手的,好比给其配台电脑,这样,同窗是否是能够作更多的事情了,那这个电脑在此处就是Buffer的存在了。 因而在NIO中最主要是有三种角色的,Buffer缓冲区,Channel通道,Selector选择器,咱们都涉及到了,接下来,咱们对其源码一步步分析解读。服务器

Channel解读

赋予Channel可异步可中断的能力

有上可知,同窗其实都是表明着一个个的Socket的存在,那么这里Channel就是对其进行的加强包装,也就是Channel的具体实现里应该有Socket这个字段才行,而后具体实现类里面也是牢牢围绕着Socket具有的功能来作文章的。那么,咱们首先来看java.nio.channels.Channel接口的设定:并发

public interface Channel extends Closeable {

    /** * Tells whether or not this channel is open. * * @return {@code true} if, and only if, this channel is open */
    public boolean isOpen();

    /** * Closes this channel. * * <p> After a channel is closed, any further attempt to invoke I/O * operations upon it will cause a {@link ClosedChannelException} to be * thrown. * * <p> If this channel is already closed then invoking this method has no * effect. * * <p> This method may be invoked at any time. If some other thread has * already invoked it, however, then another invocation will block until * the first invocation is complete, after which it will return without * effect. </p> * * @throws IOException If an I/O error occurs */
    public void close() throws IOException;

}
复制代码

此处就是很直接的设定,判断Channel是不是open状态,关闭Channel的动做,咱们在接下来会讲到ClosedChannelException是如何具体在代码中发生的。 有时候,一个Channel可能会被异步关闭和中断,这也是咱们所需求的。那么要实现这个效果咱们须得设定一个能够进行此操做效果的接口。达到的具体的效果应该是若是线程在实现这个接口的的Channel中进行IO操做的时候,另外一个线程能够调用该Channel的close方法。致使的结果就是,进行IO操做的那个阻塞线程会收到一个AsynchronousCloseException异常。异步

一样,咱们应该考虑到另外一种状况,若是线程在实现这个接口的的Channel中进行IO操做的时候,另外一个线程可能会调用被阻塞线程的interrupt方法(Thread#interrupt()),从而致使Channel关闭,那么这个阻塞的线程应该要收到ClosedByInterruptException异常,同时将中断状态设定到该阻塞线程之上。socket

这时候,若是中断状态已经在该线程设定完毕,此时在其之上的有Channel又调用了IO阻塞操做,那么,这个Channel会被关闭,同时,该线程会当即受到一个ClosedByInterruptException异常,它的interrupt状态仍然保持不变。 这个接口定义以下:ide

public interface InterruptibleChannel extends Channel {

    /** * Closes this channel. * * <p> Any thread currently blocked in an I/O operation upon this channel * will receive an {@link AsynchronousCloseException}. * * <p> This method otherwise behaves exactly as specified by the {@link * Channel#close Channel} interface. </p> * * @throws IOException If an I/O error occurs */
    public void close() throws IOException;

}
复制代码

其针对上面所提到逻辑的具体实现是在java.nio.channels.spi.AbstractInterruptibleChannel进行的,关于这个类的解析,咱们来参考这篇文章InterruptibleChannel 与可中断 IO

赋予Channel可被多路复用的能力

咱们在前面有说到,Channel能够被Selector进行使用,而Selector是根据Channel的状态来分配任务的,那么Channel应该提供一个注册到Selector上的方法,来和Selector进行绑定。也就是说Channel的实例要调用register(Selector,int,Object)。注意,由于Selector是要根据状态值进行管理的,因此此方法会返回一个SelectionKey对象来表示这个channelselector上的状态。关于SelectionKey,它是包含不少东西的,这里暂不提。

//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
        if ((ops & ~validOps()) != 0)
            throw new IllegalArgumentException();
        if (!isOpen())
            throw new ClosedChannelException();
        synchronized (regLock) {
            if (isBlocking())
                throw new IllegalBlockingModeException();
            synchronized (keyLock) {
                // re-check if channel has been closed
                if (!isOpen())
                    throw new ClosedChannelException();
                SelectionKey k = findKey(sel);
                if (k != null) {
                    k.attach(att);
                    k.interestOps(ops);
                } else {
                    // New registration
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
                return k;
            }
        }
    }
//java.nio.channels.spi.AbstractSelectableChannel#addKey
    private void addKey(SelectionKey k) {
        assert Thread.holdsLock(keyLock);
        int i = 0;
        if ((keys != null) && (keyCount < keys.length)) {
            // Find empty element of key array
            for (i = 0; i < keys.length; i++)
                if (keys[i] == null)
                    break;
        } else if (keys == null) {
            keys = new SelectionKey[2];
        } else {
            // Grow key array
            int n = keys.length * 2;
            SelectionKey[] ks =  new SelectionKey[n];
            for (i = 0; i < keys.length; i++)
                ks[i] = keys[i];
            keys = ks;
            i = keyCount;
        }
        keys[i] = k;
        keyCount++;
    }
复制代码

一旦注册到Selector上,Channel将一直保持注册直到其被解除注册。在解除注册的时候会解除Selector分配给Channel的全部资源。 也就是Channel并无直接提供解除注册的方法,那咱们换一个思路,咱们将Selector上表明其注册的Key取消不就能够了。这里能够经过调用SelectionKey#cancel()方法来显式的取消key。而后在Selector下一次选择操做期间进行对Channel的取消注册。

//java.nio.channels.spi.AbstractSelectionKey#cancel
    /** * Cancels this key. * * <p> If this key has not yet been cancelled then it is added to its * selector's cancelled-key set while synchronized on that set. </p> */
    public final void cancel() {
        // Synchronizing "this" to prevent this key from getting canceled
        // multiple times by different threads, which might cause race
        // condition between selector's select() and channel's close().
        synchronized (this) {
            if (valid) {
                valid = false;
                //仍是调用Selector的cancel方法
                ((AbstractSelector)selector()).cancel(this);
            }
        }
    }


//java.nio.channels.spi.AbstractSelector#cancel
    void cancel(SelectionKey k) {                       
        synchronized (cancelledKeys) {
            cancelledKeys.add(k);
        }
    }


//在下一次select操做的时候来解除那些要求cancel的key,即解除Channel注册
//sun.nio.ch.SelectorImpl#select(long)
    @Override
    public final int select(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("Negative timeout");
            //重点关注此方法
        return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
    }
//sun.nio.ch.SelectorImpl#lockAndDoSelect
    private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
        synchronized (this) {
            ensureOpen();
            if (inSelect)
                throw new IllegalStateException("select in progress");
            inSelect = true;
            try {
                synchronized (publicSelectedKeys) {
                    //重点关注此方法
                    return doSelect(action, timeout);
                }
            } finally {
                inSelect = false;
            }
        }
    }
//sun.nio.ch.WindowsSelectorImpl#doSelect
    protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
        assert Thread.holdsLock(this);
        this.timeout = timeout; // set selector timeout
        processUpdateQueue();
        //重点关注此方法
        processDeregisterQueue();
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        ...
    }

     /** * sun.nio.ch.SelectorImpl#processDeregisterQueue * Invoked by selection operations to process the cancelled-key set */
    protected final void processDeregisterQueue() throws IOException {
        assert Thread.holdsLock(this);
        assert Thread.holdsLock(publicSelectedKeys);

        Set<SelectionKey> cks = cancelledKeys();
        synchronized (cks) {
            if (!cks.isEmpty()) {
                Iterator<SelectionKey> i = cks.iterator();
                while (i.hasNext()) {
                    SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                    i.remove();

                    // remove the key from the selector
                    implDereg(ski);

                    selectedKeys.remove(ski);
                    keys.remove(ski);

                    // remove from channel's key set
                    deregister(ski);

                    SelectableChannel ch = ski.channel();
                    if (!ch.isOpen() && !ch.isRegistered())
                        ((SelChImpl)ch).kill();
                }
            }
        }
    }
复制代码

这里,当Channel关闭时,不管是经过调用Channel#close仍是经过打断线程的方式来对Channel进行关闭,其都会隐式的取消关于这个Channel的全部的keys,其内部也是调用了k.cancel()

//java.nio.channels.spi.AbstractInterruptibleChannel#close
    /** * Closes this channel. * * <p> If the channel has already been closed then this method returns * immediately. Otherwise it marks the channel as closed and then invokes * the {@link #implCloseChannel implCloseChannel} method in order to * complete the close operation. </p> * * @throws IOException * If an I/O error occurs */
    public final void close() throws IOException {
        synchronized (closeLock) {
            if (closed)
                return;
            closed = true;
            implCloseChannel();
        }
    }
//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
     protected final void implCloseChannel() throws IOException {
        implCloseSelectableChannel();

        // clone keys to avoid calling cancel when holding keyLock
        SelectionKey[] copyOfKeys = null;
        synchronized (keyLock) {
            if (keys != null) {
                copyOfKeys = keys.clone();
            }
        }

        if (copyOfKeys != null) {
            for (SelectionKey k : copyOfKeys) {
                if (k != null) {
                    k.cancel();   // invalidate and adds key to cancelledKey set
                }
            }
        }
    }
复制代码

若是Selector自身关闭掉,那么Channel也会被解除注册,同时表明Channel注册的key也将变得无效:

//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
        boolean open = selectorOpen.getAndSet(false);
        if (!open)
            return;
        implCloseSelector();
    }
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
                selectedKeys.remove(ski);
                i.remove();
            }
            assert selectedKeys.isEmpty() && keys.isEmpty();
        }
    }
}
复制代码

一个channel所支持的Ops中,假如支持多个Ops,在特定的selector注册一次以后便没法在该selector上重复注册,也就是在二次调用java.nio.channels.spi.AbstractSelectableChannel#register方法获得时候,只会进行Ops的改变,并不会从新注册,由于注册会产生一个全新的SelectionKey对象。咱们能够经过调用java.nio.channels.SelectableChannel#isRegistered的方法来肯定是否向一个或多个Selector注册了channel

//java.nio.channels.spi.AbstractSelectableChannel#isRegistered
 // -- Registration --

    public final boolean isRegistered() {
        synchronized (keyLock) {
            //咱们在以前往Selector上注册的时候调用了addKey方法,即每次往//一个Selector注册一次,keyCount就要自增一次。
            return keyCount != 0;
        }
    }
复制代码

至此,继承了SelectableChannel这个类以后,这个channel就能够安全的由多个并发线程来使用。 这里,要注意的是,继承了AbstractSelectableChannel这个类以后,新建立的channel始终处于阻塞模式。然而与Selector的多路复用有关的操做必须基于非阻塞模式,因此在注册到Selector以前,必须将channel置于非阻塞模式,而且在取消注册以前,channel可能不会返回到阻塞模式。 这里,咱们涉及了Channel的阻塞模式与非阻塞模式。在阻塞模式下,在Channel上调用的每一个I/O操做都将阻塞,直到完成为止。 在非阻塞模式下,I/O操做永远不会阻塞,而且能够传输比请求的字节更少的字节,或者根本不传输任何字节。 咱们能够经过调用channel的isBlocking方法来肯定其是否为阻塞模式。

//java.nio.channels.spi.AbstractSelectableChannel#register
 public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
        if ((ops & ~validOps()) != 0)
            throw new IllegalArgumentException();
        if (!isOpen())
            throw new ClosedChannelException();
        synchronized (regLock) {
     //此处会作判断,假如是阻塞模式,则会返回true,而后就会抛出异常
            if (isBlocking())
                throw new IllegalBlockingModeException();
            synchronized (keyLock) {
                // re-check if channel has been closed
                if (!isOpen())
                    throw new ClosedChannelException();
                SelectionKey k = findKey(sel);
                if (k != null) {
                    k.attach(att);
                    k.interestOps(ops);
                } else {
                    // New registration
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
                return k;
            }
        }
    }
复制代码

因此,咱们在使用的时候能够基于如下的例子做为参考:

public NIOServerSelectorThread(int port) {
		try {
			//打开ServerSocketChannel,用于监听客户端的链接,他是全部客户端链接的父管道
			serverSocketChannel = ServerSocketChannel.open();
			//将管道设置为非阻塞模式
			serverSocketChannel.configureBlocking(false);
			//利用ServerSocketChannel建立一个服务端Socket对象,即ServerSocket
			serverSocket = serverSocketChannel.socket();
			//为服务端Socket绑定监听端口
			serverSocket.bind(new InetSocketAddress(port));
			//建立多路复用器
			selector = Selector.open();
			//将ServerSocketChannel注册到Selector多路复用器上,而且监听ACCEPT事件
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
			System.out.println("The server is start in port: "+port);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
复制代码

因时间关系,本篇暂时到这里,剩下的会在下一篇中进行讲解。

相关文章
相关标签/搜索