协程实际上是比起通常的子例程而言更宽泛的存在,子例程是协程的一种特例。javascript
子例程的起始处是唯一的入口点,一旦退出即完成了子例程的执行,子例程的一个实例只会返回一次。css
协程能够经过yield
来调用其它协程。经过yield
方式转移执行权的协程之间不是调用者与被调用者的关系,而是彼此对称、平等的。html
协程的起始处是第一个入口点,在协程里,返回点以后是接下来的入口点。子例程的生命期遵循后进先出(最后一个被调用的子例程最早返回);相反,协程的生命期彻底由他们的使用的须要决定。java
还记得咱们何时会用到yield
吗,就是在生成器(generator)里,在迭代的时候每次执行next(generator)生成器都会执行到下一次yield
的位置并返回,能够说生成器就是例程。python
一个生成器的例子:git
#定义生成器函数 def fib(): a, b = 0, 1 while(True): yield a a, b = b, a + b #得到生成器 fib = fib() next(fib) # >> 0 next(fib) # >> 1
在考察生成器前,咱们须要先了解通常的python函数是如何运行的。当一个函数调用子函数时,控制就会交给子函数,直到子函数返回或者抛出异常时才会将控制交还给调用函数。程序员
自定义两个函数:github
>>> def foo(): ... bar() ... >>> def bar(): ... pass
标准Python
解释器CPython
中的PyEval_EvalFrameEx
方法会取得栈帧和待运行的字节码,并在获得的栈帧的上下文环境下计算字节码的结果。如下是foo
函数的字节码:web
从字节码能够看出foo
函数加载bar
到栈上以后经过CALL_FUNCTION
调用,bar
返回后弹出bar
的返回值,加载None
到栈上并将其做为foo
的返回值返回。sql
当PyEval_EvalFrameEx
遇到CALL_FUNCTION
字节码时,它建立一个新的Python
栈帧。
须要了解的一点是Python
的栈帧是存在于堆上的。CPython
做为一个普通的C程序,它的栈帧就在栈上,可是CPython
所控制的Python
的栈帧倒是在堆上的,因此Python
的栈帧在函数调用结束后是仍可以保持存在。咱们设置一个全局变量frame
,将bar
的栈帧赋给frame
。
>>> import inspect >>> frame = None >>> def foo(): ... bar() ... >>> def bar(): ... global frame ... frame = inspect.currentframe() ... >>> foo() >>> #获得'bar'的栈帧 >>> frame.f_code.co_name 'bar' >>> # 它的返回指针指向foo的栈 >>> caller_frame = frame.f_back >>> caller_frame.f_code.co_name 'foo'
如今让咱们考察一下生成器的结构,先定义一个生成器函数:
>>> def gen_fn(): ... result = yield 1 ... print('result of yield: {}'.format(result)) ... result2 = yield 2 ... print('result of 2nd yield: {}'.format(result2)) ... return 'done'
当 Python
将 gen_fn
编译为字节码时,它会检查有没有yield
,有的话那就是生成器函数了,编译器会在该函数的flag
上打上标识:
>>> # 生成器的标识位是第5位. >>> generator_bit = 1 << 5 >>> bool(gen_fn.__code__.co_flags & generator_bit) True
调用生成器函数时会生成一个生成器对象:
>>> gen = gen_fn() >>> type(gen) <class 'generator'>
生成器对象会封装一个栈帧和一个对代码的引用:
>>> gen.gi_code.co_name 'gen_fn'
全部来自同一个生成器函数的生成器对象都会引用同一份代码,可是却会拥有各自不一样的栈帧,生成器对象结构图以下:
帧拥有f_lasti
指针,它指向以前最新一次运行的指令。它初始化的时候为-1,说明生成器尚未开始运行。
>>> gen.gi_frame.f_lasti -1
当咱们对生成器执行send
方法时,生成器会运行到第一次yield
的位置并中止。在这里它返回1,正如咱们编写的代码预期的那样。
>>> gen.send(None) 1
如今f_lasti
指向3了,比最开始前进了4个字节码,以及能够发现这个函数一共生成了56个字节码:
>>> gen.gi_frame.f_lasti 3 >>> len(gen.gi_code.co_code) 56
生成器可以中止,也可以在任意时刻经过任意函数恢复,这也是由于栈帧是在堆上而不是栈上,因此不用遵照函数调用的先进后出原则。
咱们能够send
"hello"字符串给生成器,它会在以前中止的yield
那里获得并赋值给result
,以后生成器继续运行直到下一个yield
位置中止并返回。
>>> gen.send('hello') result of yield: hello 2
查看生成器的局部变量:
>>> gen.gi_frame.f_locals {'result': 'hello'}
当咱们再次调用send
的时候,生成器从它第二次yield的地方继续运行,到最后已经没有yield
了,因此出现了StopIteration
异常:
>>> gen.send('goodbye') result of 2nd yield: goodbye Traceback (most recent call last): File "<input>", line 1, in <module> StopIteration: done
能够看到,该异常的值是生成器最后返回的值,在这里就是字符串"done"
虽然生成器拥有一个协程该有的特性,但光这样是不够的,作异步编程还是困难的,咱们须要先用生成器实现一个协程异步编程的简单模型,它同时也是Python
标准库asyncio
的简化版,正如asyncio
的实现,咱们会用到生成器,Future
类,以及yield from
语句。
首先实现Future
类, Future
类能够认为是专门用来存储将要发送给协程的信息的类。
class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self)
Future
对象最开始处在挂起状态,当调用set_result
时被激活,并运行注册的回调函数,该回调函数多半是对协程发送信息让协程继续运行下去的函数。
咱们改造一下以前从fetch
到connected
的函数,加入Future
与yield
。
这是以前回调实现的fetch
:
class Fetcher: def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): print('connected!') # ...后面省略...
改造后,咱们将链接创建后的部分也放到了fetch
中。
class Fetcher: def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass f = Future() def on_connected(): #链接创建后经过set_result协程继续从yield的地方往下运行 f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield f selector.unregister(sock.fileno()) print('connected!')
fetcher
是一个生成器函数,咱们建立一个Future
实例,yield它来暂停fetch
的运行直到链接创建f.set_result(None)
的时候,生成器才继续运行。那set_result
时运行的回调函数是哪来的呢?这里引入Task
类:
class Task: def __init__(self, coro): #协程 self.coro = coro #建立并初始化一个为None的Future对象 f = Future() f.set_result(None) #步进一次(发送一次信息) #在初始化的时候发送是为了协程到达第一个yield的位置,也是为了注册下一次的步进 self.step(f) def step(self, future): try: #向协程发送消息并获得下一个从协程那yield到的Future对象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) fetcher = Fetcher('/') Task(fetcher.fetch()) loop()
流程大体是这样的,首先Task
初始化,向fetch
生成器发送None
信息(也能够想象成step
调用了fetch
,参数是None),fetch
得以从开头运行到第一个yield
的地方并返回了一个Future
对象给step
的next_future
,而后step
就在这个获得的Future
对象注册了step
。当链接创建时on_connected
就会被调用,再一次向协程发送信息,协程就会继续往下执行了。
yield from
分解协程一旦socket
链接创建成功,咱们发送HTTP GET
请求到服务器并在以后读取服务器响应。如今这些步骤不用再分散在不一样的回调函数里了,咱们能够将其放在同一个生成器函数中:
def fetch(self): # ... 省略链接的代码 sock.send(request.encode('ascii')) while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield f selector.unregister(sock.fileno()) if chunk: self.response += chunk else: # 完成读取 break
可是这样代码也会越积越多,可不能够分解生成器函数的代码呢,从协程中提取出子协程?Python 3
的yield from
能帮助咱们完成这部分工做。:
>>> def gen_fn(): ... result = yield 1 ... print('result of yield: {}'.format(result)) ... result2 = yield 2 ... print('result of 2nd yield: {}'.format(result2)) ... return 'done' ...
使用yield from
在一个生成器中调用另外一个生成器:
>>> # Generator function: >>> def caller_fn(): ... gen = gen_fn() ... rv = yield from gen ... print('return value of yield-from: {}' ... .format(rv)) ... >>> # Make a generator from the >>> # generator function. >>> caller = caller_fn()
给caller
生成器发送消息,消息送到了gen
生成器那里,在gen
还没返回前caller
就停在rv = yield from gen
这条语句上了。
>>> caller.send(None) 1 >>> caller.gi_frame.f_lasti 15 >>> caller.send('hello') result of yield: hello 2 >>> caller.gi_frame.f_lasti # 能够发现指令没有前进 15 >>> caller.send('goodbye') result of 2nd yield: goodbye return value of yield-from: done Traceback (most recent call last): File "<input>", line 1, in <module> StopIteration
对于咱们来讲,于外,咱们没法判断发送消息时yield
的值是来自caller
仍是caller
内的子协程(好比gen
),于内,咱们也不用关心gen
所获得的消息是从哪里传送来的,gen
只用负责在上一次yield
时获得消息输入,运行到下一个yield时返回输出,重复这个模式到最后return
就能够了。
yield from
获得的子协程最后return
的返回值:
rv = yield from gen
想想以前咱们抱怨回调函数抛出异常时看不到上下文,这回咱们看看协程是怎么样的:
>>> def gen_fn(): ... raise Exception('my error') >>> caller = caller_fn() >>> caller.send(None) Traceback (most recent call last): File "<input>", line 1, in <module> File "<input>", line 3, in caller_fn File "<input>", line 2, in gen_fn Exception: my error
清楚多了,栈跟踪显示在gen_fn
抛出异常时消息是从caller_fn
委派到gen_fn
的。
协程处理异常的手段跟普通函数也是同样的:
>>> def gen_fn(): ... yield 1 ... raise Exception('uh oh') ... >>> def caller_fn(): ... try: ... yield from gen_fn() ... except Exception as exc: ... print('caught {}'.format(exc)) ... >>> caller = caller_fn() >>> caller.send(None) 1 >>> caller.send('hello') caught uh oh
如今让咱们从fetch
协程上分解出一些子协程。(注意分离出的子协程并非Fetcher
的成员协程)
实现read
协程接收一个数据块:
def read(sock): f = Future() def on_readable(): #在socket可读时读取消息并向协程发送一个数据快 f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) #yield f中止协程,等到可读时,从f那获得数据块。 chunk = yield f selector.unregister(sock.fileno()) return chunk
实现read_all
协程接收整个消息:
def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response)
若是将yield from
去掉,看上去就跟以前实现阻塞式I/O读取差很少呢。
如今在fetch
中调用read_all
:
class Fetcher: def fetch(self): # ... 省略链接的代码: sock.send(request.encode('ascii')) self.response = yield from read_all(sock)
嗯,如今看上去代码短多了,可是能够作的更好。 从以前的代码能够看到当咱们等一个Future
返回时使用的是yield
,而等一个子协程返回倒是使用yield from
,咱们可让二者统一块儿来。得益于生成器与迭代器在Python中的一致性,咱们实现Future
方法同时让它成为一个生成器:
def __iter__(self): yield self return self.result
这样yield f
与yield from f
的效果就一样是输入Future
返回Future
的结果了。 统一的好处是什么呢?统一以后不管你是调用一个返回Future
的协程仍是返回值的协程均可以统一用yield from
应对了,当你想改变协程的实现时也不用担忧对调用函数(or协程)产生影响。
咱们将链接的逻辑也从fetch
中分离出来:
def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno())
fetch
如今长这个样子:
def fetch(self): global stopped sock = socket.socket() yield from connect(sock, ('xkcd.com', 80)) get = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) self._process_response() urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url)
将上一节课中的parse_links
更名 _process_response
并稍做修改:
def _process_response(self): if not self.response: print('error: {}'.format(self.url)) return if not self._is_html(): return urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('xkcd.com', 'www.xkcd.com'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) if defragmented not in urls_seen: urls_todo.add(defragmented) urls_seen.add(defragmented) Task(Fetcher(defragmented).fetch())
主循环部分:
start = time.time() fetcher = Fetcher('/') Task(fetcher.fetch()) while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds'.format( len(urls_seen), time.time() - start))
这里先奉上完整代码:
from selectors import * import socket import re import urllib.parse import time class Future: def __init__(self): self.result = None self._callbacks = [] def result(self): return self.result def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self) def __iter__(self): yield self return self.result class Task: def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) urls_seen = set(['/']) urls_todo = set(['/']) #追加了一个能够看最高并发数的变量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) def read(sock): f = Future() def on_readable(): f.set_result(sock.recv(4096)) # Read 4k at a time. selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield from f selector.unregister(sock.fileno()) return chunk def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) class Fetcher: def __init__(self, url): self.response = b'' self.url = url def fetch(self): global concurrency_achieved, stopped concurrency_achieved = max(concurrency_achieved, len(urls_todo)) sock = socket.socket() yield from connect(sock, ('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) self._process_response() urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _process_response(self): if not self.response: print('error: {}'.format(self.url)) return if not self._is_html(): return urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) if defragmented not in urls_seen: urls_todo.add(defragmented) urls_seen.add(defragmented) Task(Fetcher(defragmented).fetch()) def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') Task(fetcher.fetch()) while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(urls_seen), time.time() - start, concurrency_achieved))
运行python3 coroutine.py
命令查看效果:
至此,咱们在学习的过程当中掌握了:
三种爬虫的实现方式中线程池是最坏的选择,由于它既占用内存,又有线程竞争的危险须要程序员本身编程解决,并且产生的I/O阻塞也浪费了CPU占用时间。再来看看回调方式,它是一种异步方法,因此I/O阻塞的问题解决了,并且它是单线程的不会产生竞争,问题好像都解决了。然而它引入了新的问题,它的问题在于以这种方式编写的代码很差维护,也不容易debug。看来协程才是最好的选择,咱们实现的协程异步编程模型使得一个单线程可以很容易地改写为协程。那是否是每一次作异步编程都要实现Task
、Future
呢?不是的,你能够直接使用asyncio
官方标准协程库,它已经帮你把Task
、Future
封装好了,你根本不会感觉到它们的存在,是否是很棒呢?若是你使用Python 3.5
那更好,已经能够用原生的协程了,Python 3.5
追加了async def
,await
等协程相关的关键词。这些会在从此的课程中再一一展开,下一个发布的课程就是asyncio库实现网络爬虫啦。