此系列文章会详细解读NIO的功能逐步丰满的路程,为Reactor-Netty 库的讲解铺平道路。java
关于Java编程方法论-Reactor与Webflux的视频分享,已经完成了Rxjava 与 Reactor,b站地址以下:linux
Rxjava源码解读与分享:www.bilibili.com/video/av345…git
Reactor源码解读与分享:www.bilibili.com/video/av353…github
本系列源码解读基于JDK11 api细节可能与其余版本有所差异,请自行解决jdk版本问题。shell
接上一篇BIO到NIO源码的一些事儿之NIO 上编程
咱们最初的目的就是为了加强Socket,基于这个基本需求,没有条件创造条件,因而为了让Channel拥有网络socket的能力,这里定义了一个java.nio.channels.NetworkChannel
接口。花很少说,咱们来看这个接口的定义:api
public interface NetworkChannel extends Channel {
NetworkChannel bind(SocketAddress local) throws IOException;
SocketAddress getLocalAddress() throws IOException;
<T> NetworkChannel setOption(SocketOption<T> name, T value) throws IOException;
<T> T getOption(SocketOption<T> name) throws IOException;
Set<SocketOption<?>> supportedOptions();
}
复制代码
经过bind(SocketAddress)
方法将socket
绑定到本地 SocketAddress
上,经过getLocalAddress()方法返回socket
绑定的地址, 经过 setOption(SocketOption,Object)
和getOption(SocketOption)
方法设置和查询socket
支持的配置选项。缓存
接下来咱们来看 java.nio.channels.ServerSocketChannel
抽象类及其实现类sun.nio.ch.ServerSocketChannelImpl
对之实现的细节。 首先咱们来看其对于bind的实现:服务器
//sun.nio.ch.ServerSocketChannelImpl#bind
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (stateLock) {
ensureOpen();
//经过localAddress判断是否已经调用过bind
if (localAddress != null)
throw new AlreadyBoundException();
//InetSocketAddress(0)表示绑定到本机的全部地址,由操做系统选择合适的端口
InetSocketAddress isa = (local == null)
? new InetSocketAddress(0)
: Net.checkAddress(local);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkListen(isa.getPort());
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
Net.bind(fd, isa.getAddress(), isa.getPort());
//开启监听,s若是参数backlog小于1,默认接受50个链接
Net.listen(fd, backlog < 1 ? 50 : backlog);
localAddress = Net.localAddress(fd);
}
return this;
}
复制代码
下面咱们来看看Net中的bind和listen方法是如何实现的。网络
//sun.nio.ch.Net#bind(java.io.FileDescriptor, java.net.InetAddress, int)
public static void bind(FileDescriptor fd, InetAddress addr, int port) throws IOException {
bind(UNSPEC, fd, addr, port);
}
static void bind(ProtocolFamily family, FileDescriptor fd, InetAddress addr, int port) throws IOException {
//若是传入的协议域不是IPV4并且支持IPV6,则使用ipv6
boolean preferIPv6 = isIPv6Available() &&
(family != StandardProtocolFamily.INET);
bind0(fd, preferIPv6, exclusiveBind, addr, port);
}
private static native void bind0(FileDescriptor fd, boolean preferIPv6, boolean useExclBind, InetAddress addr, int port) throws IOException;
复制代码
bind0为native方法实现:
JNIEXPORT void JNICALL Java_sun_nio_ch_Net_bind0(JNIEnv *env, jclass clazz, jobject fdo, jboolean preferIPv6, jboolean useExclBind, jobject iao, int port) {
SOCKETADDRESS sa;
int sa_len = 0;
int rv = 0;
//将java的InetAddress转换为c的struct sockaddr
if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len,
preferIPv6) != 0) {
return;//转换失败,方法返回
}
//调用bind方法:int bind(int sockfd, struct sockaddr* addr, socklen_t addrlen)
rv = NET_Bind(fdval(env, fdo), &sa, sa_len);
if (rv != 0) {
handleSocketError(env, errno);
}
}
复制代码
socket是用户程序与内核交互信息的枢纽,它自身没有网络协议地址和端口号等信息,在进行网络通讯的时候,必须把一个socket与一个地址相关联。 不少时候内核会咱们自动绑定一个地址,然而有时用户可能须要本身来完成这个绑定的过程,以知足实际应用的须要; 最典型的状况是一个服务器进程须要绑定一个众所周知的地址或端口以等待客户来链接。 对于客户端,不少时候并不须要调用bind方法,而是由内核自动绑定;
这里要注意,绑定归绑定,在有链接过来的时候会建立一个新的Socket,而后服务端操做这个新的Socket便可。这里就能够关注accept方法了。由sun.nio.ch.ServerSocketChannelImpl#bind
最后,咱们知道其经过Net.listen(fd, backlog < 1 ? 50 : backlog)
开启监听,若是参数backlog小于1,默认接受50个链接。由此,咱们来关注下Net.listen
方法细节。
//sun.nio.ch.Net#listen
static native void listen(FileDescriptor fd, int backlog) throws IOException;
复制代码
能够知道,Net.listen
是native
方法,源码以下:
JNIEXPORT void JNICALL Java_sun_nio_ch_Net_listen(JNIEnv *env, jclass cl, jobject fdo, jint backlog) {
if (listen(fdval(env, fdo), backlog) < 0)
handleSocketError(env, errno);
}
复制代码
能够看到底层是调用listen
实现的,listen
函数在通常在调用bind
以后到调用accept
以前调用,它的函数原型是: int listen(int sockfd, int backlog)
返回值:0表示成功, -1表示失败
咱们再来关注下bind操做中的其余细节,最开始时的ensureOpen()
方法判断:
//sun.nio.ch.ServerSocketChannelImpl#ensureOpen
// @throws ClosedChannelException if channel is closed
private void ensureOpen() throws ClosedChannelException {
if (!isOpen())
throw new ClosedChannelException();
}
//java.nio.channels.spi.AbstractInterruptibleChannel#isOpen
public final boolean isOpen() {
return !closed;
}
复制代码
若是socket
关闭,则抛出ClosedChannelException
。
咱们再来看下Net#checkAddress
:
//sun.nio.ch.Net#checkAddress(java.net.SocketAddress)
public static InetSocketAddress checkAddress(SocketAddress sa) {
if (sa == null)//地址为空
throw new NullPointerException();
//非InetSocketAddress类型地址
if (!(sa instanceof InetSocketAddress))
throw new UnsupportedAddressTypeException(); // ## needs arg
InetSocketAddress isa = (InetSocketAddress)sa;
//地址不可识别
if (isa.isUnresolved())
throw new UnresolvedAddressException(); // ## needs arg
InetAddress addr = isa.getAddress();
//非ip4和ip6地址
if (!(addr instanceof Inet4Address || addr instanceof Inet6Address))
throw new IllegalArgumentException("Invalid address type");
return isa;
}
复制代码
从上面能够看出,bind首先检查ServerSocket
是否关闭,是否绑定地址, 若是既没有绑定也没关闭,则检查绑定的socketaddress
是否正确或合法; 而后经过Net工具类的bind
和listen
,完成实际的ServerSocket
地址绑定和开启监听,若是绑定是开启的参数小于1
,则默认接受50
个链接。
对照咱们以前在第一篇中接触的BIO,咱们来看些accept()
方法的实现:
//sun.nio.ch.ServerSocketChannelImpl#accept()
@Override
public SocketChannel accept() throws IOException {
acceptLock.lock();
try {
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
boolean blocking = isBlocking();
try {
begin(blocking);
do {
n = accept(this.fd, newfd, isaa);
} while (n == IOStatus.INTERRUPTED && isOpen());
} finally {
end(blocking, n > 0);
assert IOStatus.check(n);
}
if (n < 1)
return null;
//针对接受链接的处理通道socketchannelimpl,默认为阻塞模式
// newly accepted socket is initially in blocking mode
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
//构建SocketChannelImpl,这个具体在SocketChannelImpl再说
SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
// check permitted to accept connections from the remote address
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
//检查地址和port权限
sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
//返回socketchannelimpl
return sc;
} finally {
acceptLock.unlock();
}
}
复制代码
对于accept(this.fd, newfd, isaa)
,调用accept接收socket中已创建的链接,咱们以前有在BIO中了解过,函数最终会调用:int accept(int sockfd,struct sockaddr *addr, socklen_t *addrlen);
这里begin(blocking);
与 end(blocking, n > 0);
的合做模式咱们在InterruptibleChannel 与可中断 IO这一篇文章中已经涉及过,这里再次提一下,让你们看到其应用,此处专一的是等待链接这个过程,期间能够出现异常打断,这个过程正常结束的话,就会正常往下执行逻辑,不要搞的好像这个Channel要结束了同样,end(blocking, n > 0)
的第二个参数completed也只是在判断这个等待过程是否结束而已,不要功能范围扩大化。
咱们再来看下NetworkChannel
的其余方法实现,首先来看supportedOptions
:
//sun.nio.ch.ServerSocketChannelImpl#supportedOptions
@Override
public final Set<SocketOption<?>> supportedOptions() {
return DefaultOptionsHolder.defaultOptions;
}
//sun.nio.ch.ServerSocketChannelImpl.DefaultOptionsHolder
private static class DefaultOptionsHolder {
static final Set<SocketOption<?>> defaultOptions = defaultOptions();
private static Set<SocketOption<?>> defaultOptions() {
HashSet<SocketOption<?>> set = new HashSet<>();
set.add(StandardSocketOptions.SO_RCVBUF);
set.add(StandardSocketOptions.SO_REUSEADDR);
if (Net.isReusePortAvailable()) {
set.add(StandardSocketOptions.SO_REUSEPORT);
}
set.add(StandardSocketOptions.IP_TOS);
set.addAll(ExtendedSocketOptions.options(SOCK_STREAM));
//返回不可修改的HashSet
return Collections.unmodifiableSet(set);
}
}
复制代码
对上述配置中的一些配置咱们大体来瞅眼:
//java.net.StandardSocketOptions
//socket接受缓存大小
public static final SocketOption<Integer> SO_RCVBUF =
new StdSocketOption<Integer>("SO_RCVBUF", Integer.class);
//是否可重用地址
public static final SocketOption<Boolean> SO_REUSEADDR =
new StdSocketOption<Boolean>("SO_REUSEADDR", Boolean.class);
//是否可重用port
public static final SocketOption<Boolean> SO_REUSEPORT =
new StdSocketOption<Boolean>("SO_REUSEPORT", Boolean.class);
//Internet协议(IP)标头(header)中的服务类型(ToS)。
public static final SocketOption<Integer> IP_TOS =
new StdSocketOption<Integer>("IP_TOS", Integer.class);
复制代码
知道了上面的支持配置,咱们来看下setOption
实现细节:
//sun.nio.ch.ServerSocketChannelImpl#setOption
@Override
public <T> ServerSocketChannel setOption(SocketOption<T> name, T value) throws IOException {
Objects.requireNonNull(name);
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
ensureOpen();
if (name == StandardSocketOptions.IP_TOS) {
ProtocolFamily family = Net.isIPv6Available() ?
StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
Net.setSocketOption(fd, family, name, value);
return this;
}
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
// SO_REUSEADDR emulated when using exclusive bind
isReuseAddress = (Boolean)value;
} else {
// no options that require special handling
Net.setSocketOption(fd, Net.UNSPEC, name, value);
}
return this;
}
}
复制代码
这里,你们就能看到supportedOptions().contains(name)
的做用了,首先会进行支持配置的判断,而后进行正常的设置逻辑。里面对于Socket配置设定主要执行了Net.setSocketOption
,这里,就只对其代码作中文注释就好,整个逻辑过程没有太复杂的。
static void setSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption<?> name, Object value) throws IOException {
if (value == null)
throw new IllegalArgumentException("Invalid option value");
// only simple values supported by this method
Class<?> type = name.type();
if (extendedOptions.isOptionSupported(name)) {
extendedOptions.setOption(fd, name, value);
return;
}
//非整形和布尔型,则抛出断言错误
if (type != Integer.class && type != Boolean.class)
throw new AssertionError("Should not reach here");
// special handling
if (name == StandardSocketOptions.SO_RCVBUF ||
name == StandardSocketOptions.SO_SNDBUF)
{
//判断接受和发送缓冲区大小
int i = ((Integer)value).intValue();
if (i < 0)
throw new IllegalArgumentException("Invalid send/receive buffer size");
}
//缓冲区有数据,延迟关闭socket的的时间
if (name == StandardSocketOptions.SO_LINGER) {
int i = ((Integer)value).intValue();
if (i < 0)
value = Integer.valueOf(-1);
if (i > 65535)
value = Integer.valueOf(65535);
}
//UDP单播
if (name == StandardSocketOptions.IP_TOS) {
int i = ((Integer)value).intValue();
if (i < 0 || i > 255)
throw new IllegalArgumentException("Invalid IP_TOS value");
}
//UDP多播
if (name == StandardSocketOptions.IP_MULTICAST_TTL) {
int i = ((Integer)value).intValue();
if (i < 0 || i > 255)
throw new IllegalArgumentException("Invalid TTL/hop value");
}
// map option name to platform level/name
OptionKey key = SocketOptionRegistry.findOption(name, family);
if (key == null)
throw new AssertionError("Option not found");
int arg;
//转换配置参数值
if (type == Integer.class) {
arg = ((Integer)value).intValue();
} else {
boolean b = ((Boolean)value).booleanValue();
arg = (b) ? 1 : 0;
}
boolean mayNeedConversion = (family == UNSPEC);
boolean isIPv6 = (family == StandardProtocolFamily.INET6);
//设置文件描述符的值及其余
setIntOption0(fd, mayNeedConversion, key.level(), key.name(), arg, isIPv6);
}
复制代码
接下来,咱们来看getOption
实现,源码以下:
//sun.nio.ch.ServerSocketChannelImpl#getOption
@Override
@SuppressWarnings("unchecked")
public <T> T getOption(SocketOption<T> name) throws IOException {
Objects.requireNonNull(name);
//非通道支持选项,则抛出UnsupportedOperationException
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
ensureOpen();
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
// SO_REUSEADDR emulated when using exclusive bind
return (T)Boolean.valueOf(isReuseAddress);
}
//假如获取的不是上面的配置,则委托给Net来处理
// no options that require special handling
return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
}
}
//sun.nio.ch.Net#getSocketOption
static Object getSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption<?> name) throws IOException {
Class<?> type = name.type();
if (extendedOptions.isOptionSupported(name)) {
return extendedOptions.getOption(fd, name);
}
//只支持整形和布尔型,不然抛出断言错误
// only simple values supported by this method
if (type != Integer.class && type != Boolean.class)
throw new AssertionError("Should not reach here");
// map option name to platform level/name
OptionKey key = SocketOptionRegistry.findOption(name, family);
if (key == null)
throw new AssertionError("Option not found");
boolean mayNeedConversion = (family == UNSPEC);
//获取文件描述的选项配置
int value = getIntOption0(fd, mayNeedConversion, key.level(), key.name());
if (type == Integer.class) {
return Integer.valueOf(value);
} else {
//咱们要看到前面支持配置处的源码其支持的类型要么是Boolean,要么是Integer
//因此,返回值为Boolean.FALSE 或 Boolean.TRUE也就不足为奇了
return (value == 0) ? Boolean.FALSE : Boolean.TRUE;
}
}
复制代码
在Net.bind一节中,咱们最后说了一个注意点,每一个链接过来的时候都会建立一个Socket来供此链接进行操做,这个在accept方法中能够看到,其在获得链接以后,就 new SocketChannelImpl(provider(), newfd, isa)
这个对象。那这里,就引出一个话题,咱们在使用bind方法的时候,是否是也应该绑定到一个Socket之上呢,那以前bio是怎么作呢,咱们先来回顾一下。 咱们以前在调用java.net.ServerSocket#ServerSocket(int, int, java.net.InetAddress)
方法的时候,里面有一个setImpl()
:
//java.net.ServerSocket
public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
setImpl();
if (port < 0 || port > 0xFFFF)
throw new IllegalArgumentException(
"Port value out of range: " + port);
if (backlog < 1)
backlog = 50;
try {
bind(new InetSocketAddress(bindAddr, port), backlog);
} catch(SecurityException e) {
close();
throw e;
} catch(IOException e) {
close();
throw e;
}
}
//java.net.ServerSocket#setImpl
private void setImpl() {
if (factory != null) {
impl = factory.createSocketImpl();
checkOldImpl();
} else {
// No need to do a checkOldImpl() here, we know it's an up to date
// SocketImpl!
impl = new SocksSocketImpl();
}
if (impl != null)
impl.setServerSocket(this);
}
复制代码
可是,咱们此处的重点在bind(new InetSocketAddress(bindAddr, port), backlog);
,这里的代码以下:
//java.net.ServerSocket
public void bind(SocketAddress endpoint, int backlog) throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!oldImpl && isBound())
throw new SocketException("Already bound");
if (endpoint == null)
endpoint = new InetSocketAddress(0);
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
InetSocketAddress epoint = (InetSocketAddress) endpoint;
if (epoint.isUnresolved())
throw new SocketException("Unresolved address");
if (backlog < 1)
backlog = 50;
try {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkListen(epoint.getPort());
//重点!!
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
bound = true;
} catch(SecurityException e) {
bound = false;
throw e;
} catch(IOException e) {
bound = false;
throw e;
}
}
复制代码
咱们有看到 getImpl()
我标示了重点,这里面作了什么,咱们走进去:
//java.net.ServerSocket#getImpl
SocketImpl getImpl() throws SocketException {
if (!created)
createImpl();
return impl;
}
复制代码
在整个过程当中created
仍是对象刚建立时的初始值,为false,那么,铁定会进入createImpl()
方法中:
//java.net.ServerSocket#createImpl
void createImpl() throws SocketException {
if (impl == null)
setImpl();
try {
impl.create(true);
created = true;
} catch (IOException e) {
throw new SocketException(e.getMessage());
}
}
复制代码
而此处,由于前面impl
已经赋值,因此,会走impl.create(true)
,进而将created
设定为true
。而此刻,终于到我想讲的重点了:
//java.net.AbstractPlainSocketImpl#create
protected synchronized void create(boolean stream) throws IOException {
this.stream = stream;
if (!stream) {
ResourceManager.beforeUdpCreate();
// only create the fd after we know we will be able to create the socket
fd = new FileDescriptor();
try {
socketCreate(false);
SocketCleanable.register(fd);
} catch (IOException ioe) {
ResourceManager.afterUdpClose();
fd = null;
throw ioe;
}
} else {
fd = new FileDescriptor();
socketCreate(true);
SocketCleanable.register(fd);
}
if (socket != null)
socket.setCreated();
if (serverSocket != null)
serverSocket.setCreated();
}
复制代码
能够看到,socketCreate(true);
,它的实现以下:
@Override
void socketCreate(boolean stream) throws IOException {
if (fd == null)
throw new SocketException("Socket closed");
int newfd = socket0(stream);
fdAccess.set(fd, newfd);
}
复制代码
经过本地方法socket0(stream)
获得了一个文件描述符,由此,Socket建立了出来,而后进行相应的绑定。 咱们再把眼光放回到sun.nio.ch.ServerSocketChannelImpl#accept()
中,这里new的SocketChannelImpl
对象是获得链接以后作的事情,那对于服务器来说,绑定时候用的Socket呢,这里,咱们在使用ServerSocketChannel
的时候,每每要使用JDK给咱们提供的对我统一的方法open
,也是为了下降咱们使用的复杂度,这里是java.nio.channels.ServerSocketChannel#open
:
//java.nio.channels.ServerSocketChannel#open
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
//sun.nio.ch.SelectorProviderImpl#openServerSocketChannel
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
//sun.nio.ch.ServerSocketChannelImpl#ServerSocketChannelImpl(SelectorProvider)
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
}
//sun.nio.ch.Net#serverSocket
static FileDescriptor serverSocket(boolean stream) {
return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));
}
复制代码
能够看到,只要new了一个ServerSocketChannelImpl对象,就至关于拿到了一个socket
而后bind也就有着落了。可是,咱们要注意下细节ServerSocketChannel#open
获得的是ServerSocketChannel
类型。咱们accept到一个客户端来的链接后,应该在客户端与服务器之间建立一个Socket通道来供二者通讯操做的,因此,sun.nio.ch.ServerSocketChannelImpl#accept()
中所作的是SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
,获得的是SocketChannel
类型的对象,这样,就能够将Socket的读写数据的方法定义在这个类里面。
关于ServerSocketChannel
,咱们还有方法须要接触一下,如socket():
//sun.nio.ch.ServerSocketChannelImpl#socket
@Override
public ServerSocket socket() {
synchronized (stateLock) {
if (socket == null)
socket = ServerSocketAdaptor.create(this);
return socket;
}
}
复制代码
咱们看到了ServerSocketAdaptor
,咱们经过此类的注释可知,这是一个和ServerSocket
调用同样,可是底层是用ServerSocketChannelImpl
来实现的一个类,其适配是的目的是适配咱们使用ServerSocket
的方式,因此该ServerSocketAdaptor
继承ServerSocket
并按顺序重写了它的方法,因此,咱们在写这块儿代码的时候也就有了新的选择。
InterruptibleChannel 与可中断 IO这一篇文章中已经涉及过java.nio.channels.spi.AbstractInterruptibleChannel#close
的实现,这里,咱们再来回顾下其中的某些细节,顺带引出咱们新的话题:
//java.nio.channels.spi.AbstractInterruptibleChannel#close
public final void close() throws IOException {
synchronized (closeLock) {
if (closed)
return;
closed = true;
implCloseChannel();
}
}
//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
// clone keys to avoid calling cancel when holding keyLock
SelectionKey[] copyOfKeys = null;
synchronized (keyLock) {
if (keys != null) {
copyOfKeys = keys.clone();
}
}
if (copyOfKeys != null) {
for (SelectionKey k : copyOfKeys) {
if (k != null) {
k.cancel(); // invalidate and adds key to cancelledKey set
}
}
}
}
//sun.nio.ch.ServerSocketChannelImpl#implCloseSelectableChannel
@Override
protected void implCloseSelectableChannel() throws IOException {
assert !isOpen();
boolean interrupted = false;
boolean blocking;
// set state to ST_CLOSING
synchronized (stateLock) {
assert state < ST_CLOSING;
state = ST_CLOSING;
blocking = isBlocking();
}
// wait for any outstanding accept to complete
if (blocking) {
synchronized (stateLock) {
assert state == ST_CLOSING;
long th = thread;
if (th != 0) {
//本地线程不为null,则本地Socket预先关闭
//并通知线程通知关闭
nd.preClose(fd);
NativeThread.signal(th);
// wait for accept operation to end
while (thread != 0) {
try {
stateLock.wait();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
}
} else {
// non-blocking mode: wait for accept to complete
acceptLock.lock();
acceptLock.unlock();
}
// set state to ST_KILLPENDING
synchronized (stateLock) {
assert state == ST_CLOSING;
state = ST_KILLPENDING;
}
// close socket if not registered with Selector
//若是未在Selector上注册,直接kill掉
//即关闭文件描述
if (!isRegistered())
kill();
// restore interrupt status
//印证了咱们上一篇中在异步打断中如果经过线程的中断方法中断线程的话
//最后要设定该线程状态是interrupt
if (interrupted)
Thread.currentThread().interrupt();
}
@Override
public void kill() throws IOException {
synchronized (stateLock) {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
}
}
}
复制代码
也是由于close()
并无在InterruptibleChannel 与可中断 IO这一篇文章中进行具体的讲解应用,这里其应用的更可能是在SocketChannel
这里,其更多的涉及到客户端与服务端创建链接交换数据,因此断开链接后,将不用的Channel关闭是很正常的。 这里,在sun.nio.ch.ServerSocketChannelImpl#accept()
中的源码中:
@Override
public SocketChannel accept() throws IOException {
...
// newly accepted socket is initially in blocking mode
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
// check permitted to accept connections from the remote address
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
return sc;
} finally {
acceptLock.unlock();
}
}
复制代码
这里经过对所接收的链接的远程地址作合法性判断,假如验证出现异常,则关闭上面建立的SocketChannel
。 还有一个关于close()的实际用法,在客户端创建链接的时候,若是链接出异常,一样是要关闭所建立的Socket:
//java.nio.channels.SocketChannel#open(java.net.SocketAddress)
public static SocketChannel open(SocketAddress remote) throws IOException {
SocketChannel sc = open();
try {
sc.connect(remote);
} catch (Throwable x) {
try {
sc.close();
} catch (Throwable suppressed) {
x.addSuppressed(suppressed);
}
throw x;
}
assert sc.isConnected();
return sc;
}
复制代码
接着,咱们在implCloseSelectableChannel
中会发现nd.preClose(fd);
与nd.close(fd);
,这个在SocketChannelImpl
与ServerSocketChannelImpl
二者对于implCloseSelectableChannel
实现中均可以看到,这个nd是什么,这里,咱们拿ServerSocketChannelImpl
来说,在这个类的最后面有一段静态代码块(SocketChannelImpl
同理),也就是在这个类加载的时候就会执行:
//C:/Program Files/Java/jdk-11.0.1/lib/src.zip!/java.base/sun/nio/ch/ServerSocketChannelImpl.java:550
static {
//加载nio,net资源库
IOUtil.load();
initIDs();
nd = new SocketDispatcher();
}
复制代码
也就是说,在ServerSocketChannelImpl
这个类字节码加载的时候,就会建立SocketDispatcher
对象。经过SocketDispatcher
容许在不一样的平台调用不一样的本地方法进行读写操做,而后基于这个类,咱们就能够在sun.nio.ch.SocketChannelImpl
作Socket的I/O操做。
//sun.nio.ch.SocketDispatcher
class SocketDispatcher extends NativeDispatcher {
static {
IOUtil.load();
}
//读操做
int read(FileDescriptor fd, long address, int len) throws IOException {
return read0(fd, address, len);
}
long readv(FileDescriptor fd, long address, int len) throws IOException {
return readv0(fd, address, len);
}
//写操做
int write(FileDescriptor fd, long address, int len) throws IOException {
return write0(fd, address, len);
}
long writev(FileDescriptor fd, long address, int len) throws IOException {
return writev0(fd, address, len);
}
//预关闭文件描述符
void preClose(FileDescriptor fd) throws IOException {
preClose0(fd);
}
//关闭文件描述
void close(FileDescriptor fd) throws IOException {
close0(fd);
}
//-- Native methods
static native int read0(FileDescriptor fd, long address, int len) throws IOException;
static native long readv0(FileDescriptor fd, long address, int len) throws IOException;
static native int write0(FileDescriptor fd, long address, int len) throws IOException;
static native long writev0(FileDescriptor fd, long address, int len) throws IOException;
static native void preClose0(FileDescriptor fd) throws IOException;
static native void close0(FileDescriptor fd) throws IOException;
}
复制代码
咱们有看到FileDescriptor
在前面代码中有大量的出现,这里,咱们对它来专门介绍。经过FileDescriptor 这个类的实例来充当底层机器特定结构的不透明处理,表示打开文件,打开socket或其余字节源或接收器。 文件描述符的主要用途是建立一个 FileInputStream或 FileOutputStream来包含它。 注意: 应用程序不该建立本身的文件描述符。 咱们来看其部分源码:
public final class FileDescriptor {
private int fd;
private long handle;
private Closeable parent;
private List<Closeable> otherParents;
private boolean closed;
/** * true, if file is opened for appending. */
private boolean append;
static {
initIDs();
}
/** * 在未明确关闭FileDescriptor的状况下进行清理. */
private PhantomCleanable<FileDescriptor> cleanup;
/** * 构造一个无效的FileDescriptor对象,fd或handle会在以后进行设定 */
public FileDescriptor() {
fd = -1;
handle = -1;
}
/** * Used for standard input, output, and error only. * For Windows the corresponding handle is initialized. * For Unix the append mode is cached. * 仅用于标准输入,输出和错误。 * 对于Windows,初始化相应的句柄。 * 对于Unix,缓存附加模式。 * @param fd the raw fd number (0, 1, 2) */
private FileDescriptor(int fd) {
this.fd = fd;
this.handle = getHandle(fd);
this.append = getAppend(fd);
}
...
}
复制代码
咱们平时所用的标准输入,输出,错误流的句柄能够以下,一般,咱们不会直接使用它们,而是使用java.lang.System.in
,java.lang.System#out
,java.lang.System#err
:
public static final FileDescriptor in = new FileDescriptor(0);
public static final FileDescriptor out = new FileDescriptor(1);
public static final FileDescriptor err = new FileDescriptor(2);
复制代码
测试该文件描述符是否有效可使用以下方法:
//java.io.FileDescriptor#valid
public boolean valid() {
return (handle != -1) || (fd != -1);
}
复制代码
返回值为true的话,那么这个文件描述符对象所表明的socket
文件操做
或其余活动的网络链接都是有效的,反之,false则是无效。 更多内容,读者能够自行深刻源码,此处就不过多解释了。为了让你们能够更好的理解上述内容,咱们会在后面的部分还要进一步涉及一下。
在前面,咱们已经接触了SocketChannel
,这里,来接触下细节。
一样,咱们也能够经过调用此类的open
方法来建立socket channel
。这里须要注意:
socket
建立channel
。socket channel
已打开但还没有链接。channel
上调用I/O
操做将致使抛出NotYetConnectedException
。connect
方法链接socket channel
;socket channel
会保持链接状态,直到它关闭。socket channel
能够经过肯定调用其isConnected
方法。socket channel
支持 非阻塞链接:
socket channel
,而后能够经过 connect
方法创建到远程socket
的链接。finishConnect
方法来结束链接。isConnectionPending
方法来肯定。socket channel
支持异步关闭,相似于Channel
类中的异步关闭操做。
socket
的输入端被一个线程关闭而另外一个线程在此socket channel
上因在进行读操做而被阻塞,那么被阻塞线程中的读操做将不读取任何字节并将返回 -1
。socket
的输出端被一个线程关闭而另外一个线程在socket channel
上因在进行写操做而被阻塞,则被阻塞的线程将收到AsynchronousCloseException
。接下来,咱们来看其具体实现方法。
//java.nio.channels.SocketChannel#open()
public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
//java.nio.channels.SocketChannel#open(java.net.SocketAddress)
//这个方法省的咱们再次调用connect了
public static SocketChannel open(SocketAddress remote) throws IOException {
//默认是堵塞的,这个在AbstractSelectableChannel处讨论过了
SocketChannel sc = open();
try {
sc.connect(remote);
} catch (Throwable x) {
try {
sc.close();
} catch (Throwable suppressed) {
x.addSuppressed(suppressed);
}
throw x;
}
assert sc.isConnected();
return sc;
}
//sun.nio.ch.SelectorProviderImpl#openSocketChannel
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
//sun.nio.ch.SocketChannelImpl#SocketChannelImpl(java.nio.channels.spi.SelectorProvider)
SocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
//调用socket函数,true表示TCP
this.fd = Net.socket(true);
this.fdVal = IOUtil.fdVal(fd);
}
//sun.nio.ch.Net#socket(boolean)
static FileDescriptor socket(boolean stream) throws IOException {
return socket(UNSPEC, stream);
}
//sun.nio.ch.Net#socket(java.net.ProtocolFamily, boolean)
static FileDescriptor socket(ProtocolFamily family, boolean stream) throws IOException {
boolean preferIPv6 = isIPv6Available() &&
(family != StandardProtocolFamily.INET);
return IOUtil.newFD(socket0(preferIPv6, stream, false, fastLoopback));
}
//sun.nio.ch.IOUtil#newFD
public static FileDescriptor newFD(int i) {
FileDescriptor fd = new FileDescriptor();
setfdVal(fd, i);
return fd;
}
static native void setfdVal(FileDescriptor fd, int value);
复制代码
关于Net.socket(true)
,咱们前面已经提到过了,这里,经过其底层源码来再次调教下 (此处不想看能够跳过):
JNIEXPORT jint JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
jboolean stream, jboolean reuse, jboolean ignored)
{
int fd;
//字节流仍是数据报,TCP对应SOCK_STREAM,UDP对应SOCK_DGRAM,此处传入的stream=true;
int type = (stream ? SOCK_STREAM : SOCK_DGRAM);
//判断是IPV6仍是IPV4
int domain = (ipv6_available() && preferIPv6) ? AF_INET6 : AF_INET;
//调用Linux的socket函数,domain为表明协议;
//type为套接字类型,protocol设置为0来表示使用默认的传输协议
fd = socket(domain, type, 0);
//出错
if (fd < 0) {
return handleSocketError(env, errno);
}
/* Disable IPV6_V6ONLY to ensure dual-socket support */
if (domain == AF_INET6) {
int arg = 0;
//arg=1设置ipv6的socket只接收ipv6地址的报文,arg=0表示也可接受ipv4的请求
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg,
sizeof(int)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set IPV6_V6ONLY");
close(fd);
return -1;
}
}
//SO_REUSEADDR有四种用途:
//1.当有一个有相同本地地址和端口的socket1处于TIME_WAIT状态时,而你启动的程序的socket2要占用该地址和端口,你的程序就要用到该选项。
//2.SO_REUSEADDR容许同一port上启动同一服务器的多个实例(多个进程)。但每一个实例绑定的IP地址是不能相同的。
//3.SO_REUSEADDR容许单个进程绑定相同的端口到多个socket上,但每一个socket绑定的ip地址不一样。
//4.SO_REUSEADDR容许彻底相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP;
if (reuse) {
int arg = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg,
sizeof(arg)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set SO_REUSEADDR");
close(fd);
return -1;
}
}
#if defined(__linux__)
if (type == SOCK_DGRAM) {
int arg = 0;
int level = (domain == AF_INET6) ? IPPROTO_IPV6 : IPPROTO_IP;
if ((setsockopt(fd, level, IP_MULTICAST_ALL, (char*)&arg, sizeof(arg)) < 0) &&
(errno != ENOPROTOOPT)) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set IP_MULTICAST_ALL");
close(fd);
return -1;
}
}
//IPV6_MULTICAST_HOPS用于控制多播的范围,
// 1表示只在本地网络转发,
//更多介绍请参考(http://www.ctt.sbras.ru/cgi-bin/www/unix_help/unix-man?ip6+4);
/* By default, Linux uses the route default */
if (domain == AF_INET6 && type == SOCK_DGRAM) {
int arg = 1;
if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg,
sizeof(arg)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set IPV6_MULTICAST_HOPS");
close(fd);
return -1;
}
}
#endif
return fd;
}
复制代码
Linux 3.9以后加入了SO_REUSEPORT
配置,这个配置很强大,多个socket
(无论是处于监听仍是非监听,无论是TCP仍是UDP)只要在绑定以前设置了SO_REUSEPORT
属性,那么就能够绑定到彻底相同的地址和端口。 为了阻止"port 劫持"(Port hijacking
)有一个特别的限制:全部但愿共享源地址和端口的socket都必须拥有相同的有效用户id(effective user ID
)。这样一个用户就不能从另外一个用户那里"偷取"端口。另外,内核在处理SO_REUSEPORT socket
的时候使用了其它系统上没有用到的"特殊技巧":
例如:一个简单的服务器程序的多个实例可使用SO_REUSEPORT socket
,这样就实现一个简单的负载均衡,由于内核已经把请求的分配都作了。
在前面的代码中能够看到,在这个socket
建立成功以后,调用IOUtil.newFD
建立了文件描述符 。这里,我只是想知道这个Socket是能够输入呢,仍是能够读呢,仍是有错呢,参考FileDescriptor
这一节最后那几个标准状态的设定,其实这里也是同样,由于咱们要往Socket中写和读,其标准状态无非就这三种:输入,输出,出错。而这个Socket是绑定在SocketChannel
上的,那就把FileDescriptor
也绑定到上面便可,这样咱们就能够获取到它的状态了。因为FileDescriptor没有提供外部设置fd的方法,setfdVal是经过本地方法实现的:
JNIEXPORT void JNICALL Java_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val) {
(*env)->SetIntField(env, fdo, fd_fdID, val);
}
复制代码
假如各位有对Linux下的shell编程或者命令有了解的话,咱们知道,shell对报错进行重定向要使用2>,也就是将错误信息由2号所指向的通道写出,这里0和1 一样指向一个通道。此处一样也表明了状态,这样就能够对表明Socket的状态进行操做了,也就是改变SelectionKey
的interest ops
,即首先对SelectionKey
按输入输出类型进行分类,而后咱们的读写状态的操做也就有着落了。此处咱们打个戳,在下一篇中会对其进行细节讲解。
咱们回归到SocketChannel
的open
方法中。咱们能够看到,SelectorProvider.provider().openSocketChannel()
返回的是SocketChannelImpl
对象实例。在SocketChannelImpl(SelectorProvider sp)
中咱们并未看到其对this.state
进行值操做,也就是其默认为0,即ST_UNCONNECTED
(未链接状态),同时Socket默认是堵塞的。 因此,通常状况下,当采用异步方式时,使用不带参数的open方法比较常见,这样,咱们会随之调用configureBlocking
来设置非堵塞。
由前面可知,咱们调用connect
方法链接到远程服务器,其源码以下:
//sun.nio.ch.SocketChannelImpl#connect
@Override
public boolean connect(SocketAddress sa) throws IOException {
InetSocketAddress isa = Net.checkAddress(sa);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
InetAddress ia = isa.getAddress();
if (ia.isAnyLocalAddress())
ia = InetAddress.getLocalHost();
try {
readLock.lock();
try {
writeLock.lock();
try {
int n = 0;
boolean blocking = isBlocking();
try {
//支持线程中断,经过设置当前线程的Interruptible blocker属性实现
beginConnect(blocking, isa);
do {
//调用connect函数实现,若是采用堵塞模式,会一直等待,直到成功或出//现异常
n = Net.connect(fd, ia, isa.getPort());
} while (n == IOStatus.INTERRUPTED && isOpen());
} finally {
endConnect(blocking, (n > 0));
}
assert IOStatus.check(n);
//链接成功
return n > 0;
} finally {
writeLock.unlock();
}
} finally {
readLock.unlock();
}
} catch (IOException ioe) {
// connect failed, close the channel
close();
throw SocketExceptions.of(ioe, isa);
}
}
复制代码
关于beginConnect
与endConnect
,是针对AbstractInterruptibleChannel
中begin()
与end
方法的一种加强。这里咱们须要知道的是,假如是非阻塞Channel的话,咱们无须去关心链接过程的打断。顾名思义,只有阻塞等待才须要去考虑打断这一场景的出现。剩下的细节我已经在代码中进行了完整的注释,读者可自行查看。
//sun.nio.ch.SocketChannelImpl#beginConnect
private void beginConnect(boolean blocking, InetSocketAddress isa) throws IOException { //只有阻塞的时候才会进入begin
if (blocking) {
// set hook for Thread.interrupt
//支持线程中断,经过设置当前线程的Interruptible blocker属性实现
begin();
}
synchronized (stateLock) {
//默认为open, 除非调用了close方法
ensureOpen();
//检查链接状态
int state = this.state;
if (state == ST_CONNECTED)
throw new AlreadyConnectedException();
if (state == ST_CONNECTIONPENDING)
throw new ConnectionPendingException();
//断言当前的状态是不是未链接状态,若是是,赋值表示正在链接中
assert state == ST_UNCONNECTED;
//表示正在链接中
this.state = ST_CONNECTIONPENDING;
//只有未绑定本地地址也就是说未调用bind方法才执行,
//该方法在ServerSocketChannel中也见过
if (localAddress == null)
NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
remoteAddress = isa;
if (blocking) {
// record thread so it can be signalled if needed
readerThread = NativeThread.current();
}
}
}
复制代码
在链接过程当中,咱们须要注意的就是几个链接的状态:ST_UNCONNECTED
、ST_CONNECTED
、ST_CONNECTIONPENDING
、ST_CLOSING
、ST_KILLPENDING
、ST_KILLED
,也是由于其是一个公共状态,可能会有多个线程对其进行链接操做的。因此,state
被定义为一个volatile
变量,这个变量在改变的时候须要有stateLock
这个对象来做为synchronized
锁对象来控制同步操做的。
//sun.nio.ch.SocketChannelImpl#endConnect
private void endConnect(boolean blocking, boolean completed) throws IOException {
endRead(blocking, completed);
//当上面代码中n>0,说明链接成功,更新状态为ST_CONNECTED
if (completed) {
synchronized (stateLock) {
if (state == ST_CONNECTIONPENDING) {
localAddress = Net.localAddress(fd);
state = ST_CONNECTED;
}
}
}
}
//sun.nio.ch.SocketChannelImpl#endRead
private void endRead(boolean blocking, boolean completed) throws AsynchronousCloseException { //当阻塞状态下的话,才进入
if (blocking) {
synchronized (stateLock) {
readerThread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
stateLock.notifyAll();
}
}
//和begin成对出现,当线程中断时,抛出ClosedByInterruptException
// remove hook for Thread.interrupt
end(completed);
}
}
复制代码
咱们来关注connect
中的Net.connect(fd, ia, isa.getPort())
方法:
//sun.nio.ch.Net#connect
static int connect(FileDescriptor fd, InetAddress remote, int remotePort) throws IOException {
return connect(UNSPEC, fd, remote, remotePort);
}
//sun.nio.ch.Net#connect
static int connect(ProtocolFamily family, FileDescriptor fd, InetAddress remote, int remotePort) throws IOException {
boolean preferIPv6 = isIPv6Available() &&
(family != StandardProtocolFamily.INET);
return connect0(preferIPv6, fd, remote, remotePort);
}
复制代码
该方法最终会调用native方法,具体注释以下:
JNIEXPORT jint JNICALL Java_sun_nio_ch_Net_connect0(JNIEnv *env, jclass clazz, jboolean preferIPv6, jobject fdo, jobject iao, jint port) {
SOCKETADDRESS sa;
int sa_len = 0;
int rv;
//地址转换为struct sockaddr格式
if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len, preferIPv6) != 0) {
return IOS_THROWN;
}
//传入fd和sockaddr,与远程服务器创建链接,通常就是TCP三次握手
//若是设置了configureBlocking(false),不会堵塞,不然会堵塞一直到超时或出现异常
rv = connect(fdval(env, fdo), &sa.sa, sa_len);
//0表示链接成功,失败时经过errno获取具体缘由
if (rv != 0) {
//非堵塞,链接还未创建(-2)
if (errno == EINPROGRESS) {
return IOS_UNAVAILABLE;
} else if (errno == EINTR) {
//中断(-3)
return IOS_INTERRUPTED;
}
return handleSocketError(env, errno);
}
//链接创建,通常TCP链接链接都须要时间,所以除非是本地网络,
//通常状况下非堵塞模式返回IOS_UNAVAILABLE比较多;
return 1;
}
复制代码
从上面能够经过注释看到,若是是非堵塞,并且链接也并未立马创建成功,其返回的是-2,也就是链接未创建成功,由以前beginConnect
部分源码可知,此时状态为ST_CONNECTIONPENDING
,那么,非阻塞条件下,何时会变为ST_CONNECTED
?有什么方法能够查询状态或者等待链接完成? 那就让咱们来关注下sun.nio.ch.SocketChannelImpl#finishConnect
首先,咱们回顾下,前面咱们涉及了sun.nio.ch.ServerSocketAdaptor
的用法,方便咱们只有Socket编程习惯人群使用,这里,咱们也就能够看到基本的核心实现逻辑,那么有ServerSocketAdaptor
就有SocketAdaptor
,这里,在BIO的Socket编程中最后也是调用了connect(address)
操做:
//java.net.Socket#Socket
private Socket(SocketAddress address, SocketAddress localAddr, boolean stream) throws IOException {
setImpl();
// backward compatibility
if (address == null)
throw new NullPointerException();
try {
createImpl(stream);
if (localAddr != null)
bind(localAddr);
connect(address);
} catch (IOException | IllegalArgumentException | SecurityException e) {
try {
close();
} catch (IOException ce) {
e.addSuppressed(ce);
}
throw e;
}
}
复制代码
这里,咱们能够调用java.nio.channels.SocketChannel#open()
,而后调用所获得的SocketChannel
对象的socket()
方法,就能够获得sun.nio.ch.SocketAdaptor
对象实例了。咱们来查看SocketAdaptor
的connect实现:
//sun.nio.ch.SocketAdaptor#connect
public void connect(SocketAddress remote) throws IOException {
connect(remote, 0);
}
public void connect(SocketAddress remote, int timeout) throws IOException {
if (remote == null)
throw new IllegalArgumentException("connect: The address can't be null");
if (timeout < 0)
throw new IllegalArgumentException("connect: timeout can't be negative");
synchronized (sc.blockingLock()) {
if (!sc.isBlocking())
throw new IllegalBlockingModeException();
try {
//未设定超时则会一直在此等待直到链接或者出现异常
// no timeout
if (timeout == 0) {
sc.connect(remote);
return;
}
//有超时设定,则会将Socket给设定为非阻塞
// timed connect
sc.configureBlocking(false);
try {
if (sc.connect(remote))
return;
} finally {
try {
sc.configureBlocking(true);
} catch (ClosedChannelException e) { }
}
long timeoutNanos = NANOSECONDS.convert(timeout, MILLISECONDS);
long to = timeout;
for (;;) {
//经过计算超时时间,在容许的时间范围内无限循环来进行链接,
//若是超时,则关闭这个Socket
long startTime = System.nanoTime();
if (sc.pollConnected(to)) {
boolean connected = sc.finishConnect();
//看下文解释
assert connected;
break;
}
timeoutNanos -= System.nanoTime() - startTime;
if (timeoutNanos <= 0) {
try {
sc.close();
} catch (IOException x) { }
throw new SocketTimeoutException();
}
to = MILLISECONDS.convert(timeoutNanos, NANOSECONDS);
}
} catch (Exception x) {
Net.translateException(x, true);
}
}
}
复制代码
这里先解释下一个小注意点:在Java中,assert
关键字是从JAVA SE 1.4
引入的,为了不和老版本的Java代码中使用了assert
关键字致使错误,Java在执行的时候默认是不启动断言检查的(这个时候,全部的断言语句都 将忽略!),若是要开启断言检查,则须要用开关-enableassertions或-ea来开启。 经过上面的源码注释,相信大伙已经知道大体的流程了,那关于sun.nio.ch.SocketChannelImpl#finishConnect
到底作了什么,此处,咱们来探索一番:
//sun.nio.ch.SocketChannelImpl#finishConnect
@Override
public boolean finishConnect() throws IOException {
try {
readLock.lock();
try {
writeLock.lock();
try {
// no-op if already connected
if (isConnected())
return true;
boolean blocking = isBlocking();
boolean connected = false;
try {
beginFinishConnect(blocking);
int n = 0;
if (blocking) {
do {
//阻塞状况下,第二个参数传入true
n = checkConnect(fd, true);
} while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen());
} else {
//非阻塞状况下,第二个参数传入false
n = checkConnect(fd, false);
}
connected = (n > 0);
} finally {
endFinishConnect(blocking, connected);
}
assert (blocking && connected) ^ !blocking;
return connected;
} finally {
writeLock.unlock();
}
} finally {
readLock.unlock();
}
} catch (IOException ioe) {
// connect failed, close the channel
close();
throw SocketExceptions.of(ioe, remoteAddress);
}
}
//sun.nio.ch.SocketChannelImpl#checkConnect
private static native int checkConnect(FileDescriptor fd, boolean block) throws IOException;
复制代码
关于beginFinishConnect
与endFinishConnect
和咱们以前分析的sun.nio.ch.SocketChannelImpl#beginConnect
与sun.nio.ch.SocketChannelImpl#endConnect
过程差很少,不懂读者可回看。剩下的,就是咱们关注的主要核心逻辑checkConnect(fd, true)
,它也是一个本地方法,涉及到的源码以下:
JNIEXPORT jint JNICALL Java_sun_nio_ch_SocketChannelImpl_checkConnect(JNIEnv *env, jobject this, jobject fdo, jboolean block) {
int error = 0;
socklen_t n = sizeof(int);
//获取FileDescriptor中的fd
jint fd = fdval(env, fdo);
int result = 0;
struct pollfd poller;
//文件描述符
poller.fd = fd;
//请求的事件为写事件
poller.events = POLLOUT;
//返回的事件
poller.revents = 0;
//-1表示阻塞,0表示当即返回,不阻塞进程
result = poll(&poller, 1, block ? -1 : 0);
//小于0表示调用失败
if (result < 0) {
if (errno == EINTR) {
return IOS_INTERRUPTED;
} else {
JNU_ThrowIOExceptionWithLastError(env, "poll failed");
return IOS_THROWN;
}
}
//非堵塞时,0表示没有准备好的链接
if (!block && (result == 0))
return IOS_UNAVAILABLE;
//准备好写或出现错误的socket数量>0
if (result > 0) {
errno = 0;
result = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &n);
//出错
if (result < 0) {
return handleSocketError(env, errno);
//发生错误,处理错误
} else if (error) {
return handleSocketError(env, error);
} else if ((poller.revents & POLLHUP) != 0) {
return handleSocketError(env, ENOTCONN);
}
//socket已经准备好,可写,即链接已经创建好
// connected
return 1;
}
return 0;
}
复制代码
具体的过程如源码注释所示,其中是否阻塞咱们在本地方法源码中和以前sun.nio.ch.SocketChannelImpl#finishConnect
的行为产生对应。另外,从上面的源码看到,底层是经过poll
查询socket
的状态,从而判断链接是否创建成功;因为在非堵塞模式下,finishConnect
方法会当即返回,根据此处sun.nio.ch.SocketAdaptor#connect
的处理,其使用循环的方式判断链接是否创建,在咱们的nio编程中,这个是不建议的,属于半成品,而是建议注册到Selector
,经过ops=OP_CONNECT
获取链接完成的SelectionKey
,而后调用finishConnect
完成链接的创建; 那么finishConnect
是否能够不调用呢?答案是否认的,由于只有finishConnect
中才会将状态更新为ST_CONNECTED
,而在调用read
和write
时都会对状态进行判断。
这里,咱们算是引出了咱们即将要涉及的Selector
和SelectionKey
,咱们会在下一篇中进行详细讲解。