目录python
操做系统(OS
)统管了计算机的全部硬件,并负责为应用程序分配和回收硬件资源。
硬件资源老是有限的,而应用程序对资源的欲望都是贪婪的。
当多个应用程序发生硬件资源争夺时,OS
负责出面调度,保证多任务的资源分配以保证系统稳定执行。
只有CPU
能够执行代码,因此应用程序(任务)执行前,必须申请到CPU
资源,同一时刻,一个CPU
只能执行一个任务代码。
计算机的CPU
数量(资源方)远远小于须要执行的任务数(需求方),操做系统将CPU
的资源按照时间片划分,并根据任务类型分配,各任务轮流使用CPU
。
CPU
的执行/切换速度很是快,对于用户而言,多任务看上去就像同时执行同样,此称为并发。ios
以下是串行和并发的对比:
算法
计算机的内存、硬盘、网卡、屏幕、键盘等硬件提供了数据交换的场所。
OS
提供了IO
接口以实现数据交换,数据交换的过程通常不须要CPU
的参与。
IO
接口有两种类型:
一、阻塞型IO
发生IO
(数据交换)的时候,调用线程没法向下执行剩余代码,意图占用CPU
但不执行任何代码,单线程阻塞型IO自身没法支持并发
二、非阻塞型IO
发生IO
(数据交换)的时候,调用线程能够向下执行剩余代码,单线程非阻塞型IO自身能够支持并发编程
以下是阻塞型IO和非阻塞型IO的对比:
windows
根据一个任务执行期间占用CPU
的比例来划分,有两种类型:
一、CPU密集型
绝大部分时间都是占用CPU
并执行代码,好比科学计算任务
二、IO密集型
绝大部分时间都未占用CPU
,而是在发生IO
操做,好比网络服务安全
OS
提供了阻塞IO和非阻塞IO两种类型的接口,应用程序能够自行选择。
Socket
模块封装了两种接口,Socket
模块提供的函数默认是阻塞IO类型。
用户能够选择手工切换至非阻塞IO类型,使用socketobj.setblocking(False)
切换至非阻塞IO模式。
下面将经过一个简单的例子程序来记录对并发的学习思考及总结。服务器
客户端:循环接收用户的输入,并发送给服务器。从服务器接收反馈并打印至屏幕。
服务器:将接收到的用户输入,变成大写并返回给客户端。网络
客户端代码固定,主要思考服务器端的代码。
通常咱们会这样写服务端代码:多线程
# 服务器端 import socket addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.listen(5) print('监听中...') while True: # 连接循环 conn, client = server.accept() print(f'一个客户端上线 -> {client}') while True: # 消息循环 try: request = conn.recv(1024) if not request: break print(f"request: {request.decode('utf-8')}") conn.send(request.upper()) except ConnectionResetError as why: print(f'客户端丢失,缘由是: {why}') break conn.close()
客户端代码保持不变:并发
# 客户端 import socket addr = ('127.0.0.1', 8080) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(addr) print(f'服务器{addr}链接成功') while True: # 消息循环 inp = input('>>>').strip() if not inp: continue try: client.send(inp.encode('utf-8')) response = client.recv(1024) print(response.decode('utf-8')) except ConnectionResetError as why: print(f'服务端丢失,缘由是: {why}') break client.close()
这种形式的编码我称为:单线程+阻塞IO+循环串行,有以下几个特色:
一、编码简单,模型简洁,可读性强
二、串行提供服务,用户使用服务器必须一个一个排队
单一线程的阻塞IO模型是没法支持并发的,若是要支持并发,有以下两类解决方案。
单线程阻塞IO,本质上是没法实现并发的。由于一旦发生IO阻塞,线程就会阻塞,下方代码不会继续执行。若是要使用单线程阻塞IO来实现并发,须要增长线程数目或者进程数目,当某一个线程/进程发生阻塞的时候,由OS
调度至另外一个线程/进程执行。
服务器端代码 import socket from multiprocessing import Process def task(conn): """通讯循环处理函数""" while True: try: request = conn.recv(1024) if not request: break print(f"request: {request.decode('utf-8')}") conn.send(request.upper()) except ConnectionResetError as why: print(f'客户端丢失,缘由是: {why}') break if __name__ == '__main__': # windows下须要把新建进程写到main中,否则会报错 addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.listen(5) print('监听中...') while True: conn, client = server.accept() print(f'一个客户端上线 -> {client}') p = Process(target=task, args=(conn,)) # 开启子进程处理与用户的消息循环 p.start()
将服务器对用户的消息循环操做封装到进程中,单进程依然会发生阻塞。
进程之间的调度交由OS
负责(重要)。
进程过重,建立和销毁进程都须要比较大的开销,此外,一台设备所能涵盖的进程数量很是有限(通常就几百左右)。
进程之间的切换开销也不小。
当进程数小于等于CPU
核心数的时候,能够实现真正的并行,当进程数大于CPU
核心的时候,依然以并发执行。
服务器端代码 import socket from threading import Thread def task(conn): """通讯循环处理函数""" while True: try: request = conn.recv(1024) if not request: break print(f"request: {request.decode('utf-8')}") conn.send(request.upper()) except ConnectionResetError as why: print(f'客户端丢失,缘由是: {why}') break if __name__ == '__main__': addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.listen(5) print('监听中...') while True: conn, client = server.accept() print(f'一个客户端上线 -> {client}') t = Thread(target=task, args=(conn,)) # 启动多线程处理与用户的消息循环 t.start()
将服务器对用户的操做封装到线程中,单线程中依然会发生IO阻塞。
线程之间的调度交由OS负责(重要)。
线程较轻,建立和销毁的开销都比较小,可是线程数量也不会太大,一台设备通常能容纳几百至上千的线程。
注意:由于CPython的GIL的存在,使用CPython编写的多线程代码,只能使用一个CPU核心,换句话说,使用官方的解释器执行Python多线程代码,没法并行(单进程中)。
线程之间的切换开销比较小。
实际上,多线程的最大问题并非并发数太少,而是数据安全问题。
线程之间共享同一进程的数据,在频繁发生IO操做的过程当中,不免须要修改共享数据,这就须要增长额外的处理,当线程数量大量增长时,如何妥善处理数据安全的问题就会变成主要困难。
一、多线程和多进程都是基于阻塞IO模式提供的并发,二者编程模型比较简单,可读性也很高。
二、若是使用多线程/进程的方案来提供并发,当线程/进程数量不断增大时,系统稳定性将会降低。虽然可使用线程/进程池来提供必定的优化,但超过必定数量以后,池子发挥的效果也会愈来愈小。因此,二者都没法支持超大规模的并发(如C10M及以上)。
三、线程/进程切换都交由OS
调度,调度策略依据OS
的算法,应用程序没法主动控制,没法针对任务的特性作一些必要的调度算法调整。
四、编码思惟直接、易理解,学习曲线平缓。
五、多线程/进程的方案能够理解为单纯的增长资源,若是要想支持超大规模的并发,单纯的增长资源的行为并不合理(资源不可能无限或者总得考虑成本以及效率,并且数量越大,原有的缺点就会越凸显)。
六、另外一种解决方案的核心思路是:改变IO模型。
单线程非阻塞IO模型,自己就直接支持并发,为啥?请回头看看阻塞IO和非阻塞IO的流程图片。
非阻塞IO接口的核心是:调用线程一旦向OS
发起IO调用,OS
就直接返回结果,所以,调用线程不会被阻塞而能够执行下方代码。不过也正由于不会阻塞,调用线程没法判断当即返回的结果是否是指望结果,因此调用线程须要增长额外的操做对返回结果进行判断,正由于这一点,就增长了编程难度(增长的难度可不是一点啊)。
对当即返回的结果进行判断的方案有两种:
注意:非阻塞IO实现并发有多种解决方案,编程模型的可读性都不高,有些方案的编程思惟甚至晦涩、难以理解、且编码困难。
服务器端代码 import socket addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.setblocking(False) server.listen(5) print('监听中...') # 须要执行接收的conn对象放入此列表 recv_list = [] # 须要发送数据的conn对象和数据放入此列表 send_list = [] # 执行连接循环 while True: try: conn, client = server.accept() # 执行成功,说明返回值是conn,client print(f'一个客户端上线 -> {client}') # 将成功连接的conn放入列表,当accept发生错误的时候执行conn的消息接收操做 recv_list.append(conn) except BlockingIOError: # 执行accept不成功,意味着当前未有任何链接 # 在下一次执行accept以前,能够执行其余的任务(消息接收操做) # 没法对处于遍历期间的接收列表执行remove操做,使用临时列表存储须要删除的conn对象 del_recv_list = [] # 对已经成功连接的conn列表执行接收操做 for conn in recv_list: # 对每个conn对象,执行recv获取request try: # recv也是非阻塞 request = conn.recv(1024) # 执行成功,就要处理request if not request: # 当前conn连接已经失效 conn.close() # 再也不接收此conn连接的消息,将失效conn加入删除列表 del_recv_list.append(conn) # 当前conn处理完毕,切换下一个 continue # request有消息,处理,而后须要加入发送列表中 response = request.upper() # 发送列表须要存放元组,发送conn和发送的数据 send_list.append((conn, response)) except BlockingIOError: # 当前conn的数据尚未准备好,处理下一个conn continue except ConnectionResetError: # 当前conn失效,再也不接收此conn消息 conn.close() del_recv_list.append(conn) # 没法处理发送列表遍历期间的remove,使用临时列表 del_send_list = [] # 接收列表所有处理完毕,准备处理发送列表 for item in send_list: conn = item[0] response = item[1] # 执行发送 try: conn.send(response) # 发送成功,就应该从发送列表中移除此项目 del_send_list.append(item) except BlockingIOError: # 发送缓冲区有可能已经满了,留待下次发送处理 continue except ConnectionResetError: # 连接失效 conn.close() del_recv_list.append(conn) del_send_list.append(item) # 删除接收列表中已经失效的conn对象 for conn in del_recv_list: recv_list.remove(conn) # 删除发送列表中已经发送或者不须要发送的对象 for item in del_send_list: send_list.remove(item)
服务器使用单线程实现了并发。
对于accept
接收到的多个conn
对象,加入列表,并经过遍历读取列表、发送列表来提供多用户访问。
单线程中的Socket
模块提供的IO
函数都被设置成:非阻塞IO类型。
增长了额外操做:对非阻塞调用当即返回的结果,使用了Try
来判断是否为指望值。
由于不知道什么时候返回的结果是指望值,因此须要不停的发起调用,并经过Try
来判断,即,轮询。
两次轮询期间,线程能够执行其余任务。可是模型中也只是不停的发起轮询,并无利用好这些时间。
编码模型复杂,难理解。
优化:此模型中的主动轮询的工做由程序负责,其实能够交由OS
代为操做。这样的话,应用程序就不须要编写轮询的部分,能够更聚焦于业务逻辑(upper()
的部分),Python
提供了Select
模块以处理应用程序的轮询工做。
服务器端代码 import socket import select addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.setblocking(False) server.listen(5) print('监听中...') # 最开始的server对象须要被监听,一旦可读,说明能够执行accept read_list = [server,] # 须要监听的写列表,一旦wl中可写对象处理完send,应该将它也今后列表中删除 write_list = [] # 用于临时存放某一个sock对象须要发送的数据 data_dic = {} # 不停的发起select查询 while True: # 发起select查询,尝试获得能够操做的socket对象 rl, wl, xl = select.select(read_list, write_list, [], 1) # 操做可读列表 for sock in rl: # 若是可读列表中的对象是server,意味着有连接,则server可执行accept if sock is server: # 执行accept必定不会报错,因此不须要try conn, client = sock.accept() # 一旦得到conn,就须要将此conn加入可读列表 read_list.append(conn) else: # 说明可读的对象是普通的conn对象,执行recv时要处理连接失效问题 try: request = sock.recv(1024) except (ConnectionResetError, ConnectionAbortedError): # 此连接失效 sock.close() read_list.remove(sock) else: # 还须要继续判断request的内容 if not request: # 说明此conn连接失效 sock.close() # 再也不监控此conn read_list.remove(sock) continue # 处理请求 response = request.upper() # 加入发送列表 write_list.append(sock) # 保存发送的数据 data_dic[sock] = response # 操做可写列表 for sock in wl: # 执行发送操做,send也会出错 try: sock.send(data_dic[sock]) # 发送完毕后,须要移除发送列表 write_list.remove(sock) # 须要移除发送数据 data_dic.pop(sock) except (ConnectionResetError, ConnectionAbortedError): # 此连接失效 sock.close() read_list.remove(sock) write_list.remove(sock)
服务器使用单线程实现了并发。
使用了Select
模块以后,应用程序再也不须要编写主动轮询的代码,而是将此部分工做交由Select
模块的select
函数代为处理。
应用程序只须要遍历select
函数返回的可操做socket
列表,并处理相关业务逻辑便可。
虽然应用程序将轮询工做甩给了select
,本身不用编写代码。不过select
函数的底层接口效率不高,使用epoll
接口能够提高效率,此接口被封装在Selectors
模块中。
此外,select
函数是一个阻塞IO,在并发数不多的时候,线程大部分时间会阻塞在select
函数上。因此select
函数应该适用于随时随刻都有socket
准备好、大规模并发的场景。
编码困难,模型难理解。
def select(rlist, wlist, xlist, timeout=None): # real signature unknown; restored from __doc__ """ select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist) Wait until one or more file descriptors are ready for some kind of I/O. The first three arguments are sequences of file descriptors to be waited for: rlist -- wait until ready for reading wlist -- wait until ready for writing xlist -- wait for an ``exceptional condition'' If only one kind of condition is required, pass [] for the other lists. A file descriptor is either a socket or file object, or a small integer gotten from a fileno() method call on one of those. The optional 4th argument specifies a timeout in seconds; it may be a floating point number to specify fractions of seconds. If it is absent or None, the call will never time out. The return value is a tuple of three lists corresponding to the first three arguments; each contains the subset of the corresponding file descriptors that are ready. *** IMPORTANT NOTICE *** On Windows, only sockets are supported; on Unix, all file descriptors can be used. """ pass
rlist/wlist/xlist
分为是:须要监控的读列表/写列表/例外列表(第3参数暂不理解)windows
下,列表中只能放socket对
象,unix
下,能够听任何文件描述符None
(默认),则会永久阻塞,不然按照给定的值(单位是秒)发生超时,可使用小数如0.5秒轮询操做,效率不高。
轮询的工做视角是:发起者按期/不按期主动发起询问,若是数据没有准备好,就继续发起询问。若是数据准备好了,发起者就处理这些数据。
假设,调用者在第35次主动轮询的时候发现数据准备好了,那么意味着前34次主动轮询的操做是没有任何收益的。
调用者要想知道数据是否就绪,就要主动询问,而主动询问的效率又比较低。
这个矛盾的核心关键在于:如何得知数据准备就绪这件事呢?
使用回调函数+事件循环。
此种方案中,调用者不会主动发起轮询,而是被动的等待IO操做完成,并由OS
向调用者发起准备就绪的事件通知。
# 服务器端代码 import socket from selectors import DefaultSelector, EVENT_READ def recv_read(conn, mask): # recv回调函数 try: request = conn.recv(1024) if not request: # 意味着连接失效,再也不监控此socket conn.close() selector.unregister(conn) # 结束此回调的执行 return None # 连接正常,处理数据 conn.send(request.upper()) except (ConnectionResetError, ConnectionAbortedError): # 连接失效 conn.close() selector.unregister(conn) def accept_read(server, mask): # accept回调函数 conn, client = server.accept() print(f'一个客户端上线{client}') # 监听conn对象的可读事件的发生,并注册回调函数 selector.register(conn, EVENT_READ, recv_read) if __name__ == '__main__': addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.setblocking(False) server.listen(5) print('监听中...') # 获取对象 selector = DefaultSelector() # 第一个注册,监听server对象的可读事件的发生,并注册回调函数 selector.register(server, EVENT_READ, accept_read) # 执行事件循环 while True: # 循环调用select,select是阻塞调用,返回就绪事件 events = selector.select() for key, mask in events: # 获取此事件预先注册的回调函数 callback = key.data # 对此事件中准备就绪的socket对象执行回调 callback(key.fileobj, mask)
服务器使用单线程实现了并发。
OS
使用了Selectors
自行选择最优的底层接口监听socket
对象。
程序再也不须要主动发起查询,而是注册回调函数。
增长事件循环,用于处理准备就绪的socket
对象,调用预先注册的回调函数。
应用程序不用再关注如何判断非阻塞IO的返回值,而将精力聚焦于回调函数的编写。
pass
OS
提供的两种IO接口,区别在于调用时是否当即返回。以下是我根据网上的各类解释,结合本身的思考给出的一个关于同步/异步简单的例子:
同步
第一天,晚饭时间到了,你饿了,你走到你老婆面前说:老婆,我饿了,快点作饭!你老婆回答:好的,我去作饭。
你跟着老婆走到厨房,你老婆花了30分钟的时间给你作饭。这期间,你就站在身边,啥也不干,就这样注视着她,你老婆问你:你站这干吗?你说:我要等你作完饭再走。30分钟后,你吃到了晚饭。
异步+轮询
次日,晚饭时间到了,你饿了,你大喊:老婆,我饿了,快点作饭!你老婆回答:好的,我去作饭。
你老婆花了30分钟的时间给你作饭,可是你再也不跟着你老婆走到厨房。这期间,你在客厅看电视,不过你实在饿得不行了,因而你每过5分钟,就跑到厨房询问:老婆,饭作好了没?你老婆回答:还要一会。30分钟后,你吃到了晚饭。
异步+事件通知 第三天,晚饭时间到了,你饿了,你大喊:老婆,我饿了,快点作饭!你老婆回答:好的,我去作饭。 你老婆花了30分钟的时间给你作饭,你也再也不跟着你老婆走到厨房。这期间,你在客厅看电视,你知道你老婆在作饭,你也不会去催她,专心看电视。30分钟后,你老婆喊你:饭作好了。最后你吃到了晚饭。