day31 - 协程、I/O

 本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)状况下实现并发,为此咱们须要先回顾下并发的本质:切换+保存状态python

    cpu正在运行一个任务,会在两种状况下切走去执行其余的任务(切换由操做系统强制控制),一种状况是该任务发生了阻塞,另一种状况是该任务计算的时间过长或有一个优先级更高的程序替代了它linux

ps:在介绍进程理论时,说起进程的三种执行状态,而线程才是执行单位,因此也能够将上图理解为线程的三种状态 git

    一:其中第二种状况并不能提高效率,只是为了让cpu可以雨露均沾,实现看起来全部任务都被“同时”执行的效果,若是多个任务都是纯计算的,这种切换反而会下降效率。为此咱们能够基于yield来验证。yield自己就是一种在单线程下能够保存任务运行状态的方法,咱们来简单复习一下:程序员

#1 yiled能够保存状态,yield的状态保存与操做系统的保存线程状态很像,可是yield是代码级别控制的,更轻量级
#2 send能够把一个函数的结果传给另一个函数,以此实现单线程内程序之间的切换  
'''
一、协程:
    单线程实现并发
    在应用程序里控制多个任务的切换+保存状态
    优势:
        应用程序级别速度要远远高于操做系统的切换
    缺点:
        多个任务一旦有一个阻塞没有切,整个线程都阻塞在原地
        该线程内的其余的任务都不能执行了

        一旦引入协程,就须要检测单线程下全部的IO行为,
        实现遇到IO就切换,少一个都不行,觉得一旦一个任务阻塞了,整个线程就阻塞了,
        其余的任务即使是能够计算,可是也没法运行了

二、协程序的目的:
    想要在单线程下实现并发
    并发指的是多个任务看起来是同时运行的
    并发=切换+保存状态
'''

#串行执行
import time

def func1():
    for i in range(10000000):
        i+1

def func2():
    for i in range(10000000):
        i+1

start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)


#基于yield并发执行
import time
def func1():
    while True:
        yield

def func2():
    g=func1()
    for i in range(10000000):
        i+1
        next(g)

start=time.time()
func2()
stop=time.time()
print(stop-start)
单纯地切换反而会下降运行效率

 二:第一种状况的切换。在任务一遇到io状况下,切到任务二去执行,这样就能够利用任务一阻塞的时间完成任务二的计算,效率的提高就在于此。github

import time
def func1():
    while True:
        print('func1')
        yield

def func2():
    g=func1()
    for i in range(10000000):
        i+1
        next(g)
        time.sleep(3)
        print('func2')
start=time.time()
func2()
stop=time.time()
print(stop-start)
yield不能检测IO,实现遇到IO自动切换

  对于单线程下,咱们不可避免程序中出现io操做,但若是咱们能在本身的程序中(即用户程序级别,而非操做系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另一个任务去计算,这样就保证了该线程可以最大限度地处于就绪态,即随时均可以被cpu执行的状态,至关于咱们在用户程序级别将本身的io操做最大限度地隐藏起来,从而能够迷惑操做系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给咱们的线程。web

    协程的本质就是在单线程下,由用户本身控制一个任务遇到io阻塞了就切换另一个任务去执行,以此来提高效率。为了实现它,咱们须要找寻一种能够同时知足如下条件的解决方案:数据库

#1. 能够控制多个任务之间的切换,切换以前将任务的状态保存下来,以便从新运行时,能够基于暂停的位置继续执行。

#2. 做为1的补充:能够检测io操做,在遇到io操做的状况下才发生切换

二 协程介绍

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的。、编程

须要强调的是:数组

#1. python的线程属于内核级别的,即由操做系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其余线程运行)
#2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操做系统)控制切换,以此来提高效率(!!!非io操做的切换与效率无关)

对比操做系统控制线程的切换,用户在单线程内控制协程的切换缓存

优势以下:

#1. 协程的切换开销更小,属于程序级别的切换,操做系统彻底感知不到,于是更加轻量级
#2. 单线程内就能够实现并发的效果,最大限度地利用cpu

缺点以下:

#1. 协程的本质是单线程下,没法利用多核,能够是一个程序开启多个进程,每一个进程内开启多个线程,每一个线程内开启协程
#2. 协程指的是单个线程,于是一旦协程出现阻塞,将会阻塞整个线程

总结协程特色:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里本身保存多个控制流的上下文栈
  4. 附加:一个协程遇到IO操做自动切换到其它协程(如何实现检测IO,yield、greenlet都没法实现,就用到了gevent模块(select机制))

三 Greenlet

若是咱们在单个线程内有20个任务,要想实如今多个任务之间切换,使用yield生成器的方式过于麻烦(须要先获得初始化一次的生成器,而后再调用send。。。很是麻烦),而使用greenlet模块能够很是简单地实现这20个任务直接的切换

#安装
pip3 install greenlet
from greenlet import greenlet

def eat(name):
    print('%s eat 1' %name)
    g2.switch('egon')
    print('%s eat 2' %name)
    g2.switch()
def play(name):
    print('%s play 1' %name)
    g1.switch()
    print('%s play 2' %name)

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('egon')#能够在第一次switch时传入参数,之后都不须要

单纯的切换(在没有io的状况下或者没有重复开辟内存空间的操做),反而会下降程序的执行速度

#顺序执行
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337

#切换
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524
View Code

greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时若是遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提高效率的问题。

单线程里的这20个任务的代码一般会既有计算操做又有阻塞操做,咱们彻底能够在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提升效率,这就用到了Gevent模块。

四 Gevent介绍

#安装
pip3 install gevent

Gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。

#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面能够有多个参数,能够是位置实参或关键字实参,都是传给函数eat的

g2=gevent.spawn(func2)

g1.join() #等待g1结束

g2.join() #等待g2结束

#或者上述两步合做一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

遇到IO阻塞时会自动切换任务

import gevent
def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)


g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('')
View Code

上例gevent.sleep(2)模拟的是gevent能够识别的io阻塞,

而time.sleep(2)或其余的阻塞,gevent是不能直接识别的须要用下面一行代码,打补丁,就能够识别了

from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块以前

或者咱们干脆记忆成:要用gevent,须要将from gevent import monkey;monkey.patch_all()放到文件的开头

from gevent import monkey;monkey.patch_all()

import gevent
import time
def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('')
View Code

咱们能够用threading.current_thread().getName()来查看每一个g1和g2,查看的结果为DummyThread-n,即假线程

五 Gevent之同步与异步

from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == '__main__':
    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()
#上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行全部给定的greenlet。执行流程只会在 全部greenlet执行完后才会继续向下走。
View Code

六 Gevent之应用举例一

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
协程应用:爬虫

七 Gevent之应用举例二

经过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()必定要放到导入socket模块以前,不然gevent没法识别socket的阻塞)

from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#若是不想用money.patch_all()打补丁,能够用gevent自带的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1',8080)

服务端
服务端
#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
客户端
from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM) #套接字对象必定要加到函数内,即局部名称空间内,放在函数外则被全部线程共享,则你们公用一个套接字对象,那么客户端端口永远同样了
    c.connect((server_ip,port))

    count=0
    while True:
        c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
        msg=c.recv(1024)
        print(msg.decode('utf-8'))
        count+=1
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client,args=('127.0.0.1',8080))
        t.start()
多线程并发多个客户端

同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不一样的人给出的答案均可能不一样,好比wiki,就认为asynchronous IO和non-blocking IO是一个东西。这实际上是由于不一样的人的知识背景不一样,而且在讨论这个问题的时候上下文(context)也不相同。因此,为了更好的回答这个问题,我先限定一下本文的上下文。

    本文讨论的背景是Linux环境下的network IO。本文最重要的参考文献是Richard Stevens的“UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking ”,6.2节“I/O Models ”,Stevens在这节中详细说明了各类IO的特色和区别,若是英文够好的话,推荐直接阅读。Stevens的文风是有名的深刻浅出,因此不用担忧看不懂。本文中的流程图也是截取自参考文献。

    Stevens在文章中一共比较了五种IO Model:
    * blocking IO
    * nonblocking IO
    * IO multiplexing
    * signal driven IO
    * asynchronous IO
    由signal driven IO(信号驱动IO)在实际中并不经常使用,因此主要介绍其他四种IO Model。

    再说一下IO发生时涉及的对象和步骤。对于一个network IO (这里咱们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另外一个就是系统内核(kernel)。当一个read操做发生时,该操做会经历两个阶段:

#1)等待数据准备 (Waiting for the data to be ready)
#2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

    记住这两点很重要,由于这些IO模型的区别就是在两个阶段上各有不一样的状况。

    补充:

#一、输入操做:read、readv、recv、recvfrom、recvmsg共5个函数,若是会阻塞状态,则会经理wait data和copy data两个阶段,若是设置为非阻塞则在wait 不到data时抛出异常

#二、输出操做:write、writev、send、sendto、sendmsg共5个函数,在发送缓冲区满了会阻塞在原地,若是设置为非阻塞,则会抛出异常

#三、接收外来连接:accept,与输入操做相似

#四、发起外出连接:connect,与输出操做相似

二 阻塞IO(blocking IO)

    在linux中,默认状况下全部的socket都是blocking,一个典型的读操做流程大概是这样:

    当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来讲,不少时候数据在一开始尚未到达(好比,尚未收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。

    而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,而后kernel返回结果,用户进程才解除block的状态,从新运行起来。
    因此,blocking IO的特色就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

    几乎全部的程序员第一次接触到的网络编程都是从listen()、send()、recv() 等接口开始的,使用这些接口能够很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的。以下图

    ps:所谓阻塞型接口是指系统调用(通常是IO接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用得到结果或者超时出错时才返回。

    实际上,除非特别指定,几乎全部的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将没法执行任何运算或响应任何的网络请求。

    一个简单的解决方案:

#在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每一个链接都拥有独立的线程(或进程),这样任何一个链接的阻塞都不会影响其余的链接。

    该方案的问题是:

#开启多进程或都线程的方式,在遇到要同时响应成百上千路的链接请求,则不管多线程仍是多进程都会严重占据系统资源,下降系统对外界响应效率,
并且线程与进程自己也更容易进入假死状态。

    改进方案:    

#不少程序员可能会考虑使用“线程池”或“链接池”。“线程池”旨在减小建立和销毁线程的频率,其维持必定合理数量的线程,并让空闲的线程从新承担新的执行任务。
“链接池”维持链接的缓存池,尽可能重用已有的链接、减小建立和关闭链接的频率。这两种技术均可以很好的下降系统开销,都被普遍应用不少大型系统,
如websphere、tomcat和各类数据库等。

    改进后方案其实也存在着问题:

#“线程池”和“链接池”技术也只是在必定程度上缓解了频繁调用IO接口带来的资源占用。并且,所谓“池”始终有其上限,当请求大大超过上限时,
“池”构成的系统对外界的响应并不比没有池的时候效果好多少。因此使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

    对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“链接池”或许能够缓解部分压力,可是不能解决全部问题。总之,多线程模型能够方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,能够用非阻塞接口来尝试解决这个问题。

三 非阻塞IO(non-blocking IO)

    Linux下,能够经过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操做时,流程是这个样子:

    从图中能够看出,当用户进程发出read操做时,若是kernel中的数据尚未准备好,那么它并不会block用户进程,而是马上返回一个error。从用户进程角度讲 ,它发起一个read操做后,并不须要等待,而是立刻就获得了一个结果。用户进程判断结果是一个error时,它就知道数据尚未准备好,因而用户就能够在本次到下次再发起read询问的时间间隔内作其余事情,或者直接再次发送read操做。一旦kernel中的数据准备好了,而且又再次收到了用户进程的system call,那么它立刻就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),而后返回。

    也就是说非阻塞的recvform系统调用调用以后,进程并无被阻塞,内核立刻返回给进程,若是数据还没准备好,此时会返回一个error。进程在返回以后,能够干点别的事情,而后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程一般被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。须要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

    因此,在非阻塞式IO中,用户进程实际上是须要不断的主动询问kernel数据准备好了没有。

# 服务端
import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)

server.setblocking(False)
r_list=[]
w_list={}

while 1:
    try:
        conn,addr=server.accept()
        r_list.append(conn)
    except BlockingIOError:
        # 强调强调强调:!!!非阻塞IO的精髓在于彻底没有阻塞!!!
        # time.sleep(0.5) # 打开该行注释纯属为了方便查看效果
        print('在作其余的事情')
        print('rlist: ',len(r_list))
        print('wlist: ',len(w_list))


        # 遍历读列表,依次取出套接字读取内容
        del_rlist=[]
        for conn in r_list:
            try:
                data=conn.recv(1024)
                if not data:
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list[conn]=data.upper()
            except BlockingIOError: # 没有收成功,则继续检索下一个套接字的接收
                continue
            except ConnectionResetError: # 当前套接字出异常,则关闭,而后加入删除列表,等待被清除
                conn.close()
                del_rlist.append(conn)


        # 遍历写列表,依次取出套接字发送内容
        del_wlist=[]
        for conn,data in w_list.items():
            try:
                conn.send(data)
                del_wlist.append(conn)
            except BlockingIOError:
                continue


        # 清理无用的套接字,无需再监听它们的IO操做
        for conn in del_rlist:
            r_list.remove(conn)

        for conn in del_wlist:
            w_list.pop(conn)



#客户端
import socket
import os

client=socket.socket()
client.connect(('127.0.0.1',8083))

while 1:
    res=('%s hello' %os.getpid()).encode('utf-8')
    client.send(res)
    data=client.recv(1024)

    print(data.decode('utf-8'))
非阻塞IO示例

 可是非阻塞IO模型毫不被推荐。

    咱们不可否则其优势:可以在等待任务完成的时间里干其余活了(包括提交其余任务,也就是 “后台” 能够有多个任务在“”同时“”执行)。

    可是也难掩其缺点:

#1. 循环调用recv()将大幅度推高CPU占用率;这也是咱们在代码中留一句time.sleep(2)的缘由,不然在低配主机下极容易出现卡机状况
#2. 任务完成的响应延迟增大了,由于每过一段时间才去轮询一次read操做,而任务可能在两次轮询之间的任意时间完成。这会致使总体数据吞吐量的下降。

    此外,在这个方案中recv()更多的是起到检测“操做是否完成”的做用,实际操做系统提供了更为高效的检测“操做是否完成“做用的接口,例如select()多路复用模式,能够一次检测多个链接是否活跃。

四 多路复用IO(IO multiplexing)

    IO multiplexing这个词可能有点陌生,可是若是我说select/epoll,大概就都能明白了。有些地方也称这种IO方式为事件驱动IO(event driven IO)。咱们都知道,select/epoll的好处就在于单个process就能够同时处理多个网络链接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的全部socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

    当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程。
    这个图和blocking IO的图其实并无太大的不一样,事实上还更差一些。由于这里须要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。可是,用select的优点在于它能够同时处理多个connection。

    强调:

    1. 若是处理的链接数不是很高的话,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优点并非对于单个链接能处理得更快,而是在于能处理更多的链接。

    2. 在多路复用模型中,对于每个socket,通常都设置成为non-blocking,可是,如上图所示,整个用户的process实际上是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

    结论: select的优点在于能够处理多个链接,不适用于单个链接  

#服务端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
server.setblocking(False)
print('starting...')

rlist=[server,]
wlist=[]
wdata={}

while True:
    rl,wl,xl=select.select(rlist,wlist,[],0.5)
    print(wl)
    for sock in rl:
        if sock == server:
            conn,addr=sock.accept()
            rlist.append(conn)
        else:
            try:
                data=sock.recv(1024)
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                wlist.append(sock)
                wdata[sock]=data.upper()
            except Exception:
                sock.close()
                rlist.remove(sock)

    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)

#客户端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8093))


while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()
select网络IO模型

select监听fd变化的过程分析:

#用户进程建立socket对象,拷贝监听的fd到内核空间,每个fd会对应一张系统文件表,内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;
#用户进程再发送系统调用,好比(accept)将内核空间的数据copy到用户空间,同时做为接受数据端内核空间的数据清除,这样从新监听时fd再有新的数据又能够响应到了(发送端由于基于TCP协议因此须要收到应答后才会清除)。

    该模型的优势:

#相比其余模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时可以为多客户端提供服务。若是试图创建一个简单的事件驱动的服务器程序,这个模型有必定的参考价值。

    该模型的缺点:

#首先select()接口并非实现“事件驱动”的最好选择。由于当须要探测的句柄值较大时,select()接口自己须要消耗大量时间去轮询各个句柄。不少操做系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。若是须要实现更高效的服务器程序,相似epoll这样的接口更被推荐。遗憾的是不一样的操做系统特供的epoll接口有很大差别,因此使用相似于epoll的接口实现具备较好跨平台能力的服务器会比较困难。
#其次,该模型将事件探测和事件响应夹杂在一块儿,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

五 异步IO(Asynchronous I/O)

    Linux下的asynchronous IO其实用得很少,从内核2.6版本才开始引入。先看一下它的流程:

    用户进程发起read操做以后,马上就能够开始去作其它的事。而另外一方面,从kernel的角度,当它受到一个asynchronous read以后,首先它会马上返回,因此不会对用户进程产生任何block。而后,kernel会等待数据准备完成,而后将数据拷贝到用户内存,当这一切都完成以后,kernel会给用户进程发送一个signal,告诉它read操做完成了。

六 IO模型比较分析

    到目前为止,已经将四个IO Model都介绍完了。如今回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronous IO和asynchronous IO的区别在哪。
    先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这二者的区别。调用blocking IO会一直block住对应的进程直到操做完成,而non-blocking IO在kernel还准备数据的状况下会马上返回。

    再说明synchronous IO和asynchronous IO的区别以前,须要先给出二者的定义。Stevens给出的定义(实际上是POSIX的定义)是这样子的:
    A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
    An asynchronous I/O operation does not cause the requesting process to be blocked; 
    二者的区别就在于synchronous IO作”IO operation”的时候会将process阻塞。按照这个定义,四个IO模型能够分为两大类,以前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO这一类,而 asynchronous I/O后一类 。

    有人可能会说,non-blocking IO并无被block啊。这里有个很是“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操做,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,若是kernel的数据没有准备好,这时候不会block进程。可是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronous IO则不同,当进程发起IO 操做以后,就直接返回不再理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程当中,进程彻底没有被block。

    各个IO Model的比较如图所示:

    通过上面的介绍,会发现non-blocking IO和asynchronous IO的区别仍是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,可是它仍然要求进程去主动的check,而且当数据准备完成之后,也须要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则彻底不一样。它就像是用户进程将整个IO操做交给了他人(kernel)完成,而后他人作完后发信号通知。在此期间,用户进程不须要去检查IO操做的状态,也不须要主动的去拷贝数据。

七 selectors模块

IO复用:为了解释这个名词,首先来理解下复用这个概念,复用也就是共用的意思,这样理解仍是有些抽象,为此,我们来理解下复用在通讯领域的使用,在通讯领域中为了充分利用网络链接的物理介质,每每在同一条网络链路上采用时分复用或频分复用的技术使其在同一链路上传输多路信号,到这里咱们就基本上理解了复用的含义,即公用某个“介质”来尽量多的作同一类(性质)的事,那IO复用的“介质”是什么呢?为此咱们首先来看看服务器编程的模型,客户端发来的请求服务端会产生一个进程来对其进行服务,每当来一个客户请求就产生一个进程来服务,然而进程不可能无限制的产生,所以为了解决大量客户端访问的问题,引入了IO复用技术,即:一个进程能够同时对多个客户请求进行服务。也就是说IO复用的“介质”是进程(准确的说复用的是select和poll,由于进程也是靠调用select和poll来实现的),复用一个进程(select和poll)来对多个IO进行服务,虽然客户端发来的IO是并发的可是IO所需的读写数据多数状况下是没有准备好的,所以就能够利用一个函数(select和poll)来监听IO所需的这些数据的状态,一旦IO有数据能够进行读写了,进程就来对这样的IO进行服务。

  

理解完IO复用后,咱们在来看下实现IO复用中的三个API(select、poll和epoll)的区别和联系

select,poll,epoll都是IO多路复用的机制,I/O多路复用就是经过一种机制,能够监视多个描述符,一旦某个描述符就绪(通常是读就绪或者写就绪),可以通知应用程序进行相应的读写操做。但select,poll,epoll本质上都是同步I/O,由于他们都须要在读写事件就绪后本身负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需本身负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。三者的原型以下所示:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);



 1.select的第一个参数nfds为fdset集合中最大描述符值加1,fdset是一个位数组,其大小限制为__FD_SETSIZE(1024),位数组的每一位表明其对应的描述符是否须要被检查。第二三四参数表示须要关注读、写、错误事件的文件描述符位数组,这些参数既是输入参数也是输出参数,可能会被内核修改用于标示哪些描述符上发生了关注的事件,因此每次调用select前都须要从新初始化fdset。timeout参数为超时时间,该结构会被内核修改,其值为超时剩余的时间。

 select的调用步骤以下:

(1)使用copy_from_user从用户空间拷贝fdset到内核空间

(2)注册回调函数__pollwait

(3)遍历全部fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据状况会调用到tcp_poll,udp_poll或者datagram_poll)

(4)以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。

(5)__pollwait的主要工做就是把current(当前进程)挂到设备的等待队列中,不一样的设备有不一样的等待队列,对于tcp_poll 来讲,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不表明进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数 据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。

(6)poll方法返回时会返回一个描述读写操做是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。

(7)若是遍历完全部的fd,尚未返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是 current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。若是超过必定的超时时间(schedule_timeout 指定),仍是没人唤醒,则调用select的进程会从新被唤醒得到CPU,进而从新遍历fd,判断有没有就绪的fd。

(8)把fd_set从内核空间拷贝到用户空间。

总结下select的几大缺点:

(1)每次调用select,都须要把fd集合从用户态拷贝到内核态,这个开销在fd不少时会很大

(2)同时每次调用select都须要在内核遍历传递进来的全部fd,这个开销在fd不少时也很大

(3)select支持的文件描述符数量过小了,默认是1024

 

2.  poll与select不一样,经过一个pollfd数组向内核传递须要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只须要被初始化一次。

 poll的实现机制与select相似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,而后对pollfd中的每一个描述符进行poll,相比处理fdset来讲,poll效率更高。poll返回后,须要对pollfd中的每一个元素检查其revents值,来得指事件是否发生。

 

3.直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。epoll能够同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,若是咱们没有采起行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,可是代码实现至关复杂。epoll一样只告知那些就绪的文件描述符,并且当咱们调用epoll_wait()得到就绪文件描述符时,返回的不是实际的描述符,而是一个表明就绪描述符数量的值,你只须要去epoll指定的一个数组中依次取得相应数量的文件描述符便可,这里也使用了内存映射(mmap)技术,这样便完全省掉了这些文件描述符在系统调用时复制的开销。另外一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用必定的方法后,内核才对全部监视的文件描述符进行扫描,而epoll事先经过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用相似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便获得通知。

 

epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此以前,咱们先看一下epoll 和select和poll的调用接口上的不一样,select和poll都只提供了一个函数——select或者poll函数。而epoll提供了三个函 数,epoll_create,epoll_ctl和epoll_wait,epoll_create是建立一个epoll句柄;epoll_ctl是注 册要监听的事件类型;epoll_wait则是等待事件的产生。

  对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定 EPOLL_CTL_ADD),会把全部的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每一个fd在整个过程当中只会拷贝 一次。

  对于第二个缺点,epoll的解决方案不像select或poll同样每次都把current轮流加入fd对应的设备等待队列中,而只在 epoll_ctl时把current挂一遍(这一遍必不可少)并为每一个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调 函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工做实际上就是在这个就绪链表中查看有没有就绪的fd(利用 schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是相似的)。

  对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大能够打开文件的数目,这个数字通常远大于2048,举个例子, 在1GB内存的机器上大约是10万左右,具体数目能够cat /proc/sys/fs/file-max察看,通常来讲这个数目和系统内存关系很大。

总结:

(1)select,poll实现须要本身不断轮询全部fd集合,直到设备就绪,期间可能要睡眠和唤醒屡次交替。而epoll其实也须要调用 epoll_wait不断轮询就绪链表,期间也可能屡次睡眠和唤醒交替,可是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在 epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,可是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的 时候只要判断一下就绪链表是否为空就好了,这节省了大量的CPU时间,这就是回调机制带来的性能提高。

(2)select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,而且要把current往设备等待队列中挂一次,而epoll只要 一次拷贝,并且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并非设备等待队列,只是一个epoll内 部定义的等待队列),这也能节省很多的开销。
select,poll,epoll
#服务端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #至关于网select的读列表里append了一个文件句柄server_fileobj,而且绑定了一个回调函数accept

while True:
    events=sel.select() #检测全部的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
View Code
相关文章
相关标签/搜索