本篇文章主要是讲解asyncio模块的实现原理. 这个系列还有另外两篇文章:html
asyncio是python3.4开始内置的一个标准库, 能够用于编写异步的并发代码, 所以很是适合用在IO密集型操做.python
如今运行以下代码:服务器
import asyncio import time async def task(i): print('task{} start at {}'.format(i, time.ctime())) # asyncio.sleep的效果与time.sleep相似, 让程序睡眠n秒 await asyncio.sleep(3) print('task{} end at {}'.format(i, time.ctime())) tasks = asyncio.wait([task(i) for i in range(3)]) asyncio.run(tasks)
运行结果以下:并发
三个任务实际是处于同一线程的, 但它们的执行顺序不是start->end->start->end这种串行模式, 而是几乎同时开始, 同时结束, asyncio模块的做用就是, 使用异步的方式实现单线程并发的效果. 最简单的使用步骤以下:app
要理解asyncio的原理, 须要理解以下几个概念: 协程, 事件循环, future/task. 其中协程就是用户本身定义的任务, 事件循环负责监听事件和回调, future/task则主要负责管理回调, 以及驱动协程.框架
事件循环负责同时对多个事件进行监听, 当监听到事件时, 就调用对应的回调函数, 进而驱动不一样的任务. 上一节代码最后的asyncio.run, 其本质就是建立一个事件循环, 而后一直运行事件循环, 直到全部任务结束为止. 异步
首先看看上篇文章最后的爬虫代码:socket
import select import socket import time req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8') address = ('cn.bing.com', 80) db = [] class GenCrawler: ''' 这里使用一个类将生成器封装起来,若是要驱动生成器,就调用next_step方法 另外,这个类还能够获取到使用的socket对象 ''' def __init__(self): self.sock = socket.socket() self.sock.setblocking(0) self._gen = self._crawler() def next_step(self): next(self._gen) def _crawler(self): self.sock.connect_ex(address) yield self.sock.send(req) response = b'' while 1: yield chunk = self.sock.recv(1024) if chunk == b'': self.sock.close() break else: response += chunk db.append(response) def event_loop(crawlers): # 首先,创建sock与crawler对象的映射关系,便于由socket对象找到对应的crawler对象 # 创建映射的同时顺便调用crawler的next_step方法,让内部的生成器运行起来 sock_to_crawler = {} for crawler in crawlers: sock_to_crawler[crawler.sock] = crawler crawler.next_step() # select.select须要传入三个列表,分别对应要监听的可读,可写和错误事件的socket对象集合 readable = [] writeable = [crawler.sock for crawler in crawlers] errors = [] while 1: rs, ws, es = select.select(readable, writeable, errors) for sock in ws: # 当socket对象链接到服务器时,会建立可读缓冲区和可写缓冲区 # 因为可写缓冲区建立时为空,所以链接成功时,就触发可写事件 # 这时再转为监听可读事件,接收到数据时,就能够触发可读事件了 writeable.remove(sock) readable.append(sock) sock_to_crawler[sock].next_step() for sock in rs: try: sock_to_crawler[sock].next_step() except StopIteration: # 若是生成器结束了,就说明对应的爬虫任务已经结束,不须要监听事件了 readable.remove(sock) # 全部的事件都结束后,就退出循环 if not readable and not writeable: break if __name__ == '__main__': start = time.time() n = 10 print('开始爬取...') event_loop([GenCrawler() for _ in range(n)]) print('获取到{}条数据,用时{:.2f}秒'.format(len(db), time.time()-start))
这段代码使用IO多路复用对多个socket进行监听, 监听到事件时, 驱动对应的生成器运行, 运行到IO操做时, 再使用yield切换回事件循环, 从而实现并发的效果, 这个也就是asyncio中事件循环的工做原理.async
因为asyncio中的事件循环使用的是selectors模块而非select, 如今在程序的代码中改用selectors模块:ide
import socket import time from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8') address = ('cn.bing.com', 80) db = [] class EventLoop: def __init__(self): self.selector = DefaultSelector() self._stopped = False def register(self, fd, event, callback): self.selector.register(fd, event, callback) def unregister(self, fd): self.selector.unregister(fd) def run_until_complete(self,gens): for gen in gens: next(gen) while not self._stopped: try: events = self.selector.select() except OSError: # 若是当前没有注册事件, 就会引起OSError异常 continue for key, mask in events: # 这里的callback就是注册事件时传入的回调函数 callback = key.data callback(key=key, mask=mask) # 生成器的gi_frame属性对应的是其框架(其实这属性我还没搞懂) # 在生成器结束(抛出stopiteration异常)后,这个属性值就会变成None # 所以,每次循环时都删减已经结束的生成器 # 若是全部的生成器都结束了,就中止循环 gens = [gen for gen in gens if gen.gi_frame is not None] if not gens: self.stop() def stop(self): self._stopped = True self.selector.close() loop = EventLoop() class GenCrawler: def __init__(self): self.sock = socket.socket() self.sock.setblocking(0) self._fd = self.sock.fileno() self.gen = self._crawler() def _crawler(self): self.sock.connect_ex(address) loop.register(self._fd, EVENT_WRITE, self.next_step) yield loop.unregister(self._fd) self.sock.send(req) response = b'' while 1: loop.register(self._fd, EVENT_READ, self.next_step) yield loop.unregister(self._fd) chunk = self.sock.recv(1024) if chunk == b'': self.sock.close() break else: response += chunk db.append(response)def next_step(self,**kwargs): try: next(self.gen) except StopIteration: return if __name__ == '__main__': start = time.time() print('开始爬取...') n = 10 gens = [GenCrawler().gen for _ in range(n)] loop.run_until_complete(gens) print('获取到{}条数据,用时{:.2f}秒'.format(len(db), time.time()-start))
这里主要是改了EventLoop部分的代码, 使用register和unregister方法来注册和注销事件, 优势是更加灵活, 能够指定触发事件时调用的回调函数. 另外, DefaultSelector会自动选择系统中效率最高的多路复用机制, 好比kqueue和epoll.
在定义函数的时候, 在def以前加上async, 这个函数就不是普通函数了, 而是一个协程函数:
async def coro(): print('this is a coroutine')
直接调用协程函数并不能使之运行, 而是返回了一个协程对象, 若是要运行该协程, 能够调用这个协程对象的send方法:
c=coro()
c.send(None)
运行结果以下, 首先会运行协程函数内部的代码, 而后函数的代码运行结束, 抛出一个StopIteration异常:
所以, 协程函数与生成器函数是很是类似的. 可是, 协程不是可迭代对象, 所以没法使用next函数, 只能调用其自身的send方法来驱动.
从python3.6开始, 协程函数中可使用yield语句, 此时调用这个函数, 就会返回一个async_generator对象, 即异步生成器.
不过这东西我还没用过, 先挖个坑, 须要的能够看PEP525.
在第一节中讲到, 协程中可使用await语句, 后接awaitable对象, 便可等待对象. 如下几类都是可等待对象:
如今定义一个可等待对象并测试:
class AwaitableObj: def __await__(self): v = yield '来自可等待对象的yield' print('可等待对象得到的值:', v) return '来自可等待对象的return' async def coro(): v = await AwaitableObj() print('协程得到的值:', v) if __name__ == '__main__': c = coro() v = c.send(None) print('外部得到的值:', v) try: c.send('来自外部') except StopIteration: pass
这段程序有三个部分: 可等待对象, 协程和外部. 协程中使用await语句来等待可等待对象, 而外部调用send方法来驱动协程.
程序的运行结果以下:
await至关于外部与可等待对象之间的桥梁, 可等待对象中__await__方法返回的生成器, 其yield返回的值会传到外部, 而外部使用send方法传的值也会传给可等待对象的生成器. 最后__await__生成器迭代结束后, 协程得到其返回值.
这里须要说明一点: await语句自己并不能暂停和切换协程, 它只是阻塞协程直到后面接的可等待对象的__await__方法返回的可迭代对象运行完. 若是__await__里面有yield, 返回一个生成器, 协程才会由于这个yield语句暂停和切换.
future是asyncio模块中的一个可等待对象, 调用asyncio.get_event_loop获取到当前线程的事件循环loop, 而后调用loop.create_future, 就能够获得一个future对象. future的主要代码以下(有改动):
class Future: def __init__(self): self._callbacks = [] self.result = None def add_callback(self, callback): self._callbacks.append(callback) def set_result(self, result): self.result = result for callback in self._callbacks: callback(self) def __await__(self): yield self return self.result
future能够理解为协程的一次暂停. 首先, 若是一个协程须要在某处暂停, 就能够实例化一个future对象而且await这个对象, 这样就会运行future对象的__await__方法, 当运行到yield self这句话时, 协程暂停, 直到外部再使用send方法驱动协程为止. 而后, future的另外一特性是能够设置回调函数, 调用它的add_callback方法就行. 最后, future还有set_result这个接口, 一方面会运行future的回调函数, 另外一方面能够设置其result属性的值, 该值在__awaiit__方法结束以后返回给协程. 通常的用法是, 协程在事件循环中注册事件, 而后让事件循环来调用future对象的set_result方法.
有了暂停, 天然也须要有驱动, task对象负责对协程进行封装和驱动. 调用asyncio.create_task并传入协程对象, 就能够获得一个task对象. task的主要代码以下(有改动):
class Task(Future): def __init__(self, coro): super().__init__() self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: next_future = self.coro.send(None) except StopIteration: self.set_result(future.result) return next_future.add_callback(self.step)
task和future应该是搭配使用的. 首先, task.step是负责对协程进行驱动的, 因为future.__await__方法会yield self, 所以每次驱动都会得到目前暂停点对应的future对象. 这时候将本身的step方法添加到future对象的回调中, 等到future对象调用set_result方法时, 就会回调到task.step方法, 从而驱动协程继续运行. 所以能够认为, future对象就是协程的一次暂停, 而调用其set_result方法就意味着此次暂停结束了, 可是这个过程须要task的协助.
task类是继承future类的, 这其实比较好理解, 好比一个简单的爬虫任务, 在链接服务器和接受数据等IO操做时须要使用future暂停, 并能够设置回调, 表示暂停结束的时候应该作什么. 而这个爬虫任务至关于一个大的IO操做, 所以也应该有能够设置回调以及能够await的特性. 当一个协程驱动结束, 即抛出StopIteration异常的时候, 就意味着这个task结束了, 所以此时就调用task.set_result方法, 把最后一个future对象的结果设置为task.result.
把上面讲的async/await, future/task等内容添加到以前的爬虫实例中, 最终代码以下:
import socket import time from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8') address = ('cn.bing.com', 80) db = [] class EventLoop: def __init__(self): self.selector = DefaultSelector() self._stopped = False def register(self, fd, event, callback): self.selector.register(fd, event, callback) def unregister(self, fd): self.selector.unregister(fd) def run_until_complete(self, coros): def _done_callback(fut): nonlocal ncoros ncoros -= 1 if ncoros == 0: self.stop() ncoros = len(coros) for coro in coros: task = Task(coro) task.add_callback(_done_callback) while not self._stopped: try: events = self.selector.select() except OSError: # 若是当前没有注册事件, 就会引起OSError异常 continue for key, mask in events: # 这里的callback就是注册事件时传入的回调函数 callback = key.data callback(key=key, mask=mask) def stop(self): self._stopped = True self.selector.close() loop = EventLoop() class Future: def __init__(self): self._callbacks = [] self.result = None def add_callback(self, callback): self._callbacks.append(callback) def set_result(self, result): self.result = result for callback in self._callbacks: callback(self) def __await__(self): yield self return self.result class Task(Future): def __init__(self, coro): super().__init__() self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: next_future = self.coro.send(None) except StopIteration: self.set_result(future.result) return next_future.add_callback(self.step) class CoroCrawler: def __init__(self): self.sock = socket.socket() self.sock.setblocking(0) self._fd = self.sock.fileno() self.coro = self._crawler() async def _crawler(self): await self.connect() self.sock.send(req) response = await self.read_all() db.append(response) async def connect(self): self.sock.connect_ex(address) f = Future() def on_connect(key, mask): f.set_result(None) loop.register(self.sock.fileno(), EVENT_WRITE, on_connect) await f loop.unregister(self.sock.fileno()) async def read_all(self): response = b'' while 1: chunk = await self.read() if chunk == b'': self.sock.close() break response += chunk return response async def read(self): f = Future() def on_readable(key, mask): chunk = self.sock.recv(1024) f.set_result(chunk) loop.register(self._fd, EVENT_READ, on_readable) chunk = await f loop.unregister(self._fd) return chunk if __name__ == '__main__': start = time.time() print('开始爬取...') n = 10 coros = [CoroCrawler().coro for _ in range(n)] loop.run_until_complete(coros) print('获取到{}条数据,用时{:.2f}秒'.format(len(db), time.time()-start))
这段代码并不算复杂, 惟一须要留意的就是事件循环中的run_until_complete方法, 这个方法再也不是主动去检查任务是否结束, 而是将协程包装成task对象, 而后给task对象添加了回调函数, 来在协程所有结束时, 中止事件循环. 这也就是用task包装协程的一个方便的地方: 能够在协程结束的时候运行指定的回调.
整个代码的实现流程以下, 这也就是用asyncio运行一个协程的流程.