Python学习笔记第十周

目录:html

   1、基础概念python

    一、多进程linux

    二、进程间通讯程序员

    三、进程锁web

    四、进程池编程

    五、协程数组

      a) greenlet缓存

      b) Gevent安全

     六、论事件驱动与异步IO服务器

    七、IO多路复用

    八、Python Select

 

    

1、基础概念:

  一、多进程multiprocessing

  多进程和多线程模块同样,都是一个可使用一个API产生多个进程的模块,多进程提供了本地与远程的并发,经过使用子进程代替线程,多进程比GIL下的多线程更高效,基于这个缘由,多进程模块能够容许开发人员在一个给定的设备上运行多个进程

  备注:python多线程不适合cpu密集型操做的任务,适合io密集型操做的任务

from multiprocessing import Process



import time


def f(name):
    time.sleep(2)
    print('hello', name)



if __name__ == '__main__':
    for i in range(10):
        p = Process(target=f,args=('h_%s' %i, )) #多进程
        p.start()
        p.join() #等待进程结束后才后项执行
View Code

   显示进程ID

from multiprocessing import Process
import os



def info(title):
    print(title)
    print('module name:' , __name__)
    print('parent process:' ,os.getppid()) #获得父进程
    print('process id: ' , os.getpid())     #获得子进程
    print('\n\n')





def f(name):
    info('\033[31;1mfunction f\033[0m')
    print('hello', name)


if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    p = Process(target=f,args=('bob',))
    p.start()
    p.join()
View Code

   

  二、进程间通信

不一样进程间内存是不共享的,要想实现两个进程间的数据交换,可使用如下方法:

    Queue

    使用方法跟thread里的queue差很少,在线程Queue中,一个线程的Queue,另一个进程是访问不到的,意思就是说线程Queue只在线程间传递数据,这里就须要另一个Queue,也就是进程Queue,多进程的queue之间传递数据,从本质上来说,是父进程的queue复制了一个queue给子进程,若是在其中一个queue中放置数据,就至关于pickle了一下放置到中间位置,而后从中间位置在unpickle反序列化到另外一个queue中,因此看上去是一个共享的queue,其实是两个queue之间的pickle与unpickle

    这种通讯形式是这个进程的数据经过queue传递给另一个进程

from multiprocessing import Process, Queue



def f(qa):
    qa.put([42,None,'hello world']) #在子进程中放数据



if __name__ == '__main__':
    q = Queue() #父进程的queue
    p = Process(target=f,args=(q,)) #将父进程的queue传递给子进程
    p.start()
    print(q.get())  #父进程能够get到子进程放置到queue中的数据
    p.join()
View Code 

    Pipe

    pipe在产生后会针对这个对象产生两端,两个进程可取一端来进程通讯,也就是数据的传递

from multiprocessing import Process,Pipe



def f(conn):
    conn.send([42,None,'hello from child1'])
    conn.send([42,None,'hello from child2'])
    print('form parent:', conn.recv())#子进程收父进程的数据
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe() #一个管道就表明了两个方向
    p = Process(target=f, args=(child_conn,)) #将父进程的一端传递进去
    p.start()
    print(parent_conn.recv()) #父进程收子进程的数据
    print(parent_conn.recv())
    parent_conn.send('hello from father!.....') #父进程发送给子进程

    p.join()
View Code

     Manager

    Manager能够实现列表、字典、锁、递归锁等之间的数据共享,同时Manager已经自动加锁了,两个进程之间无法同时写一份数据,Manager与Queue同样,都是同时复制了几份数据,而后完成共享

from multiprocessing import Process,Manager
import os

def f(d,l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)






if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()  #使用专门的语法生成一个可在多个进程之间资源的字典
        l = manager.list(range(5)) #使用专门方法生成一个可在多个进程之间资源共享的列表,默认里面有5个元素
        p_list = [] #记录进程,已方便join使用

        for i in range(10):
            p = Process(target=f,args=(d,l))
            p.start()
            p_list.append(p)
        for res in p_list:#等待进程完毕
            res.join()

        print(d)
        print(l)
View Code

该程序运行结果以下:

[0, 1, 2, 3, 4, 76572]
[0, 1, 2, 3, 4, 76572, 76573, 76574]
[0, 1, 2, 3, 4, 76572, 76573, 76574, 76578, 76575]
[0, 1, 2, 3, 4, 76572, 76573, 76574, 76578, 76575]
[0, 1, 2, 3, 4, 76572, 76573, 76574, 76578, 76575]
[0, 1, 2, 3, 4, 76572, 76573, 76574, 76578, 76575, 76577]
[0, 1, 2, 3, 4, 76572, 76573, 76574, 76578, 76575, 76577, 76576, 76579]
[0, 1, 2, 3, 4, 76572, 76573, 76574, 76578, 76575, 76577, 76576, 76579]
[0, 1, 2, 3, 4, 76572, 76573, 76574, 76578, 76575, 76577, 76576, 76579, 76580, 76581]
[0, 1, 2, 3, 4, 76572, 76573, 76574, 76578, 76575, 76577, 76576, 76579, 76580, 76581]
{76572: 76572, 76573: 76573, 76574: 76574, 76578: 76578, 76575: 76575, 76577: 76577, 76579: 76579, 76576: 76576, 76580: 76580, 76581: 76581}
[0, 1, 2, 3, 4, 76572, 76573, 76574, 76578, 76575, 76577, 76576, 76579, 76580, 76581]

#每一个子进程都共享同一个list与dict数据,在同一份数据中进行添加与修改,能够实现数据共享

 

    三、进程锁

    对于进程而言,若是有屏幕显示操做,没有锁的话可能致使屏显出现错乱,因此进程锁用来保证打印正确

from multiprocessing import Process,Lock

def f1(l,i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()



if __name__ == '__main__':
    lock = Lock()

    for num in range(1000):
        Process(target=f1,args=(l,num)).start()
View Code

 

    四、进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进程,那么程序会等待,直到进程池中有可又用进程为止。

    进程池有两个方法:

  •  apply        表明同步进行,也就是串行
  •  apply_async  表明异步,也就是并行
from multiprocessing import Process,Pool
import time,os

def Foo(i):
    time.sleep(2)
    print('in process:', os.getpid())
    return i+100

def Bar(arg):
    print('-->exec done:',arg)

if __name__ == '__main__':

    pool = Pool(5)  #容许进程池里同时放入5个进程

    for i in range(10):
        pool.apply(func=Foo, args=(i,)) #apply方法表明同步执行,也就是串行

    print('end')

    pool.close()
    pool.join()#进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。
apply
from  multiprocessing import Process,Pool
import time,os

def Foo(i):
    time.sleep(2)
    print('in process:', os.getpid())
    return i+100

def Bar(arg):
    print('-->exec done:',arg,os.getpid())

pool = Pool(3)
print('主进程:', os.getpid())
for i in range(10):
    pool.apply_async(func=Foo, args=(i,),callback=Bar) #callback表示回调函数,表示执行完Foo后会执行Bar

#回调函数是主进程调用的回调函数
print('end')
pool.close()
pool.join()#进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。
python_async

    

    五、协程

    协程,又称微线程,英文名Coroutine。一句话说明什么是协程:协程就是一种用户态的轻量级线程! 

    协程拥有本身的寄存器、上下文和栈。协程调用切换时,将寄存器上下文与栈保存到其余地方,在切换回来的时候恢复先前保存的寄存器上下文和栈,所以:

    协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

    协程的好处:

  •  无需线程上下文切换的开销
  •    无需原子操做锁定及同步的开销
    • 原子操做(atomic operation)是不须要synchronized,所谓原子操做就是说不会被线程调度机制打断的操做,这种操做一旦开始,就一直运行到结束,中间不会有任何context switch(切换到熬另外一个线程),原子操做能够是一个步骤,也能够是多个操做步骤,可是顺序是不能够被打乱,或者切割只执行部分,视做总体是原子性的核心
  •    方便切换控制流,简化编程模型
  •    高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题,因此很适合用于高并发处理

    

    缺点:

  •  没法利用多核资源:协程本质是个单线程,它不能同时将单个CPU的多个核用上,协程须要和进程配合才能运行在多个CPU上,固然咱们平常编程绝大部分应用没有这个必要,触发CPU密集型应用
  •    进行阻塞(Bloking)操做(如IO)会阻塞整个程序

 

 

    协程的标准定义:

  •    必须在只有一个单线程实现并发
  •    修改共享数据不须要加锁
  •    用户程序里本身保存多个控制流的上下文和栈
  •    一个协程遇到IO操做自动切换到其余协程

 

a) greenlet :greenlet仍然是手动切换,gevent达到自动切换的目标

from greenlet import  greenlet


def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()


def test2():
    print(56)
    gr3.switch()
    print(78)
    gr3.switch()

def test3():
    print(910)
    gr1.switch()
    print(1112)



if __name__ == '__main__':
    gr1 = greenlet(test1)#启动一个协程
    gr2 = greenlet(test2)
    gr3 = greenlet(test3)
    gr1.switch()  #手动切换,目前还没到自动切换的地步
View Code
12
56
910
34
78
1112

    b) Gevent

Gevent是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它以C扩展模块形式接入Python的轻量级协程。Greenlet所有运行在主程序进程内部,但它们被协做式调度 

import gevent


def func1():
    print('\033[31;1mfunc 1 执行\033[0m')
    gevent.sleep(2)
    print('\033[31;1mfunc 1 继续执行\033[0m')

def func2():
    print('\033[32;1mfunc 2 执行\033[0m')
    gevent.sleep(3)   #根据gevent的中断时间自动判断并切换时间需求少来执行
    print('\033[32;1mfunc 2 继续执行\033[0m')



if __name__ == '__main__':
    gevent.joinall([
        gevent.spawn(func1),
        gevent.spawn(func2),
    ])
View Code
#结果
func 1 执行
func 2 执行
func 1 继续执行
func 2 继续执行

     备注:在遇到gevent.sleep后不会等待时间超时,会直接切换(遇到IO操做就切换)

    同步与异步对比:

import gevent


def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' %pid)


def synchronous():
    for i in range(1,10):
        task(i)


def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()


print('Asynchronous:')
asynchronous()
View Code

上面的程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn.初始化的greenlet列表存放在数组threads中, 此数组被传给gevnet.joinall

函数,后者阻塞当前流程,并执行全部给定的greenlet,执行流程只会在全部greenlet执行后才会继续向下走

 

    遇到IO阻塞时自动切换任务: 

    经过爬虫的方式来对比实际生产中IO操做,在实际生产中gevent是无法知道程序是否进行IO操做的,须要monkey.path来对程序作标记

from gevent import monkey
import gevent
from urllib.request import urlopen
#默认状况下,gevent是不知道urllib在进行io操做,须要打一个补丁才能让gevent知道urllib的IO操做,补丁就是monkey.path


monkey.patch_all()  #把当前程序的全部的IO操做作上标记,让gevent知道,相似于gevent.sleep()
def f(url):
    print('GET %s' % url)
    resp = urlopen(url)
    data = resp.read()
    f = open('url.html','wb')
    f.write(data)
    print('%d bytes received from %s.' % (len(data), url))





gevent.joinall([
    gevent.spawn(f,'http://www.sina.com.cn'),#加参数可使用这种方式
    gevent.spawn(f,'http://www.tianya.cn'),
    gevent.spawn(f,'http://www.mop.com'),
])

 

  

    经过单线程完成多并发的socket server

 

import gevent

from gevent import socket,monkey
monkey.patch_all()


def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)



def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()
if __name__ == '__main__':
    server(8001)
server端
import socket

HOST = 'localhost'    # The remote host
PORT = 8001           # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"),encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    #print(data)

    print('Received', repr(data))
s.close()
client端

 

 

  六、论事件驱动与异步IO

  一般咱们写服务器处理模型的程序时,有如下几种模型:

  (1)每收到一个请求,建立一个新的进程,来处理该请求

  (2)每收到一个请求,建立一个新的线程,来处理该请求

  (3)每收到一个请求,放入一个事件列表,让主进程经过非阻塞IO方式来处理

  上面的几种方法,可有千秋:

  (1)中因为建立新的进程开销较大,因此会致使服务器性能比较差,但实现简单

  (2)中因为涉及到线程的同步,有可能面临死锁的问题

  (3)中在写应用代码时,逻辑比前面两种复杂

  综合考虑,通常普通认为(3)是大多数网络服务器采用的方式

 

    看图说话讲事件驱动模型

    在UI编程中,经常要对鼠标点击进行响应,首先如何得到鼠标点击呢?

    方式一:建立一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有如下几个缺点:

    一、CPU资源浪费,可能鼠标点击的频率小,可是扫描线程仍是会一直循环,这会形成CPU资源浪费,若是扫描鼠标点击接口是阻塞的呢?

    二、若是是阻塞的,又出现下面这样的问题,若是咱们不但要扫描鼠标点击,还要扫描是否按下,因为扫描鼠标被阻塞了,那么可能永远不会取扫描键盘

    三、若是一个循环须要扫描的设备不少,这会引来响应时间的问题

    全部该方式很是很差

    方式二:事件驱动模型

    目前大部分的UI编程都是事件驱动模型,如不少UI平台都会提供onClick()事件,这个事件就表明鼠标按下事件。事件驱动模型大致思路以下:
    1. 有一个事件(消息)队列;
    2. 鼠标按下时,往这个队列中增长一个点击事件(消息);
    3. 有个循环,不断从队列取出事件,根据不一样的事件,调用不一样的函数,如onClick()、onKeyDown()等;
    4. 事件(消息)通常都各自保存各自的处理函数指针,这样,每一个消息都有独立的处理函数;

    

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特色是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

让咱们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展现了随着时间的推移,这三种模式下程序所作的工做。这个程序有3个任务须要完成,每一个任务都在等待I/O操做时阻塞自身。阻塞在I/O操做上所花费的时间已经用灰色框标示出来了。

      

在单线程同步模型中,任务按照顺序执行。若是某个任务由于I/O而阻塞,其余全部的任务都必须等待,直到它完成以后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。若是任务之间并无互相依赖的关系,但仍然须要互相等待的话这就使得程序没必要要的下降了运行速度。

在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操做系统来管理,在多处理器系统上能够并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其余线程得以继续执行。与完成相似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,由于这类程序不得不经过线程同步机制如锁、可重入函数、线程局部存储或者其余机制来处理线程安全问题,若是实现不当就会致使出现微妙且使人痛不欲生的bug。

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其余昂贵的操做时,注册一个回调到事件循环中,而后当I/O操做完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询全部的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽量的得以执行而不须要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,由于程序员不须要关心线程安全问题。

当咱们面对以下的环境时,事件驱动模型一般是一个好的选择:

  1. 程序中有许多任务,并且…
  2. 任务之间高度独立(所以它们不须要互相通讯,或者等待彼此)并且…
  3. 在等待事件到来时,某些任务会阻塞。

当应用程序须要在任务间共享可变的数据时,这也是一个不错的选择,由于这里不须要采用同步处理。

网络应用程序一般都有上述这些特色,这使得它们可以很好的契合事件驱动编程模型。

 

此处要提出一个问题,就是,上面的事件驱动模型中,只要一遇到IO就注册一个事件,而后主程序就能够继续干其它的事情了,只到io处理完毕后,继续恢复以前中断的任务,这本质上是怎么实现的呢?哈哈,下面咱们就来一块儿揭开这神秘的面纱。。。。

 

  七、IO多路复用

  

  同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?不同的人在不一样的上下文下给出的答案是不一样的。因此先限定一下本文的上下文。

本文讨论的背景是Linux环境下的network IO
  
    一、概念说明
    在进行解释以前,首先要说明几个概念:
    - 用户空间和内核空间
    - 进程切换
    - 进程的阻塞
    - 文件描述符
    - 缓存 I/O

    用户空间与内核空间

    如今操做系统都是采用虚拟存储器,那么对32位操做系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操做系统的核心是内核,独立于普通的应用程序,能够访问受保护的内存空间,也有访问底层硬件设备的全部权限。为了保证用户进程不能直接操做内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操做系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

 

    进程切换

    为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复之前挂起的某个进程的执行。这种行为被称为进程切换。所以能够说,任何进程都是在操做系统内核的支持下运行的,是与内核紧密相关的。

    从一个进程的运行转到另外一个进程上运行,这个过程当中通过下面这些变化:
    1. 保存处理机上下文,包括程序计数器和其余寄存器。
    2. 更新PCB信息。

    3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
    4. 选择另外一个进程执行,并更新其PCB。
    5. 更新内存管理的数据结构。
    6. 恢复处理机上下文。

    总而言之就是很耗资源,具体的能够参考这篇文章:进程切换

    注:进程控制块(Processing Control Block),是操做系统核心中一种数据结构,主要表示进程状态。其做用是使一个在多道程序环境下不能独立运行的程序(含数据),成为一个能独立运行的基本单位或与其它进程并发执行的进程。或者说,OS是根据PCB来对并发执行的进程进行控制和管理的。 PCB一般是系统内存占用区中的一个连续存区,它存放着操做系统用于描述进程状况及控制进程运行所需的所有信息 

    进程的阻塞

    正在执行的进程,因为期待的某些事件未发生,如请求系统资源失败、等待某种操做的完成、新数据还没有到达或无新工做作等,则由系统自动执行阻塞原语(Block),使本身由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也所以只有处于运行态的进程(得到CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的

    文件描述符fd

    文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

    文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者建立一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写每每会围绕着文件描述符展开。可是文件描述符这一律念每每只适用于UNIX、Linux这样的操做系统。

 

    缓存 I/O

    缓存 I/O 又被称做标准 I/O,大多数文件系统的默认 I/O 操做都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操做系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操做系统内核的缓冲区中,而后才会从操做系统内核的缓冲区拷贝到应用程序的地址空间。

    缓存 I/O 的缺点:
    数据在传输过程当中须要在应用程序地址空间和内核进行屡次数据拷贝操做,这些数据拷贝操做所带来的 CPU 以及内存开销是很是大的。

 

    二、IO模式

    

    刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操做系统内核的缓冲区中,而后才会从操做系统内核的缓冲区拷贝到应用程序的地址空间。因此说,当一个read操做发生时,它会经历两个阶段:
    1. 等待数据准备 (Waiting for the data to be ready)
    2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

    正式由于这两个阶段,linux系统产生了下面五种网络模式的方案。
    - 阻塞 I/O(blocking IO)
    - 非阻塞 I/O(nonblocking IO)
    - I/O 多路复用( IO multiplexing)
    - 信号驱动 I/O( signal driven IO)
    - 异步 I/O(asynchronous IO)

    注:因为signal driven IO在实际中并不经常使用,因此我这只说起剩下的四种IO Model。

 

    阻塞 I/O(blocking IO)

    在linux中,默认状况下全部的socket都是blocking,一个典型的读操做流程大概是这样:

 

 

    当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来讲,不少时候数据在一开始尚未到达。好比,尚未收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程须要等待,也就是说数据被拷贝到操做系统内核的缓冲区中是须要一个过程的。而在用户进程这边,整个进程会被阻塞(固然,是进程本身选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,而后kernel返回结果,用户进程才解除block的状态,从新运行起来。

因此,blocking IO的特色就是在IO执行的两个阶段都被block了。

    非阻塞 I/O(nonblocking IO)

    linux下,能够经过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操做时,流程是这个样子:

 

    当用户进程发出read操做时,若是kernel中的数据尚未准备好,那么它并不会block用户进程,而是马上返回一个error。从用户进程角度讲 ,它发起一个read操做后,并不须要等待,而是立刻就获得了一个结果。用户进程判断结果是一个error时,它就知道数据尚未准备好,因而它能够再次发送read操做。一旦kernel中的数据准备好了,而且又再次收到了用户进程的system call,那么它立刻就将数据拷贝到了用户内存,而后返回。

因此,nonblocking IO的特色是用户进程须要不断的主动询问kernel数据好了没有。

    I/O 多路复用( IO multiplexing)

    IO multiplexing就是咱们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就能够同时处理多个网络链接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的全部socket,当某个socket有数据到达了,就通知用户进程。

 

    当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程。

因此,I/O 多路复用的特色是经过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就能够返回。

    这个图和blocking IO的图其实并无太大的不一样,事实上,还更差一些。由于这里须要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。可是,用select的优点在于它能够同时处理多个connection。

    因此,若是处理的链接数不是很高的话,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优点并非对于单个链接能处理得更快,而是在于能处理更多的链接。)

    在IO multiplexing Model中,实际中,对于每个socket,通常都设置成为non-blocking,可是,如上图所示,整个用户的process实际上是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

 

    异步 I/O(asynchronous IO)

    Linux下的asynchronous IO其实用得不多。先看一下它的流程:

    用户进程发起read操做以后,马上就能够开始去作其它的事。而另外一方面,从kernel的角度,当它受到一个asynchronous read以后,首先它会马上返回,因此不会对用户进程产生任何block。而后,kernel会等待数据准备完成,而后将数据拷贝到用户内存,当这一切都完成以后,kernel会给用户进程发送一个signal,告诉它read操做完成了。

  总结

    blocking和non-blocking的区别

 

    调用blocking IO会一直block住对应的进程直到操做完成,而non-blocking IO在kernel还准备数据的状况下会马上返回。

 

    synchronous IO和asynchronous IO的区别

 

    在说明synchronous IO和asynchronous IO的区别以前,须要先给出二者的定义。POSIX的定义是这样子的:
    - A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
    - An asynchronous I/O operation does not cause the requesting process to be blocked;

 

    二者的区别就在于synchronous IO作”IO operation”的时候会将process阻塞。按照这个定义,以前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。

 

    有人会说,non-blocking IO并无被block啊。这里有个很是“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操做,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,若是kernel的数据没有准备好,这时候不会block进程。可是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。

 

    而asynchronous IO则不同,当进程发起IO 操做以后,就直接返回不再理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程当中,进程彻底没有被block。

 

 

 

    各个IO Model的比较如图所示:

 

 

 

 

    经过上面的图片,能够发现non-blocking IO和asynchronous IO的区别仍是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,可是它仍然要求进程去主动的check,而且当数据准备完成之后,也须要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则彻底不一样。它就像是用户进程将整个IO操做交给了他人(kernel)完成,而后他人作完后发信号通知。在此期间,用户进程不须要去检查IO操做的状态,也不须要主动的去拷贝数据。

 

  八、 Python Select

   select 
  select最先于1983年出如今4.2BSD中,它经过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程能够得到这些文件描述符从而进行后续的读写操做。

  select目前几乎在全部的平台上支持,其良好跨平台支持也是它的一个优势,事实上从如今看来,这也是它所剩很少的优势之一。

  select的一个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024,不过能够经过修改宏定义甚至从新编译内核的方式提高这一限制。

  另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增加。同时,因为网络响应时间的延迟使得大量TCP链接处于非活跃状态,但调用select()会对全部socket进行一次线性扫描,因此这也浪费了必定的开销。

  poll 
  poll在1986年诞生于System V Release 3,它和select在本质上没有多大差异,可是poll没有最大文件描述符数量的限制。

  poll和select一样存在一个缺点就是,包含大量文件描述符的数组被总体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增长而线性增大。

  另外,select()和poll()将就绪的文件描述符告诉进程后,若是进程没有对其进行IO操做,那么下次调用select()和poll()的时候将再次报告这些文件描述符,因此它们通常不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

  epoll 
  直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具有了以前所说的一切优势,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

  epoll能够同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,若是咱们没有采起行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,可是代码实现至关复杂。

  epoll一样只告知那些就绪的文件描述符,并且当咱们调用epoll_wait()得到就绪文件描述符时,返回的不是实际的描述符,而是一个表明就绪描述符数量的值,你只须要去epoll指定的一个数组中依次取得相应数量的文件描述符便可,这里也使用了内存映射(mmap)技术,这样便完全省掉了这些文件描述符在系统调用时复制的开销。

  另外一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用必定的方法后,内核才对全部监视的文件描述符进行扫描,而epoll事先经过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用相似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便获得通知。

 

 

  Python select 

  Python的select()方法直接调用操做系统的IO接口,它监控sockets,open files, and pipes(全部带fileno()方法的文件句柄)什么时候变成readable 和writeable, 或者通讯错误,select()使得同时监控多个链接变的简单,而且这比写一个长循环来等待和监控多客户端链接要高效,由于select直接经过操做系统提供的C的网络接口进行操做,而不是经过Python的解释器。

  注意:Using Python’s file objects with select() works for Unix, but is not supported under Windows.

  接下来经过echo server例子要以了解select 是如何经过单进程实现同时处理多个非阻塞的socket链接的

import select
import socket
import sys
import Queue
 
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
 
# Bind the socket to the port
server_address = ('localhost', 10000)
print >>sys.stderr, 'starting up on %s port %s' % server_address
server.bind(server_address)
 
# Listen for incoming connections
server.listen(5)

 

select()方法接收并监控3个通讯列表, 第一个是全部的输入的data,就是指外部发过来的数据,第2个是监控和接收全部要发出去的data(outgoing data),第3个监控错误信息,接下来咱们须要建立2个列表来包含输入和输出信息来传给select().

# Sockets from which we expect to read
inputs = [ server ] # Sockets to which we expect to write outputs = [ ] 

 

全部客户端的进来的链接和数据将会被server的主循环程序放在上面的list中处理,咱们如今的server端须要等待链接可写(writable)以后才能过来,而后接收数据并返回(所以不是在接收到数据以后就马上返回),由于每一个链接要把输入或输出的数据先缓存到queue里,而后再由select取出来再发出去。

Connections are added to and removed from these lists by the server main loop. Since this version of the server is going to wait for a socket to become writable before sending any data (instead of immediately sending the reply), each output connection needs a queue to act as a buffer for the data to be sent through it.

# Outgoing message queues (socket:Queue)
message_queues = {}

The main portion of the server program loops, calling select() to block and wait for network activity.

下面是此程序的主循环,调用select()时会阻塞和等待直到新的链接和数据进来

while inputs: # Wait for at least one of the sockets to be ready for processing print >>sys.stderr, '\nwaiting for the next event' readable, writable, exceptional = select.select(inputs, outputs, inputs) 

 当你把inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,咱们上面将他们分别赋值为readable,writable,exceptional, 全部在readable list中的socket链接表明有数据可接收(recv),全部在writable list中的存放着你能够对其进行发送(send)操做的socket链接,当链接通讯出现error时会把error写到exceptional列表中。

select() returns three new lists, containing subsets of the contents of the lists passed in. All of the sockets in the readable list have incoming data buffered and available to be read. All of the sockets in the writable list have free space in their buffer and can be written to. The sockets returned in exceptional have had an error (the actual definition of “exceptional condition” depends on the platform).

 

Readable list 中的socket 能够有3种可能状态,第一种是若是这个socket是main "server" socket,它负责监听客户端的链接,若是这个main server socket出如今readable里,那表明这是server端已经ready来接收一个新的链接进来了,为了让这个main server能同时处理多个链接,在下面的代码里,咱们把这个main server的socket设置为非阻塞模式。

The “readable” sockets represent three possible cases. If the socket is the main “server” socket, the one being used to listen for connections, then the “readable” condition means it is ready to accept another incoming connection. In addition to adding the new connection to the list of inputs to monitor, this section sets the client socket to not block.

for s in readable:
 
    if s is server:
        # A "readable" server socket is ready to accept a connection
        connection, client_address = s.accept()
        print >>sys.stderr, 'new connection from', client_address
        connection.setblocking(0)
        inputs.append(connection)
 
        # Give the connection a queue for data we want to send
        message_queues[connection] = Queue.Queue()

第二种状况是这个socket是已经创建了的链接,它把数据发了过来,这个时候你就能够经过recv()来接收它发过来的数据,而后把接收到的数据放到queue里,这样你就能够把接收到的数据再传回给客户端了。

The next case is an established connection with a client that has sent data. The data is read with recv(), then placed on the queue so it can be sent through the socket and back to the client.

else:
     data = s.recv(1024)
     if data:
         # A readable client socket has data
         print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
         message_queues[s].put(data)
         # Add output channel for response
         if s not in outputs:
             outputs.append(s)

 

第三种状况就是这个客户端已经断开了,因此你再经过recv()接收到的数据就为空了,因此这个时候你就能够把这个跟客户端的链接关闭了。

A readable socket without data available is from a client that has disconnected, and the stream is ready to be closed.

else:
    # Interpret empty result as closed connection
    print >>sys.stderr, 'closing', client_address, 'after reading no data'
    # Stop listening for input on the connection
    if s in outputs:
        outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,因此这时候若是这个客户端的链接对象还在outputs列表中,就把它删掉
    inputs.remove(s)    #inputs中也删除掉
    s.close()           #把这个链接关闭掉
 
    # Remove message queue
    del message_queues[s]  

对于writable list中的socket,也有几种状态,若是这个客户端链接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,不然就把这个链接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个链接,那就会认为这个链接还处于非活动状态

There are fewer cases for the writable connections. If there is data in the queue for a connection, the next message is sent. Otherwise, the connection is removed from the list of output connections so that the next time through the loop select() does not indicate that the socket is ready to send data.

 
for s in writable:
    try:
        next_msg = message_queues[s].get_nowait()
    except Queue.Empty:
        # No messages waiting so stop checking for writability.
        print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
        outputs.remove(s)
    else:
        print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
        s.send(next_msg)

最后,若是在跟某个socket链接通讯过程当中出了错误,就把这个链接对象在inputs\outputs\message_queue中都删除,再把链接关闭掉

for s in exceptional:
    print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
    # Stop listening for input on the connection
    inputs.remove(s)
    if s in outputs:
        outputs.remove(s)
    s.close()
 
    # Remove message queue
    del message_queues[s]

客户端

下面的这个是客户端程序展现了如何经过select()对socket进行管理并与多个链接同时进行交互,

The example client program uses two sockets to demonstrate how the server with select() manages multiple connections at the same time. The client starts by connecting each TCP/IP socket to the server

import socket
import sys
 
messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', 10000)
 
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]
 
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
    s.connect(server_address)

接下来经过循环经过每一个socket链接给server发送和接收数据。

 
  

Then it sends one pieces of the message at a time via each socket, and reads all responses available after writing new data.

for message in messages:
 
    # Send messages on both sockets
    for s in socks:
        print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
        if not data:
            print >>sys.stderr, 'closing socket', s.getsockname()

最后服务器端的完整代码以下: 

import select
import socket
import sys
import queue
 
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
 
# Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)
 
# Listen for incoming connections
server.listen(5)
 
# Sockets from which we expect to read
inputs = [ server ]
 
# Sockets to which we expect to write
outputs = [ ]
 
message_queues = {}
while inputs:
 
    # Wait for at least one of the sockets to be ready for processing
    print( '\nwaiting for the next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    # Handle inputs
    for s in readable:
 
        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print('new connection from', client_address)
            connection.setblocking(False)
            inputs.append(connection)
 
            # Give the connection a queue for data we want to send
            message_queues[connection] = queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
                print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
                message_queues[s].put(data)
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)
            else:
                # Interpret empty result as closed connection
                print('closing', client_address, 'after reading no data')
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,因此这时候若是这个客户端的链接对象还在outputs列表中,就把它删掉
                inputs.remove(s)    #inputs中也删除掉
                s.close()           #把这个链接关闭掉
 
                # Remove message queue
                del message_queues[s]
    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # No messages waiting so stop checking for writability.
            print('output queue for', s.getpeername(), 'is empty')
            outputs.remove(s)
        else:
            print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
            s.send(next_msg)
    # Handle "exceptional conditions"
    for s in exceptional:
        print('handling exceptional condition for', s.getpeername() )
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
 
        # Remove message queue
        del message_queues[s]
import socket
import sys
 
messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', 10000)
 
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]
 
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
    s.connect(server_address)
 
for message in messages:
 
    # Send messages on both sockets
    for s in socks:
        print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
        if not data:
            print >>sys.stderr, 'closing socket', s.getsockname()
            s.close()

 

epoll

epoll是在2.6内核中提出的,是以前的select和poll的加强版本。相对于select和poll来讲,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

一 epoll操做过程

epoll操做过程须要三个接口,分别以下:

int epoll_create(int size);//建立一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

1. int epoll_create(int size);
建立一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不一样于select()中的第一个参数,给出最大监听的fd+1的值,参数size并非限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议
当建立好epoll句柄后,它就会占用一个fd值,在linux下若是查看/proc/进程id/fd/,是可以看到这个fd的,因此在使用完epoll后,必须调用close()关闭,不然可能致使fd被耗尽。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数是对指定描述符fd执行op操做。
- epfd:是epoll_create()的返回值。
- op:表示op操做,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
- fd:是须要监听的fd(文件描述符)
- epoll_event:是告诉内核须要监听什么事

3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核获得事件的集合,maxevents告以内核这个events有多大,这个maxevents的值不能大于建立epoll_create()时的size,参数timeout是超时时间(毫秒,0会当即返回,-1将不肯定,也有说法说是永久阻塞)。该函数返回须要处理的事件数目,如返回0表示已超时。

import socket, logging
import select, errno

logger = logging.getLogger("network-server")

def InitLog():
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler("network-server.log")
    fh.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)

    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)

    logger.addHandler(fh)
    logger.addHandler(ch)


if __name__ == "__main__":
    InitLog()

    try:
        # 建立 TCP socket 做为监听 socket
        listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    except socket.error as  msg:
        logger.error("create socket failed")

    try:
        # 设置 SO_REUSEADDR 选项
        listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    except socket.error as  msg:
        logger.error("setsocketopt SO_REUSEADDR failed")

    try:
        # 进行 bind -- 此处未指定 ip 地址,即 bind 了所有网卡 ip 上
        listen_fd.bind(('', 2003))
    except socket.error as  msg:
        logger.error("bind failed")

    try:
        # 设置 listen 的 backlog 数
        listen_fd.listen(10)
    except socket.error as  msg:
        logger.error(msg)

    try:
        # 建立 epoll 句柄
        epoll_fd = select.epoll()
        # 向 epoll 句柄中注册 监听 socket 的 可读 事件
        epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
    except select.error as  msg:
        logger.error(msg)

    connections = {}
    addresses = {}
    datalist = {}
    while True:
        # epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待
        epoll_list = epoll_fd.poll()

        for fd, events in epoll_list:
            # 若为监听 fd 被激活
            if fd == listen_fd.fileno():
                # 进行 accept -- 得到链接上来 client 的 ip 和 port,以及 socket 句柄
                conn, addr = listen_fd.accept()
                logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno()))
                # 将链接 socket 设置为 非阻塞
                conn.setblocking(0)
                # 向 epoll 句柄中注册 链接 socket 的 可读 事件
                epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET)
                # 将 conn 和 addr 信息分别保存起来
                connections[conn.fileno()] = conn
                addresses[conn.fileno()] = addr
            elif select.EPOLLIN & events:
                # 有 可读 事件激活
                datas = ''
                while True:
                    try:
                        # 从激活 fd 上 recv 10 字节数据
                        data = connections[fd].recv(10)
                        # 若当前没有接收到数据,而且以前的累计数据也没有
                        if not data and not datas:
                            # 从 epoll 句柄中移除该 链接 fd
                            epoll_fd.unregister(fd)
                            # server 侧主动关闭该 链接 fd
                            connections[fd].close()
                            logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
                            break
                        else:
                            # 将接收到的数据拼接保存在 datas 中
                            datas += data
                    except socket.error as  msg:
                        # 在 非阻塞 socket 上进行 recv 须要处理 读穿 的状况
                        # 这里其实是利用 读穿 出 异常 的方式跳到这里进行后续处理
                        if msg.errno == errno.EAGAIN:
                            logger.debug("%s receive %s" % (fd, datas))
                            # 将已接收数据保存起来
                            datalist[fd] = datas
                            # 更新 epoll 句柄中链接d 注册事件为 可写
                            epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
                            break
                        else:
                            # 出错处理
                            epoll_fd.unregister(fd)
                            connections[fd].close()
                            logger.error(msg)
                            break
            elif select.EPOLLHUP & events:
                # 有 HUP 事件激活
                epoll_fd.unregister(fd)
                connections[fd].close()
                logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
            elif select.EPOLLOUT & events:
                # 有 可写 事件激活
                sendLen = 0
                # 经过 while 循环确保将 buf 中的数据所有发送出去
                while True:
                    # 将以前收到的数据发回 client -- 经过 sendLen 来控制发送位置
                    sendLen += connections[fd].send(datalist[fd][sendLen:])
                    # 在所有发送完毕后退出 while 循环
                    if sendLen == len(datalist[fd]):
                        break
                # 更新 epoll 句柄中链接 fd 注册事件为 可读
                epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
            else:
                # 其余 epoll 事件不进行处理
                continue

 

selectors模块

This module allows high-level and efficient I/O multiplexing, built upon the select module primitives. Users are encouraged to use this module instead, unless they want precise control over the OS-level primitives used.

import selectors
import socket
 
sel = selectors.DefaultSelector()
 
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)
 
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
 
sock = socket.socket()
sock.bind(('localhost', 10000))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
 
while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)
View Code
相关文章
相关标签/搜索