NIO 源码分析(05) Channel 源码分析java
Netty 系列目录(http://www.javashuo.com/article/p-hskusway-em.html)linux
功能说明:编程
InterruptibleChannel、AbstractInterruptibleChannel
提供了 Channel 响应 thread.interrupt(),支持中断操做,最重要的两个方法是 begin 和 end。SelectableChannel、AbstractSelectableChannel
提供了 Channel 注册到 Selector 的各类方法。ReadableByteChannel、ScatteringByteChannel
Channel 读数据。WritableByteChannel、GatheringByteChannel
Channel 写数据。NetworkChannel
Channel 进行端口绑定、参数设置等网络相关的操做。SocketChannel
Socket 门面,由 SelectorProvider.openSocketChannel 提供具体的实现类。ServerSocketChannel
ServerSocket 门面,由 SelectorProvider.openServerSocketChannel 提供具体的实现类。public ServerSocketChannel openServerSocketChannel() throws IOException { return new ServerSocketChannelImpl(this); } public SocketChannel openSocketChannel() throws IOException { return new SocketChannelImpl(this); }
在 SelectorProviderImpl 中默认的 ServerSocketChannel 实现类是 ServerSocketChannelImpl。windows
begin() 和 end() 老是配对使用的,Channel 和 Selector 均有本身的实现,所完成的功能也是有所区别的。一般这两个方法的使用以下:安全
boolean completed = false; try { begin(); completed = ...; // Perform blocking I/O operation return ...; // Return result } finally { end(completed); }
AbstractInterruptibleChannel 中最重要的方法是 begin 和 end,它们的功能是什么呢?网络
protected final void begin() { if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread target) { synchronized (closeLock) { if (!open) return; open = false; interrupted = target; try { // Channel 中的 begin 就作了一件事,关闭 channel AbstractInterruptibleChannel.this.implCloseChannel(); } catch (IOException x) { } } }}; } blockedOn(interruptor); // t.blockedOn(b) Thread me = Thread.currentThread(); if (me.isInterrupted()) interruptor.interrupt(me); } protected final void end(boolean completed) throws AsynchronousCloseException { blockedOn(null); Thread interrupted = this.interrupted; if (interrupted != null && interrupted == Thread.currentThread()) { interrupted = null; throw new ClosedByInterruptException(); } if (!completed && !open) throw new AsynchronousCloseException(); }
总结: Channel 中的 begin 就干了一件事,关闭 channel。咱们先试想这样一个场景,Channel 要读写数据时所在线程被中断了,会发生什么事?线程既然被中断了,Channel 总要关闭吧。事实上 Channel 的 begin 和 end 就是作这个事情的。不过要理解 begin 和 end,彷佛咱们先得弄明白 AbstractInterruptibleChannel.blockedOn 究竟在干什么:多线程
static void blockedOn(Interruptible intr) { // package-private sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr); }
其中 JavaLangAccess 接口在 java.lang.System 中被实例化,它是这样写的:app
private static void setJavaLangAccess() { // Allow privileged classes outside of java.lang sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){ public void blockedOn(Thread t, Interruptible b) { t.blockedOn(b); } } }
如今咱们发现,JavaLangAccess 的 blockedOn 实现,竟然只有这么一句 t.blockedOn(b)。继续跟踪到 java.lang.Thread 中 blockedOn 的实现了:socket
private volatile Interruptible blocker; void blockedOn(Interruptible b) { synchronized (blockerLock) { blocker = b; } }
实际上就是 Thread 线程上注册了一个钩子方法,当线程中断时,即调用 thread.interrupt() 时会回调这个 b.interrupt(this) 方法,进而关闭线程对应的 Channel。
public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } interrupt0(); }
protected final void begin() { if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread ignore) { // 线程中断时唤醒 selector AbstractSelector.this.wakeup(); }}; } AbstractInterruptibleChannel.blockedOn(interruptor); Thread me = Thread.currentThread(); if (me.isInterrupted()) interruptor.interrupt(me); } protected final void end() { AbstractInterruptibleChannel.blockedOn(null); }
总结: Selector 中的 begin 和 end 就是在线程中断时唤醒对应的 selector,对应的使用以下:
protected int doSelect(long timeout) throws IOException { try { begin(); pollWrapper.poll(timeout); } finally { end(); } // ... }
private SelectionKey[] keys = null; private int keyCount = 0; // Lock for key set and count private final Object keyLock = new Object(); // Blocking mode, protected by regLock boolean blocking = true; // Lock for registration and configureBlocking operations private final Object regLock = new Object();
keys、keyCount
Channel 注册后的 SelectionKey 集合和数量,也就是说一个 Channel 能够注册到多个 Selector 上。keyLock 保证多线程下 keys、keyCount 操做时的数据安全。
blocking
Channel 是不是阻塞的。regLock 保证多线程下 blocking 操做的线程安全,同时注册时也须要加锁。
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); // 注册的感兴趣事件不合法 if ((ops & ~validOps()) != 0) throw new IllegalArgumentException(); if (blocking) throw new IllegalBlockingModeException(); // 1. 判断 Channel 是不是在这个 Selector 上是否已经注册 SelectionKey k = findKey(sel); // 2. 若是已经注册,更新感兴趣事件 if (k != null) { k.interestOps(ops); k.attach(att); } // 3. 若是没有注册,委托 seletor.register 完成注册 if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }
总结: AbstractSelectableChannel 能够注册到多个 Selector 上,keys 属性管理了全部已经注册的 SelectionKey。对于已经注册的 SelectionKey 和未注册的处理逻辑并不一样。具体步骤以下:
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) { channel = ch; selector = sel; }
SelectionKeyImpl 是对 Channel 和 Seletor 的封装,具体的事件注册仍是委托给了 channel 完成。
public SelectionKey interestOps(int ops) { ensureValid(); return nioInterestOps(ops); } public SelectionKey nioInterestOps(int ops) { if ((ops & ~channel().validOps()) != 0) throw new IllegalArgumentException(); channel.translateAndSetInterestOps(ops, this); interestOps = ops; return this; }
总结: SelectionKey.interestOps 事件注册绕了一圈,最后发现又委托给了 Channel 来完成。
// ServerSocketChannel 事件注册 public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) { int newOps = 0; // Translate ops if ((ops & SelectionKey.OP_ACCEPT) != 0) newOps |= Net.POLLIN; // Place ops into pollfd array sk.selector.putEventOps(sk, newOps); }
总结: 事件注册须要注意的是 OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT 只是 JDK 对底层 IO 事件的封装,在注册前须要将 JDK IO 事件转换成 Linux NIO 事件。最终事件的注册都是委托给了 Selector 完成,因此才说 Selector 才是 NIO 的核心。
// Net public static final short POLLIN; // 读事件 public static final short POLLOUT; // 写事件 public static final short POLLERR; public static final short POLLHUP; public static final short POLLNVAL; public static final short POLLCONN;
public SocketChannel accept() throws IOException { synchronized (lock) { // 1. 链接校验 if (!isOpen()) throw new ClosedChannelException(); if (!isBound()) throw new NotYetBoundException(); SocketChannel sc = null; // 2. newfd、isaa 若是有新的 socket 链接,则会经过 accept 赋值 int n = 0; FileDescriptor newfd = new FileDescriptor(); InetSocketAddress[] isaa = new InetSocketAddress[1]; // 3. begin、end 配套使用则能够响应线程中断,关闭 channel try { begin(); if (!isOpen()) return null; thread = NativeThread.current(); for (;;) { // 4. 接收新的链接请求 n = accept(this.fd, newfd, isaa); if ((n == IOStatus.INTERRUPTED) && isOpen()) continue; break; } } finally { thread = 0; end(n > 0); assert IOStatus.check(n); } if (n < 1) return null; // 4. socket 默认是阻塞的,因此若是是 NIO 编程须要手动设置成 false IOUtil.configureBlocking(newfd, true); InetSocketAddress isa = isaa[0]; sc = new SocketChannelImpl(provider(), newfd, isa); // 5. 返回新的 socket 请求 return sc; } } private int accept(FileDescriptor ssfd, FileDescriptor newfd, InetSocketAddress[] isaa) throws IOException { return accept0(ssfd, newfd, isaa); }
总结: ServerSocketChannel 接收新的链接请求步骤和 BIO 相似,没有什么区别,大体有如下步骤:
close() 操做限于通道,并且仍是实现了 InterruptibleChannel 接口的通道,例如 FileChannel 就没有 close 操做。
在分析 close() 具体实现以前,咱们先得理解为何要有 close() 这个操做:一个可选择的通道,在建立之初会生成一个 FileDescriptor,linux 下即为 fd,windows 下即为句柄,这些都是系统资源,不能无限占用,当在不使用的时候,就应该将其释放,close 便是完成这个工做的。
抽象类 AbstractInterruptibleChannel 实现了 InterruptibleChannel 接口,而 SelectableChannel 继承自 AbstractInterruptibleChannel,所以,可选择的通道同时也是能够 close 的。AbstractInterruptibleChannel 的 close 实现以下:
public final void close() throws IOException { synchronized (closeLock) { if (!open) return; open = false; implCloseChannel(); } }
来具体关闭逻辑就在 implCloseChannel() 中了,因而再看 AbstractSelectableChannel:
protected final void implCloseChannel() throws IOException { implCloseSelectableChannel(); synchronized (keyLock) { int count = (keys == null) ? 0 : keys.length; for (int i = 0; i < count; i++) { SelectionKey k = keys[i]; if (k != null) k.cancel(); } } }
先看 synchronized 同步块,它将当前通道保存的 SelectionKey 所有 cancel,意思就是说,当前通关闭了,与它相关的全部 SelectionKey 都没有意义了,因此要所有取消掉,以前讲解 cancel 过程已经说明了,cancel 操做只是将 SelectionKey 加入对 应选择器的 cancelKeys 集合中,在下次正式选择开始的时候再一一清除;
这么看来,仍是应该追究一下 implCloseSelectableChannel() 的实现了,下面分别从 ServerSocketChannel 和 SocketChannel 实现出发:
先看 ServerSocketChannelImpl
protected void implCloseSelectableChannel() throws IOException { synchronized (stateLock) { if (state != ST_KILLED) nd.preClose(fd); long th = thread; if (th != 0) NativeThread.signal(th); if (!isRegistered()) kill(); } }
出现了两个很奇怪的东西,看来要彻底弄懂这段代码,是得好好分析一下它们了,它们是:NativeDispatcher nd 和 NativeThread;
若是已经对 linux 信号机制很是熟悉,应该很容易猜想到 NativeThread.signal(th) 在作什么,是的,它在唤醒阻塞的线程 th,下面咱们来看看它是如何作到的:
NativeThread 类很是简单,几乎全是 native 方法:
class NativeThread { static native long current(); static native void signal(long nt); static native void init(); static { Util.load(); init(); } }
在看其本地实现:
//自定义中断信号,kill –l #define INTERRUPT_SIGNAL (__SIGRTMAX - 2) //自定义的信号处理函数,当前函数什么都不作 static void nullHandler(int sig) { } #endif //NativeThread.init()的本地实现,能够看到它用到了sigaction //sigaction用来install一个信号 JNIEXPORT void JNICALL Java_sun_nio_ch_NativeThread_init(JNIEnv *env, jclass cl) { #ifdef __linux__ sigset_t ss; // 如下这段代码是常见的信号安装过程 // 讲解这段代码的目的只是为了让你们理解NativeThread.signal // 的工做原理,故不少细节就简单带过了 struct sigaction sa, osa; // sa用于定制信号INTERRUPT_SIGNAL的处理方式的 // 如sa_handler = nullHandler即用来指定信号处理函数的 // 即线程收到信号时,为执行这个函数,nullHandler是个空壳 // 函数,因此它什么都不作 // 不用理解sa_flags各个标识表明什么 // sigemptyset顾名思义,它是初始化sigaction的sa_mask位 // sigaction(INTERRUPT_SIGNAL, &sa, &osa)执行后 // 若是成功,则表示INTERRUPT_SIGNAL这个信号安装成功了 // 为何要有这个init呢,其实不用这不操做也许不会有问题 // 但由于不能确保INTERRUPT_SIGNAL没有被其余线程install // 过,若是sa_handler对应函数不是空操做,则在使用这个信号 // 时会对当前线程有影响 sa.sa_handler = nullHandler; sa.sa_flags = 0; sigemptyset(&sa.sa_mask); if (sigaction(INTERRUPT_SIGNAL, &sa, &osa) < 0) JNU_ThrowIOExceptionWithLastError(env, "sigaction"); #endif } JNIEXPORT jlong JNICALL Java_sun_nio_ch_NativeThread_current(JNIEnv *env, jclass cl) { #ifdef __linux__ // pthread_self()便是获取当前线程ID,它与getpid()是不一样的 // 具体细节没有研究 return (long)pthread_self(); #else return -1; #endif } JNIEXPORT void JNICALL Java_sun_nio_ch_NativeThread_signal(JNIEnv *env, jclass cl, jlong thread) { #ifdef __linux__ //这个就是最关键的signal实现了,能够看到,它调用了pthread库的pthread_kill //像thread线程发送一个INTERRUPT_SIGNAL信号,这个信号就是在init中install //的,对应的处理函数是空函数,也就是说,往thread线程发送一个信号,若是该线程处于 //阻塞状态,则会由于受到信号而终止阻塞,而若是处于非阻塞,则无影响 if (pthread_kill((pthread_t)thread, INTERRUPT_SIGNAL)) JNU_ThrowIOExceptionWithLastError(env, "Thread signal failed"); #endif }
Java 的 NativeThread 作静态初始化时已经执行了 init,也就是说 INTERRUPT_SIGNAL 信号已经被安装,而 ServerSocketChannelImpl 在上述 accept 时可能赋值。
try { begin(); if (!isOpen()) return null; thread = NativeThread.current(); for (;;) { n = accept0(this.fd, newfd, isaa); if ((n == IOStatus.INTERRUPTED) && isOpen()) continue; break; } } finally { thread = 0; end(n > 0); assert IOStatus.check(n); }
try 的内部,for 循环以前,thread 被复制为 NativeThread.current() 即为当前线程 id;finally 时 thread 又被修改回 0,所以在 implCloseSelectableChannel 才有这样一段:
if (th != 0) NativeThread.signal(th);
NativeThread.signal(th) 经过像当前线程发送 INTERRUPT_SIGNAL 信号而确保 th 线程没有被阻塞,即若是阻塞就中止阻塞。
如今理解了 NativeThread 了,咱们再看 NativeDispatcher
首先咱们得知道在 ServerSocketChannelImpl 中,nd 被初始化为 SocketDispatcher,见:
static { Util.load(); initIDs(); nd = new SocketDispatcher(); }
又由于 linux 下一切皆文件的思想(现实虽然不绝对),SocketDispatcher 其实就是用 FileDispatcher 实现的,最终 FileDispatcher 也只是封装了一大堆 native 方法,一波三折,关于 FileDispatcher,这里先不详细讲解了,先针对 nd.preClose(fd) 和 kill 将 implCloseSelectableChannel 的过程说明白吧:
首先,咱们要明白这样一个道理:在多线程环境下,老是很难知道何时可安全的关闭或释放资源(如fd),当一个线程 A 使用 fd 来读写,而另外一个线程 B 关闭或释放了 fd,则 A 线程就会读写一个错误的文件或 socket;为了防止这种状况出现,因而 NIO 就采用了经典的 two-step 处理方案:
第一步:建立一个 socket pair,假设 FDs 为 sp[2],先 close 掉 sp[1],这样,该 socket pair 就成为了一个半关闭的连接;复制 (dup2)sp[0] 到 fd(即为咱们想关闭或释放的fd),这个时候,其余线程若是正在读写当即会得到 EOF 或者 Pipe Error,read 或 write 方法里会检测这些状态作相应处理;
第二步:最后一个会使用到 fd 的线程负责释放
nd.preClose(fd) 即为两步曲中的第一步,咱们先来看其实现,最终定位到 FileDispatcher.c,相关代码以下:
static int preCloseFD = -1; JNIEXPORT void JNICALL Java_sun_nio_ch_FileDispatcher_init(JNIEnv *env, jclass cl) { int sp[2]; if (socketpair(PF_UNIX, SOCK_STREAM, 0, sp) < 0) { JNU_ThrowIOExceptionWithLastError(env, "socketpair failed"); return; } preCloseFD = sp[0]; close(sp[1]); } JNIEXPORT void JNICALL Java_sun_nio_ch_FileDispatcher_preClose0(JNIEnv *env, jclass clazz, jobject fdo) { jint fd = fdval(env, fdo); if (preCloseFD >= 0) { if (dup2(preCloseFD, fd) < 0) JNU_ThrowIOExceptionWithLastError(env, "dup2 failed"); } }
从上面两个函数实现,咱们能够看到,在 init 函数中,建立了一个半关闭的 socket pair,preCloseFD 即为未关闭的一端,init 在静态初始化时就会被执行;再来看关键的 preClose0,它的确是采用 dup2 来复制 preCloseFD,这样一来,fd 就被替换成了 preCloseFD,这正是 socket pair 中未被关闭的一端。
既然 nd.preClose(fd) 只是预关闭,则真正执行关闭的逻辑确定在这个 kill 中了,从代码逻辑上仍是比较好懂的,if (!isRegistered()) 即表示该通道没有被注册,表示全部 Selector 都没有意愿关心这个通道了,则天然能够放心的关闭 fd。
果断猜想 kill 中有 nd.close(fd) 这样的代码,不信请看:
public void kill() throws IOException { synchronized (stateLock) { if (state == ST_KILLED) return; if (state == ST_UNINITIALIZED) { state = ST_KILLED; return; } assert !isOpen() && !isRegistered(); nd.close(fd); state = ST_KILLED; } }
果真如此,这样一来,关闭二步曲就可以较安全的释放咱们的fd资源了,至于 nd.close(fd)的本地实现,这里就不讲了,确定是采用了 close(fd) 的系统调用。总的来讲,通道的 close 就是为了断开它与内核 fd 的那点联系。
天天用心记录一点点。内容也许不重要,但习惯很重要!