Python之路,Day9 , IO多路复用(番外篇)

同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?不一样的人在不一样的上下文下给出的答案是不一样的。因此先限定一下本文的上下文。html

本文讨论的背景是Linux环境下的network IO。

一 概念说明

在进行解释以前,首先要说明几个概念:
- 用户空间和内核空间
- 进程切换
- 进程的阻塞
- 文件描述符
- 缓存 I/Opython

 

 

用户空间与内核空间

如今操做系统都是采用虚拟存储器,那么对32位操做系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操做系统的核心是内核,独立于普通的应用程序,能够访问受保护的内存空间,也有访问底层硬件设备的全部权限。为了保证用户进程不能直接操做内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操做系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。linux

 

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复之前挂起的某个进程的执行。这种行为被称为进程切换。所以能够说,任何进程都是在操做系统内核的支持下运行的,是与内核紧密相关的。web

从一个进程的运行转到另外一个进程上运行,这个过程当中通过下面这些变化:
1. 保存处理机上下文,包括程序计数器和其余寄存器。
2. 更新PCB信息。segmentfault

3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
4. 选择另外一个进程执行,并更新其PCB。
5. 更新内存管理的数据结构。
6. 恢复处理机上下文。缓存

总而言之就是很耗资源,具体的能够参考这篇文章:进程切换安全

注:进程控制块(Processing Control Block),是操做系统核心中一种数据结构,主要表示进程状态。其做用是使一个在多道程序环境下不能独立运行的程序(含数据),成为一个能独立运行的基本单位或与其它进程并发执行的进程。或者说,OS是根据PCB来对并发执行的进程进行控制和管理的。 PCB一般是系统内存占用区中的一个连续存区,它存放着操做系统用于描述进程状况及控制进程运行所需的所有信息 网络

进程的阻塞

正在执行的进程,因为期待的某些事件未发生,如请求系统资源失败、等待某种操做的完成、新数据还没有到达或无新工做作等,则由系统自动执行阻塞原语(Block),使本身由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也所以只有处于运行态的进程(得到CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的数据结构

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。并发

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者建立一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写每每会围绕着文件描述符展开。可是文件描述符这一律念每每只适用于UNIX、Linux这样的操做系统。

 

缓存 I/O

缓存 I/O 又被称做标准 I/O,大多数文件系统的默认 I/O 操做都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操做系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操做系统内核的缓冲区中,而后才会从操做系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的缺点:
数据在传输过程当中须要在应用程序地址空间和内核进行屡次数据拷贝操做,这些数据拷贝操做所带来的 CPU 以及内存开销是很是大的。

 

二 IO模式

刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操做系统内核的缓冲区中,而后才会从操做系统内核的缓冲区拷贝到应用程序的地址空间。因此说,当一个read操做发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式由于这两个阶段,linux系统产生了下面五种网络模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)

注:因为signal driven IO在实际中并不经常使用,因此我这只说起剩下的四种IO Model。

 

阻塞 I/O(blocking IO)

在linux中,默认状况下全部的socket都是blocking,一个典型的读操做流程大概是这样:

 

 

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来讲,不少时候数据在一开始尚未到达。好比,尚未收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程须要等待,也就是说数据被拷贝到操做系统内核的缓冲区中是须要一个过程的。而在用户进程这边,整个进程会被阻塞(固然,是进程本身选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,而后kernel返回结果,用户进程才解除block的状态,从新运行起来。

因此,blocking IO的特色就是在IO执行的两个阶段都被block了。

非阻塞 I/O(nonblocking IO)

linux下,能够经过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操做时,流程是这个样子:

 

当用户进程发出read操做时,若是kernel中的数据尚未准备好,那么它并不会block用户进程,而是马上返回一个error。从用户进程角度讲 ,它发起一个read操做后,并不须要等待,而是立刻就获得了一个结果。用户进程判断结果是一个error时,它就知道数据尚未准备好,因而它能够再次发送read操做。一旦kernel中的数据准备好了,而且又再次收到了用户进程的system call,那么它立刻就将数据拷贝到了用户内存,而后返回。

因此,nonblocking IO的特色是用户进程须要不断的主动询问kernel数据好了没有。

I/O 多路复用( IO multiplexing)

IO multiplexing就是咱们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就能够同时处理多个网络链接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的全部socket,当某个socket有数据到达了,就通知用户进程。

 

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程。

因此,I/O 多路复用的特色是经过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就能够返回。

这个图和blocking IO的图其实并无太大的不一样,事实上,还更差一些。由于这里须要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。可是,用select的优点在于它能够同时处理多个connection。

因此,若是处理的链接数不是很高的话,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优点并非对于单个链接能处理得更快,而是在于能处理更多的链接。)

在IO multiplexing Model中,实际中,对于每个socket,通常都设置成为non-blocking,可是,如上图所示,整个用户的process实际上是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

 

异步 I/O(asynchronous IO)

inux下的asynchronous IO其实用得不多。先看一下它的流程:

用户进程发起read操做以后,马上就能够开始去作其它的事。而另外一方面,从kernel的角度,当它受到一个asynchronous read以后,首先它会马上返回,因此不会对用户进程产生任何block。而后,kernel会等待数据准备完成,而后将数据拷贝到用户内存,当这一切都完成以后,kernel会给用户进程发送一个signal,告诉它read操做完成了。

 

总结

blocking和non-blocking的区别

调用blocking IO会一直block住对应的进程直到操做完成,而non-blocking IO在kernel还准备数据的状况下会马上返回。

synchronous IO和asynchronous IO的区别

在说明synchronous IO和asynchronous IO的区别以前,须要先给出二者的定义。POSIX的定义是这样子的:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;

二者的区别就在于synchronous IO作”IO operation”的时候会将process阻塞。按照这个定义,以前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。

有人会说,non-blocking IO并无被block啊。这里有个很是“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操做,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,若是kernel的数据没有准备好,这时候不会block进程。可是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。

而asynchronous IO则不同,当进程发起IO 操做以后,就直接返回不再理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程当中,进程彻底没有被block。

 

各个IO Model的比较如图所示:

 

经过上面的图片,能够发现non-blocking IO和asynchronous IO的区别仍是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,可是它仍然要求进程去主动的check,而且当数据准备完成之后,也须要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则彻底不一样。它就像是用户进程将整个IO操做交给了他人(kernel)完成,而后他人作完后发信号通知。在此期间,用户进程不须要去检查IO操做的状态,也不须要主动的去拷贝数据。

 

三 I/O 多路复用之select、poll、epoll详解

select,poll,epoll都是IO多路复用的机制。I/O多路复用就是经过一种机制,一个进程能够监视多个描述符,一旦某个描述符就绪(通常是读就绪或者写就绪),可以通知程序进行相应的读写操做。但select,poll,epoll本质上都是同步I/O,由于他们都须要在读写事件就绪后本身负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需本身负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。(这里啰嗦下)

select

?
1
select(rlist, wlist, xlist, timeout = None )

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,若是当即返回设为null便可),函数返回。当select函数返回后,能够 经过遍历fdset,来找到就绪的描述符。

select目前几乎在全部的平台上支持,其良好跨平台支持也是它的一个优势。select的一 个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024,能够经过修改宏定义甚至从新编译内核的方式提高这一限制,但 是这样也会形成效率的下降。

poll

?
1
int poll (struct pollfd * fds, unsigned int nfds, int timeout);

不一样与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。

struct pollfd { int fd; /* file descriptor */ short events; /* requested events to watch */ short revents; /* returned events witnessed */ }; 

pollfd结构包含了要监视的event和发生的event,再也不使用select“参数-值”传递的方式。同时,pollfd并无最大数量限制(可是数量过大后性能也是会降低)。 和select函数同样,poll返回后,须要轮询pollfd来获取就绪的描述符。

从上面看,select和poll都须要在返回后,经过遍历文件描述符来获取已经就绪的socket。事实上,同时链接的大量客户端在一时刻可能只有不多的处于就绪状态,所以随着监视的描述符数量的增加,其效率也会线性降低。

  

epoll

epoll是在2.6内核中提出的,是以前的select和poll的加强版本。相对于select和poll来讲,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

一 epoll操做过程

epoll操做过程须要三个接口,分别以下:

 

?
1
2
3
int epoll_create( int size); //建立一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl( int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait( int epfd, struct epoll_event * events, int maxevents, int timeout);

1. int epoll_create(int size);
建立一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不一样于select()中的第一个参数,给出最大监听的fd+1的值,参数size并非限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议
当建立好epoll句柄后,它就会占用一个fd值,在linux下若是查看/proc/进程id/fd/,是可以看到这个fd的,因此在使用完epoll后,必须调用close()关闭,不然可能致使fd被耗尽。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数是对指定描述符fd执行op操做。
- epfd:是epoll_create()的返回值。
- op:表示op操做,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
- fd:是须要监听的fd(文件描述符)
- epoll_event:是告诉内核须要监听什么事

3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核获得事件的集合,maxevents告以内核这个events有多大,这个maxevents的值不能大于建立epoll_create()时的size,参数timeout是超时时间(毫秒,0会当即返回,-1将不肯定,也有说法说是永久阻塞)。该函数返回须要处理的事件数目,如返回0表示已超时。

 

  

#_*_coding:utf-8_*_
__author__ = 'Alex Li'

import socket, logging
import select, errno

logger = logging.getLogger("network-server")

def InitLog():
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler("network-server.log")
    fh.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)

    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)

    logger.addHandler(fh)
    logger.addHandler(ch)


if __name__ == "__main__":
    InitLog()

    try:
        # 建立 TCP socket 做为监听 socket
        listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    except socket.error as  msg:
        logger.error("create socket failed")

    try:
        # 设置 SO_REUSEADDR 选项
        listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    except socket.error as  msg:
        logger.error("setsocketopt SO_REUSEADDR failed")

    try:
        # 进行 bind -- 此处未指定 ip 地址,即 bind 了所有网卡 ip 上
        listen_fd.bind(('', 2003))
    except socket.error as  msg:
        logger.error("bind failed")

    try:
        # 设置 listen 的 backlog 数
        listen_fd.listen(10)
    except socket.error as  msg:
        logger.error(msg)

    try:
        # 建立 epoll 句柄
        epoll_fd = select.epoll()
        # 向 epoll 句柄中注册 监听 socket 的 可读 事件
        epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
    except select.error as  msg:
        logger.error(msg)

    connections = {}
    addresses = {}
    datalist = {}
    while True:
        # epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待
        epoll_list = epoll_fd.poll()

        for fd, events in epoll_list:
            # 若为监听 fd 被激活
            if fd == listen_fd.fileno():
                # 进行 accept -- 得到链接上来 client 的 ip 和 port,以及 socket 句柄
                conn, addr = listen_fd.accept()
                logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno()))
                # 将链接 socket 设置为 非阻塞
                conn.setblocking(0)
                # 向 epoll 句柄中注册 链接 socket 的 可读 事件
                epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET)
                # 将 conn 和 addr 信息分别保存起来
                connections[conn.fileno()] = conn
                addresses[conn.fileno()] = addr
            elif select.EPOLLIN & events:
                # 有 可读 事件激活
                datas = ''
                while True:
                    try:
                        # 从激活 fd 上 recv 10 字节数据
                        data = connections[fd].recv(10)
                        # 若当前没有接收到数据,而且以前的累计数据也没有
                        if not data and not datas:
                            # 从 epoll 句柄中移除该 链接 fd
                            epoll_fd.unregister(fd)
                            # server 侧主动关闭该 链接 fd
                            connections[fd].close()
                            logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
                            break
                        else:
                            # 将接收到的数据拼接保存在 datas 中
                            datas += data
                    except socket.error as  msg:
                        # 在 非阻塞 socket 上进行 recv 须要处理 读穿 的状况
                        # 这里其实是利用 读穿 出 异常 的方式跳到这里进行后续处理
                        if msg.errno == errno.EAGAIN:
                            logger.debug("%s receive %s" % (fd, datas))
                            # 将已接收数据保存起来
                            datalist[fd] = datas
                            # 更新 epoll 句柄中链接d 注册事件为 可写
                            epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
                            break
                        else:
                            # 出错处理
                            epoll_fd.unregister(fd)
                            connections[fd].close()
                            logger.error(msg)
                            break
            elif select.EPOLLHUP & events:
                # 有 HUP 事件激活
                epoll_fd.unregister(fd)
                connections[fd].close()
                logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
            elif select.EPOLLOUT & events:
                # 有 可写 事件激活
                sendLen = 0
                # 经过 while 循环确保将 buf 中的数据所有发送出去
                while True:
                    # 将以前收到的数据发回 client -- 经过 sendLen 来控制发送位置
                    sendLen += connections[fd].send(datalist[fd][sendLen:])
                    # 在所有发送完毕后退出 while 循环
                    if sendLen == len(datalist[fd]):
                        break
                # 更新 epoll 句柄中链接 fd 注册事件为 可读
                epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
            else:
                # 其余 epoll 事件不进行处理
                continue
epoll socket echo server

 

 

 

参考:

https://segmentfault.com/a/1190000003063859

https://my.oschina.net/moooofly/blog/147297

 

原文:http://www.cnblogs.com/alex3714/articles/5876749.html

相关文章
相关标签/搜索