I/O并发编程总结

本部分的Github地址为:这里php

Introduction

在传统的网络服务器的构建中,IO模式会按照Blocking/Non-Blocking、Synchronous/Asynchronous这两个标准进行分类,其中Blocking与Synchronous基本上一个意思,而NIO与Async的区别在于NIO强调的是Polling(轮询),而Async强调的是Notification(通知)。譬如在一个典型的单进程单线程Socket接口中,阻塞型的接口必须在上一个Socket链接关闭以后才能接入下一个Socket链接。而对于NIO的Socket而言,Server Application会从内核获取到一个特殊的"Would Block"错误信息,可是并不会阻塞到等待发起请求的Socket Client中止。通常来讲,在Linux系统中能够经过调用独立的select或者poll方法来遍历全部读取好的数据,而且进行写操做。而对于异步Socket而言(譬如Windows中的Sockets或者.Net中实现的Sockets模型),Server Application会告诉IO Framework去读取某个Socket数据,在数据读取完毕以后IO Framework会自动地调用你的回调(也就是通知应用程序自己数据已经准备好了)。以IO多路复用中的Reactor与Proactor模型为例,非阻塞的模型是须要应用程序自己处理IO的,而异步模型则是由Kernel或者Framework将数据准备好读入缓冲区中,应用程序直接从缓冲区读取数据。html

总结一下:java

  • 同步阻塞:在此种方式下,用户进程在发起一个IO操做之后,必须等待IO操做的完成,只有当真正完成了IO操做之后,用户进程才能运行。react

  • 同步非阻塞:在此种方式下,用户进程发起一个IO操做之后边可返回作其它事情,可是用户进程须要时不时的询问IO操做是否就绪,这就要求用户进程不停的去询问,从而引入没必要要的CPU资源浪费。linux

  • 异步非阻塞:在此种模式下,用户进程只须要发起一个IO操做而后当即返回,等IO操做真正的完成之后,应用程序会获得IO操做完成的通知,此时用户进程只须要对数据进行处理就行了,不须要进行实际的IO读写操做,由于真正的IO读取或者写入操做已经由内核完成了。git

而在并发IO的问题中,较常见的就是所谓的C10K问题,即有10000个客户端须要连上一个服务器并保持TCP链接,客户端会不定时的发送请求给服务器,服务器收到请求后需及时处理并返回结果。github

BIO

BIO即同步阻塞式IO,是面向流的,阻塞式的,串行的一个过程。对每个客户端的socket链接,都须要一个线程来处理,并且在此期间这个线程一直被占用,直到socket关闭。redis

采用BIO通讯模型的服务端,一般由一个独立的Acceptor线程负责监听客户端的链接,接收到客户端链接以后为客户端链接建立一个新的线程处理请求消息,处理完成以后,返回应答消息给客户端,线程销毁,这就是典型的一请求一应答模型。该架构最大的问题就是不具有弹性伸缩能力,当并发访问量增长后,服务端的线程个数和并发访问数成线性正比,因为线程是JAVA虚拟机很是宝贵的系统资源,当线程数膨胀以后,系统的性能急剧降低,随着并发量的继续增长,可能会发生句柄溢出、线程堆栈溢出等问题,并致使服务器最终宕机。编程

还有一些并非因为并发数增长而致使的系统负载增长:链接服务器的一些客户端,因为网络或者自身性能处理的问题,接收端从socket读取数据的速度跟不上发送端写入数据的速度。 而在TCP/IP网络编程过程当中,已经发送出去的数据依然须要暂存在send buffer,只有收到对方的ack,kernel才从buffer中清除这一部分数据,为后续发送数据腾出空间。接收端将收到的数据暂存在receive buffer中,自动进行确认。但若是socket所在的进程不及时将数据从receive buffer中取出,最终致使receive buffer填满,因为TCP的滑动窗口和拥塞控制,接收端会阻止发送端向其发送数据。segmentfault

做为发送端,服务器因为迟迟不能释放被占用的线程,致使内存占用率不断升高,堆回收的效率愈来愈低,致使Full GC,最终致使服务宕机。

BIO下的多进程/多线程模式

本文初说起,BIO的一个缺陷在于某个Socket在其链接到断上期间会独占线程,那么解决这个问题的一个朴素想法就是利用多进程多线程的办法,便是建立一个新的线程来处理新的链接,这样就保证了并发IO的实现。本节便是对这种思路进行分析。

最先的服务器端程序都是经过多进程、多线程来解决并发IO的问题。进程模型出现的最先,从Unix系统诞生就开始有了进程的概念。最先的服务器端程序通常都是Accept一个客户端链接就建立一个进程,而后子进程进入循环同步阻塞地与客户端链接进行交互,收发处理数据。

多线程模式出现要晚一些,线程与进程相比更轻量,并且线程之间是共享内存堆栈的,因此不一样的线程之间交互很是容易实现。好比聊天室这样的程序,客户端链接之间能够交互,比聊天室中的玩家能够任意的其余人发消息。用多线程模式实现很是简单,线程中能够直接读写某一个客户端链接。而多进程模式就要用到管道、消息队列、共享内存实现数据交互,统称进程间通讯(IPC)复杂的技术才能实现。

多进程/线程模型的流程以下:

  1. 建立一个 socket,绑定服务器端口(bind),监听端口(listen),在PHP中用stream_socket_server一个函数就能完成上面3个步骤,固然也可使用php sockets扩展分别实现。

  2. 进入while循环,阻塞在accept操做上,等待客户端链接进入。此时程序会进入随眠状态,直到有新的客户端发起connect到服务器,操做系统会唤醒此进程。accept函数返回客户端链接的socket

  3. 主进程在多进程模型下经过fork(php: pcntl_fork)建立子进程,多线程模型下使用pthread_create(php: new Thread)建立子线程。下文如无特殊声明将使用进程同时表示进程/线程。

  4. 子进程建立成功后进入while循环,阻塞在recv(php: fread)调用上,等待客户端向服务器发送数据。收到数据后服务器程序进行处理而后使用send(php: fwrite)向客户端发送响应。长链接的服务会持续与客户端交互,而短链接服务通常收到响应就会close。

  5. 当客户端链接关闭时,子进程退出并销毁全部资源。主进程会回收掉此子进程。

Leader-Follow模型

上文描述的多进程/多线程模型最大的问题是,进程/线程建立和销毁的开销很大。因此上面的模式没办法应用于很是繁忙的服务器程序。对应的改进版解决了此问题,这就是经典的Leader-Follower模型。

它的特色是程序启动后就会建立N个进程。每一个子进程进入Accept,等待新的链接进入。当客户端链接到服务器时,其中一个子进程会被唤醒,开始处理客户端请求,而且再也不接受新的TCP链接。当此链接关闭时,子进程会释放,从新进入Accept,参与处理新的链接。这个模型的优点是彻底能够复用进程,没有额外消耗,性能很是好。不少常见的服务器程序都是基于此模型的,好比Apache、PHP-FPM。

多进程模型也有一些缺点。

  1. 这种模型严重依赖进程的数量解决并发问题,一个客户端链接就须要占用一个进程,工做进程的数量有多少,并发处理能力就有多少。操做系统能够建立的进程数量是有限的。

  2. 启动大量进程会带来额外的进程调度消耗。数百个进程时可能进程上下文切换调度消耗占CPU不到1%能够忽略不接,若是启动数千甚至数万个进程,消耗就会直线上升。调度消耗可能占到CPU的百分之几十甚至100%。

另外有一些场景多进程模型没法解决,好比即时聊天程序(IM),一台服务器要同时维持上万甚至几十万上百万的链接(经典的C10K问题),多进程模型就力不从心了。还有一种场景也是多进程模型的软肋。一般Web服务器启动100个进程,若是一个请求消耗100ms,100个进程能够提供1000qps,这样的处理能力仍是不错的。可是若是请求内要调用外网Http接口,像QQ、微博登陆,耗时会很长,一个请求须要10s。那一个进程1秒只能处理0.1个请求,100个进程只能达到10qps,这样的处理能力就太差了。

Unix的5种IO模型

Asynchronous and non-blocking IO

Unix的5种IO模型:阻塞式IO, 非阻塞式IO,IO复用模型,信号驱动式IO和异步IO。

阻塞式IO

blocking_io

非阻塞式IO

nonblocking_io

IO复用(select,poll)

IO多路复用经过把多个IO的阻塞复用到同一个select的阻塞上,从而使得系统在单线程的状况能够同时处理多个客户端请求。 目前支持IO多路复用的系统调用有select,pselect,poll,epoll,在linux网络编程过程当中,很长一段时间都是用select作轮询和网络事件通知,然而select的一些固有缺陷致使了它的应用受到了很大的限制,最终linux不得不载新的内核版本中寻找select的替代方案,最终选择了epoll。

multplexing_io

信号驱动式IO

signal_driven

异步IO

Asynchronous IO refers to an interface where you supply a callback to an IO operation, which is invoked when the operation completes. This invocation often happens to an entirely different thread to the one that originally made the request, but this is not necessarily the case. Asynchronous IO is a manifestation of the "proactor" pattern.

asynchronous_io

并发IO问 题一直是后端编程中的技术挑战,从最先的同步阻塞Fork进程,到多进程/多线程,到如今的异步IO、协程。

IO多路复用

IO多路复用技术通俗阐述,便是由一个线程轮询每一个链接,若是某个链接有请求则处理请求,没有请求则处理下一个链接。首先来看下可读事件与可写事件:

当以下任一状况发生时,会产生套接字的可读事件:

  • 该套接字的接收缓冲区中的数据字节数大于等于套接字接收缓冲区低水位标记的大小;

  • 该套接字的读半部关闭(也就是收到了FIN),对这样的套接字的读操做将返回0(也就是返回EOF);

  • 该套接字是一个监听套接字且已完成的链接数不为0;

  • 该套接字有错误待处理,对这样的套接字的读操做将返回-1。

当以下任一状况发生时,会产生套接字的可写事件:

  • 该套接字的发送缓冲区中的可用空间字节数大于等于套接字发送缓冲区低水位标记的大小;

  • 该套接字的写半部关闭,继续写会产生SIGPIPE信号;

  • 非阻塞模式下,connect返回以后,该套接字链接成功或失败;

  • 该套接字有错误待处理,对这样的套接字的写操做将返回-1。

Reactor模型

Reactor模型在Linux系统中的具体实现便是select/poll/epoll/kqueue,像Redis中便是采用了Reactor模型实现了单进程单线程高并发。Reactor模型的理论基础能够参考reactor-siemens

核心组件

  • Handles:表示操做系统管理的资源,咱们能够理解为fd。

  • Synchronous Event Demultiplexer:同步事件分离器,阻塞等待Handles中的事件发生。

  • Initiation Dispatcher:初始分派器,做用为添加Event handler(事件处理器)、删除Event handler以及分派事件给Event handler。也就是说,Synchronous Event Demultiplexer负责等待新事件发生,事件发生时通知Initiation Dispatcher,而后Initiation Dispatcher调用event handler处理事件。

  • Event Handler:事件处理器的接口

  • Concrete Event Handler:事件处理器的实际实现,并且绑定了一个Handle。由于在实际状况中,咱们每每不止一种事件处理器,所以这里将事件处理器接口和实现分开,与C++、Java这些高级语言中的多态相似。

处理逻辑

Reactor模型的基本的处理逻辑为:

  1. 咱们注册Concrete Event Handler到Initiation Dispatcher中。

  2. Initiation Dispatcher调用每一个Event Handler的get_handle接口获取其绑定的Handle。

  3. Initiation Dispatcher调用handle_events开始事件处理循环。在这里,Initiation Dispatcher会将步骤2获取的全部Handle都收集起来,使用Synchronous Event Demultiplexer来等待这些Handle的事件发生。

  4. 当某个(或某几个)Handle的事件发生时,Synchronous Event Demultiplexer通知Initiation Dispatcher。

  5. Initiation Dispatcher根据发生事件的Handle找出所对应的Handler。

  6. Initiation Dispatcher调用Handler的handle_event方法处理事件。

时序图以下:

抽象来讲,Reactor有4个核心的操做:

  1. add添加socket监听到reactor,能够是listen socket也可使客户端socket,也能够是管道、eventfd、信号等

  2. set修改事件监听,能够设置监听的类型,如可读、可写。可读很好理解,对于listen socket就是有新客户端链接到来了须要accept。对于客户端链接就是收到数据,须要recv。可写事件比较难理解一些。一个SOCKET是有缓存区的,若是要向客户端链接发送2M的数据,一次性是发不出去的,操做系统默认TCP缓存区只有256K。一次性只能发256K,缓存区满了以后send就会返回EAGAIN错误。这时候就要监听可写事件,在纯异步的编程中,必须去监听可写才能保证send操做是彻底非阻塞的。

  3. del从reactor中移除,再也不监听事件

  4. callback就是事件发生后对应的处理逻辑,通常在add/set时制定。C语言用函数指针实现,JS能够用匿名函数,PHP能够用匿名函数、对象方法数组、字符串函数名。

Reactor只是一个事件发生器,实际对socket句柄的操做,如connect/accept、send/recv、close是在callback中完成的。具体编码可参考下面的伪代码:

Reactor模型还能够与多进程、多线程结合起来用,既实现异步非阻塞IO,又利用到多核。目前流行的异步服务器程序都是这样的方式:如

  • Nginx:多进程Reactor

  • Nginx+Lua:多进程Reactor+协程

  • Golang:单线程Reactor+多线程协程

  • Swoole:多线程Reactor+多进程Worker

协程从底层技术角度看实际上仍是异步IO Reactor模型,应用层自行实现了任务调度,借助Reactor切换各个当前执行的用户态线程,但用户代码中彻底感知不到Reactor的存在。

Proactor模型

Reactor和Proactor模式的主要区别就是真正的读取和写入操做是有谁来完成的,Reactor中须要应用程序本身读取或者写入数据,而 Proactor模式中,应用程序不须要进行实际的读写过程,它只须要从缓存区读取或者写入便可,操做系统会读取缓存区或者写入缓存区到真正的IO设备。Proactor模型的基本处理逻辑以下:

  1. 应用程序初始化一个异步读取操做,而后注册相应的事件处理器,此时事件处理器不关注读取就绪事件,而是关注读取完成事件,这是区别于Reactor的关键。

  2. 事件分离器等待读取操做完成事件。

  3. 在事件分离器等待读取操做完成的时候,操做系统调用内核线程完成读取操做(异步IO都是操做系统负责将数据读写到应用传递进来的缓冲区供应用程序操做,操做系统扮演了重要角色),并将读取的内容放入用户传递过来的缓存区中。这也是区别于Reactor的一点,Proactor中,应用程序须要传递缓存区。

  4. 事件分离器捕获到读取完成事件后,激活应用程序注册的事件处理器,事件处理器直接从缓存区读取数据,而不须要进行实际的读取操做。

Linux NIO

select,poll,epoll都是IO多路复用的机制。IO多路复用就经过一种机制,能够监视多个描述符,一旦某个描述符就绪(通常是读就绪或者写就绪),可以通知程序进行相应的读写操做。但select,poll,epoll本质上都是同步IO,由于他们都须要在读写事件就绪后本身负责进行读写,也就是说这个读写过程是阻塞的,而异步IO则无需本身负责进行读写,异步IO的实现会负责把数据从内核拷贝到用户空间。

select/poll

函数分析

select(int nfds, fd_set *r, fd_set *w, fd_set *e, struct timeval *timeout)
  • maxfdp1表示该进程中描述符的总数。

  • fd_set则是配合select模型的重点数据结构,用来存放描述符的集合。

  • timeout表示select返回须要等待的时间。

对于select(),咱们须要传3个集合,r,w和e。其中,r表示咱们对哪些fd的可读事件感兴趣,w表示咱们对哪些fd的可写事件感兴趣。每一个集合实际上是一个bitmap,经过0/1表示咱们感兴趣的fd。例如,咱们对于fd为6的可读事件感兴趣,那么r集合的第6个bit须要被 设置为1。这个系统调用会阻塞,直到咱们感兴趣的事件(至少一个)发生。调用返回时,内核一样使用这3个集合来存放fd实际发生的事件信息。也就是说,调 用前这3个集合表示咱们感兴趣的事件,调用后这3个集合表示实际发生的事件。

select为最先期的UNIX系统调用,它存在4个问题:1)这3个bitmap有大小限制(FD_SETSIZE,一般为1024);2)因为 这3个集合在返回时会被内核修改,所以咱们每次调用时都须要从新设置;3)咱们在调用完成后须要扫描这3个集合才能知道哪些fd的读/写事件发生了,通常状况下全量集合比较大而实际发生读/写事件的fd比较少,效率比较低下;4)内核在每次调用都须要扫描这3个fd集合,而后查看哪些fd的事件实际发生, 在读/写比较稀疏的状况下一样存在效率问题。

因为存在这些问题,因而人们对select进行了改进,从而有了poll。

poll(struct pollfd *fds, int nfds, int timeout)
struct pollfd {
    int fd;     
    short events;
    short revents;
    }

poll调用须要传递的是一个pollfd结构的数组,调用返回时结果信息也存放在这个数组里面。 pollfd的结构中存放着fd、咱们对该fd感兴趣的事件(events)以及该fd实际发生的事件(revents)。poll传递的不是固定大小的 bitmap,所以select的问题1解决了;poll将感兴趣事件和实际发生事件分开了,所以select的问题2也解决了。但select的问题3和问题4仍然没有解决。

处理逻辑

总的来讲,Select模型的内核的处理逻辑为:

  1. 使用copy_from_user从用户空间拷贝fd_set到内核空间

  2. 注册回调函数__pollwait

  3. 遍历全部fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据状况会调用到tcp_poll,udp_poll或者datagram_poll)

  4. 以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。

  5. __pollwait的主要工做就是把current(当前进程)挂到设备的等待队列中,不一样的设备有不一样的等待队列,对于tcp_poll 来讲,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不表明进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数 据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。

  6. poll方法返回时会返回一个描述读写操做是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。

  7. 若是遍历完全部的fd,尚未返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是 current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。若是超过必定的超时时间(schedule_timeout 指定),仍是没人唤醒,则调用select的进程会从新被唤醒得到CPU,进而从新遍历fd,判断有没有就绪的fd。

  8. 把fd_set从内核空间拷贝到用户空间。

多客户端请求服务端,服务端与各客户端保持长链接而且能接收到各客户端数据大致思路以下:

  1. 初始化readset,而且将服务端监听的描述符添加到readset中去。

  2. 而后select阻塞等待readset集合中是否有描述符可读。

  3. 若是是服务端描述符可读,那么表示有新客户端链接上。经过accept接收客户端的数据,而且将客户端描述符添加到一个数组client中,以便二次遍历的时候使用。

  4. 执行第二次循环,此时经过for循环把client中的有效的描述符都添加到readset中去。

  5. select再次阻塞等待readset集合中是否有描述符可读。

  6. 若是此时已经链接上的某个客户端描述符有数据可读,则进行数据读取。

epoll/kqueue

select不足与epoll中的改进

[select、poll、epoll之间的区别总结[整理]](http://www.cnblogs.com/Anker/...

select与poll问题的关键在于无状态。对于每一次系统调用,内核不会记录下任何信息,因此每次调用都须要重复传递相同信息。总结而言,select/poll模型存在的问题便是每次调用select,都须要把fd集合从用户态拷贝到内核态,这个开销在fd不少时会很大而且每次都须要在内核遍历传递进来的全部的fd,这个开销在fd不少时候也很大。讨论epoll对于select/poll改进的时候,epoll和select和poll的调用接口上的不一样,select和poll都只提供了一个函数——select或者poll函数。而epoll提供了三个函数,epoll_create,epoll_ctl和epoll_wait,epoll_create是建立一个epoll句柄;epoll_ctl是注册要监听的事件类型;epoll_wait则是等待事件的产生。对于上面所说的select/poll的缺点,主要是在epoll_ctl中解决的,每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把全部的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每一个fd在整个过程当中只会拷贝一次。epoll的解决方案不像select或poll同样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把 current挂一遍(这一遍必不可少)并为每一个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会 把就绪的fd加入一个就绪链表)。epoll_wait的工做实际上就是在这个就绪链表中查看有没有就绪的fd(利用 schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是相似的)。

(1)select,poll实现须要本身不断轮询全部fd集合,直到设备就绪,期间可能要睡眠和唤醒屡次交替。而epoll其实也须要调用epoll_wait不断轮询就绪链表,期间也可能屡次睡眠和唤醒交替,可是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,可是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的时候只要判断一下就绪链表是否为空就好了,这节省了大量的CPU时间。这就是回调机制带来的性能提高。

(2)select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,而且要把current往设备等待队列中挂一次,而epoll只要一次拷贝,并且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并非设备等待队列,只是一个epoll内部定义的等待队列)。这也能节省很多的开销。

函数分析

int epoll_create(int size);

建立一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个 参数不一样于select()中的第一个参数,给出最大监听的fd+1的值。须要注意的是,当建立好epoll句柄后,它就是会占用一个fd值,在 linux下若是查看/proc/进程id/fd/,是可以看到这个fd的,因此在使用完epoll后,必须调用close()关闭,不然可能致使fd被 耗尽。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,它不一样与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动做,用三个宏来表示:

  • EPOLL_CTL_ADD:注册新的fd到epfd中;

  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

  • EPOLL_CTL_DEL:从epfd中删除一个fd;

第三个参数是须要监听的fd,第四个参数是告诉内核须要监听什么事,struct epoll_event结构以下:

typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
} epoll_data_t;

struct epoll_event {
    __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
};

events能够是如下几个宏的集合:

  • EPOLLIN :表示对应的文件描述符能够读(包括对端SOCKET正常关闭);

  • EPOLLOUT:表示对应的文件描述符能够写;

  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

  • EPOLLERR:表示对应的文件描述符发生错误;

  • EPOLLHUP:表示对应的文件描述符被挂断;

  • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来讲的。

  • EPOLLONESHOT:只监听一次事件,当监听完此次事件以后,若是还须要继续监听这个socket的话,须要再次把这个socket加入到EPOLL队列里

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等 待事件的产生,相似于select()调用。参数events用来从内核获得事件的集合,maxevents告以内核这个events有多大,这个 maxevents的值不能大于建立epoll_create()时的size,参数timeout是超时时间(毫秒,0会当即返回,-1将不肯定,也有 说法说是永久阻塞)。该函数返回须要处理的事件数目,如返回0表示已超时。

处理逻辑

使用epoll 来实现服务端同时接受多客户端长链接数据时,的大致步骤以下:

(1)使用epoll_create建立一个 epoll 的句柄,下例中咱们命名为epollfd。
(2)使用epoll_ctl把服务端监听的描述符添加到epollfd指定的 epoll 内核事件表中,监听服务器端监听的描述符是否可读。
(3)使用epoll_wait阻塞等待注册的服务端监听的描述符可读事件的发生。
(4)当有新的客户端链接上服务端时,服务端监听的描述符可读,则epoll_wait返回,而后经过accept获取客户端描述符。
(5)使用epoll_ctl把客户端描述符添加到epollfd指定的 epoll 内核事件表中,监听服务器端监听的描述符是否可读。
(6)当客户端描述符有数据可读时,则触发epoll_wait返回,而后执行读取。

几乎全部的epoll模型编码都是基于如下模板:

for( ; ; )
{
    nfds = epoll_wait(epfd,events,20,500);
    for(i=0;i<nfds;++i)
    {
        if(events[i].data.fd==listenfd) //有新的链接
        {
            connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept这个链接
            ev.data.fd=connfd;
            ev.events=EPOLLIN|EPOLLET;
            epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的fd添加到epoll的监听队列中
        }
        else if( events[i].events&EPOLLIN ) //接收到数据,读socket
        {
            n = read(sockfd, line, MAXLINE)) < 0    //读
            ev.data.ptr = md;     //md为自定义类型,添加数据
            ev.events=EPOLLOUT|EPOLLET;
            epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
        }
        else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
        {
            struct myepoll_data* md = (myepoll_data*)events[i].data.ptr;    //取数据
            sockfd = md->fd;
            send( sockfd, md->ptr, strlen((char*)md->ptr), 0 );        //发送数据
            ev.data.fd=sockfd;
            ev.events=EPOLLIN|EPOLLET;
            epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
        }
        else
        {
            //其余的处理
        }
    }
}

Demo

本部分代码实现参考多是最接地气的 IO 多路复用小结

阻塞式网络编程接口

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>

#define SERV_PORT 8031
#define BUFSIZE 1024

int main(void)
{
    int lfd, cfd;
    struct sockaddr_in serv_addr,clin_addr;
    socklen_t clin_len;
    char recvbuf[BUFSIZE];
    int len;

    lfd = socket(AF_INET,SOCK_STREAM,0);

    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(SERV_PORT);

    bind(lfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));

    listen(lfd, 128);

    while(1){
        clin_len = sizeof(clin_addr);
        cfd = accept(lfd, (struct sockaddr *)&clin_addr, &clin_len);
        while(len = read(cfd,recvbuf,BUFSIZE)){
            write(STDOUT_FILENO,recvbuf,len);//把客户端输入的内容输出在终端
            // 只有当客户端输入 stop 就中止当前客户端的链接
            if (strncasecmp(recvbuf,"stop",4) == 0){
                close(cfd);
                break;
            }
        }
    }      
    close(lfd); 
    return 0;
}

编译运行以后,开启两个终端使用命令nc 10.211.55.4 8031(假如服务器的 ip 为 10.211.55.4)。若是首先连上的客户端一直不输入stop加回车,那么第二个客户端输入任何内容都不会被客户端接收。以下图所示

输入abc的是先链接上的,在其输入stop以前,后面链接上的客户端输入123并不会被服务端收到。也就是说一直阻塞在第一个客户端那里。当第一个客户端输入stop以后,服务端才收到第二个客户端的发送过来的数据。

select

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/select.h>
#include <sys/time.h>
#include <string.h>

#define SERV_PORT     8031
#define BUFSIZE       1024
#define FD_SET_SIZE   128

int main(void) {
    int lfd, cfd, maxfd, scokfd, retval;
    struct sockaddr_in serv_addr, clin_addr;

    socklen_t clin_len; // 地址信息结构体大小

    char recvbuf[BUFSIZE];
    int len;

    fd_set read_set, read_set_init;

    int client[FD_SET_SIZE];
    int i;
    int maxi = -1;

    if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        perror("套接字描述符建立失败");
        exit(1);
    }

    int opt = 1;
    setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(SERV_PORT);

    if (bind(lfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) {
        perror("绑定失败");
        exit(1);
    }

    if (listen(lfd, FD_SET_SIZE) == -1) {
        perror("监听失败");
        exit(1);
    }

    maxfd = lfd;

    for (i = 0; i < FD_SET_SIZE; ++i) {
        client[i] = -1;
    }

    FD_ZERO(&read_set_init);
    FD_SET(lfd, &read_set_init);

    while (1) {
        // 每次循环开始时,都初始化 read_set
        read_set = read_set_init;

        // 由于上一步 read_set 已经重置,因此须要已链接上的客户端 fd (由上次循环后产生)从新添加进 read_set
        for (i = 0; i < FD_SET_SIZE; ++i) {
            if (client[i] > 0) {
                FD_SET(client[i], &read_set);
            }
        }

        printf("select 等待\n");
        // 这里会阻塞,直到 read_set 中某一个 fd 有数据可读才返回,注意 read_set 中除了客户端 fd 还有服务端监听的 fd
        retval = select(maxfd + 1, &read_set, NULL, NULL, NULL);
        if (retval == -1) {
            perror("select 错误\n");
        } else if (retval == 0) {
            printf("超时\n");
            continue;
        }
        printf("select 返回\n");

        //------------------------------------------------------------------------------------------------
        // 用 FD_ISSET 来判断 lfd (服务端监听的fd)是否可读。只有当新的客户端链接时,lfd 才可读 
        if (FD_ISSET(lfd, &read_set)) {
            clin_len = sizeof(clin_addr);
            if ((cfd = accept(lfd, (struct sockaddr *) &clin_addr, &clin_len)) == -1) {
                perror("接收错误\n");
                continue;
            }

            for (i = 0; i < FD_SET_SIZE; ++i) {
                if (client[i] < 0) {
                    // 把客户端 fd 放入 client 数组
                    client[i] = cfd;
                    printf("接收client[%d]一个请求来自于: %s:%d\n", i, inet_ntoa(clin_addr.sin_addr), ntohs(clin_addr.sin_port));
                    break;
                }
            }

            // 最大的描述符值也要从新计算
            maxfd = (cfd > maxfd) ? cfd : maxfd;
            // maxi 用于下面遍历全部有效客户端 fd 使用,以避免遍历整个 client 数组
            maxi = (i >= maxi) ? ++i : maxi;
        }
        //------------------------------------------------------------------------------------------------

        for (i = 0; i < maxi; ++i) {
            if (client[i] < 0) {
                continue;
            }

            // 若是客户端 fd 中有数据可读,则进行读取
            if (FD_ISSET(client[i], &read_set)) {
                // 注意:这里没有使用 while 循环读取,若是使用 while 循环读取,则有阻塞在一个客户端了。
                // 可能你会想到若是一次读取不完怎么办?
                // 读取不完时,在循环到 select 时 因为未读完的 fd 还有数据可读,那么当即返回,而后到这里继续读取,原来的 while 循环读取直接提到最外层的 while(1) + select 来判断是否有数据继续可读

                len = read(client[i], recvbuf, BUFSIZE);
                if (len > 0) {
                    write(STDOUT_FILENO, recvbuf, len);
                }else if (len == 0){
                    // 若是在客户端 ctrl+z
                    close(client[i]);
                    printf("clinet[%d] 链接关闭\n", i);
                    FD_CLR(client[i], &read_set);
                    client[i] = -1;
                    break;
                }
            }
        }

    }

    close(lfd);

    return 0;
}

epoll

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <string.h>

#define SERV_PORT           8031
#define MAX_EVENT_NUMBER    1024
#define BUFFER_SIZE         10

/* 将文件描述符 fd 上的 EPOLLIN 注册到 epollfd 指示的 epoll 内核事件表中 */

void addfd(int epollfd, int fd) {
    struct epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
}

void et(struct epoll_event *events, int number, int epollfd, int listenfd) {
    char buf[BUFFER_SIZE];
    for (int i = 0; i < number; ++i) {
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            struct sockaddr_in client_address;
            socklen_t length = sizeof(client_address);
            int connfd = accept(listenfd, (struct sockaddr *) &client_address, &length);
            printf("接收一个请求来自于: %s:%d\n", inet_ntoa(client_address.sin_addr), ntohs(client_address.sin_port));

            addfd(epollfd, connfd);
        } else if (events[i].events & EPOLLIN) {
            /* 这段代码不会被重复触发,因此咱们循环读取数据,以确保把 socket 缓存中的全部数据读取*/
            while (1) {
                memset(buf, '\0', BUFFER_SIZE);
                int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
                if (ret < 0) {
                    /* 对非阻塞 IO ,下面的条件成立表示数据已经所有读取完毕。此后 epoll 就能再次触发 sockfd 上的 EPOLLIN 事件,以驱动下一次读操做 */
                    if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
                        printf("read later\n");
                        break;
                    }
                    close(sockfd);
                    break;
                } else if (ret == 0) {
                    printf("断开一个链接\n");
                    close(sockfd);
                } else {
                    printf("get %d bytes of content: %s\n", ret, buf);
                }
            }
        }
    }
}

int main(void) {
    int lfd, epollfd,ret;
    struct sockaddr_in serv_addr;

    if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        perror("套接字描述符建立失败");
        exit(1);
    }

    int opt = 1;
    setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(SERV_PORT);

    if (bind(lfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) {
        perror("绑定失败");
        exit(1);
    }

    if (listen(lfd, 5) == -1) {
        perror("监听失败");
        exit(1);
    }

    struct epoll_event events[MAX_EVENT_NUMBER];
    if ((epollfd = epoll_create(5)) == -1) {
        perror("建立失败");
        exit(1);
    }

    // 把服务器端 lfd 添加到 epollfd 指定的 epoll 内核事件表中,添加一个 lfd 可读的事件
    addfd(epollfd, lfd);
    while (1) {
        // 阻塞等待新客户端的链接或者客户端的数据写入,返回须要处理的事件数目
        if ((ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1)) < 0) {
            perror("epoll_wait失败");
            exit(1);
        }
        
        et(events, ret, epollfd, lfd);
    }

    close(lfd);
    return 0;
}

相关文章
相关标签/搜索