“ Epoll 是Linux内核的高性能、可扩展的I/O事件通知机制。 java
在linux2.5.44首次引入epoll,它设计的目的旨在取代既有的select、poll系统函数,让须要大量操做文件描述符的程序得以发挥更优异的性能(wikipedia example: 旧有的系统函数所花费的时间复杂度为O(n), epoll的时间复杂度O(log n))。epoll实现的功能与poll相似,都是监听多个文件描述符上的事件。linux
epoll底层是由可配置的操做系统内核对象建构而成,并以文件描述符(file descriptor)的形式呈现于用户空间(from wikipedia: 在操做系统中,虚拟内存一般会被分红用户空间,与核心空间这两个区段。这是存储器保护机制中的一环。内核**、核心扩展(kernel extensions)、以及驱动程序,运行在核心空间**上。而其余的应用程序,则运行在用户空间上。全部运行在用户空间的应用程序,都被统称为用户级(userland))。编程
它是一个用来管理软件发出的数据I/O的一个程序,并将数据交由CPU和电脑其余电子组件处理,可是直接对硬件操做是很是复杂的,一般内核提供一种硬件抽象的方法来完成(由内核决定一个程序在何时对某部分硬件操做多长时间),经过这些方法来完成进程间通讯和系统调用。windows
宏内核简单来讲,首先定义了一个高阶的抽象接口,叫系统调用(System call))来实现操做系统的功能,例如进程管理,文件系统,和存储管理等等,这些功能由多个运行在内核态的程序来完成。数组
微内核:缓存
微内核结构由硬件抽象层和系统调用组成;包括了建立一个系统必需的几个部分;如线程管理,地址空间和进程间通讯等。微核的目标是将系统服务的实现和系统的基本操做规则分离开来。服务器
linux就是使用的宏内核。由于它可以在运行时将模块调入执行,使扩充内核的功能变得更简单。markdown
epoll作了什么事?网络
epoll 经过使用红黑树(RB-tree)搜索被监视的文件描述符(file descriptor)。app
在 epoll 实例上注册事件时,epoll 会将该事件添加到 epoll 实例的红黑树上并注册一个回调函数,当事件发生时会将事件添加到就绪链表中。
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
复制代码
向内核申请空间,建立一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不一样于select()中的第一个参数,给出最大监听的fd+1的值。在最初的实现中,调用者经过 size 参数告知内核须要监听的文件描述符数量。若是监听的文件描述符数量超过 size, 则内核会自动扩容。而如今 size 已经没有这种语义了,可是调用者调用时 size 依然必须大于 0,以保证后向兼容性。须要注意的是,当建立好epoll句柄后,它就是会占用一个fd值,在linux下若是查看/proc/进程id/fd/,是可以看到这个fd的。
向 epfd 对应的内核epoll 实例添加、修改或删除对 fd 上事件 event 的监听。op 能够为 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 分别对应的是添加新的事件,修改文件描述符上监听的事件类型,从实例上删除一个事件。若是 event 的 events 属性设置了 EPOLLET flag,那么监听该事件的方式是边缘触发。
events能够是如下几个宏的集合:
EPOLLIN:触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);
EPOLLOUT:触发该事件,表示对应的文件描述符上能够写数据;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来讲的。
EPOLLONESHOT:只监听一次事件,当监听完此次事件以后,若是还须要继续监听这个socket的话,须要再次把这个socket加入到EPOLL队列里。
例如:
struct epoll_event ev;
//设置与要处理的事件相关的文件描述符
ev.data.fd=listenfd;
//设置要处理的事件类型
ev.events=EPOLLIN|EPOLLET;
//注册epoll事件
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
复制代码
Linux-2.6.19又引入了能够屏蔽指定信号的epoll_wait: epoll_pwait
接收发生在被侦听的描述符上的,用户感兴趣的IO事件。简单点说:经过循环,不断地监听暴露的端口,看哪个fd可读、可写~
当 timeout 为 0 时,epoll_wait 永远会当即返回。而 timeout 为 -1 时,epoll_wait 会一直阻塞直到任一已注册的事件变为就绪。当 timeout 为一正整数时,epoll 会阻塞直到计时结束或已注册的事件变为就绪。由于内核调度延迟,阻塞的时间可能会略微超过 timeout (毫秒级)。
epoll文件描述符用完后,直接用close关闭,而且会自动从被侦听的文件描述符集合中删除
说了这么多原理,脑袋怕嗡嗡的吧,来看看实战清醒下~
如上知道:每次添加/修改/删除被侦听文件描述符都须要调用epoll_ctl,因此要尽可能少地调用epoll_ctl,防止其所引来的开销抵消其带来的好处。有的时候,应用中可能存在大量的短链接(好比说Web服务器),epoll_ctl将被频繁地调用,可能成为这个系统的瓶颈。
传统的select以及poll的效率会由于在线人数的线形递增而致使呈二次乃至三次方的降低,这些直接致使了网络服务器能够支持的人数有了个比较明显的限制。这是由于他们有限的文件描述符和遍历全部的fd所带来的低效。
当你拥有一个很大的socket集合,不过因为网络延时,任一时间只有部分的socket是“活跃”的,可是select/poll每次调用都会线性扫描所有的集合,致使效率呈现线性降低。epoll不存在这个问题,它只会对“活跃”的socket进行操做---这是由于在内核实现中epoll是根据每一个fd上面的callback函数实现的。那么,只有“活跃”的socket才会主动的去调用 callback函数,其余idle(空闲)状态socket则不会,在这点上,epoll实现了一个“伪”AIO,由于这时候推进力在os内核。在一些 benchmark中,若是全部的socket基本上都是活跃的---好比一个高速LAN环境,epoll并不比select/poll有什么效率,相反,若是过多使用epoll_ctl,效率相比还有稍微的降低。可是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。
int epfd = epoll_create(POLL_SIZE);
struct epoll_event ev;
struct epoll_event *events = NULL;
nfds = epoll_wait(epfd, events, 20, 500);
{
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listener) {
//若是是主socket的事件的话,则表示
//有新链接进入了,进行新链接的处理。
client = accept(listener, (structsockaddr *)&local, &addrlen);
if (client < 0) {
perror("accept");
continue;
}
setnonblocking(client); //将新链接置于非阻塞模式
ev.events = EPOLLIN | EPOLLET; //而且将新链接也加入EPOLL的监听队列。
//注意,这里的参数EPOLLIN|EPOLLET并无设置对写socket的监听,
//若是有写操做的话,这个时候epoll是不会返回事件的,若是要对写操做
//也监听的话,应该是EPOLLIN|EPOLLOUT|EPOLLET
ev.data.fd = client;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev) < 0) {
//设置好event以后,将这个新的event经过epoll_ctl加入到epoll的监听队列里面,
//这里用EPOLL_CTL_ADD来加一个新的epoll事件,经过EPOLL_CTL_DEL来减小一个
//epoll事件,经过EPOLL_CTL_MOD来改变一个事件的监听方式。
fprintf(stderr, "epollsetinsertionerror:fd=%d", client);
return -1;
}
}
else if(event[n].events & EPOLLIN)
{
//若是是已经链接的用户,而且收到数据,
//那么进行读入
int sockfd_r;
if ((sockfd_r = event[n].data.fd) < 0)
continue;
read(sockfd_r, buffer, MAXSIZE);
//修改sockfd_r上要处理的事件为EPOLLOUT
ev.data.fd = sockfd_r;
ev.events = EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)
}
else if(event[n].events & EPOLLOUT)
{
//若是有数据发送
int sockfd_w = events[n].data.fd;
write(sockfd_w, buffer, sizeof(buffer));
//修改sockfd_w上要处理的事件为EPOLLIN
ev.data.fd = sockfd_w;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_w, &ev)
}
do_use_fd(events[n].data.fd);
}
}
复制代码
简单说下流程:
监听到有新链接进入了,进行新链接的处理;
若是是已经链接的用户,而且收到数据,读完以后修改sockfd_r上要处理的事件为EPOLLOUT(可写);
若是有数据发送,写完以后,修改sockfd_w上要处理的事件为EPOLLIN(可读)
基础知识:
文件描述符:
(参考《Unix网络编程》译者的注释)
文件描述符是Unix系统标识文件的int,Unix的哲学一切皆文件,因此各自资源(包括常规意义的文件、目录、管道、POSIX IPC、socket)均可以当作文件。
Java NIO的世界中,Selector是中央控制器,Buffer是承载数据的容器,而Channel能够说是最基础的门面,它是本地I/O设备、网络I/O的通讯桥梁。
网络I/O设备:
DatagramChannel:读写UDP通讯的数据,对应DatagramSocket类
SocketChannel:读写TCP通讯的数据,对应Socket类
ServerSocketChannel:监听新的TCP链接,而且会建立一个可读写的SocketChannel,对应ServerSocket类
本地I/O设备:
FileChannel:读写本地文件的数据,不支持Selector控制,对应File类
ServerSocketChannel与ServerSocket同样是socket监听器,其主要区别前者能够运行在非阻塞模式下运行;
// 建立一个ServerSocketChannel,将会关联一个未绑定的ServerSocket
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
复制代码
ServerSocketChannel的建立也是依赖底层操做系统实现,其实现类主要是ServerSocketChannelImpl,咱们来看看其构造方法
ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
super(var1);
// 建立一个文件操做符
this.fd = Net.serverSocket(true);
// 获得文件操做符是索引
this.fdVal = IOUtil.fdVal(this.fd);
this.state = 0;
}
复制代码
新建一个ServerSocketChannelImpl其本质是在底层操做系统建立了一个fd(即文件描述符),至关于创建了一个用于网络通讯的通道,调用socket的bind()方法绑定,经过accept()调用操做系统获取TCP链接
public SocketChannel accept() throws IOException {
// 忽略一些校验及无关代码
....
SocketChannelImpl var2 = null;
// var3的做用主要是说明当前的IO状态,主要有
/** * EOF = -1; * UNAVAILABLE = -2; * INTERRUPTED = -3; * UNSUPPORTED = -4; * THROWN = -5; * UNSUPPORTED_CASE = -6; */
int var3 = 0;
// 这里本质也是用fd来获取链接
FileDescriptor var4 = new FileDescriptor();
// 用来存储TCP链接的地址信息
InetSocketAddress[] var5 = new InetSocketAddress[1];
try {
// 这里设置了一个中断器,中断时会将链接关闭
this.begin();
// 这里当IO被中断时,会从新获取链接
do {
var3 = this.accept(this.fd, var4, var5);
} while(var3 == -3 && this.isOpen());
}finally {
// 当链接被关闭且accept失败时或抛出AsynchronousCloseException
this.end(var3 > 0);
// 验证链接是可用的
assert IOStatus.check(var3);
}
if (var3 < 1) {
return null;
} {
// 默认链接是阻塞的
IOUtil.configureBlocking(var4, true);
// 建立一个SocketChannel的引用
var2 = new SocketChannelImpl(this.provider(), var4, var5[0]);
// 下面是是否链接成功校验,这里忽略...
return var2;
}
}
// 依赖底层操做系统实现的accept0方法
private int accept(FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException {
return this.accept0(var1, var2, var3);
}
复制代码
用于读写TCP通讯的数据,至关于客户端
经过open方法建立SocketChannel,
而后利用connect方法来和服务端发起创建链接,还支持了一些判断链接创建状况的方法;
read和write支持最基本的读写操做
open
public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
// State, increases monotonically
private static final int ST_UNINITIALIZED = -1;
private static final int ST_UNCONNECTED = 0;
private static final int ST_PENDING = 1;
private static final int ST_CONNECTED = 2;
private static final int ST_KILLPENDING = 3;
private static final int ST_KILLED = 4;
private int state = ST_UNINITIALIZED;
SocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
// 建立一个scoket通道,即fd(fd的做用可参考上面的描述)
this.fd = Net.socket(true);
// 获得该fd的索引
this.fdVal = IOUtil.fdVal(fd);
// 设置为未链接
this.state = ST_UNCONNECTED;
}
复制代码
// 代码均来自JDK1.8 部分代码
public boolean connect(SocketAddress var1) throws IOException {
boolean var2 = false;
// 读写都锁住
synchronized(this.readLock) {
synchronized(this.writeLock) {
/****状态检查,channel和address****/
// 判断channel是否open
this.ensureOpenAndUnconnected();
InetSocketAddress var5 = Net.checkAddress(var1);
SecurityManager var6 = System.getSecurityManager();
if (var6 != null) {
var6.checkConnect(var5.getAddress().getHostAddress(), var5.getPort());
}
boolean var10000;
/****链接创建****/
// 阻塞状态变动的锁也锁住
synchronized(this.blockingLock()) {
int var8 = 0;
try {
try {
this.begin();
// 若是当前socket未绑定本地端口,则尝试着判断和服务端是否能创建链接
synchronized(this.stateLock) {
if (!this.isOpen()) {
boolean var10 = false;
return var10;
}
if (this.localAddress == null) {
// 和远程创建链接后关闭链接
NetHooks.beforeTcpConnect(this.fd, var5.getAddress(), var5.getPort());
}
this.readerThread = NativeThread.current();
}
do {
InetAddress var9 = var5.getAddress();
if (var9.isAnyLocalAddress()) {
var9 = InetAddress.getLocalHost();
}
// 创建链接
var8 = Net.connect(this.fd, var9, var5.getPort());
} while(var8 == -3 && this.isOpen());
synchronized(this.stateLock) {
this.remoteAddress = var5;
if (var8 <= 0) {
if (!this.isBlocking()) {
this.state = 1;
} else {
assert false;
}
} else {
this.state = 2;// 链接成功
if (this.isOpen()) {
this.localAddress = Net.localAddress(this.fd);
}
var10000 = true;
return var10000;
}
}
}
var10000 = false;
return var10000;
}
}
}
复制代码
在创建在绑定地址以前,咱们须要调用NetHooks.beforeTcpBind,这个方法是将fd转换为SDP(Sockets Direct Protocol,Java套接字直接协议) socket。SDP须要网卡支持InfiniBand高速网络通讯技术,windows不支持该协议。
咱们来看看在openjdk: src\solaris\classes\sun\net下的NetHooks.java
private static final Provider provider = new sun.net.sdp.SdpProvider();
public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException
{
provider.implBeforeTcpBind(fdObj, address, port);
}
public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException
{
provider.implBeforeTcpConnect(fdObj, address, port);
}
复制代码
能够看到实际是调用的SdpProvider里的implBeforeTcpBind
@Override
public void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
if (enabled)
convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);
}
// converts unbound TCP socket to a SDP socket if it matches the rules
private void convertTcpToSdpIfMatch(FileDescriptor fdObj, Action action, InetAddress address, int port) throws IOException {
boolean matched = false;
// 主要是先经过规则校验器判断入参是否符合,通常有PortRangeRule校验器
// 而后再执行将fd转换为socket
for (Rule rule: rules) {
if (rule.match(action, address, port)) {
SdpSupport.convertSocket(fdObj);
matched = true;
break;
}
}
}
public static void convertSocket(FileDescriptor fd) throws IOException {
...
//获取fd索引
int fdVal = fdAccess.get(fd);
convert0(fdVal);
}
// convert0
JNIEXPORT void JNICALL Java_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, int fd) {
// create方法实际是经过socket(AF_INET_SDP, SOCK_STREAM, 0);方法获得一个socket
int s = create(env);
if (s >= 0) {
socklen_t len;
int arg, res;
struct linger linger;
/* copy socket options that are relevant to SDP */
len = sizeof(arg);
// 重用TIME_WAIT的端口
if (getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0)
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, len);
len = sizeof(arg);
// 紧急数据放入普通数据流
if (getsockopt(fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0)
setsockopt(s, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, len);
len = sizeof(linger);
// 延迟关闭链接
if (getsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0)
setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len);
// 将fd也引用到s所持有的通道
RESTARTABLE(dup2(s, fd), res);
if (res < 0)
JNU_ThrowIOExceptionWithLastError(env, "dup2");
// 执行close方法,关闭s这个引用
RESTARTABLE(close(s), res);
}
}
复制代码
public int read(ByteBuffer var1) throws IOException {
// 省略一些判断
synchronized(this.readLock) {
this.begin();
synchronized(this.stateLock) {
do {
// 经过IOUtil的读取fd的数据至buf
// 这里的nd是SocketDispatcher,用于调用底层的read和write操做
var3 = IOUtil.read(this.fd, var1, -1L, nd);
} while(var3 == -3 && this.isOpen());
// 这个方法主要是将UNAVAILABLE(原为-2)这个状态返回0,不然返回n
var4 = IOStatus.normalize(var3);
var20 = false;
break label367;
}
this.readerCleanup();
assert IOStatus.check(var3);
}
}
}
}
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
if (var1.isReadOnly()) {
throw new IllegalArgumentException("Read-only buffer");
} else if (var1 instanceof DirectBuffer) {
return readIntoNativeBuffer(var0, var1, var2, var4);
} else {
// 临时缓冲区,大小为buf的remain(limit - position),堆外内存,使用ByteBuffer.allocateDirect(size)分配
// Notes:这里分配后后面有个try-finally块会释放该部份内存
ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());
int var7;
try {
// 将网络中的buf读进direct buffer
int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
var5.flip();// 待读取
if (var6 > 0) {
var1.put(var5);// 成功时写入
}
var7 = var6;
} finally {
Util.offerFirstTemporaryDirectBuffer(var5);
}
return var7;
}
}
private static int readIntoNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
// 忽略变量init
if (var2 != -1L) {
// pread方法只有在同步状态下才能使用
var9 = var4.pread(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2);
} else {
// 其调用SocketDispatcher.read方法 -> FileDispatcherImpl.read0方法
var9 = var4.read(var0, ((DirectBuffer)var1).address() + (long)var5, var7);
}
if (var9 > 0) {
var1.position(var5 + var9);
}
return var9;
}
}
// 一样找到openjdk:src\solaris\native\sun\nio\ch
//FileDispatcherImpl.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,
jobject fdo, jlong address, jint len)
{
jint fd = fdval(env, fdo);// 获取fd索引
void *buf = (void *)jlong_to_ptr(address);
// 调用底层read方法
return convertReturnVal(env, read(fd, buf, len), JNI_TRUE);
}
复制代码
初始化一个direct buffer,若是自己的buffer就是direct的则不用初始化
调用底层read方法写入至direct buffer
最终将direct buffer写到传入的buffer对象
看完了前面的read,write整个执行流程基本同样,具体的细节参考以下
public int write(ByteBuffer var1) throws IOException {
if (var1 == null) {
throw new NullPointerException();
} else {
synchronized(this.writeLock) {
this.ensureWriteOpen();
this.begin();
synchronized(this.stateLock) {
if (!this.isOpen()) {
var5 = 0;
var20 = false;
break label310;
}
this.writerThread = NativeThread.current();
}
do {
// 经过IOUtil的读取fd的数据至buf
// 这里的nd是SocketDispatcher,用于调用底层的read和write操做
var3 = IOUtil.write(this.fd, var1, -1L, nd);
} while(var3 == -3 && this.isOpen());
var4 = IOStatus.normalize(var3);
var20 = false;
this.writerCleanup();
assert IOStatus.check(var3);
return var4;
}
}
}
}
static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
if (var1 instanceof DirectBuffer) {
return writeFromNativeBuffer(var0, var1, var2, var4);
} else {
ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
int var10;
try {
// 这里的pos为buf初始的position,意思是将buf重置为最初的状态;由于目前尚未真实的写入到channel中
var8.put(var1);
var8.flip();
var1.position(var5);
// 调用
int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
if (var9 > 0) {
var1.position(var5 + var9);
}
var10 = var9;
} finally {
Util.offerFirstTemporaryDirectBuffer(var8);
}
return var10;
}
}
IOUtil.writeFromNativeBuffer(fd , buf , position , nd)
{
// ... 忽略一些获取buf变量的代码
int written = 0;
if (position != -1) {
// pread方法只有在同步状态下才能使用
written = nd.pwrite(fd ,((DirectBuffer)bb).address() + pos,rem, position);
} else {
// 其调用SocketDispatcher.write方法 -> FileDispatcherImpl.write0方法
written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
}
//....
}
FileDispatcherImpl.write0
{
// 调用底层的write方法写入
return convertReturnVal(env, write(fd, buf, len), JNI_FALSE);
}
}
复制代码
若是buf是direct buffer则直接开始写入,不然须要初始化一个direct buffer,大小是buf的remain
将buf的内容写入到direct buffer中,并恢复buf的position
调用底层的write方法写入至channel
更新buf的position,即被direct buffer读取内容后的position
耐心一点,立刻就到Epoll了
理解了前面的一些基础知识,接下来的部分就会涉及到Java是怎么样来使用epoll的。
Selector的做用是Java NIO中管理一组多路复用的SelectableChannel对象,并可以识别通道是否为诸如读写事件作好准备的组件 --Java doc
Selector的建立过程以下:
// 1.建立Selector
Selector selector = Selector.open();
// 2.将Channel注册到选择器中
// ....... new channel的过程 ....
//Notes:channel要注册到Selector上就必须是非阻塞的,因此FileChannel是不能够
//使用Selector的,由于FileChannel是阻塞的
channel.configureBlocking(false);
// 第二个参数指定了咱们对 Channel 的什么类型的事件感兴趣
SelectionKey key = channel.register(selector , SelectionKey.OP_READ);
// 也可使用或运算|来组合多个事件,例如
SelectionKey key = channel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// 不过值得注意的是,一个 Channel 仅仅能够被注册到一个 Selector 一次,
// 若是将 Channel 注册到 Selector 屡次, 那么其实就是至关于更新 SelectionKey
//的 interest set.
复制代码
①一个Channel在Selector注册其表明的是一个SelectionKey事件,SelectionKey的类型包括:
OP_READ:可读事件;值为:1<<0
OP_WRITE:可写事件;值为:1<<2
OP_CONNECT:客户端链接服务端的事件(tcp链接),通常为建立SocketChannel客户端channel;值为:1<<3
OP_ACCEPT:服务端接收客户端链接的事件,通常为建立ServerSocketChannel服务端channel;值为:1<<4
②一个Selector内部维护了三组keys:
key set:当前channel注册在Selector上全部的key;可调用keys()获取
selected-key set:当前channel就绪的事件;可调用selectedKeys()获取
cancelled-key:主动触发SelectionKey#cancel()方法会放在该集合,前提条件是该channel没有被取消注册;不可经过外部方法调用
③Selector类中总共包含如下10个方法:
open():建立一个Selector对象
isOpen():是不是open状态,若是调用了close()方法则会返回false
provider():获取当前Selector的Provider
keys():如上文所述,获取当前channel注册在Selector上全部的key
selectedKeys():获取当前channel就绪的事件列表
selectNow():获取当前是否有事件就绪,该方法当即返回结果,不会阻塞;若是返回值>0,则表明存在一个或多个
select(long timeout):selectNow的阻塞超时方法,超时时间内,有事件就绪时才会返回;不然超过期间也会返回
select():selectNow的阻塞方法,直到有事件就绪时才会返回
wakeup():调用该方法会时,阻塞在select()处的线程会立马返回;(ps:下面一句划重点)即便当前不存在线程阻塞在select()处,那么下一个执行select()方法的线程也会当即返回结果,至关于执行了一次selectNow()方法
close(): 用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的全部SelectionKey实例无效。channel自己并不会关闭。
谈到Selector就不得不提SelectionKey,二者是紧密关联,配合使用的;如上文所示,往Channel注册Selector会返回一个SelectionKey对象, 这个对象包含了以下内容:
interest set,当前Channel感兴趣的事件集,即在调用register方法设置的interes set
ready set
channel
selector
attached object,可选的附加对象
// 返回当前感兴趣的事件列表
int interestSet = key.interestOps();
// 也可经过interestSet判断其中包含的事件
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
// 能够经过interestOps(int ops)方法修改事件列表
key.interestOps(interestSet | SelectionKey.OP_WRITE);
复制代码
int readySet = key.readyOps();
// 也可经过四个方法来分别判断不一样事件是否就绪
key.isReadable(); //读事件是否就绪
key.isWritable(); //写事件是否就绪
key.isConnectable(); //客户端链接事件是否就绪
key.isAcceptable(); //服务端链接事件是否就绪
复制代码
// 返回当前事件关联的通道,可转换的选项包括:`ServerSocketChannel`和`SocketChannel`
Channel channel = key.channel();
//返回当前事件所关联的Selector对象
Selector selector = key.selector();
复制代码
attached object 咱们能够在selectionKey中附加一个对象,或者在注册时直接附加:
key.attach(theObject);
Object attachedObj = key.attachment();
// 在注册时直接附加
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
复制代码
万丈高楼平地起,基础知识差很少了,了解了这些,能够找一些nio demo或者netty demo练练手。接下来说解本节比较重要的~epoll
前面屡次提到了openjdk,seletor的具体实现确定是跟操做系统有关的,咱们一块儿来看看。
能够看到Selector的实现是SelectorImpl, 而后SelectorImpl又将职责委托给了具体的平台,好比图中的linux2.6 EpollSelectorImpl,windows是WindowsSelectorImpl,MacOSX是KQueueSelectorImpl
根据前面咱们知道,Selector.open()能够获得一个Selector实例,怎么实现的呢?
// Selector.java
public static Selector open() throws IOException {
// 首先找到provider,而后再打开Selector
return SelectorProvider.provider().openSelector();
}
// java.nio.channels.spi.SelectorProvider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 这里就是打开Selector的真正方法
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
复制代码
在openjdk中,每一个操做系统都有一个sun.nio.ch.DefaultSelectorProvider实现,以srcsolaris\classes\sun\nio\ch下的DefaultSelectorProvider为例:
/** * Returns the default SelectorProvider. */
public static SelectorProvider create() {
// 获取OS名称
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
// 根据名称来建立不一样的Selctor
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
复制代码
打开srcsolaris\classes\sun\nio\ch下的EPollSelectorProvider.java
public class EPollSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}
复制代码
Linux平台就获得了最终的Selector实现:srcsolaris\classes\sun\nio\ch下的EPollSelectorImpl.java
来看看它实现的构造器:
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
// makePipe返回管道的2个文件描述符,编码在一个long类型的变量中
// 高32位表明读 低32位表明写
// 使用pipe为了实现Selector的wakeup逻辑
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
// 新建一个EPollArrayWrapper
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
复制代码
\src\solaris\native\sun\nio\ch下的EPollArrayWrapper.c
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this) {
/* * epoll_create expects a size as a hint to the kernel about how to * dimension internal structures. We can't predict the size in advance. */
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
复制代码
调用Selector.select(返回键的数量,多是零)最后会委托给各个实现的doSelect方法,限于篇幅不贴出太详细的,这里看下EpollSelectorImpl的doSelect方法
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
//EPollArrayWrapper pollWrapper
pollWrapper.poll(timeout);//重点在这里
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();// 后面会讲到
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
int poll(long timeout) throws IOException {
updateRegistrations();// 这个代码在下面讲,涉及到epoo_ctl
// 这个epollWait是否是有点熟悉呢?
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
复制代码
看下EPollArrayWrapper.c
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) {
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
//系统调用等待内核事件
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}
复制代码
能够看到在linux中Selector.select()实际上是调用了epoll_wait
JDK中对于注册到Selector上的IO事件关系是使用SelectionKey来表示,表明了Channel感兴趣的事件,如Read,Write,Connect,Accept.
调用Selector.register()时均会将事件存储到EpollArrayWrapper.java的成员变量eventsLow和eventsHigh中
// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用数组保存事件变动, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超过数组长度的事件会缓存到这个map中,等待下次处理
private Map<Integer,Byte> eventsHigh;
/** * Sets the pending update events for the given file descriptor. This * method has no effect if the update events is already set to KILLED, * unless {@code force} is {@code true}. */
private void setUpdateEvents(int fd, byte events, boolean force) {
// 判断fd和数组长度
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
/** * Returns the pending update events for the given file descriptor. */
private byte getUpdateEvents(int fd) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
return eventsLow[fd];
} else {
Byte result = eventsHigh.get(Integer.valueOf(fd));
// result should never be null
return result.byteValue();
}
复制代码
在上面poll代码中涉及到
int poll(long timeout) throws IOException {
updateRegistrations();/
/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
// 从保存的eventsLow和eventsHigh里取出事件
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
if (isRegistered) {
// 判断操做类型以传给epoll_ctl
// 没有指定EPOLLET事件类型
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 熟悉的epoll_ctl
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
private native void epollCtl(int epfd, int opcode, int fd, int events);
复制代码
能够看到epollCtl调用的native方法,咱们进入EpollArrayWrapper.c
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
// epoll_ctl这里就不用多说了吧
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
/*
* A channel may be registered with several Selectors. When each Selector
* is polled a EPOLL_CTL_DEL op will be inserted into its pending update
* list to remove the file descriptor from epoll. The "last" Selector will
* close the file descriptor which automatically unregisters it from each
* epoll descriptor. To avoid costly synchronization between Selectors we
* allow pending updates to be processed, ignoring errors. The errors are
* harmless as the last update for the file descriptor is guaranteed to
* be EPOLL_CTL_DEL.
*/
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}
复制代码
在doSelect方法poll执行后,会更新EpollSelectorImpl.java里的 updateSelectedKeys,就是Selector里的三个set集合,具体可看前面。
/**
*更新已被epoll选择fd的键。
*将就绪兴趣集添加到就绪队列。
*/
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
复制代码
总结
经过本文,你应该知道Channel、Selector基本原理和在Java中怎么使用Epoll的。 (包括更细节的fd与channel和socket之间的转换关系)掌握这些基础知识,再去看NIO、netty网络框架的源码可能就没有那么吃力了。在接下来的文章里我会跟进关于Netty的文章,毕竟这已成为分布式网络通讯框架的主流了!
感谢
zh.wikipedia.org/wiki/Epoll 维基百科
真心感谢帅逼靓女们能看到这里,若是这个文章写得还不错,以为有点东西的话
求点赞👍 求关注❤️ 求分享👥 对8块腹肌的我来讲真的 很是有用!!!
若是本篇博客有任何错误,请批评指教,不胜感激 !❤️❤️❤️❤️