目录python
同步和异步关注的是程序在执行时的状态:linux
同步
,能够理解为在执行完一个函数或方法以后,一直等待系统返回值或消息,这时程序是出于阻塞的,只有接收到返回的值或消息后才往下执行其余的命令。异步
,执行完函数或方法后,没必要阻塞性地等待返回值或消息,只须要向系统委托一个异步过程,那么当系统接收到返回值或消息时,系统会自动触发委托的异步过程,从而完成一个完整的流程。
windows
异步如收发收短信,对比打电话,打电话我必定要在电话的旁边听着,保证双方都在线,而收发短信,对方不用保证此刻我必定在手机旁,同时,我也不用时刻留意手机有没有来短信。这样的话,我看着视频,而后来了短信,我就处理短信(也能够不处理),接着再看视频。网络
对于写程序,同步每每会阻塞,没有数据过来,我就等着,异步则不会阻塞,没数据来我干别的事,有数据来去处理这些数据。一句话总结一下就是:函数或方法被调用时,调用者是否获得最终结果的
。数据结构
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.架构
一句话总结一下就是:函数或方法被调用时,是否马上返回
cors
linux系统中,全部的设备读写均可以看作文件的读写来操做,对文件的读写通常要通过内核态和用户态的切换,正由于有切换才致使了IO有同步和异步的说法。异步
在i386以前,CPU工做在实模式下,以后开始支持保护模式,一般用保护环(ring)来描述特权级,分为四个运行级别:Ring0 ~ Ring3.socket
实模下,软件能够直接访问BIOS例程以及周边硬件,没有任何硬件等级的存储器保护观念或多任务。函数
CPU 在某个时刻运行在特定的特权级,等级约束了CPU了能够作什么,不能够作什么。x86(如今最流行的PC/Server CPU架构) CPU 只用了两个特权级:0 和 3:
Ring 0
: 能够执行特权指令,能够访问说有级别数据,能够访问IO设备等(级别最高)Ring 3
:只能访问本级别数据(级别最低)针对于Linux来讲,
内核态
,运行内核代码用户态
,运行用户代码当用户的应用程序想访问某些硬件资源时,就须要经过操做系统提供的 系统调用 ,系统调用可使用特权指令运行在内核空间,此时进程陷入内核态运行。系统调用完成,进程将回到用户态继续执行用户空间代码。
现代操做系统采用虚拟存储器,对于32位操做系统来讲,进程对虚拟内存地址的内存寻址空间为4G(2^32)。操做系统中,内核程序独立且运行在较高的特权级别上,它们驻留在被保护的内存空间上,拥有访问硬件设备的权限,这部份内存称为内核空间(内核态,最高1G)。
一般来说IO能够分红两种类型:
IO过程能够分为两个阶段:
主要分为同步IO和异步IO,而同步IO又能够分为:同步阻塞IO、同步非阻塞IO、IO多路复用。
进程等待(阻塞),直到读写完成。(全程等待
)
进程调用read操做,若是IO设备没准备好,当即返回ERROR,进程不阻塞。用户能够再次发起系统调用,若是内核已经准备好,就阻塞,而后复制数据到用户空间
所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不须要等了当即开始处理,提升了同时处理IO的能力。主要的IO多路复用有:
通常状况下,select最多能监听1024个fd(能够修改,但不建议),可是因为select采用轮询的方式,当管理的IO多了,每次都要遍历所有fd,效率低下。epoll没有管理的fd的上限,且是回调机制,不须要遍历,效果很高。
进程发起异步IO请求,当即返回。内核完成IO的两个阶段,后给进程发信号
Linux的aio的系统调用,内核从2.6版本开始支持。
Python的select库实现了select、poll系统调用,这个基本上操做系统都支持,部分实现了epoll,它是底层的IO多路复用模块。
epoll使用事件通知机制,使用回调机制提升效率
select/pool 还有从内核空间复制消息到用户空间,而epoll经过内核空间和用户空间共享一块内存来减小复制。
Python 3.4 提供 selectors库,高级IO复用库。它的类层次结构以下
BaseSelector +-- SelectSelector +-- PollSelector +-- EpollSelector +-- DevpollSelector +-- KqueueSelector
观察模块原码倒数几行
咱们知道,selecors.DefaultSelector会返回当前平台最有效、性能最高的实现,可是因为没有实现 Windows下的IOCP,因此windows下只能退化为select。
selectorsobj.register为当前selectors实例注册一个文件对象,监视它的IO实现,返回一个selectKey对象。它的参数以下:
register(self, fileobj, events, data=None):
经常使用的Event事件
返回的selectKey对象对象具备如下方法:
下面是代码:
import socket import selectors import logging import threading import time FORMAT = '%(asctime)s %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) class EchoServer: def __init__(self, ip, port): self.ip = ip self.port = port self.sock = socket.socket() self.selector = selectors.DefaultSelector() self.evnet = threading.Event() def start(self): self.sock.bind((self.ip, self.port)) self.sock.listen() self.sock.setblocking(False) # 事件被触发,说明有连接进来,那么不须要阻塞等待 self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept) # 注册accept函数 threading.Thread(target=self.select, name='select', daemon=True).start() # 启动监视进程 # 启动selector,用于监视事件的发生 def select(self): while not self.evnet.is_set(): events = self.selector.select() for key, event in events: key.data(key.fileobj) def accept(self, sock: socket.socket): sock, client = sock.accept() # 将client的读写也加入到监事列表中 self.selector.register(sock, selectors.EVENT_READ, self.recv) # 当对应的socket有写操做,会直接直接触发执行,因此这里根本不须要死循环 def recv(self, sock: socket.socket): client_ip = sock.getpeername() data = sock.recv(1024) if data == b'quit' or data == b'': self.evnet.set() sock.close() return msg = '{}:{} {}'.format(*client_ip, data.decode()).encode() sock.send(msg) # 关闭进程时,将selectors中注册的事件取消掉,再关闭监视器 def stop(self): self.evnet.set() event_list = set() for event in self.selector.get_map(): event_list.add(event) for event in event_list: self.selector.unregister(event) self.selector.close() if __name__ == '__main__': es = EchoServer('127.0.0.1', 9999) es.start() while True: cmd = input('>>>:').strip() if cmd == 'quit': es.stop() break
这里是每次client进来时,经过记录client的socket来完成的。
import socket import selectors import logging import threading FORMAT = '%(asctime)s %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) class ChatSocketServer: def __init__(self, ip, port): self.ip = ip self.port = port self.sock = socket.socket() self.selector = selectors.DefaultSelector() self.event = threading.Event() self.clients = {} # 用于记录链接的client def start(self): self.sock.bind((self.ip, self.port)) self.sock.listen() self.sock.setblocking(False) # 事件被触发,说明有连接进来,那么不须要阻塞等待 self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept) # 注册accept函数 threading.Thread(target=self.select, name='select', daemon=True).start() # 启动监视进程 # 启动selector,用于监视事件的发生 def select(self): while not self.event.is_set(): events = self.selector.select() for key, event in events: key.data(key.fileobj) def accept(self, sock: socket.socket): sock, client = sock.accept() # 添加已链接客户端列表 self.clients[client] = sock # 将client的读写也加入到监事列表中 self.selector.register(sock, selectors.EVENT_READ, self.recv) # 当对应的socket有写操做,会直接直接触发执行,因此这里根本不须要死循环 def recv(self, sock: socket.socket): client_ip = sock.getpeername() data = sock.recv(1024) if data == b'quit' or data == b'': self.clients.pop(client_ip) # 退出后弹出client地址 self.event.set() sock.close() return msg = '{}:{} {}'.format(*client_ip, data.decode()) logging.info(msg) for clients in self.clients.values(): clients.send(msg.encode()) # 关闭进程时,将selectors中注册的事件取消掉,再关闭监视器 def stop(self): self.event.set() event_list = set() for event in self.selector.get_map(): event_list.add(event) for event in event_list: self.selector.unregister(event) for client in self.clients.values(): # 关闭因此已链接的client的socket client.close() self.selector.close() if __name__ == '__main__': es = ChatSocketServer('127.0.0.1', 9999) es.start() while True: cmd = input('>>>:').strip() if cmd == 'quit': es.stop() break
固然还能够经过selector来处理,为何呢?由于每当有请求进来,selector都会监视当前链接的recv,那么咱们只须要在selector中的recv拿出来,就知道到底有多少链接了。
import socket import selectors import logging import threading FORMAT = '%(asctime)s %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) class ChatSocketServer: def __init__(self, ip, port): self.ip = ip self.port = port self.sock = socket.socket() self.selector = selectors.DefaultSelector() self.event = threading.Event() def start(self): self.sock.bind((self.ip, self.port)) self.sock.listen() self.sock.setblocking(False) # 事件被触发,说明有连接进来,那么不须要阻塞等待 self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept) # 注册accept函数 threading.Thread(target=self.select, name='select', daemon=True).start() # 启动监视进程 # 启动selector,用于监视事件的发生 def select(self): while not self.event.is_set(): events = self.selector.select() for key, event in events: key.data(key.fileobj) def accept(self, sock: socket.socket): sock, client = sock.accept() # 将client的读写也加入到监事列表中 self.selector.register(sock, selectors.EVENT_READ, self.recv) # 当对应的socket有写操做,会直接直接触发执行,因此这里根本不须要死循环 def recv(self, sock: socket.socket): client_ip = sock.getpeername() data = sock.recv(1024) if data == b'quit' or data == b'': self.selector.unregister(sock) # 客户端退出,则取消监控当前 socket 事件 sock.close() return msg = '{}:{} {}'.format(*client_ip, data.decode()) logging.info(msg) # 群发消息,若是data绑定的是recv(排除accept),那么就经过socket群发消息 for sock in self.selector.get_map().values(): if sock.data == self.recv: sock.fileobj.send(msg.encode()) # 关闭进程时,将selectors中注册的事件取消掉,再关闭监视器 def stop(self): self.event.set() event_list = set() for event in self.selector.get_map(): event_list.add(event) for event in event_list: self.selector.unregister(event) self.selector.close() if __name__ == '__main__': es = ChatSocketServer('127.0.0.1', 9999) es.start() while True: cmd = input('>>>:').strip() if cmd == 'quit': es.stop() break