事件驱动-协程实现爬虫

实验三:事件驱动-协程实现爬虫

什么是协程?

协程实际上是比起通常的子例程而言更宽泛的存在,子例程是协程的一种特例。javascript

子例程的起始处是唯一的入口点,一旦退出即完成了子例程的执行,子例程的一个实例只会返回一次。css

协程能够经过yield来调用其它协程。经过yield方式转移执行权的协程之间不是调用者与被调用者的关系,而是彼此对称、平等的。html

协程的起始处是第一个入口点,在协程里,返回点以后是接下来的入口点。子例程的生命期遵循后进先出(最后一个被调用的子例程最早返回);相反,协程的生命期彻底由他们的使用的须要决定。java

还记得咱们何时会用到yield吗,就是在生成器(generator)里,在迭代的时候每次执行next(generator)生成器都会执行到下一次yield的位置并返回,能够说生成器就是例程。python

一个生成器的例子:git

#定义生成器函数 def fib(): a, b = 0, 1 while(True): yield a a, b = b, a + b #得到生成器 fib = fib() next(fib) # >> 0 next(fib) # >> 1 

生成器是如何工做的?

在考察生成器前,咱们须要先了解通常的python函数是如何运行的。当一个函数调用子函数时,控制就会交给子函数,直到子函数返回或者抛出异常时才会将控制交还给调用函数。程序员

自定义两个函数:github

>>> def foo(): ... bar() ... >>> def bar(): ... pass 

标准Python解释器CPython中的PyEval_EvalFrameEx方法会取得栈帧和待运行的字节码,并在获得的栈帧的上下文环境下计算字节码的结果。如下是foo函数的字节码:web

从字节码能够看出foo函数加载bar到栈上以后经过CALL_FUNCTION调用,bar返回后弹出bar的返回值,加载None到栈上并将其做为foo的返回值返回。sql

PyEval_EvalFrameEx遇到CALL_FUNCTION字节码时,它建立一个新的Python栈帧。

须要了解的一点是Python的栈帧是存在于堆上的。CPython做为一个普通的C程序,它的栈帧就在栈上,可是CPython所控制的Python的栈帧倒是在堆上的,因此Python的栈帧在函数调用结束后是仍可以保持存在。咱们设置一个全局变量frame,将bar的栈帧赋给frame

>>> import inspect >>> frame = None >>> def foo(): ... bar() ... >>> def bar(): ... global frame ... frame = inspect.currentframe() ... >>> foo() >>> #获得'bar'的栈帧 >>> frame.f_code.co_name 'bar' >>> # 它的返回指针指向foo的栈 >>> caller_frame = frame.f_back >>> caller_frame.f_code.co_name 'foo' 

此处输入图片的描述

如今让咱们考察一下生成器的结构,先定义一个生成器函数:

>>> def gen_fn(): ... result = yield 1 ... print('result of yield: {}'.format(result)) ... result2 = yield 2 ... print('result of 2nd yield: {}'.format(result2)) ... return 'done' 

当 Python 将 gen_fn 编译为字节码时,它会检查有没有yield,有的话那就是生成器函数了,编译器会在该函数的flag上打上标识:

>>> # 生成器的标识位是第5位. >>> generator_bit = 1 << 5 >>> bool(gen_fn.__code__.co_flags & generator_bit) True 

调用生成器函数时会生成一个生成器对象:

>>> gen = gen_fn() >>> type(gen) <class 'generator'> 

生成器对象会封装一个栈帧和一个对代码的引用:

>>> gen.gi_code.co_name 'gen_fn' 

全部来自同一个生成器函数的生成器对象都会引用同一份代码,可是却会拥有各自不一样的栈帧,生成器对象结构图以下:

此处输入图片的描述

帧拥有f_lasti指针,它指向以前最新一次运行的指令。它初始化的时候为-1,说明生成器尚未开始运行。

>>> gen.gi_frame.f_lasti -1 

当咱们对生成器执行send方法时,生成器会运行到第一次yield的位置并中止。在这里它返回1,正如咱们编写的代码预期的那样。

>>> gen.send(None) 1 

如今f_lasti指向3了,比最开始前进了4个字节码,以及能够发现这个函数一共生成了56个字节码:

>>> gen.gi_frame.f_lasti 3 >>> len(gen.gi_code.co_code) 56 

生成器可以中止,也可以在任意时刻经过任意函数恢复,这也是由于栈帧是在堆上而不是栈上,因此不用遵照函数调用的先进后出原则。

咱们能够send"hello"字符串给生成器,它会在以前中止的yield那里获得并赋值给result,以后生成器继续运行直到下一个yield位置中止并返回。

>>> gen.send('hello') result of yield: hello 2 

查看生成器的局部变量:

>>> gen.gi_frame.f_locals {'result': 'hello'} 

当咱们再次调用send的时候,生成器从它第二次yield的地方继续运行,到最后已经没有yield了,因此出现了StopIteration异常:

>>> gen.send('goodbye') result of 2nd yield: goodbye Traceback (most recent call last): File "<input>", line 1, in <module> StopIteration: done 

能够看到,该异常的值是生成器最后返回的值,在这里就是字符串"done"

生成器实现协程模型

虽然生成器拥有一个协程该有的特性,但光这样是不够的,作异步编程还是困难的,咱们须要先用生成器实现一个协程异步编程的简单模型,它同时也是Python标准库asyncio的简化版,正如asyncio的实现,咱们会用到生成器,Future类,以及yield from语句。

首先实现Future类, Future类能够认为是专门用来存储将要发送给协程的信息的类。

class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self) 

Future对象最开始处在挂起状态,当调用set_result时被激活,并运行注册的回调函数,该回调函数多半是对协程发送信息让协程继续运行下去的函数。

咱们改造一下以前从fetchconnected的函数,加入Futureyield

这是以前回调实现的fetch

class Fetcher: def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): print('connected!') # ...后面省略... 

改造后,咱们将链接创建后的部分也放到了fetch中。

class Fetcher: def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass f = Future() def on_connected(): #链接创建后经过set_result协程继续从yield的地方往下运行 f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield f selector.unregister(sock.fileno()) print('connected!') 

fetcher是一个生成器函数,咱们建立一个Future实例,yield它来暂停fetch的运行直到链接创建f.set_result(None)的时候,生成器才继续运行。那set_result时运行的回调函数是哪来的呢?这里引入Task类:

class Task: def __init__(self, coro): #协程 self.coro = coro #建立并初始化一个为None的Future对象 f = Future() f.set_result(None) #步进一次(发送一次信息) #在初始化的时候发送是为了协程到达第一个yield的位置,也是为了注册下一次的步进 self.step(f) def step(self, future): try: #向协程发送消息并获得下一个从协程那yield到的Future对象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) fetcher = Fetcher('/') Task(fetcher.fetch()) loop() 

流程大体是这样的,首先Task初始化,向fetch生成器发送None信息(也能够想象成step调用了fetch,参数是None),fetch得以从开头运行到第一个yield的地方并返回了一个Future对象给stepnext_future,而后step就在这个获得的Future对象注册了step。当链接创建时on_connected就会被调用,再一次向协程发送信息,协程就会继续往下执行了。

使用yield from分解协程

一旦socket链接创建成功,咱们发送HTTP GET请求到服务器并在以后读取服务器响应。如今这些步骤不用再分散在不一样的回调函数里了,咱们能够将其放在同一个生成器函数中:

def fetch(self): # ... 省略链接的代码 sock.send(request.encode('ascii')) while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield f selector.unregister(sock.fileno()) if chunk: self.response += chunk else: # 完成读取 break 

可是这样代码也会越积越多,可不能够分解生成器函数的代码呢,从协程中提取出子协程?Python 3yield from能帮助咱们完成这部分工做。:

>>> def gen_fn(): ... result = yield 1 ... print('result of yield: {}'.format(result)) ... result2 = yield 2 ... print('result of 2nd yield: {}'.format(result2)) ... return 'done' ... 

使用yield from在一个生成器中调用另外一个生成器:

>>> # Generator function: >>> def caller_fn(): ... gen = gen_fn() ... rv = yield from gen ... print('return value of yield-from: {}' ... .format(rv)) ... >>> # Make a generator from the >>> # generator function. >>> caller = caller_fn() 

caller生成器发送消息,消息送到了gen生成器那里,在gen还没返回前caller就停在rv = yield from gen这条语句上了。

>>> caller.send(None) 1 >>> caller.gi_frame.f_lasti 15 >>> caller.send('hello') result of yield: hello 2 >>> caller.gi_frame.f_lasti # 能够发现指令没有前进 15 >>> caller.send('goodbye') result of 2nd yield: goodbye return value of yield-from: done Traceback (most recent call last): File "<input>", line 1, in <module> StopIteration 

对于咱们来讲,于外,咱们没法判断发送消息时yield的值是来自caller仍是caller内的子协程(好比gen),于内,咱们也不用关心gen所获得的消息是从哪里传送来的,gen只用负责在上一次yield时获得消息输入,运行到下一个yield时返回输出,重复这个模式到最后return就能够了。

yield from获得的子协程最后return的返回值:

rv = yield from gen 

想想以前咱们抱怨回调函数抛出异常时看不到上下文,这回咱们看看协程是怎么样的:

>>> def gen_fn(): ... raise Exception('my error') >>> caller = caller_fn() >>> caller.send(None) Traceback (most recent call last): File "<input>", line 1, in <module> File "<input>", line 3, in caller_fn File "<input>", line 2, in gen_fn Exception: my error 

清楚多了,栈跟踪显示在gen_fn抛出异常时消息是从caller_fn委派到gen_fn的。

协程处理异常的手段跟普通函数也是同样的:

>>> def gen_fn(): ... yield 1 ... raise Exception('uh oh') ... >>> def caller_fn(): ... try: ... yield from gen_fn() ... except Exception as exc: ... print('caught {}'.format(exc)) ... >>> caller = caller_fn() >>> caller.send(None) 1 >>> caller.send('hello') caught uh oh 

如今让咱们从fetch协程上分解出一些子协程。(注意分离出的子协程并非Fetcher的成员协程)

实现read协程接收一个数据块:

def read(sock): f = Future() def on_readable(): #在socket可读时读取消息并向协程发送一个数据快 f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) #yield f中止协程,等到可读时,从f那获得数据块。 chunk = yield f selector.unregister(sock.fileno()) return chunk 

实现read_all协程接收整个消息:

def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) 

若是将yield from去掉,看上去就跟以前实现阻塞式I/O读取差很少呢。

如今在fetch中调用read_all

class Fetcher: def fetch(self): # ... 省略链接的代码: sock.send(request.encode('ascii')) self.response = yield from read_all(sock) 

嗯,如今看上去代码短多了,可是能够作的更好。 从以前的代码能够看到当咱们等一个Future返回时使用的是yield,而等一个子协程返回倒是使用yield from,咱们可让二者统一块儿来。得益于生成器与迭代器在Python中的一致性,咱们实现Future方法同时让它成为一个生成器:

def __iter__(self): yield self return self.result 

这样yield fyield from f的效果就一样是输入Future返回Future的结果了。 统一的好处是什么呢?统一以后不管你是调用一个返回Future的协程仍是返回值的协程均可以统一用yield from应对了,当你想改变协程的实现时也不用担忧对调用函数(or协程)产生影响。

完成后续工做

咱们将链接的逻辑也从fetch中分离出来:

def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) 

fetch如今长这个样子:

def fetch(self): global stopped sock = socket.socket() yield from connect(sock, ('xkcd.com', 80)) get = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) self._process_response() urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) 

将上一节课中的parse_links 更名 _process_response 并稍做修改:

def _process_response(self): if not self.response: print('error: {}'.format(self.url)) return if not self._is_html(): return urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('xkcd.com', 'www.xkcd.com'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) if defragmented not in urls_seen: urls_todo.add(defragmented) urls_seen.add(defragmented) Task(Fetcher(defragmented).fetch()) 

主循环部分:

start = time.time() fetcher = Fetcher('/') Task(fetcher.fetch()) while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds'.format( len(urls_seen), time.time() - start)) 

运行效果

这里先奉上完整代码:

from selectors import * import socket import re import urllib.parse import time class Future: def __init__(self): self.result = None self._callbacks = [] def result(self): return self.result def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self) def __iter__(self): yield self return self.result class Task: def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) urls_seen = set(['/']) urls_todo = set(['/']) #追加了一个能够看最高并发数的变量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) def read(sock): f = Future() def on_readable(): f.set_result(sock.recv(4096)) # Read 4k at a time. selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield from f selector.unregister(sock.fileno()) return chunk def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) class Fetcher: def __init__(self, url): self.response = b'' self.url = url def fetch(self): global concurrency_achieved, stopped concurrency_achieved = max(concurrency_achieved, len(urls_todo)) sock = socket.socket() yield from connect(sock, ('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) self._process_response() urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _process_response(self): if not self.response: print('error: {}'.format(self.url)) return if not self._is_html(): return urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) if defragmented not in urls_seen: urls_todo.add(defragmented) urls_seen.add(defragmented) Task(Fetcher(defragmented).fetch()) def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') Task(fetcher.fetch()) while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(urls_seen), time.time() - start, concurrency_achieved)) 

运行python3 coroutine.py命令查看效果:

此处输入图片的描述

7、总结

至此,咱们在学习的过程当中掌握了:

  1. 线程池实现并发爬虫
  2. 回调方法实现异步爬虫
  3. 协程技术的介绍
  4. 一个基于协程的异步编程模型
  5. 协程实现异步爬虫

三种爬虫的实现方式中线程池是最坏的选择,由于它既占用内存,又有线程竞争的危险须要程序员本身编程解决,并且产生的I/O阻塞也浪费了CPU占用时间。再来看看回调方式,它是一种异步方法,因此I/O阻塞的问题解决了,并且它是单线程的不会产生竞争,问题好像都解决了。然而它引入了新的问题,它的问题在于以这种方式编写的代码很差维护,也不容易debug。看来协程才是最好的选择,咱们实现的协程异步编程模型使得一个单线程可以很容易地改写为协程。那是否是每一次作异步编程都要实现TaskFuture呢?不是的,你能够直接使用asyncio官方标准协程库,它已经帮你把TaskFuture封装好了,你根本不会感觉到它们的存在,是否是很棒呢?若是你使用Python 3.5那更好,已经能够用原生的协程了,Python 3.5追加了async defawait等协程相关的关键词。这些会在从此的课程中再一一展开,下一个发布的课程就是asyncio库实现网络爬虫啦。

8、参考资料

相关文章
相关标签/搜索