NIO源码详解

阻塞io和无阻塞io
阻塞io是指jdk1.4以前版本面向流的io,服务端须要对每一个请求创建一堆线程等待请求,而客户端发送请求后,先咨询服务端是否有线程相应,若是没有则会一直等待或者遭到拒 绝请求,若是有的话,客户端会线程会等待请求结束后才继续执行。
当并发量大,然后端服务或客户端处理数据慢时就会产生产生大量线程处于等待中,即上述的阻塞。java

 

无阻塞io是使用单线程或者只使用少许的多线程,每一个链接共用一个线程,当处于等待(没有事件)的时候线程资源能够释放出来处理别的请求,经过事件驱动模型当有accept/read/write等事件发生后通知(唤醒)主线程分配资源来处理相关事件。java.nio.channels.Selector就是在该模型中事件的观察者,能够将多个SocketChannel的事件注册到一个Selector上,当没有事件发生时Selector处于阻塞状态,当SocketChannel有accept/read/write等事件发生时唤醒Selector。
 

 这个Selector是使用了单线程模型,主要用来描述事件驱动模型,要优化性能须要一个好的线程模型来使用,目前比较好的nio框架有Netty,apache的mina等。线程模型这块后面再分享,这里重点研究Selector的阻塞和唤醒原理。
退出阻塞的方式有:register在selector上的socketChannel处于就绪状态(放在pollArray中的socketChannel的FD就绪) 或者 第1节中放在pollArray中的wakeupSourceFd就绪。前者(socketChannel)就绪唤醒应证了文章开始的阻塞->事件驱动->唤醒的过程,后者(wakeupSourceFd)就是下面要看的主动wakeup。apache

这里建立了一个管道pipe,并对pipe的source端的POLLIN事件感兴趣,addWakeupSocket方法将source的POLLIN事件标识为感兴趣的,当sink端有数据写入时,source对应的文件描述描wakeupSourceFd就会处于就绪状态。(事实上windows就是经过向管道中写数据来唤醒阻塞的选择器的)从以上代码能够看出:通道的打开其实是构造了一个SelectorImpl对象windows

subSelector.poll() 是select的核心,由native函数poll0实现,readFds、writeFds 和exceptFds数组用来保存底层select的结果,数组的第一个位置都是存放发生事件的socket的总数,其他位置存放发生事件的socket句柄fd

图像详解后端

1,Selector.open()数组

Pipe.open()打开一个管道 拿到wakeupSourceFd和wakeupSinkFd两个文件描述符;把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里; 上图中最下面那部分建立pipe的过程,windows下的实现是建立两个本地的socketChannel,而后链接(连接的过程经过写一个随机long作两个socket的连接校验),两个socketChannel分别实现了管道的source与sink端。
source端由前面提到的WindowsSelectorImpl放到了pollWrapper中(pollWrapper.addWakeupSocket(wakeupSourceFd, 0))数据结构

pollWrapper用Unsafe类申请一块物理内存,存放注册时的socket句柄fdVal和event的数据结构pollfd,其中pollfd共8字节,0~3字节保存socket句柄,4~7字节保存event多线程

先了解一下Unsafe的基本操做并发

//分配var1字节大小的内存,返回起始地址偏移量 
 public native long allocateMemory(long var1);
//从新给var1起始地址的内存分配长度为var3字节大小的内存,返回新的内存起始地址偏移量
 public native long reallocateMemory(long var1, long var3);
 //释放起始地址为var1的内存
 public native void freeMemory(long var1);

pollWrapper申请了一个64个字节的对外内存空间,address为内存空间的开始地址app

PollArrayWrapper pollWrapper = new PollArrayWrapper(8);
PollArrayWrapper(int var1) { int var2 = var1 * SIZE_POLLFD; 
this.pollArray = new AllocatedNativeObject(var2, true); 
this.pollArrayAddress = this.pollArray.address(); this.size = var1; }
protected NativeObject(int var1, boolean var2) {
    if(!var2) {
        this.allocationAddress = unsafe.allocateMemory((long)var1);
        this.address = this.allocationAddress;
    } else {
        int var3 = pageSize();
        long var4 = unsafe.allocateMemory((long)(var1 + var3));
        this.allocationAddress = var4;
        this.address = var4 + (long)var3 - (var4 & (long)(var3 - 1));
    }

}

pollWrapper.addWakeupSocket(wakeupSourceFd, 0),把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里,本次操做pollfd会使用6个字节框架

void addWakeupSocket(int var1, int var2) {
    this.putDescriptor(var2, var1);
    this.putEventOps(var2, Net.POLLIN);
}
void putDescriptor(int var1, int var2) {
    this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
}

void putEventOps(int var1, int var2) {
    this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
}

实际上pollfd的结构

2,Channel.Register()

3,Selector.select()

4,SelectionKey.cancel()

   上图浅蓝色部分

网上找的一些辅助理解图片

相关文章
相关标签/搜索