一个使用 asyncio 协程的网络爬虫(一)

A. Jesse Jiryu Davis 是纽约 MongoDB 的工程师。他编写了异步 MongoDB Python 驱动程序 Motor,也是 MongoDB C 驱动程序的开发领袖和 PyMongo 团队成员。 他也为 asyncio 和 Tornado 作了贡献,在 http://emptysqua.re 上写做。

一个使用 asyncio 协程的网络爬虫(一)一个使用 asyncio 协程的网络爬虫(一)

介绍html

Guido van Rossum 是主流编程语言 Python 的创造者,Python 社区称他为 BDFL(仁慈的终生大独裁者)——这是一个来自 Monty Python 短剧的称号。他的主页是 http://www.python.org/~guido/ python

经典的计算机科学强调高效的算法,尽量快地完成计算。可是不少网络程序的时间并非消耗在计算上,而是在等待许多慢速的链接或者低频事件的发生。这些程序暴露出一个新的挑战:如何高效的等待大量网络事件。一个现代的解决方案是异步 I/O。linux

这一章咱们将实现一个简单的网络爬虫。这个爬虫只是一个原型式的异步应用,由于它等待许多响应而只作少许的计算。一次爬的网页越多,它就能越快的完成任务。若是它为每一个动态的请求启动一个线程的话,随着并发请求数量的增长,它会在耗尽套接字以前,耗尽内存或者线程相关的资源。使用异步 I/O 能够避免这个的问题。git

咱们将分三个阶段展现这个例子。首先,咱们会实现一个事件循环并用这个事件循环和回调来勾画出一只网络爬虫。它颇有效,可是当把它扩展成更复杂的问题时,就会致使没法管理的混乱代码。而后,因为 Python 的协程不只有效并且可扩展,咱们将用 Python 的生成器函数实现一个简单的协程。在最后一个阶段,咱们将使用 Python 标准库“asyncio”中功能完整的协程, 并经过异步队列完成这个网络爬虫。(在 PyCon 2013 上,Guido 介绍了标准的 asyncio 库,当时称之为“Tulip”。)github

任务web

网络爬虫寻找并下载一个网站上的全部网页,也许还会把它们存档,为它们创建索引。从根 URL 开始,它获取每一个网页,解析出没有遇到过的连接加到队列中。当网页没有未见到过的连接而且队列为空时,它便中止运行。算法

咱们能够经过同时下载大量的网页来加快这一过程。当爬虫发现新的连接,它使用一个新的套接字并行的处理这个新连接,解析响应,添加新连接到队列。当并发很大时,可能会致使性能降低,因此咱们会限制并发的数量,在队列保留那些未处理的连接,直到一些正在执行的任务完成。数据库

传统方式编程

怎么使一个爬虫并发?传统的作法是建立一个线程池,每一个线程使用一个套接字在一段时间内负责一个网页的下载。好比,下载 xkcd.com 网站的一个网页:缓存

def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)
    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

套接字操做默认是阻塞的:当一个线程调用一个相似 connect 和 recv 方法时,它会阻塞,直到操做完成。(即便是 send 也能被阻塞,好比接收端在接受外发消息时缓慢而系统的外发数据缓存已经满了的状况下)所以,为了同一时间内下载多个网页,咱们须要不少线程。一个复杂的应用会经过线程池保持空闲的线程来分摊建立线程的开销。一样的作法也适用于套接字,使用链接池。

到目前为止,使用线程的是成本昂贵的,操做系统对一个进程、一个用户、一台机器能使用线程作了不一样的硬性限制。在 做者 Jesse 的系统中,一个 Python 线程须要 50K 的内存,开启上万个线程就会失败。每一个线程的开销和系统的限制就是这种方式的瓶颈所在。

在 Dan Kegel 那一篇颇有影响力的文章“The C10K problem”中,它提出了多线程方式在 I/O 并发上的局限性。他在开始写道,

网络服务器到了要同时处理成千上万的客户的时代了,你不这样认为么?毕竟,如今网络规模很大了。

Kegel 在 1999 年创造出“C10K”这个术语。一万个链接在今天看来仍是可接受的,可是问题依然存在,只不过大小不一样。回到那时候,对于 C10K 问题,每一个链接启一个线程是不切实际的。如今这个限制已经成指数级增加。确实,咱们的玩具网络爬虫使用线程也能够工做的很好。可是,对于有着千万级链接的大规模应用来讲,限制依然存在:它会消耗掉全部线程,即便套接字还够用。那么咱们该如何解决这个问题?

异步

异步 I/O 框架在一个线程中完成并发操做。让咱们看看这是怎么作到的。

异步框架使用非阻塞套接字。异步爬虫中,咱们在发起到服务器的链接前把套接字设为非阻塞:

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

对一个非阻塞套接字调用 connect 方法会当即抛出异常,即便它能够正常工做。这个异常复现了底层 C 语言函数使人厌烦的行为,它把 errno 设置为 EINPROGRESS,告诉你操做已经开始。

如今咱们的爬虫须要一种知道链接什么时候创建的方法,这样它才能发送 HTTP 请求。咱们能够简单地使用循环来重试:

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
encoded = request.encode('ascii')
while True:
    try:
        sock.send(encoded)
        break  # Done.
    except OSError as e:
        pass
print('sent')

这种方法不只消耗 CPU,也不能有效的等待多个套接字。在远古时代,BSD Unix 的解决方法是 select,这是一个 C 函数,它在一个或一组非阻塞套接字上等待事件发生。如今,互联网应用大量链接的需求,致使 select 被 poll 所代替,在 BSD 上的实现是 kqueue ,在 Linux 上是 epoll。它们的 API 和 select 类似,但在大数量的链接中也能有较好的性能。

Python 3.4 的 DefaultSelector 会使用你系统上最好的 select 类函数。要注册一个网络 I/O 事件的提醒,咱们会建立一个非阻塞套接字,并使用默认 selector 注册它。

from selectors import DefaultSelector, EVENT_WRITE
selector = DefaultSelector()
sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass
def connected():
    selector.unregister(sock.fileno())
    print('connected!')
selector.register(sock.fileno(), EVENT_WRITE, connected)

咱们不理会这个伪造的错误,调用 selector.register,传递套接字文件描述符和一个表示咱们想要监听什么事件的常量表达式。为了当链接创建时收到提醒,咱们使用 EVENT_WRITE :它表示何时这个套接字可写。咱们还传递了一个 Python 函数 connected,当对应事件发生时被调用。这样的函数被称为回调。

在一个循环中,selector 接收到 I/O 提醒时咱们处理它们。

def loop():
    while True:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

connected 回调函数被保存在 event_key.data 中,一旦这个非阻塞套接字创建链接,它就会被取出来执行。

不像咱们前面那个快速轮转的循环,这里的 select 调用会暂停,等待下一个 I/O 事件,接着执行等待这些事件的回调函数。没有完成的操做会保持挂起,直到进到下一个事件循环时执行。

到目前为止咱们展示了什么?咱们展现了如何开始一个 I/O 操做和当操做准备好时调用回调函数。异步框架,它在单线程中执行并发操做,其创建在两个功能之上,非阻塞套接字和事件循环。

咱们这里达成了“并发性concurrency”,但不是传统意义上的“并行性parallelism”。也就是说,咱们构建了一个能够进行重叠 I/O 的微小系统,它能够在其它操做还在进行的时候就开始一个新的操做。它实际上并无利用多核来并行执行计算。这个系统是用于解决 I/O 密集I/O-bound问题的,而不是解决 CPU 密集CPU-bound问题的。(Python 的全局解释器锁禁止在一个进程中以任何方式并行执行 Python 代码。在 Python 中并行化 CPU 密集的算法须要多个进程,或者以将该代码移植为 C 语言并行版本。可是这是另一个话题了。)

因此,咱们的事件循环在并发 I/O 上是有效的,由于它并不用为每一个链接拨付线程资源。可是在咱们开始前,咱们须要澄清一个常见的误解:异步比多线程快。一般并非这样的,事实上,在 Python 中,在处理少许很是活跃的链接时,像咱们这样的事件循环是慢于多线程的。在运行时环境中是没有全局解释器锁的,在一样的负载下线程会执行的更好。异步 I/O 真正适用于事件不多、有许多缓慢或睡眠的链接的应用程序。(Jesse 在“什么是异步,它如何工做,何时该用它?”一文中指出了异步所适用和不适用的场景。Mike Bayer 在“异步 Python 和数据库”一文中比较了不一样负载状况下异步 I/O 和多线程的不一样。)

回调

用咱们刚刚创建的异步框架,怎么才能完成一个网络爬虫?即便是一个简单的网页下载程序也是很难写的。

首先,咱们有一个还没有获取的 URL 集合,和一个已经解析过的 URL 集合。

urls_todo = set(['/'])
seen_urls = set(['/'])

seen_urls 集合包括 urls_todo 和已经完成的 URL。用根 URL / 初始化它们。

获取一个网页须要一系列的回调。在套接字链接创建时会触发 connected 回调,它向服务器发送一个 GET 请求。可是它要等待响应,因此咱们须要注册另外一个回调函数;当该回调被调用,它仍然不能读取到完整的请求时,就会再一次注册回调,如此反复。

让咱们把这些回调放在一个 Fetcher 对象中,它须要一个 URL,一个套接字,还须要一个地方保存返回的字节:

class Fetcher:
    def __init__(self, url):
        self.response = b''  # Empty array of bytes.
        self.url = url
        self.sock = None

咱们的入口点在 Fetcher.fetch:

# Method on Fetcher class.
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass
        # Register next callback.
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

fetch 方法从链接一个套接字开始。可是要注意这个方法在链接创建前就返回了。它必须将控制返回到事件循环中等待链接创建。为了理解为何要这样作,假设咱们程序的总体结构以下:

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
fetcher.fetch()
while True:
    events = selector.select()
    for event_key, event_mask in events:
        callback = event_key.data
        callback(event_key, event_mask)

当调用 select 函数后,全部的事件提醒才会在事件循环中处理,因此 fetch 必须把控制权交给事件循环,这样咱们的程序才能知道何时链接已创建,接着循环调用 connected 回调,它已经在上面的 fetch 方法中注册过。

这里是咱们的 connected 方法的实现:

# Method on Fetcher class.
    def connected(self, key, mask):
        print('connected!')
        selector.unregister(key.fd)
        request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
        self.sock.send(request.encode('ascii'))
        # Register the next callback.
        selector.register(key.fd,
                          EVENT_READ,
                          self.read_response)

这个方法发送一个 GET 请求。一个真正的应用会检查 send 的返回值,以防全部的信息没能一次发送出去。可是咱们的请求很小,应用也不复杂。它只是简单的调用 send,而后等待响应。固然,它必须注册另外一个回调并把控制权交给事件循环。接下来也是最后一个回调函数 read_response,它处理服务器的响应:

# Method on Fetcher class.
    def read_response(self, key, mask):
        global stopped
        chunk = self.sock.recv(4096)  # 4k chunk size.
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)  # Done reading.
            links = self.parse_links()
            # Python set-logic:
            for link in links.difference(seen_urls):
                urls_todo.add(link)
                Fetcher(link).fetch()  # 
            seen_urls.update(links)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True

这个回调在每次 selector 发现套接字可读时被调用,可读有两种状况:套接字接受到数据或它被关闭。

这个回调函数从套接字读取 4K 数据。若是不到 4k,那么有多少读多少。若是比 4K 多,chunk 中只包 4K 数据而且这个套接字保持可读,这样在事件循环的下一个周期,会再次回到这个回调函数。当响应完成时,服务器关闭这个套接字,chunk 为空。

这里没有展现的 parse_links 方法,它返回一个 URL 集合。咱们为每一个新的 URL 启动一个 fetcher。注意一个使用异步回调方式编程的好处:咱们不须要为共享数据加锁,好比咱们往 seen_urls 增长新连接时。这是一种非抢占式的多任务,它不会在咱们代码中的任意一个地方被打断。

咱们增长了一个全局变量 stopped,用它来控制这个循环:

stopped = False
def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

一旦全部的网页被下载下来,fetcher 中止这个事件循环,程序退出。

这个例子让异步编程的一个问题明显的暴露出来:意大利面代码。

咱们须要某种方式来表达一系列的计算和 I/O 操做,而且可以调度多个这样的系列操做让它们并发的执行。可是,没有线程你不能把这一系列操做写在一个函数中:当函数开始一个 I/O 操做,它明确的把将来所需的状态保存下来,而后返回。你须要考虑如何写这个状态保存的代码。

让咱们来解释下这究竟是什么意思。先来看一下在线程中使用一般的阻塞套接字来获取一个网页时是多么简单。

# Blocking version.
def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)
    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

在一个套接字操做和下一个操做之间这个函数到底记住了什么状态?它有一个套接字,一个 URL 和一个可增加的 response。运行在线程中的函数使用编程语言的基本功能来在栈中的局部变量保存这些临时状态。这样的函数也有一个“continuation”——它会在 I/O 结束后执行这些代码。运行时环境经过线程的指令指针来记住这个 continuation。你没必要考虑怎么在 I/O 操做后恢复局部变量和这个 continuation。语言自己的特性帮你解决。

可是用一个基于回调的异步框架时,这些语言特性不能提供一点帮助。当等待 I/O 操做时,一个函数必须明确的保存它的状态,由于它会在 I/O 操做完成以前返回并清除栈帧。在咱们基于回调的例子中,做为局部变量的替代,咱们把 sock 和 response 做为 Fetcher 实例 self 的属性来存储。而做为指令指针的替代,它经过注册 connected 和 read_response 回调来保存它的 continuation。随着应用功能的增加,咱们须要手动保存的回调的复杂性也会增长。如此繁复的记帐式工做会让编码者感到头痛。

更糟糕的是,当咱们的回调函数抛出异常会发生什么?假设咱们没有写好 parse_links 方法,它在解析 HTML 时抛出异常:

Traceback (most recent call last):
  File "loop-with-callbacks.py", line 111, in 
    loop()
  File "loop-with-callbacks.py", line 106, in loop
    callback(event_key, event_mask)
  File "loop-with-callbacks.py", line 51, in read_response
    links = self.parse_links()
  File "loop-with-callbacks.py", line 67, in parse_links
    raise Exception('parse error')
Exception: parse error

这个堆栈回溯只能显示出事件循环调用了一个回调。咱们不知道是什么致使了这个错误。这条链的两边都被破坏:不知道从哪来也不知到哪去。这种丢失上下文的现象被称为“堆栈撕裂stack ripping”,常常会致使没法分析缘由。它还会阻止咱们为回调链设置异常处理,即那种用“try / except”块封装函数调用及其调用树。(对于这个问题的更复杂的解决方案,参见 http://www.tornadoweb.org/en/stable/stack_context.html )

因此,除了关于多线程和异步哪一个更高效的长期争议以外,还有一个关于这二者之间的争论:谁更容易跪了。若是在同步上出现失误,线程更容易出现数据竞争的问题,而回调由于"堆栈撕裂stack ripping"问题而很是难于调试。

via: http://aosabook.org/en/500L/pages/a-web-crawler-with-asyncio-coroutines.html

做者:A. Jesse Jiryu Davis , Guido van Rossum 译者:qingyunha 校对:wxy

本文由 LCTT 原创翻译,Linux中国 荣誉推出

相关文章
相关标签/搜索