多进程、协程、事件驱动及select poll epoll

目录css

-多线程使用场景python

-多进程linux

 --简单的一个多进程例子程序员

 --进程间数据的交互实现方法web

   ---经过Queues和Pipe能够实现进程间数据的传递,可是不能实现数据的共享编程

   ---Queueswindows

    ---Pipe数组

    ---经过Manager能够不一样进程间实现数据的共享缓存

 --进程同步,即进程锁安全

 --进程池

-协程

 --先用yield实现简单的协程

 --Greenlet

 --Gevent

 --用协程gevent写一个简单并发爬网页

-事件驱动

 --IO多路复用

   ---用户空间和内核空间

    ---文件描述符fd

    ---缓存IO

 --IO模式

   ---阻塞I/O(blocking IO)

    ---非阻塞I/O

    ---I/O多路复用(IO multiplexing)

    ---异步I/O(asynchronous IO)

-关于select poll epoll

 --select

 --poll

 --epoll

 --以select方法为例子进行理解

多线程的使用场景

IO操做不占用CPU

计算占用cpu

python多线程不适合cpu密集型操做的任务,适合IO操做密集型的任务

多进程

简单的一个多进程例子:(用于理解对多线程方法的使用)

和线程的方法相似,下面是一个简单的多进程代码

 1 #AUTHOR:FAN
 2 import time,multiprocessing
 3 
 4 def run(name):
 5     time.sleep(2)
 6     print("hello",name)
 7 
 8 if __name__ =="__main__":
 9     for i in range(6):
10         p = multiprocessing.Process(target=run,args=("dean",))
11       p.start()

和以前学习的多线程结合在一块儿使用,代码以下:

 1 #AUTHOR:FAN
 2 
 3 import time,threading
 4 import multiprocessing
 5 
 6 def thread_run():
 7     print(threading.get_ident())   #这里表示获取线程id
 8 
 9 
10 def run(name):
11     time.sleep(2)
12     print("hello",name)
13     t=threading.Thread(target=thread_run)
14     t.start()
15 
16 if __name__ =="__main__":
17     for i in range(6):
18         p = multiprocessing.Process(target=run,args=("dean",))
19         p.start()

运行结果以下:

 1 D:\python35\python.exe D:/python培训/s14/day10/进程与线程结合使用.py
 2 hello dean
 3 10008
 4 hello dean
 5 9276
 6 hello dean
 7 8096
 8 hello dean
 9 1308
10 hello dean
11 hello dean
12 10112
13 8032
14 
15 Process finished with exit code 0
View Code

接着咱们查看下面代码:

 1 #AUTHOR:FAN
 2 
 3 from multiprocessing import Process
 4 import os
 5 
 6 
 7 def info(title):
 8     print(title)
 9     print('module name:', __name__)
10     print('parent process:', os.getppid())
11     print('process id:', os.getpid())
12     print("\n\n")
13 
14 
15 def f(name):
16     info('\033[31;1mcalled from child process function f\033[0m')
17     print('hello', name)
18 
19 if __name__ == '__main__':
20     info('\033[32;1mmain process line\033[0m')

运行结果以下:

 1 D:\python35\python.exe D:/python培训/s14/day10/获取进程id.py
 2 main process line
 3 module name: __main__
 4 parent process: 8368
 5 process id: 7664
 6 
 7 
 8 
 9 
10 Process finished with exit code 0
View Code

咱们这里能够看到父进程id:8368,而且会发现不管程序运行多少次都是这个,而后咱们在windows任务管理器查看发现这个是pycharm的进程id,以下图:

这里要记住:每个子进程都是由父进程启动的

咱们将上面代码中if __name__=”__main__”进行修改,以下:

1 if __name__ == '__main__':
2     info('\033[32;1mmain process line\033[0m')
3     p = Process(target=f, args=('bob',))
4     p.start()

运行结果以下:

进程间数据的交互,实现方法

经过Queues和Pipe能够实现进程间数据的传递,可是不能实现数据的共享

 

不一样进程间内存不是共享的,要想实现两个进程间的数据交换,有一下方法:

Queues

使用方法和threading里的queue使用差很少

先回忆一下线程之间的数据共享,经过下面代码理解:

 

 1 #AUTHOR:FAN
 2 import threading
 3 import queue
 4 
 5 def func():
 6     q.put([22,"dean",'hello'])
 7 
 8 if __name__=="__main__":
 9     q = queue.Queue()
10     t = threading.Thread(target=func)
11     t.start()
12     print(q.get(q))

 

运行结果:

1 D:\python35\python.exe D:/python培训/s14/day10/线程之间数据的共享.py
2 [22, 'dean', 'hello']
3 
4 Process finished with exit code 0
View Code

从上述代码能够看出线程之间的数据是共享的:父线程能够访问子线程放入的数据

若是是多进程之间呢?

将代码进行修改以下,让子进程调用父进程数据:

 

 1 from multiprocessing import Process
 2 import queue
 3 
 4 
 5 
 6 def f():
 7     q.put([11,None,"hello"])
 8 
 9 
10 if __name__=="__main__":
11     q = queue.Queue()
12     p = Process(target=f)
13     p.start()
14     print(q.get())

 

运行结果以下:

从这里咱们也能够看出子进程是访问不到父进程的数据

咱们再次将代码进行修改,写f方法的时候直接将q给线程传入,也就是,只有启动线程,就自动传入线程q,代码以下:

 

 1 #AUTHOR:FAN
 2 
 3 from multiprocessing import Process
 4 import queue
 5 
 6 
 7 def f(data):
 8     data.put([11,None,"hello"])
 9 
10 if __name__=="__main__":
11     q = queue.Queue()   #切记这里是线程q
12     p = Process(target=f,args=(q,))
13     p.start()
14     print(q.get())

 

运行结果以下:

这里咱们须要知道:进程不能访问线程q

因此咱们须要改为进程,代码以下:

 

 1 #AUTHOR:FAN
 2 
 3 from multiprocessing import Process,Queue
 4 
 5 
 6 def f(data):
 7     data.put([11,None,"hello"])
 8 
 9 if __name__=="__main__":
10     q = Queue() 这里的q是进程q 11     p = Process(target=f,args=(q,))
12     p.start()
13     print(q.get())

 

运行结果以下:

1 D:\python35\python.exe D:/python培训/s14/day10/子进程访问父进程数据.py
2 [11, None, 'hello']
3 
4 Process finished with exit code 0
View Code

此次咱们就发如今父进程里就能够调用到子进程放入的数据

这里咱们须要明白:这里的q实际上是被克隆了一个q,而后将子线程序列化的内容传入的克隆q,而后再反序列化给q,从而实现了进程之间数据的传递

 

Pipe

实现代码例子:

 1 #AUTHOR:FAN
 2 
 3 from multiprocessing import Process,Pipe
 4 
 5 def f(conn):
 6     conn.send([22,None,"hello from child"])
 7     conn.send([22,None,"hello from child2"])
 8     print(conn.recv())
 9     conn.close()
10 
11 if __name__=="__main__":
12     left_conn,right_conn = Pipe()
13     p = Process(target=f,args=(right_conn,))
14     p.start()
15     print(left_conn.recv())
16     print(left_conn.recv())
17     left_conn.send("我是left_conn")

运行结果以下:

1 D:\python35\python.exe D:/python培训/s14/day10/经过pipes实现进程间数据传递.py
2 [22, None, 'hello from child']
3 [22, None, 'hello from child2']
4 我是left_conn
5 
6 Process finished with exit code 0
View Code

对上面代码分析:pip()会生成两个值,上面的left_conn和right_conn,这就如同一条网线的两头,两头均可以发送和接收数据

 

经过Manager能够不一样进程间实现数据的共享

经过下面代码进行理解:

 1 #AUTHOR:FAN
 2 from multiprocessing import Manager,Process
 3 import os
 4 
 5 def f(d,l):
 6     d[1]="1"
 7     d["2"] = 2
 8     d[0.25] = None
 9     l.append(os.getpid())
10     print(l)
11 
12 if __name__ == "__main__":
13     with Manager() as manager:  #这种方式和直接manager=Manager()同样
14         d = manager.dict()  #生成一个字典,能够在多个进程间共享
15         l = manager.list(range(5))  #生成一个列表,能够在多个进程间共享
16         p_list = []
17         for i in range(10):
18             p = Process(target=f,args=(d,l))
19             p.start()
20             p_list.append(p)
21         for res in p_list:
22             res.join()
23 
24         print(d)
25         print(l)

运行结果以下:

 1 D:\python35\python.exe D:/python培训/s14/day10/Manager实现进程间数据的共享.py
 2 [0, 1, 2, 3, 4, 9756]
 3 [0, 1, 2, 3, 4, 9756, 3352]
 4 [0, 1, 2, 3, 4, 9756, 3352, 9220]
 5 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736]
 6 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724]
 7 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860]
 8 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084]
 9 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452]
10 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452, 7376]
11 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452, 7376, 9952]
12 {0.25: None, 1: '1', '2': 2}
13 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452, 7376, 9952]
14 
15 Process finished with exit code 0
View Code

经过结果能够看出已经实现了不一样进程间数据的共享

进程同步,即进程锁

 1 #AUTHOR:FAN
 2 from multiprocessing import Process, Lock
 3 
 4 
 5 def f(l, i):
 6     l.acquire()
 7     print('hello world', i)
 8     l.release()
 9 
10 if __name__ == '__main__':
11     lock = Lock()
12     for num in range(10):
13         Process(target=f, args=(lock, num)).start()

打印结果以下:

 1 D:\python35\python.exe D:/python培训/s14/day10/进程锁.py
 2 hello world 3
 3 hello world 2
 4 hello world 1
 5 hello world 0
 6 hello world 7
 7 hello world 6
 8 hello world 4
 9 hello world 5
10 hello world 9
11 hello world 8
12 
13 Process finished with exit code 0
View Code

可能会以为这个加锁没有上面做用,实际上是这样的,当在屏幕上打印这些内容的时候,不一样进程之间是共享这个屏幕的,锁的做用在于当一个进程开始打印的时候,其余线程不能打印,从而防止打印乱内容

在windows上可能看不到效果,当不一样进程打印的东西比较多的时候,就能够看到打印数据出现乱的状况

 

进程池

 

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

 

进程池中有两个方法:

 

apply

 

apply_async(这个就表示异步

 

从下面代码一点一点分析

 

 1 #AUTHOR:FAN
 2 
 3 from  multiprocessing import Process, Pool
 4 import time
 5 import os
 6 
 7 
 8 def Foo(i):
 9     time.sleep(2)
10     print("in the process",os.getpid())
11     return i + 100
12 
13 
14 def Bar(arg):
15     print('-->exec done:', arg)
16 
17 if __name__ == "__main__":
18     pool = Pool(5)
19 
20     for i in range(10):
21         pool.apply(func=Foo, args=(i,))
22     print('end')
23     pool.close()
24     pool.join()  # 进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。

 

这样运行结果发现,程序变成了串行了。

将上述代码中的:

pool.apply(func=Foo, args=(i,))

 

替换为:

pool.apply_async(func=Foo,args=(i,))

 

以后就解决了以前的的问题

 

这个时候咱们再次将

pool.apply_async(func=Foo,args=(i,))

 

替换为,这里的callback叫作回调函数

pool.apply_async(func=Foo, args=(i,), callback=Bar)

 

运行结果以下:

 1 D:\python35\python.exe D:/python培训/s14/day10/进程池.py
 2 end
 3 in the process 10876
 4 -->exec done: 100
 5 in the process 5084
 6 -->exec done: 101
 7 in the process 9648
 8 -->exec done: 102
 9 in the process 11028
10 -->exec done: 103
11 in the process 8528
12 -->exec done: 104
13 in the process 10876
14 -->exec done: 105
15 in the process 5084
16 -->exec done: 106
17 in the process 9648
18 -->exec done: 107
19 in the process 11028
20 -->exec done: 108
21 in the process 8528
22 -->exec done: 109
23 
24 Process finished with exit code 0
View Code

下面将代码进行修改,肯定回调函数是由子进程仍是主进程调用

 1 #AUTHOR:FAN
 2 
 3 from  multiprocessing import Process, Pool
 4 import time
 5 import os
 6 
 7 
 8 def Foo(i):
 9     time.sleep(2)
10     print("in the process",os.getpid())
11     return i + 100
12 
13 
14 def Bar(arg):
15     print('-->exec done:', arg,os.getpid())
16 
17 if __name__ == "__main__":
18     pool = Pool(5)
19     print(os.getpid())
20     for i in range(5):
21         pool.apply_async(func=Foo, args=(i,), callback=Bar)
22         #pool.apply(func=Foo, args=(i,))
23         #pool.apply_async(func=Foo,args=(i,))
24 
25     print('end')
26     pool.close()
27     pool.join()  # 进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。

运行结果以下,能够看出回调函数的pid和主进程是同样的

协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。

 

协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以:

协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

 

协程的好处:

无需线程上下文切换的开销

无需原子操做锁定及同步的开销

方便切换控制流,简化编程模型

高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。因此很适合用于高并发处理。

 

缺点:

没法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程须要和进程配合才能运行在多CPU上.固然咱们平常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。

进行阻塞(Blocking)操做(如IO时)会阻塞掉整个程序

 

先用yield实现简单的协程

 1 #AUTHOR:FAN
 2 
 3 import time
 4 import queue
 5 
 6 
 7 def consumer(name):
 8     print("--->starting eating baozi...")
 9     while True:
10         new_baozi = yield
11         print("[%s] is eating baozi %s" % (name, new_baozi))
12         time.sleep(1)
13 def producer():
14     r = con.__next__()
15     r = con2.__next__()
16     n = 0
17     while n < 5:
18         n += 1
19         con.send(n)
20         con2.send(n)
21         print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
22 if __name__ == '__main__':
23     con = consumer("c1")
24     con2 = consumer("c2")
25     p = producer()

运行结果以下:

 1 D:\python35\python.exe D:/python培训/s14/day10/yield实现协程.py
 2 --->starting eating baozi...
 3 --->starting eating baozi...
 4 [c1] is eating baozi 1
 5 [c2] is eating baozi 1
 6 [producer] is making baozi 1
 7 [c1] is eating baozi 2
 8 [c2] is eating baozi 2
 9 [producer] is making baozi 2
10 [c1] is eating baozi 3
11 [c2] is eating baozi 3
12 [producer] is making baozi 3
13 [c1] is eating baozi 4
14 [c2] is eating baozi 4
15 [producer] is making baozi 4
16 [c1] is eating baozi 5
17 [c2] is eating baozi 5
18 [producer] is making baozi 5
19 
20 Process finished with exit code 0

Greenlet

 1 #AUTHOR:FAN
 2 
 3 from greenlet import greenlet
 4 
 5 def test1():
 6     print(10)
 7     gr2.switch()
 8     print(11)
 9     gr2.switch()
10 
11 
12 def test2():
13     print(12)
14     gr1.switch()
15     print(13)
16 
17 
18 gr1 = greenlet(test1) #启动一个协程
19 gr2 = greenlet(test2)
20 gr1.switch()

这里的gr1.switch()是手动切换

Gevent

Gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度.

经过下面代码进行理解:

 1 import gevent
 2 
 3 
 4 def foo():
 5     print('Running in foo1')
 6     gevent.sleep(2)
 7     print('Running in foo2')
 8 
 9 def bar():
10     print('Running in bar1')
11     gevent.sleep(1)
12     print('Running in bar2')
13 
14 def func3():
15     print("running in func1")
16     gevent.sleep(0)
17     print("running in func2")
18 
19 
20 gevent.joinall([
21     gevent.spawn(foo),
22     gevent.spawn(bar),
23     gevent.spawn(func3),
24 ])

执行结果以下:

1 D:\python35\python.exe D:/python培训/s14/day10/自动IO切换.py
2 Running in foo1
3 Running in bar1
4 running in func1
5 running in func2
6 Running in bar2
7 Running in foo2
8 
9 Process finished with exit code 0
View Code

从运行结果能够看出,经过gevent.sleep()模拟执行IO操做,从而实现自动切换,程序最终花费的时间仍是2秒

 

用协程gevent写一个简单并发爬网页

 1 #AUTHOR:FAN
 2 
 3 from urllib import request
 4 import gevent,time
 5 
 6 def f(url):
 7     print("get:%s" %url)
 8     resp = request.urlopen(url)
 9     data = resp.read()
10     print("%d bytes received from %s" %(len(data),url))
11 
12 
13 urls = ["http://sina.com.cn",
14         "http://www.cnblogs.com/",
15         "https://news.cnblogs.com/"
16 ]
17 
18 time_start = time.time()
19 for url in urls:
20     f(url)
21 
22 print("同步串行cost:",time.time()-time_start)
23 
24 async_time = time.time()
25 gevent.joinall([
26     gevent.spawn(f,"http://sina.com.cn"),
27     gevent.spawn(f,"http://www.cnblogs.com/"),
28     gevent.spawn(f,"https://news.cnblogs.com/")
29 ])
30 print("异步cost:",time.time()-async_time)

这样的运行结果:

这里能够看出异步的时候和串行执行的时间基本同样,其实这里的异步并无起做用,由于这里的gevent并不能识别出urllib执行时的IO操做,想要是gevent实现异步的方法是导入模块:from gevent import monkey

将代码进行修改以下:

 1 #AUTHOR:FAN
 2 
 3 from urllib import request
 4 import gevent,time
 5 from gevent import monkey  6 
 7 monkey.patch_all()  8 def f(url):
 9     print("get:%s" %url)
10     resp = request.urlopen(url)
11     data = resp.read()
12     print("%d bytes received from %s" %(len(data),url))
13 
14 
15 urls = ["http://sina.com.cn",
16         "http://www.cnblogs.com/",
17         "https://news.cnblogs.com/"
18 ]
19 
20 time_start = time.time()
21 for url in urls:
22     f(url)
23 
24 print("同步串行cost:",time.time()-time_start)
25 
26 async_time = time.time()
27 gevent.joinall([
28     gevent.spawn(f,"http://sina.com.cn"),
29     gevent.spawn(f,"http://www.cnblogs.com/"),
30     gevent.spawn(f,"https://news.cnblogs.com/")
31 ])
32 print("异步cost:",time.time()-async_time)

而后执行,结果以下:

事件驱动

一般,咱们写服务器处理模型的程序时,有如下几种模型:

(1)每收到一个请求,建立一个新的进程,来处理该请求;

(2)每收到一个请求,建立一个新的线程,来处理该请求;

(3)每收到一个请求,放入一个事件列表,让主进程经过非阻塞I/O方式来处理请求

上面的几种方式,各有千秋,

第(1)中方法,因为建立新的进程的开销比较大,因此,会致使服务器性能比较差,但实现比较简单。

第(2)种方式,因为要涉及到线程的同步,有可能会面临死锁等问题。

第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。

综合考虑各方面因素,通常广泛认为第(3)种方式是大多数网络服务器采用的方式

目前大部分的UI编程都是事件驱动模型,如不少UI平台都会提供onClick()事件,这个事件就表明鼠标按下事件。事件驱动模型大致思路以下:

1. 有一个事件(消息)队列;

2. 鼠标按下时,往这个队列中增长一个点击事件(消息);

3. 有个循环,不断从队列取出事件,根据不一样的事件,调用不一样的函数,如onClick()、onKeyDown()等;

4. 事件(消息)通常都各自保存各自的处理函数指针,这样,每一个消息都有独立的处理函数

 

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特色是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

让咱们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展现了随着时间的推移,这三种模式下程序所作的工做。这个程序有3个任务须要完成,每一个任务都在等待I/O操做时阻塞自身。阻塞在I/O操做上所花费的时间已经用灰色框标示出来了。

在单线程同步模型中,任务按照顺序执行。若是某个任务由于I/O而阻塞,其余全部的任务都必须等待,直到它完成以后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。若是任务之间并无互相依赖的关系,但仍然须要互相等待的话这就使得程序没必要要的下降了运行速度。

 

在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操做系统来管理,在多处理器系统上能够并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其余线程得以继续执行。与完成相似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,由于这类程序不得不经过线程同步机制如锁、可重入函数、线程局部存储或者其余机制来处理线程安全问题,若是实现不当就会致使出现微妙且使人痛不欲生的bug。

 

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其余昂贵的操做时,注册一个回调到事件循环中,而后当I/O操做完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询全部的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽量的得以执行而不须要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,由于程序员不须要关心线程安全问题。

当咱们面对以下的环境时,事件驱动模型一般是一个好的选择:

 

(1)程序中有许多任务

(2)任务之间高度独立(所以它们不须要互相通讯,或者等待彼此)

(3)在等待事件到来时,某些任务会阻塞。

当应用程序须要在任务间共享可变的数据时,这也是一个不错的选择,由于这里不须要采用同步处理。

网络应用程序一般都有上述这些特色,这使得它们可以很好的契合事件驱动编程模型。

IO多路复用

用户空间和内核空间

操做系统都是采用虚拟存储器,对于32位操做系统,它的寻址空间(虚拟存储空间)为4G。操做系统的核心是内核,独立于普通的应用程序,能够访问受保护内存空间,也有访问底层硬件设备的全部权限,为了保证用户进程不能直接操做内核,保证内核的安全,操做系统将虚拟空间分为两部分:一部分为内核空间,一部分是用户空间,针对linux系统而言,将最高的1G字节给内核使用,称为内核空间,将3G字节的供各个进程使用,称为用户空间

文件描述符fd

文件描述符是一个用于表述指向文件的引用的抽象化概念

文件描述符在形式上是一个非负整数,实际上,它是一个索引值,指内核为每个进程所维护的进程打开文件的记录的记录表,当程序打开一个现有文件或者建立一个新文件时,内核向进程返回一个文件描述符。

缓存IO

缓存IO,也被称为标准IO,大多数文件系统默认IO操做都是缓存IO,在Linux的缓存IO机制中,操做系统会将IO的数据缓存在文件系统的页缓存(page cache)中,也就是说,数据会先被拷贝到操做系统内核的缓冲区中,而后才会从操做系统内核的缓冲区拷贝到应用程序的地址空间

缓存IO的缺点:

数据在传输过程当中须要在应用程序地址空间和内核进行屡次数据拷贝操做,这些数据拷贝操做所带来的CPU以及内存开销是很是大的

IO模式

对于一次IO访问(以read为例子),数据会先拷贝到操做系统内核的缓冲区中,而后会从操做系统内核的缓冲区拷贝到应用程序的地址空间,也就是说当一个read操做发生时,它会经历两个阶段:

1. 等待数据准备

2. 经数据从内核拷贝到进程

正是由于这两个阶段,linux系统产生了五种网络模式的方案

1. 阻塞I/O(blocking IO)

2. 非阻塞I/O(nonblocking IO)

3. I/O多路复用(IO multiplexing)

4. 信号驱动I/O(signal driven IO)

5. 异步I/O(asynchromous IO)

注意:信号驱动I/O(signal driven IO)在实际中不经常使用

阻塞I/O(blocking IO)

在linux中,默认状况下全部的socket都是blocking,一个典型的读操做流程大概是这样:

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来讲,不少时候数据在一开始尚未到达。好比,尚未收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程须要等待,也就是说数据被拷贝到操做系统内核的缓冲区中是须要一个过程的。而在用户进程这边,整个进程会被阻塞(固然,是进程本身选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,而后kernel返回结果,用户进程才解除block的状态,从新运行起来。

 

因此,blocking IO的特色就是在IO执行的两个阶段都被block了

非阻塞I/O

linux下,能够经过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操做时,流程是这个样子:

当用户进程发出read操做时,若是kernel中的数据尚未准备好,那么它并不会block用户进程,而是马上返回一个error。从用户进程角度讲 ,它发起一个read操做后,并不须要等待,而是立刻就获得了一个结果。用户进程判断结果是一个error时,它就知道数据尚未准备好,因而它能够再次发送read操做。一旦kernel中的数据准备好了,而且又再次收到了用户进程的system call,那么它立刻就将数据拷贝到了用户内存,而后返回。

 

因此,nonblocking IO的特色是用户进程须要不断的主动询问kernel数据好了没有。

I/O多路复用(IO multiplexing)

IO multiplexing就是咱们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就能够同时处理多个网络链接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的全部socket,当某个socket有数据到达了,就通知用户进程。

 

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程。

 

因此,I/O 多路复用的特色是经过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就能够返回。

 

这个图和blocking IO的图其实并无太大的不一样,事实上,还更差一些。由于这里须要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。可是,用select的优点在于它能够同时处理多个connection。

 

因此,若是处理的链接数不是很高的话,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优点并非对于单个链接能处理得更快,而是在于能处理更多的链接。)

 

在IO multiplexing Model中,实际中,对于每个socket,通常都设置成为non-blocking,可是,如上图所示,整个用户的process实际上是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

异步I/O(asynchronous IO)

Linux下的asynchronous IO其实用得不多。先看一下它的流程:

 

用户进程发起read操做以后,马上就能够开始去作其它的事。而另外一方面,从kernel的角度,当它受到一个asynchronous read以后,首先它会马上返回,因此不会对用户进程产生任何block。而后,kernel会等待数据准备完成,而后将数据拷贝到用户内存,当这一切都完成以后,kernel会给用户进程发送一个signal,告诉它read操做完成了。

关于select poll epoll

select

sekect是经过一个select()系统调用来监视多个文件描述符,当select()返回后,该数组中就绪的文件描述符便会被该内核修改标志位,使得进程能够得到这些文件描述符从而进行后续的读写操做

select的优势就是支持跨平台

缺点在于单个进程可以监视的文件描述符的数量存在最大限制

另外select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增加。同时,因为网络响应时间的延迟使得大量TCP链接处于非活跃状态,但调用select()会对全部socket进行一次线性扫描,因此这也浪费了必定的开销。

poll

和select在本质上没有多大差异,可是poll没有最大文件描述符数量的限制

poll和select一样存在一个缺点就是,包含大量文件描述符的数组被总体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增长而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,若是进程没有对其进行IO操做,那么下次调用select()和poll()的时候将再次报告这些文件描述符,因此它们通常不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

 

epoll

epoll能够同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,若是咱们没有采起行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,可是代码实现至关复杂。

 

epoll一样只告知那些就绪的文件描述符,并且当咱们调用epoll_wait()得到就绪文件描述符时,返回的不是实际的描述符,而是一个表明就绪描述符数量的值,你只须要去epoll指定的一个数组中依次取得相应数量的文件描述符便可,这里也使用了内存映射(mmap)技术,这样便完全省掉了这些文件描述符在系统调用时复制的开销。

 

另外一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用必定的方法后,内核才对全部监视的文件描述符进行扫描,而epoll事先经过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用相似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便获得通知

 

以select方法为例子进行理解

Python的select()方法直接调用操做系统的IO接口,它监控sockets,open files, and pipes(全部带fileno()方法的文件句柄)什么时候变成readable 和writeable, 或者通讯错误,select()使得同时监控多个链接变的简单,而且这比写一个长循环来等待和监控多客户端链接要高效,由于select直接经过操做系统提供的C的网络接口进行操做,而不是经过Python的解释器。

接下来经过echo server例子要以了解select 是如何经过单进程实现同时处理多个非阻塞的socket链接的

代码以下:

 1 #AUTHOR:FAN
 2 
 3 import select
 4 import socket
 5 import queue
 6 server = socket.socket()
 7 server.bind(('127.0.0.1',9999))
 8 server.listen()
 9 
10 server.setblocking(False)#不阻塞
11 msg_dict = {}
12 inputs=[server,]
13 outputs=[]
14 
15 while True:
16     readable, writeable, exceptional = select.select(inputs, outputs, inputs)
17     print(readable, writeable, exceptional)
18     for r in readable:
19         if r is server:   #表明来了一个新链接
20             conn,addr = server.accept()
21             print("来了一个新链接:",addr)
22             inputs.append(conn)  #是由于这个新创建的链接还没发数据过来,如今就接收的话程序就报错了
23             #因此要想要实现这个客户端发数据来时server端能知道,就须要让select再监测这个conn
24             msg_dict[conn] = queue.Queue() #初始化一个队列,后面须要返回给这个客户端的数据
25         else:
26             data = r.recv(1024)
27             print("收到数据:",data)
28             msg_dict[r].put(data)
29             outputs.append(r)  #放入返回的链接队列里
30 
31     for w in writeable:    #要返回给客户端的链接列表
32         data_to_client = msg_dict[w].get()
33         w.send(data_to_client)  #返回给客户端源数据
34         outputs.remove(w)   #确保下次循环的时候writeable,不能返回这个已经处理完的链接了
35     for e in exceptional:
36         if e in outputs:
37             outputs.remove(e)
38         inputs.remove(e)
39         del msg_dict[e]

其实上述的代码相对来讲是比较麻烦,python已经封装了selectors模块,而且这个模块中包含了select和epoll,会根据系统自动识别(windows只支持select,linux是两者都支持),默认用epoll

若是将上述代码用selectors模块的方式写,代码以下:

 

 1 #AUTHOR:FAN
 2 
 3 
 4 import selectors
 5 import socket
 6 
 7 sel = selectors.DefaultSelector()
 8 def accept(server,mask):
 9     conn,addr = server.accept()
10     print("一个新的链接",addr)
11     print(conn)
12     conn.setblocking(False)
13     sel.register(conn,selectors.EVENT_READ,read)  #新链接注册read回调函数
14     print("done")
15 
16 def read(conn,mask):
17     print("ccc")
18     print("mask:",mask)
19     data = conn.recv(1024)
20     if data:
21         print(data)
22         conn.send(data)
23     else:
24         print("客户端断开链接")
25         sel.unregister(conn)
26         conn.close()
27 
28 server = socket.socket()
29 server.bind(('127.0.0.1',9999))
30 server.listen()
31 server.setblocking(False)
32 sel.register(server,selectors.EVENT_READ,accept)
33 
34 while True:
35     print("cccccccsssssss")
36     events = sel.select() #默认阻塞,有活动链接,有活动链接就返回活动的链接列表
37     print(events)
38     for key,mask in events:
39         print("key:%s    mask:%s"%(key,mask))
40         callback = key.data  #这里就是回调函数及上述的accept
41         print("key.data:",key.data)
42         print("key.fileobj:",key.fileobj)
43         callback(key.fileobj,mask) #key.fileobj

 

咱们用客户端模拟同时并发一万去链接服务端

客户端代码以下:

 1 #AUTHOR:FAN
 2 
 3 
 4 import socket
 5 import sys
 6 
 7 messages = [ b'This is the message. ',
 8              b'It will be sent ',
 9              b'in parts.',
10              ]
11 server_address = ('192.168.8.102', 10000)
12 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(10000)
13           ]
14 print('connecting to %s port %s' % server_address)
15 for s in socks:
16     s.connect(server_address)
17 
18 for message in messages:
19     for s in socks:
20         print('%s: sending "%s"' % (s.getsockname(), message) )
21         s.send(message)
22     for s in socks:
23         data = s.recv(1024)
24         print( '%s: received "%s"' % (s.getsockname(), data) )
25         if not data:
26             print(sys.stderr, 'closing socket', s.getsockname() )

将服务端放到linux服务端,在本机执行客户端,从而实现了上万的并发

相关文章
相关标签/搜索