python小白-day8 线程、进程、协程

Python线程

线程是操做系统可以进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运做单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中能够并发多个线程,每条线程并行执行不一样的任务。html

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python
import threading
import time
 
def show(arg):
     time.sleep( 1 )
     print ( 'thread' + str (arg))
 
for i in range ( 10 ):
     t = threading.Thread(target = show, args = (i,))
     t.start()
 
print ( 'main thread stop' )


上述代码建立了10个“前台”线程,而后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。python

更多方法:git

  • start            线程准备就绪,等待CPU调度程序员

  • setName      为线程设置名称github

  • getName      获取线程名称算法

  • setDaemon   设置为后台线程或前台线程(默认)
                       若是是后台线程,主线程执行过程当中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均中止
                        若是是前台线程,主线程执行过程当中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序中止编程

  • join              逐个执行每一个线程,执行完毕后继续往下执行,该方法使得多线程变得无心义windows

  • run              线程被cpu调度后执行Thread类对象的run方法数组

Python GIL(Global Interpreter Lock) 

首先须要明确的一点是GIL并非Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就比如C++是一套语言(语法)标准,可是能够用不一样的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也同样,一样一段代码能够经过CPython,PyPy,Psyco等不一样的Python执行环境来执行。像其中的JPython就没有GIL。然而由于CPython是大部分环境下默认的Python执行环境。因此在不少人的概念里CPython就是Python,也就想固然的把GIL归结为Python语言的缺陷。因此这里要先明确一点:GIL并非Python的特性,Python彻底能够不依赖于GIL。
缓存

线程锁(互斥锁Mutex)

一个进程下能够启动多个线程,多个线程共享父进程的内存空间,也就意味着每一个线程能够访问同一份数据,此时,若是2个线程同时要修改同一份数据,会出现什么情况?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import threading
  
def addNum():
     global num #在每一个线程中都获取这个全局变量
     print ( '--get num:' ,num )
     time.sleep( 1 )
     num  - = 1 #对此公共变量进行-1操做
  
num = 100  #设定一个共享变量
thread_list = []
for i in range ( 100 ):
     t = threading.Thread(target = addNum)
     t.start()
     thread_list.append(t)
  
for t in thread_list: #等待全部线程执行完毕
     t.join()
  
  
print ( 'final num:' , num )


正常来说,这个num结果应该是0, 但在python 2.7上多运行几回,会发现,最后打印出来的num结果不老是0,为何每次运行的结果不同呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操做, 因为2个线程是并发同时运行的,因此2个线程颇有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每一个线程在要修改公共数据时,为了不本身在还没改完的时候别人也来修改此数据,能够给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。 

*注:不要在3.x上运行,不知为何,3.x上的结果老是正确的,多是自动加了锁

加锁版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
import threading
  
def addNum():
     global num #在每一个线程中都获取这个全局变量
     print ( '--get num:' ,num )
     time.sleep( 1 )
     lock.acquire() #修改数据前加锁
     num  - = 1 #对此公共变量进行-1操做
     lock.release() #修改后释放
  
num = 100  #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for i in range ( 100 ):
     t = threading.Thread(target = addNum)
     t.start()
     thread_list.append(t)
  
for t in thread_list: #等待全部线程执行完毕
     t.join()
  
print ( 'final num:' , num )

RLock(递归锁)

说白了就是在一个大锁中还要再包含子锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
 
def run1():
     print ( "grab the first part data" )
     lock.acquire()
     global num
     num + = 1
     lock.release()
     return num
def run2():
     print ( "grab the second part data" )
     lock.acquire()
     global  num2
     num2 + = 1
     lock.release()
     return num2
def run3():
     lock.acquire()
     res = run1()
     print ( '--------between run1 and run2-----' )
     res2 = run2()
     lock.release()
     print (res,res2)
 
if __name__ = = '__main__' :
 
     num,num2 = 0 , 0
     lock = threading.RLock()
     for i in range ( 10 ):
         t = threading.Thread(target = run3)
         t.start()
 
while threading.active_count() ! = 1 :
     print (threading.active_count())
else :
     print ( '----all threads done---' )
     print (num,num2)


Semaphore(信号量)

互斥锁 同时只容许一个线程更改数据,而Semaphore是同时容许必定数量的线程更改数据 ,好比厕全部3个坑,那最多只容许3我的上厕所,后面的人只能等里面有人出来了才能再进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
def run(n):
     semaphore.acquire()
     time.sleep( 1 )
     print ( "run the thread: %s\n" % n)
     semaphore.release()
if __name__ = = '__main__' :
     num = 0
     semaphore  = threading.BoundedSemaphore( 3 ) #最多容许5个线程同时运行
     for i in range ( 20 ):
         t = threading.Thread(target = run,args = (i,))
         t.start()
while threading.active_count() ! = 1 :
     pass #print threading.active_count()
else :
     print ( '----all threads done---' )
     print (num)


event

python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。

  • clear:将“Flag”设置为False

  • set:将“Flag”设置为True

经过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程作交通指挥灯,生成几个线程作车辆,车辆行驶按红灯停,绿灯行的规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
import random
def light():
     if not event.isSet():
         event. set () #wait就不阻塞 #绿灯状态
     count = 0
     while True :
         if count < 10 :
             print ( '\033[42;1m--green light on---\033[0m' )
         elif count < 13 :
             print ( '\033[43;1m--yellow light on---\033[0m' )
         elif count < 20 :
             if event.isSet():
                 event.clear()
             print ( '\033[41;1m--red light on---\033[0m' )
         else :
             count = 0
             event. set () #打开绿灯
         time.sleep( 1 )
         count + = 1
 
def car(n): #no bug version
     while 1 :
         time.sleep( 1 )
         if  event.isSet(): #绿灯
             print ( "car [%s] is running.." % n)
         else :
             print ( "car [%s] is waiting for the red light.." % n)
             event.wait()
 
 
def car2(n):
     while 1 :
         time.sleep(random.randrange( 10 ))
         if  event.isSet(): #绿灯
             print ( "car [%s] is running.." % n)
         else :
             print ( "car [%s] is waiting for the red light.." % n)
 
if __name__ = = '__main__' :
     event = threading.Event()
     Light = threading.Thread(target = light)
     Light.start()
     for i in range ( 3 ):
         t = threading.Thread(target = car,args = (i,))
         t.start()


queue队列 

Python中对队列和线程的操做,须要使用模块:Queue 和 threading。其中,Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,可以在多线程中直接使用。可使用队列来实现线程间的同步。

生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python
 
import queue
import time
import threading
q = queue.Queue()
 
def consumer(num):
     while True :
         time.sleep( 1 )
         print ( 'consumer %s get task:%s' % (num,q.get()))
         q.task_done()
def producer(num):
     count = 1
     while True :
         print ( 'producer %s produced a new task:%s' % (num,count))
         q.put(count)
         count + = 1
         q.join()
         print ( 'all tasks has been consumer by consumers' )
 
 
c1 = threading.Thread(target = consumer,args = [ 1 ,])
c2 = threading.Thread(target = consumer,args = [ 2 ,])
c3 = threading.Thread(target = consumer,args = [ 3 ,])
 
p1 = threading.Thread(target = producer,args = [ 'hetan' ,])
p2 = threading.Thread(target = producer,args = [ 'liuyao' ,])
p3 = threading.Thread(target = producer,args = [ 'xxxx' ,])
 
c1.start()
c2.start()
c3.start()
p1.start()
p2.start()
p3.start()



Python 进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
def f(name):
     time.sleep( 2 )
     print ( 'hello' , name)
 
if __name__ = = '__main__' :
     p = Process(target = f, args = ( 'bob' ,))
     p2 = Process(target = f, args = ( 'bob' ,))
     p.start()
     p2.start()
     p.join()


进程间通信  

不一样进程间内存是不共享的,要想实现两个进程间的数据交换,能够用如下方法:

Queues

使用方法跟threading里的queue差很少

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python
from multiprocessing import Process, Queue
 
def f(q):
     q.put([ 42 , None , 'hello' ])
 
if __name__ = = '__main__' :
     q = Queue()
     p = Process(target = f, args = (q,))
     p.start()
     print (q.get())    # prints "[42, None, 'hello']"
     p.join()


Pipes

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Pipe
 
def f(conn):
     conn.send([ 42 , None , 'hello' ])
     conn.close()
 
if __name__ = = '__main__' :
     parent_conn, child_conn = Pipe()
     p = Process(target = f, args = (child_conn,))
     p.start()
     print (parent_conn.recv())   # prints "[42, None, 'hello']"
     p.join()


Managers

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process, Manager
 
def f(d, l):
     d[ 1 ] = '1'
     d[ '2' ] = 2
     d[ 0.25 ] = None
     l.append( 1 )
     print (l)
 
if __name__ = = '__main__' :
     with Manager() as manager:
         d = manager. dict ()
 
         l = manager. list ( range ( 5 ))
         p_list = []
         for i in range ( 10 ):
             p = Process(target = f, args = (d, l))
             p.start()
             p_list.append(p)
         for res in p_list:
             res.join()
 
         print (d)
         print (l)


进程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process, Lock
  
def f(l, i):
     l.acquire()
     try :
         print ( 'hello world' , i)
     finally :
         l.release()
  
if __name__ = = '__main__' :
     lock = Lock()
  
     for num in range ( 10 ):
         Process(target = f, args = (lock, num)).start()


进程池  

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply              同步,通常不用

  • apply_async   异步,通常用这个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from  multiprocessing import Process,Pool,freeze_support
import time
  
def Foo(i):
     time.sleep( 2 )
     return i + 100
  
def Bar(arg):
     print ( '-->exec done:' ,arg)
if __name__ = = '__main__' :
     freeze_support()   #windows系统执行需加上,不然会报错
  
     pool = Pool( 5 )
  
     for i in range ( 10 ):
         pool.apply_async(func = Foo, args = (i,),callback = Bar)
         #pool.apply(func=Foo, args=(i,))
  
     print ( 'end' )
     pool.close()
     pool.join() #进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。


Python 协程

协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程

协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以:

协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

 

协程的好处:

  • 无需线程上下文切换的开销

  • 无需原子操做锁定及同步的开销

  • 方便切换控制流,简化编程模型

  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。因此很适合用于高并发处理。

缺点:

  • 没法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程须要和进程配合才能运行在多CPU上.固然咱们平常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。

  • 进行阻塞(Blocking)操做(如IO时)会阻塞掉整个程序

使用yield实现协程操做例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
def consumer(name):
     print ( "--->starting eating baozi..." )
     while True :
         new_baozi = yield
         print ( "[%s] is eating baozi %s" % (name,new_baozi))
         time.sleep( 1 )
 
def producer():
 
     r = con.__next__()
     r = con2.__next__()
     n = 0
     while n < 5 :
         n + = 1
         con.send(n)
         con2.send(n)
         print ( "\033[32;1m[producer]\033[0m is making baozi %s" % n )
 
 
if __name__ = = '__main__' :
     con = consumer( "hetan" )
     con2 = consumer( "liuyao" )
     p = producer()


Greenlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/bin/env python
 
 
from greenlet import greenlet
 
def test1():
     print ( 12 )
     gr2.switch()
     print ( 34 )
     gr2.switch()
 
 
def test2():
     print ( 56 )
     gr1.switch()
     print ( 78 )
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()


Gevent 

Gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gevent
 
def foo():
     print ( 'Running in foo' )
     gevent.sleep( 0 )
     print ( 'Explicit context switch to foo again' )
 
def bar():
     print ( 'Explicit context to bar' )
     gevent.sleep( 0 )
     print ( 'Implicit context switch back to bar' )
 
gevent.joinall([
     gevent.spawn(foo),
     gevent.spawn(bar),
])


同步与异步的性能区别 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import gevent
 
def task(pid):
     """
     Some non-deterministic task
     """
     gevent.sleep( 0.5 )
     print ( 'Task %s done' % pid)
 
def synchronous():
     for i in range ( 1 , 10 ):
         task(i)
 
def asynchronous():
     threads = [gevent.spawn(task, i) for i in range ( 10 )]
     gevent.joinall(threads)
 
print ( 'Synchronous:' )
synchronous()
 
print ( 'Asynchronous:' )
asynchronous()

上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行全部给定的greenlet。执行流程只会在 全部greenlet执行完后才会继续向下走。  

遇到IO阻塞时会自动切换任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from gevent import monkey; monkey.patch_all()
import gevent
from  urllib.request import urlopen
 
def f(url):
     print ( 'GET: %s' % url)
     resp = urlopen(url)
     data = resp.read()
     print ( '%d bytes received from %s.' % ( len (data), url))
 
gevent.joinall([
         gevent.spawn(f, 'https://www.python.org/' ),
         gevent.spawn(f, 'https://www.yahoo.com/' ),
         gevent.spawn(f, 'https://github.com/' ),
])


经过gevent实现单线程下的多socket并发

server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/env python
import gevent
 
import time
import sys
from gevent import socket,monkey
import socket
monkey.patch_all()
 
def server(port):
     s = socket.socket()
     s.bind(( '0.0.0.0' ,port))
     s.listen( 300 )
     while True :
         cli,addr = s.accept()
         gevent.spawn(handle_request,cli)
def handle_request(s):
     try :
         while True :
             data = s.recv( 1024 )
             print ( 'recv' ,data.decode())
             s.send(data)
             if not data:
                 s.shutdown(socket.SHUT_WR)
     except Exception as e:
         print (e)
     finally :
         s.close()
if __name__ = = '__main__' :
     server( 8000 )

client端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
import socket
 
Host = 'localhost'
port = 8000
 
s = socket.socket()
s.connect((Host,port))
 
while True :
     msg = bytes( input ( '>>' ),encoding = 'utf8' )
     s.send(msg)
     data = s.recv( 1024 )
 
     print ( 'recevied' , repr (data.decode()))
s.close()

论事件驱动与异步IO

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特色是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

让咱们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展现了随着时间的推移,这三种模式下程序所作的工做。这个程序有3个任务须要完成,每一个任务都在等待I/O操做时阻塞自身。阻塞在I/O操做上所花费的时间已经用灰色框标示出来了。


在单线程同步模型中,任务按照顺序执行。若是某个任务由于I/O而阻塞,其余全部的任务都必须等待,直到它完成以后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。若是任务之间并无互相依赖的关系,但仍然须要互相等待的话这就使得程序没必要要的下降了运行速度。

在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操做系统来管理,在多处理器系统上能够并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其余线程得以继续执行。与完成相似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,由于这类程序不得不经过线程同步机制如锁、可重入函数、线程局部存储或者其余机制来处理线程安全问题,若是实现不当就会致使出现微妙且使人痛不欲生的bug。

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其余昂贵的操做时,注册一个回调到事件循环中,而后当I/O操做完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询全部的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽量的得以执行而不须要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,由于程序员不须要关心线程安全问题。

当咱们面对以下的环境时,事件驱动模型一般是一个好的选择:

  1. 程序中有许多任务,并且…

  2. 任务之间高度独立(所以它们不须要互相通讯,或者等待彼此)并且…

  3. 在等待事件到来时,某些任务会阻塞。

当应用程序须要在任务间共享可变的数据时,这也是一个不错的选择,由于这里不须要采用同步处理。

网络应用程序一般都有上述这些特色,这使得它们可以很好的契合事件驱动编程模型。

首先列一下,sellect、poll、epoll三者的区别 
select 
select最先于1983年出如今4.2BSD中,它经过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程能够得到这些文件描述符从而进行后续的读写操做。

select目前几乎在全部的平台上支持,其良好跨平台支持也是它的一个优势,事实上从如今看来,这也是它所剩很少的优势之一。

select的一个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024,不过能够经过修改宏定义甚至从新编译内核的方式提高这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增加。同时,因为网络响应时间的延迟使得大量TCP链接处于非活跃状态,但调用select()会对全部socket进行一次线性扫描,因此这也浪费了必定的开销。

poll 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差异,可是poll没有最大文件描述符数量的限制。

poll和select一样存在一个缺点就是,包含大量文件描述符的数组被总体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增长而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,若是进程没有对其进行IO操做,那么下次调用select()和poll()的时候将再次报告这些文件描述符,因此它们通常不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具有了以前所说的一切优势,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll能够同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,若是咱们没有采起行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,可是代码实现至关复杂。

epoll一样只告知那些就绪的文件描述符,并且当咱们调用epoll_wait()得到就绪文件描述符时,返回的不是实际的描述符,而是一个表明就绪描述符数量的值,你只须要去epoll指定的一个数组中依次取得相应数量的文件描述符便可,这里也使用了内存映射(mmap)技术,这样便完全省掉了这些文件描述符在系统调用时复制的开销。

另外一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用必定的方法后,内核才对全部监视的文件描述符进行扫描,而epoll事先经过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用相似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便获得通知。

 

 

Python select 

Python的select()方法直接调用操做系统的IO接口,它监控sockets,open files, and pipes(全部带fileno()方法的文件句柄)什么时候变成readable 和writeable, 或者通讯错误,select()使得同时监控多个链接变的简单,而且这比写一个长循环来等待和监控多客户端链接要高效,由于select直接经过操做系统提供的C的网络接口进行操做,而不是经过Python的解释器。

注意:Using Python’s file objects with select() works for Unix, but is not supported under Windows.

接下来经过echo server例子要以了解select 是如何经过单进程实现同时处理多个非阻塞的socket链接的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import select
import socket
import sys
import Queue
  
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking( 0 )
  
# Bind the socket to the port
server_address = ( 'localhost' , 10000 )
print >>sys.stderr, 'starting up on %s port %s' % server_address
server.bind(server_address)
  
# Listen for incoming connections
server.listen( 5 )

select()方法接收并监控3个通讯列表, 第一个是全部的输入的data,就是指外部发过来的数据,第2个是监控和接收全部要发出去的data(outgoing data),第3个监控错误信息,接下来咱们须要建立2个列表来包含输入和输出信息来传给select().

1
2
3
4
5
# Sockets from which we expect to read
inputs = [ server ]
 
# Sockets to which we expect to write
outputs = [ ]

全部客户端的进来的链接和数据将会被server的主循环程序放在上面的list中处理,咱们如今的server端须要等待链接可写(writable)以后才能过来,而后接收数据并返回(所以不是在接收到数据以后就马上返回),由于每一个链接要把输入或输出的数据先缓存到queue里,而后再由select取出来再发出去。

1
2
# Outgoing message queues (socket:Queue)
message_queues = {}

下面是此程序的主循环,调用select()时会阻塞和等待直到新的链接和数据进来

1
2
3
4
5
while inputs:
 
     # Wait for at least one of the sockets to be ready for processing
     print >>sys.stderr, '\nwaiting for the next event'
     readable, writable, exceptional = select.select(inputs, outputs, inputs)

 当你把inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,咱们上面将他们分别赋值为readable,writable,exceptional, 全部在readable list中的socket链接表明有数据可接收(recv),全部在writable list中的存放着你能够对其进行发送(send)操做的socket链接,当链接通讯出现error时会把error写到exceptional列表中。

Readable list 中的socket 能够有3种可能状态,第一种是若是这个socket是main "server" socket,它负责监听客户端的链接,若是这个main server socket出如今readable里,那表明这是server端已经ready来接收一个新的链接进来了,为了让这个main server能同时处理多个链接,在下面的代码里,咱们把这个main server的socket设置为非阻塞模式。

1
2
3
4
5
6
7
8
9
10
11
12
# Handle inputs
for s in readable:
  
     if s is server:
         # A "readable" server socket is ready to accept a connection
         connection, client_address = s.accept()
         print >>sys.stderr, 'new connection from' , client_address
         connection.setblocking( 0 )
         inputs.append(connection)
  
         # Give the connection a queue for data we want to send
         message_queues[connection] = Queue.Queue()

第二种状况是这个socket是已经创建了的链接,它把数据发了过来,这个时候你就能够经过recv()来接收它发过来的数据,而后把接收到的数据放到queue里,这样你就能够把接收到的数据再传回给客户端了。

1
2
3
4
5
6
7
8
9
else :
      data = s.recv( 1024 )
      if data:
          # A readable client socket has data
          print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
          message_queues[s].put(data)
          # Add output channel for response
          if s not in outputs:
              outputs.append(s)

第三种状况就是这个客户端已经断开了,因此你再经过recv()接收到的数据就为空了,因此这个时候你就能够把这个跟客户端的链接关闭了。

1
2
3
4
5
6
7
8
9
10
11
else :
     # Interpret empty result as closed connection
     print >>sys.stderr, 'closing' , client_address, 'after reading no data'
     # Stop listening for input on the connection
     if s in outputs:
         outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,因此这时候若是这个客户端的链接对象还在outputs列表中,就把它删掉
     inputs.remove(s)    #inputs中也删除掉
     s.close()           #把这个链接关闭掉
  
     # Remove message queue
     del message_queues[s]

对于writable list中的socket,也有几种状态,若是这个客户端链接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,不然就把这个链接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个链接,那就会认为这个链接还处于非活动状态

1
2
3
4
5
6
7
8
9
10
11
# Handle outputs
for s in writable:
     try :
         next_msg = message_queues[s].get_nowait()
     except Queue.Empty:
         # No messages waiting so stop checking for writability.
         print >>sys.stderr, 'output queue for' , s.getpeername(), 'is empty'
         outputs.remove(s)
     else :
         print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
         s.send(next_msg)

最后,若是在跟某个socket链接通讯过程当中出了错误,就把这个链接对象在inputs\outputs\message_queue中都删除,再把链接关闭掉

1
2
3
4
5
6
7
8
9
10
11
# Handle "exceptional conditions"
for s in exceptional:
     print >>sys.stderr, 'handling exceptional condition for' , s.getpeername()
     # Stop listening for input on the connection
     inputs.remove(s)
     if s in outputs:
         outputs.remove(s)
     s.close()
  
     # Remove message queue
     del message_queues[s]

最后服务器端的完整代码以下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python
import select
import socket
import sys
import queue
 
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  #生成socket句柄
server.setblocking( False )   #非阻塞模式
 
 
server_address = ( 'localhost' , 10000 #服务器端ip和端口
#输出错误信息
print (sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)    #绑定ip和端口
 
server.listen( 5 #监听端口,最大链接数为5个
 
inputs = [server]    #select接收的输入信息
 
outputs = []        #select接收的输出信息
 
message_queue = {}  #socket的连接对象和输出信息的队列字典(socket:Queue)
while inputs:  #程序主循环,由于inputs列表中有值因此为死循环
     print ( '\nwaiting for the next event' )
     '''将inputs和outputs等三个列表放进select中返回三个列表,分别赋值为readable,writeable,exception,
     全部在readable list中的socket链接表明有数据可接收(recv),全部在writable list中的存放着你能够对其
     进行发送(send)操做的socket链接,当链接通讯出现error时会把error写到exceptional列表中。'''
     readable,writeable,exception = select.select(inputs,outputs,inputs)
     for s in readable:  #遍历输入信息列表
         if s is server: #若是s是server,那表明这是server端已经ready来接收一个新的链接进来了,因此在inputs中有一个server,表明server端已经ready
             connection,client_address = s.accept()  #等待接收数据
             connection.seblocking( False #为了让这个server能同时处理多个链接,咱们把这个server的socket设置为非阻塞模式。
             inputs.append(connection) #将这个连接的对象内存地址放入inputs列表
             message_queue[connection] = queue.Queue() #将这个连接对象和其对应的队列地址组成一个字典,方便之后知道咱们要往哪里放数据
         else #第二种状况是已经创建了的链接,它把数据发了过来,这个时候你就能够经过recv()来接收它发过来的数据,而后把接收到的数据放到queue里,这样你就能够把接收到的数据再传回给客户端了。
             data = s.recv( 1024 #接收数据
             if data:  #若是接收数据不为空
                 print (sys.stderr, 'received "%s" from %s' % (data, s.getpeername()))
                 message_queue[s].put(data) #接收数据放入这个连接对应的队列中
                 if s not in outputs:  #若是s不在输出列表中就加入到列表
                     outputs.append(s)  #将对象加入信息输入列表
             #第三种状况就是这个客户端已经断开了,因此你再经过recv()接收到的数据就为空了,因此这个时候你就能够把这个跟客户端的链接关闭了。
             else :
                 if s in outputs:  #若是s在outputs列表中就删除
                     outputs.remove(s)
                 inputs.remove(s)  #把s删除出inputs列表
                 s.close()  #链接关闭
                 del message_queue[s] #在字典中删除s
     for s in writeable:  #遍历输出信息列表
         try #若是这个客户端连接对应的queue中有数据,就取出来
             next_msg = message_queue[s].get_nowait()
         #没有数据就将其从outputs列表中移除,这样下一次循环select()调用时检测到outputs list中没有这个链接,那就会认为这个链接还处于非活动状态
         except queue.Empty:
             print ( 'output queue for' , s.getpeername(), 'is empty' )
             outputs.remove(s)
         #将取出的数据在发送回客户端
         else :
             print ( 'sending "%s" to %s' % (next_msg, s.getpeername()))
             s.send(next_msg)
     #最后,若是在跟某个socket链接通讯过程当中出了错误,就把这个链接对象在inputs\outputs\message_queue中都删除,再把链接关闭掉
     for s in exception:
         print ( 'handling exceptional condition for' , s.getpeername() )
         #删除inputs中连接对象
         inputs.remove(s)
         if s in outputs:
             #删除outputs中的连接对象
             outputs.remove(s)
         s.close()  #关闭连接
         del message_queue[s]  #删除连接字典中的对象

客户端

下面的这个是客户端程序展现了如何经过select()对socket进行管理并与多个链接同时进行交互:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import socket
import sys
  
messages = [ 'This is the message. ' ,
              'It will be sent ' ,
              'in parts.' ,
              ]
server_address = ( 'localhost' , 10000 )
  
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           ]
  
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
     s.connect(server_address)

接下来经过循环经过每一个socket链接给server发送和接收数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
for message in messages:
  
     # Send messages on both sockets
     for s in socks:
         print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
         s.send(message)
  
     # Read responses on both sockets
     for s in socks:
         data = s.recv( 1024 )
         print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
         if not data:
             print >>sys.stderr, 'closing socket' , s.getsockname()

客户端完整代码以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
__author__ = 'jieli'
import socket
import sys
  
messages = [ 'This is the message. ' ,
              'It will be sent ' ,
              'in parts.' ,
              ]
server_address = ( 'localhost' , 10000 )
  
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           ]
  
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
     s.connect(server_address)
  
for message in messages:
  
     # Send messages on both sockets
     for s in socks:
         print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
         s.send(message)
  
     # Read responses on both sockets
     for s in socks:
         data = s.recv( 1024 )
         print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
         if not data:
             print >>sys.stderr, 'closing socket' , s.getsockname()
             s.close()

selectors模块(将select模块封装,调用更简洁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import selectors
import socket
  
sel = selectors.DefaultSelector()
  
def accept(sock, mask):
     conn, addr = sock.accept()  # Should be ready
     print ( 'accepted' , conn, 'from' , addr)
     conn.setblocking( False )
     sel.register(conn, selectors.EVENT_READ, read)
  
def read(conn, mask):
     data = conn.recv( 1000 # Should be ready
     if data:
         print ( 'echoing' , repr (data), 'to' , conn)
         conn.send(data)  # Hope it won't block
     else :
         print ( 'closing' , conn)
         sel.unregister(conn)
         conn.close()
  
sock = socket.socket()
sock.bind(( 'localhost' , 10000 ))
sock.listen( 100 )
sock.setblocking( False )
sel.register(sock, selectors.EVENT_READ, accept)
  
while True :
     events = sel.select()
     for key, mask in events:
         callback = key.data
         callback(key.fileobj, mask)




相关文章
相关标签/搜索