原本是想学习Netty的,可是Netty是一个NIO框架,所以在学习netty以前,仍是先梳理一下NIO的知识。经过剖析源码理解NIO的设计原理。html
本系列文章针对的是JDK1.8.0.161的源码。java
上一篇介绍了Channel的接口,本篇对SocektChannel的源码进行解析。linux
咱们经过ServerSocketChannel.open()
建立一个ServerSocketChannel
,它实际经过provider
建立。c++
public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel {
protected ServerSocketChannel(SelectorProvider provider) {
super(provider);
}
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
...
}
复制代码
在首次建立时,建立初始化SelectorProvider
对象。git
public static SelectorProvider provider() {
synchronized(lock) {
return provider != null ? provider : (SelectorProvider)AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (SelectorProvider.loadProviderFromProperty()) {
return SelectorProvider.provider;
} else if (SelectorProvider.loadProviderAsService()) {
return SelectorProvider.provider;
} else {
SelectorProvider.provider = DefaultSelectorProvider.create();
return SelectorProvider.provider;
}
}
});
}
}
复制代码
具体
SelcetProvider
实如今《NIO-Selector》一节讲解。github
privoder
建立完成后经过openServerSocketChannel
建立ServiceSocketChannel
。编程
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
复制代码
在ServerSocketChannelImpl
静态构造函数中会进行一些初始化操做。windows
class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl {
private static NativeDispatcher nd;
static {
IOUtil.load();
initIDs();
nd = new SocketDispatcher();
}
private static native void initIDs();
...
}
复制代码
经过静态构造函数在首次建立时经过IOUtil.load()
初始化一些必要的参数。IOUtil的静态构造方法来加载net和nio的类库。数组
public static void load() { }
static {
java.security.AccessController.doPrivileged(
new java.security.PrivilegedAction<Void>() {
public Void run() {
//加载net和nio的库
//net主要是应用层的一些协议实现,如FTP,Http
System.loadLibrary("net");
System.loadLibrary("nio");
return null;
}
});
initIDs();
IOV_MAX = iovMax();
}
复制代码
经过System.loadLibrary
加载后就能够调用该类的native方法。缓存
initIDs
是一个native方法,这里对windows下的native源码进行说明。在native\sun\nio\ch\ServerSocketChannelImpl.c
能够看到ServerSocketChannelImpl的initIDs的代码
Java_sun_nio_ch_ServerSocketChannelImpl_initIDs(JNIEnv *env, jclass cls)
{
//查找类型FileDescriptor
cls = (*env)->FindClass(env, "java/io/FileDescriptor");
//获取class的字段fd,I是int类型的意思,将字段id保存到fd_fdID,之后要获取该字段就能够直接用fd_fdID去获取
fd_fdID = (*env)->GetFieldID(env, cls, "fd", "I");
//查找InetSocketAddress类型
cls = (*env)->FindClass(env, "java/net/InetSocketAddress");
//建立一个引用指向cls,后面能够经过cls操做这个java对象
isa_class = (*env)->NewGlobalRef(env, cls);
//获取构造函数的方法id,(Ljava/net/InetAddress;I)V 是反汇编后的构造函数签名
isa_ctorID = (*env)->GetMethodID(env, cls, "<init>",
"(Ljava/net/InetAddress;I)V");
}
复制代码
将fd_fdID
、isa_ctorID
和isa_class
保存起来后面都用获得。
FindClass
:是查找指定的类型GetFieldID
:获取类型字段idNewGlobalRef
:建立一个引用GetMethodID
:获取方法id关于JNI的字段和方法解释能够看JNI GetFieldID和GetMethodID函数解释及方法签名
咱们能够经过javap
查看类的方法签名,经过-p
显示全部类和成员,-l
显示行号和本地变量表,-c
对代码进行反汇编。
C:\Program Files\Java\jdk1.8.0_161\bin>javap 用法: javap <options> <classes> 其中, 可能的选项包括: -help --help -? 输出此用法消息 -version 版本信息 -v -verbose 输出附加信息 -l 输出行号和本地变量表 -public 仅显示公共类和成员 -protected 显示受保护的/公共类和成员 -package 显示程序包/受保护的/公共类 和成员 (默认) -p -private 显示全部类和成员 -c 对代码进行反汇编 -s 输出内部类型签名 -sysinfo 显示正在处理的类的 系统信息 (路径, 大小, 日期, MD5 散列) -constants 显示最终常量 -classpath <path> 指定查找用户类文件的位置 -cp <path> 指定查找用户类文件的位置 -bootclasspath <path> 覆盖引导类文件的位置 复制代码
在命令行输入javap -p -c -l java.net.InetSocketAddress
查看构造函数的方法签名为Method "<init>":(Ljava/net/InetAddress;I)V
C:\Program Files\Java\jdk1.8.0_161\bin>javap -p -c -l java.net.InetSocketAddress Compiled from "InetSocketAddress.java" public class java.net.InetSocketAddress extends java.net.SocketAddress { private final transient java.net.InetSocketAddress$InetSocketAddressHolder holder; private static final long serialVersionUID; ... public java.net.InetSocketAddress(int); Code: 0: aload_0 1: invokestatic #215 // Method java/net/InetAddress.anyLocalAddress:()Ljava/net/InetAddress; 4: iload_1 5: invokespecial #219 // Method "<init>":(Ljava/net/InetAddress;I)V 8: return LineNumberTable: line 166: 0 line 167: 8 ... 复制代码
IOV_MAX用于获取最大的可一次性写入的缓存个数,当咱们经过IOUtils.write一次性向Channel写入多个Buffer时,会有Buffer最大数量限制的。
在IOUtil.load
初始化完成,则会建立SocketDispatcher
,它提供了Socket的native方法,不一样平台对于SocketDispatcher实现不同,最终都是调用FileDispatcherImpl
执行相关的文件操做。
class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl {
private final FileDescriptor fd;
private int fdVal;
private static final int ST_INUSE = 0;
private int state = -1;
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
//首先经过Net.serverSocket(true)建立Socket并建立一个文件描述符与其关联。
this.fd = Net.serverSocket(true);
//在注册selector的时候须要获取到文件描述符的值。
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
ServerSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) throws IOException {
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
//已绑定则直接获取地址
if (bound)
localAddress = Net.localAddress(fd);//获取传入的文件描述符的socket地址
}
...
}
复制代码
文件描述符简称fd,它是一个抽象概念,在C库编程中能够叫作文件流或文件流指针,在其它语言中也能够叫作文件句柄(handler),并且这些不一样名词的隐含意义多是不彻底相同的。不过在系统层,咱们统一把它叫作文件描述符。
咱们经过channel.bind
能够将socket绑定到一个端口上。
public final ServerSocketChannel bind(SocketAddress local) throws IOException {
return bind(local, 0);
}
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (lock) {
...
InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
Net.checkAddress(local);
SecurityManager sm = System.getSecurityManager();
//检查是否端口已被监听
if (sm != null)
sm.checkListen(isa.getPort());
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
Net.bind(fd, isa.getAddress(), isa.getPort());
//默认tcp待链接队列长度最小为50
Net.listen(fd, backlog < 1 ? 50 : backlog);
synchronized (stateLock) {
//从文件描述符中获取地址信息
localAddress = Net.localAddress(fd);
}
}
return this;
}
复制代码
首先会作一下基本校验,包括检查端口是否被占用。最终绑定并监听端口。
文件描述符能够关联到一个文件设备,最终能够关联到socket结构体,经过socket结构体能够提取出地址信息。如何获取地址信息能够看下根据文件描述符fd获取socket结构体,socket结构体能够看下struct socket结构体详解 关于backlog,因为TCP链接时会有3次握手,当服务端接收到
SYN
包时,会将该请求socket加入到待链接队列中,而后返回SYN+ACK
继续进行链接握手。若链接并发量很高,而服务端来不及accept致使待链接队列满了,则后续的链接请求将会被拒绝。
在创建在绑定地址以前,咱们须要调用NetHooks.beforeTcpBind
,这个方法是将fd转换为SDP(Sockets Direct Protocol,Java套接字直接协议) socket。
SDP须要网卡支持InfiniBand高速网络通讯技术,windows不支持该协议。
public final class NetHooks {
public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
}
public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
}
}
复制代码
public final class NetHooks {
public static abstract class Provider {
protected Provider() {}
public abstract void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException;
public abstract void implBeforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException;
}
private static final Provider provider = new sun.net.sdp.SdpProvider();
public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
provider.implBeforeTcpBind(fdObj, address, port);
}
public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
provider.implBeforeTcpConnect(fdObj, address, port);
}
}
复制代码
实际调用了SdpProvider的方法
provider.implBeforeTcpBind(fdObj, address, port);
public void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
if (enabled)
convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);
}
private void convertTcpToSdpIfMatch(FileDescriptor fdObj, Action action, InetAddress address, int port) throws IOException {
boolean matched = false;
//匹配规则,通常有PortRangeRule校验器校验端口范围是否符合规则
for (Rule rule: rules) {
if (rule.match(action, address, port)) {
//将fd转换为socket
SdpSupport.convertSocket(fdObj);
matched = true;
break;
}
}
...
}
public static void convertSocket(FileDescriptor fd) throws IOException {
...
//获取fd索引
int fdVal = fdAccess.get(fd);
convert0(fdVal);
}
//调用native方法转换
private static native int create0() throws IOException;
//rules 是在初始化SdpProvider加载的
public SdpProvider() {
String file = AccessController.doPrivileged(
new GetPropertyAction("com.sun.sdp.conf"));
...
list = loadRulesFromFile(file);
...
this.rules = list;
this.log = out;
}
复制代码
咱们除了使用chennel.bind
绑定地址之外,还能够经过channel.socket().bind
绑定。channel.socket()
会建立一个内部的ServerSocket,ServerSocketAdaptor
把实现了SocketServer的配置、绑定Socket Server的功能抽出提取到了ServerSocket
中。
class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl {
...
ServerSocket socket;
public ServerSocket socket() {
synchronized(this.stateLock) {
if (this.socket == null) {
this.socket = ServerSocketAdaptor.create(this);
}
return this.socket;
}
}
...
}
复制代码
ServerSocketAdaptor
实现了SocketServer的绑定,监听等功能,但实际仍是调用ServerSocketChannelImpl
的方法。
public class ServerSocketAdaptor extends ServerSocket {
private final ServerSocketChannelImpl ssc;
public static ServerSocket create(ServerSocketChannelImpl ssc) {
try {
return new ServerSocketAdaptor(ssc);
} catch (IOException x) {
throw new Error(x);
}
}
private ServerSocketAdaptor(ServerSocketChannelImpl ssc) throws IOException {
this.ssc = ssc;
}
public void bind(SocketAddress local, int backlog) throws IOException {
...
ssc.bind(local, backlog);
...
}
public Socket accept() throws IOException {
synchronized (ssc.blockingLock()) {
...
SocketChannel sc;
if ((sc = ssc.accept()) != null)
...
}
}
public void close() throws IOException {
ssc.close();
}
复制代码
咱们能够经过channel.accept
接收一个新的通道。
public SocketChannel accept() throws IOException {
synchronized (lock) {
//基本的校验
if (!isOpen())
throw new ClosedChannelException();
if (!isBound())
throw new NotYetBoundException();
SocketChannel sc = null;
int n = 0;
//建立文件描述符和接收到的客户端socket进行绑定
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
try {
//I/O开始前调用begin
begin();
if (!isOpen())
return null;
thread = NativeThread.current();
for (;;) {
//接收新链接,将给定的文件描述符与socket客户端绑定,并设置套接字客户端的地址。
n = accept(this.fd, newfd, isaa);
//返回 1 成功
//若返回IOStaus.INTERRUPTED表示系统调用了中断操做,继续等待接收。
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
break;
}
} finally {
thread = 0;
//I/O结束调用end
end(n > 0);
assert IOStatus.check(n);
}
//返回 1 成功
if (n < 1)
return null;
//默认都是阻塞模式
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
//初始化客户端socketchannel
sc = new SocketChannelImpl(provider(), newfd, isa);
//检查权限
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
sm.checkAccept(isa.getAddress().getHostAddress(),
isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
return sc;
}
}
复制代码
SecurityManager是安全管理器,用于权限校验的类,若权限校验不经过则会抛出
AccessControlException
异常。
SocketChannel
和ServerSocketChannel
相似,经过SocketChannel.open
建立channel,和服务端相似也是经过provider调用SocketChannle的构造函数。
public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
复制代码
咱们也能够经过open(SocketAddress remote)
传入链接地址,在建立后直接。
public static SocketChannel open(SocketAddress remote) throws IOException {
SocketChannel sc = open();
try {
sc.connect(remote);
}
...
return sc;
}
public boolean connect(SocketAddress sa) throws IOException {
...
begin();
if (localAddress == null) {
NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
}
...
//创建链接
n = Net.connect(fd, ia, isa.getPort());
...
end((n > 0) || (n == IOStatus.UNAVAILABLE));
...
//更新状态
state = ST_CONNECTED;
// 非阻塞socket则更新状态为ST_PENDING
if (!isBlocking())
state = ST_PENDING;
...
}
复制代码
NetHooks.beforeTcpConnect
和NetHooks.implBeforeTcpBind
同样,最终都是调用SdpProvider.convertTcpToSdpIfMatch
将数据写入channel时会调用IOUtil.write
public int write(ByteBuffer buf) throws IOException {
...
begin();
...
IOUtil.write(fd, buf, -1, nd);
...
end(n > 0 || (n == IOStatus.UNAVAILABLE));
...
}
static int write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd) throws IOException {
//使用直接缓冲区,则直接写入到缓冲区中
if (src instanceof DirectBuffer)
return writeFromNativeBuffer(fd, src, position, nd);
// 不是直接缓冲区
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
//获取一个临时的直接缓冲区地址。
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
//写入到临时的直接缓冲区中
bb.put(src);
bb.flip();
src.position(pos);
//将直接缓冲区数据写入
int n = writeFromNativeBuffer(fd, bb, position, nd);
if (n > 0) {
//更新实际写入量
src.position(pos + n);
}
return n;
} finally {
//使用完缓冲释放
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
private static int writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, NativeDispatcher nd) throws IOException {
...
if (position != -1) {
//socket不支持,文件支持
written = nd.pwrite(fd, ((DirectBuffer)bb).address() + pos, rem, position);
} else {
//调用native方法
written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
}
...
}
int write(FileDescriptor fd, long address, int len) throws IOException {
return write0(fd, address, len, append);
}
static native int write0(FileDescriptor fd, long address, int len, boolean append) throws IOException;
复制代码
当咱们使用堆缓冲区时,须要从线程本地缓冲区申请一块临时的直接缓冲区用于存放临时数据,会多一次内存复制。
关于为何须要使用一块临时缓冲区作中间处理能够看下《Java NIO为何须要DirectByteBuffer做为中间缓冲区》
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
private static ThreadLocal<BufferCache> bufferCache = new ThreadLocal<BufferCache>()
{
@Override
protected BufferCache initialValue() {
return new BufferCache();
}
};
public static ByteBuffer getTemporaryDirectBuffer(int size) {
//从线程缓冲区获取一个缓冲区
BufferCache cache = bufferCache.get();
//获取能容纳的下的地址
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
//当没有合适的缓冲区时,
if (!cache.isEmpty()) {
//删除第一个未用的缓冲区
buf = cache.removeFirst();
//释放
free(buf);
}
//从新分配一个合适大小的缓冲区
return ByteBuffer.allocateDirect(size);
}
}
复制代码
ThreadLocal是利用空间换时间的一种方案性能优化方案,在每一个线程会有一个ThreadLocalMap用于存放数据。这样每一个线程就访问本身的数据,从而避免了非线程安全的数据因为并发问题须要经过加锁的方式同步带来的性能损耗。
当咱们一次性向Channel写入多个Buffer时,会有最大Buffer数量限制,也就是在Channel静态构造函数获取的IOV_MAX
的值。一次性写入多个Buffer在Linux中称之为汇集写,即将内存中分散在的若干缓冲区中的数据按顺序写至文件中。
调用Channel的write(ByteBuffer[] srcs, int offset, int length)
一次性写入多个Buffer。
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
...
begin();
...
IOUtil.write(fd, srcs, offset, length, nd);
...
end((n > 0) || (n == IOStatus.UNAVAILABLE));
...
}
write(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length, NativeDispatcher nd)
{
//获取或建立一个指定长度的IOVecWrapper结构,它能够存放多个Buffer数据。
IOVecWrapper vec = IOVecWrapper.get(length);
...
while (i < count && iov_len < IOV_MAX) {
//遍历每一块待写入缓冲区
ByteBuffer buf = bufs[i];
//计算buffer的可读大小存放rem中
...
//将buffer放入IOVecWrapper中
if (rem > 0) {
vec.setBuffer(iov_len, buf, pos, rem);
if (!(buf instanceof DirectBuffer)) {
//若buf不是直接缓冲区则须要建立一个临时的直接缓冲区
ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
...
vec.setShadow(iov_len, shadow);
...
//更新待写入缓冲区引用指向直接缓冲区。
buf = shadow;
}
//设置当前缓冲区的起始地址
vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
//设置当前缓冲区的长度
vec.putLen(iov_len, rem);
iov_len++;
}
i++;
}
//调用writev一次性写入多个缓冲区的数据。
long bytesWritten = nd.writev(fd, vec.address, iov_len);
...
long left = bytesWritten;
//清理工做
for (int j=0; j<iov_len; j++) {
if (left > 0) {
ByteBuffer buf = vec.getBuffer(j);
int pos = vec.getPosition(j);
int rem = vec.getRemaining(j);
int n = (left > rem) ? rem : (int)left;
buf.position(pos + n);
left -= n;
}
ByteBuffer shadow = vec.getShadow(j);
if (shadow != null)
//将已写入完成的缓冲区放回到临时直接缓冲区中
Util.offerLastTemporaryDirectBuffer(shadow);
// 清楚IOVcMrapper的缓存
vec.clearRefs(j);
}
return bytesWritten;
}
复制代码
与汇集写对应的还有
readv()
称为分散(散布)读,即将文件中若干连续的数据块读入内存分散的缓冲区中。 关于linux的汇集写和散步读具体内容能够阅读readv()和writev()函数和分散读取与汇集写入
经过IOVecWrapper
结构构建了一个字节缓冲区数组用于存放多个Buffer,其内部实际维护了一个Native数组结构。
class IOVecWrapper {
...
private final AllocatedNativeObject vecArray;
final long address;
static int addressSize;
private IOVecWrapper(int size) {
...
//false表示无需页面对齐
this.vecArray = new AllocatedNativeObject(size * SIZE_IOVEC, false);
this.address = vecArray.address();
}
...
static {
//获取本机指针的大小
addressSize = Util.unsafe().addressSize();
//保存每一个指针偏移量
LEN_OFFSET = addressSize;
//用于保存每一个AllocatedNativeObject对象的元素的大小
//每一个NativeObject有两个long属性,所以须要×2
SIZE_IOVEC = (short) (addressSize * 2);
}
}
复制代码
AllocatedNativeObject
在堆外申请一片内存存放本机对象。
class AllocatedNativeObject extends NativeObject {
AllocatedNativeObject(int size, boolean pageAligned) {
super(size, pageAligned);
}
}
class NativeObject {
// native 分配地址,可能小于基地址,因为页面大小会对齐,因此实际的及地址时页面对其后的地址。
protected long allocationAddress;
// native 基地址
private final long address;
}
复制代码
经过上面咱们知道当一次性批量写入缓存时会调用native的writev
汇集写方法,调用以前会申请一块地址用于存放可写入的多块缓冲区数据。如今咱们在回过头看看IOVecWrapper
具体是怎么作的。
class IOVecWrapper {
...
private final AllocatedNativeObject vecArray;
private final ByteBuffer[] buf;
private final int[] position;
private final int[] remaining;
private final ByteBuffer[] shadow;
//每一个线程保存一份IOVecWrapper缓存
private static final ThreadLocal<IOVecWrapper> cached =
new ThreadLocal<IOVecWrapper>();
//经过get获取一块适合大小的空间
static IOVecWrapper get(int size) {
IOVecWrapper wrapper = cached.get();
if (wrapper != null && wrapper.size < size) {
//若获取到空间不够大,则从新初始化一个空间。
wrapper.vecArray.free();
wrapper = null;
}
if (wrapper == null) {
wrapper = new IOVecWrapper(size);
//native资源,当对象释放时使得操做系统能够释放内存,在将Buffer的时候关于直接缓冲区提到过相关的知识
Cleaner.create(wrapper, new Deallocator(wrapper.vecArray));
cached.set(wrapper);
}
return wrapper;
}
//将buffer保存起来
void setBuffer(int i, ByteBuffer buf, int pos, int rem) {
this.buf[i] = buf;
this.position[i] = pos;
this.remaining[i] = rem;
}
...
}
复制代码
因为IOVecWrapper内部使用的是直接缓冲区,所以将它存放于ThreadLocalMap中复用以提升性能。
从channel读取数据时会调用IOUtil.read
public int read(ByteBuffer buf) throws IOException {
...
begin();
...
n = IOUtil.read(fd, buf, -1, nd);
...
end(n > 0 || (n == IOStatus.UNAVAILABLE));
...
}
static int read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd) throws IOException {
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
if (dst instanceof DirectBuffer)
return readIntoNativeBuffer(fd, dst, position, nd);//使用直接缓冲区
//当不是使用直接内存时,则从线程本地缓冲获取一块临时的直接缓冲区存放待读取的数据
ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
try {
//读取
int n = readIntoNativeBuffer(fd, bb, position, nd);
bb.flip();
if (n > 0)
//将直接缓冲区的数据写入到堆缓冲区中
dst.put(bb);
return n;
} finally {
//释放临时的直接缓冲区
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, NativeDispatcher nd) throws IOException {
...
if (position != -1) {
//socket不支持,文件支持
n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
rem, position);
} else {
n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
}
...
}
复制代码
前面说到,Channel也支持分散读取。
调用Channel的read(ByteBuffer[] dsts, int offset, int length)
读取到多个Buffer中。
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
...
begin();
...
n = IOUtil.read(fd, dsts, offset, length, nd);
...
end(n > 0 || (n == IOStatus.UNAVAILABLE));
...
}
static long read(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd) throws IOException {
return read(fd, bufs, 0, bufs.length, nd);
}
static long read(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length, NativeDispatcher nd) throws IOException {
IOVecWrapper vec = IOVecWrapper.get(length);
...
while (i < count && iov_len < IOV_MAX) {
ByteBuffer buf = bufs[i];
...
vec.setBuffer(iov_len, buf, pos, rem);
if (!(buf instanceof DirectBuffer)) {
ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
vec.setShadow(iov_len, shadow);
...
}
...
long bytesRead = nd.readv(fd, vec.address, iov_len);
//清理工做
...
return bytesRead;
//清理工做
...
}
复制代码
分散读和汇集写逻辑相似,都须要经过IOVecWrapper进行一个“中转”。
经过调用channel.close
关闭通道,在接口实现里咱们看过它实际是AbstractInterruptibleChannel
声明的。
public final void close() throws IOException {
synchronized(this.closeLock) {
if (this.open) {
this.open = false;
this.implCloseChannel();
}
}
}
复制代码
在AbstractSelectableChannel
实现了implCloseChannel
protected final void implCloseChannel() throws IOException {
//关闭当前channel
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
//注册的channel都须要取消
k.cancel();
}
}
}
复制代码
当channel注册到selector时会建立SelectionKey保存到keys中。
this.implCloseSelectableChannel();
protected void implCloseSelectableChannel() throws IOException {
synchronized(this.stateLock) {
this.isInputOpen = false;
this.isOutputOpen = false;
if (this.state != 4) {
//windows不作处理
//linux和Solaris须要,关闭前将fd复制到另外一个待关闭fd中,以防止被fd回收
nd.preClose(this.fd);
}
if (this.readerThread != 0L) {
//发送信号给读线程,将其从阻塞I/O中释放,避免一直被阻塞。
NativeThread.signal(this.readerThread);
}
if (this.writerThread != 0L) {
//发送信号给写线程,将其从阻塞I/O中释放,避免一直被阻塞。
NativeThread.signal(this.writerThread);
}
//若还有注册的channel,则不处理,等待key所有注销后再kill
//若没有的话能够直接kill当前channel
if (!this.isRegistered()) {
this.kill();
}
}
}
复制代码
kill
方法是在具体Channel中实现的,最终调用nd的close方法关闭文件描述符。
//SocketChannel
public void kill() throws IOException {
synchronized(this.stateLock) {
if (this.state != 4) {
if (this.state == -1) {
this.state = 4;
} else {
assert !this.isOpen() && !this.isRegistered();
//若仍有线程还没释放,则等线程I/O执行完后再kill
if (this.readerThread == 0L && this.writerThread == 0L) {
nd.close(this.fd);
this.state = 4;
} else {
this.state = 3;
}
}
}
}
}
//ServerSocketChannel
public void kill() throws IOException {
synchronized(this.stateLock) {
if (this.state != 1) {
if (this.state == -1) {
this.state = 1;
} else {
assert !this.isOpen() && !this.isRegistered();
nd.close(this.fd);
this.state = 1;
}
}
}
}
复制代码
ServerSocketChannel的仅有-1(未初始化)、0(使用中)和1(已释放)三个状态。
NIO支持tcp的半链接,因为TCP是全双工的,即有输入流和输出流。在某些时候咱们能够中断其中一个流,而另外一个流仍然能够继续工做。好比做为客户端咱们能够关闭输出流,可是仍然能继续接收到服务端发送的数据。
//关闭输入流
client.socket().shutdownInput();
//关闭输出流
client.socket().shutdownOutput();
复制代码
当客户端关闭了输出流,实际上会送FIN
包到服务端,服务端接收到后响应ACK
,若服务端不发送FIN
包,关闭服务端的输出流(客户端的输入流)时,则服务端仍然能继续发送(响应)数据给客户端,客户端也仍然能够继续接收到数据。NIO的Pipe就是经过两个socket的半链接实现的单项数据传输。
本篇对客户端和服务端的socket,绑定、链接、读、写以及关闭等经常使用操做进行源码分析,下一篇将继续对FileChannel源码进行探究。
![]()
- 微信扫一扫二维码关注订阅号杰哥技术分享
- 出处:www.cnblogs.com/Jack-Blog/p…
- 做者:杰哥很忙
- 本文使用「CC BY 4.0」创做共享协议。欢迎转载,请在明显位置给出出处及连接。