python异步爬虫

本文主要包括如下内容           html

  • 线程池实现并发爬虫
  • 回调方法实现异步爬虫
  • 协程技术的介绍
  • 一个基于协程的异步编程模型
  • 协程实现异步爬虫

线程池、回调、协程python

咱们但愿经过并发执行来加快爬虫抓取页面的速度。通常的实现方式有三种:程序员

  • 线程池方式:开一个线程池,每当爬虫发现一个新连接,就将连接放入任务队列中,线程池中的线程从任务队列获取一个连接,以后创建socket,完成抓取页面、解析、将新链接放入工做队列的步骤。
  • 回调方式:程序会有一个主循环叫作事件循环,在事件循环中会不断得到事件,经过在事件上注册解除回调函数来达到多任务并发执行的效果。缺点是一旦须要的回调操做变多,代码就会很是散,变得难以维护。
  • 协程方式:一样经过事件循环执行程序,利用了Python 的生成器特性,生成器函数可以中途中止并在以后恢复,那么本来不得不分开写的回调函数就可以写在一个生成器函数中了,这也就实现了协程。

线程池实现爬虫

Python多线程创建线程的两种方式编程

#第一种:经过函数建立线程 def 函数a(): pass t = threading.Thread(target=函数a,name=本身随便取的线程名字) #第二种:继承线程类 class Fetcher(threading.Thread): def __init__(self): Thread.__init__(self): #加这一步后主程序中断退出后子线程也会跟着中断退出 self.daemon = True def run(self): #线程运行的函数 pass t = Fetcher()

 

参见:python 多线程就这么简单 - 虫师 - 博客园服务器

多线程同步-队列网络

多线程同步就是多个线程竞争一个全局变量时按顺序读写,通常状况下要用锁,可是使用标准库里的Queue的时候它内部已经实现了锁,不用程序员本身写了。数据结构

导入队列类:多线程

from queue import Queue 建立一个队列: q = Queue(maxsize=0) maxsize为队列大小,为0默认队列大小可无穷大。 队列是先进先出的数据结构: q.put(item) #往队列添加一个item,队列满了则阻塞 q.get(item) #从队列获得一个item,队列为空则阻塞 还有相应的不等待的版本,这里略过。 队列不为空,或者为空可是取得item的线程没有告知任务完成时都是处于阻塞状态 q.join() #阻塞直到全部任务完成 线程告知任务完成使用task_done q.task_done() #在线程内调用

 

完整代码并发

from queue import Queue from threading import Thread, Lock import urllib.parse import socket import re import time seen_urls = set(['/']) lock = Lock() class Fetcher(Thread): def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: url = self.tasks.get() print(url) sock = socket.socket() sock.connect(('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(get.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = self.parse_links(url, response) lock.acquire() for link in links.difference(seen_urls): self.tasks.put(link) seen_urls.update(links) lock.release() self.tasks.task_done() def parse_links(self, fetched_url, response): if not response: print('error: {}'.format(fetched_url)) return set() if not self._is_html(response): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body(response))) links = set() for url in urls: normalized = urllib.parse.urljoin(fetched_url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def body(self, response): body = response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _is_html(self, response): head, body = response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') class ThreadPool: def __init__(self, num_threads): self.tasks = Queue() for _ in range(num_threads): Fetcher(self.tasks) def add_task(self, url): self.tasks.put(url) def wait_completion(self): self.tasks.join() if __name__ == '__main__': start = time.time() pool = ThreadPool(4) pool.add_task("/") pool.wait_completion() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))

 

事件驱动-回调函数实现爬虫

非阻塞I/Oapp

若是使用非阻塞I/O,程序就不会傻傻地等在那里(好比等链接、等读取),而是会返回一个错误信息,虽说是说错误信息,它其实就是叫你过一会再来的意思,编程的时候都不把它当错误看。

非阻塞I/O代码以下:

sock = socket.socket()
sock.setblocking(False) try: sock.connect(('xkcd.com', 80)) except BlockingIOError: pass

 

单线程上的多I/O

有了非阻塞I/O这个特性,咱们就可以实现单线程上多个sockets的处理了,学过C语言网络编程的同窗应该都认识select这个函数吧?不认识也没关系,select函数若是你不设置它的超时时间它就是默认一直阻塞的,只有当有I/O事件发生时它才会被激活,而后告诉你哪一个socket上发生了什么事件(读|写|异常),在python中也有select,还有跟select功能相同可是更高效的poll,它们都是底层C函数的Python实现。

不过这里咱们不使用select,而是用更简单好用的DefaultSelector,是Python 3.4后才出现的一个模块里的类,你只须要在非阻塞socket和事件上绑定回调函数就能够了。

代码以下:

from selectors import DefaultSelector, EVENT_WRITE selector = DefaultSelector() sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass def connected(): selector.unregister(sock.fileno()) print('connected!') selector.register(sock.fileno(), EVENT_WRITE, connected)

 

这里看一下selector.register的原型

register(fileobj, events, data=None) 
其中fileobj能够是文件描述符也能够是文件对象(经过fileno获得),events是位掩码,指明发生的是什么事件,data 则是与指定文件(也就是咱们的socket)与指定事件绑定在一块儿的数据。

如代码所示,selector.register 在该socket的写事件上绑定了回调函数connected(这里做为数据绑定)。在该socket上第一次发生的写事件意味着链接的创建,connected函数在链接创建成功后再解除了该socket上全部绑定的数据。

事件驱动

看了以上selector的使用方式,我想你会发现它很适合写成事件驱动的形式。

咱们能够建立一个事件循环,在循环中不断得到I/O事件:

def loop(): while True: events = selector.select() #遍历事件并调用相应的处理 for event_key, event_mask in events: callback = event_key.data callback()

 

完整代码

from selectors import * import socket import re import urllib.parse import time urls_todo = set(['/']) seen_urls = set(['/']) #追加了一个能够看最高并发数的变量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False class Fetcher: def __init__(self, url): self.response = b'' self.url = url self.sock = None def fetch(self): global concurrency_achieved concurrency_achieved = max(concurrency_achieved, len(urls_todo)) self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): selector.unregister(key.fd) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) self.sock.send(get.encode('ascii')) selector.register(key.fd, EVENT_READ, self.read_response) def read_response(self, key, mask): global stopped chunk = self.sock.recv(4096) # 4k chunk size. if chunk: self.response += chunk else: selector.unregister(key.fd) # Done reading. links = self.parse_links() for link in links.difference(seen_urls): urls_todo.add(link) Fetcher(link).fetch() seen_urls.update(links) urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def parse_links(self): if not self.response: print('error: {}'.format(self.url)) return set() if not self._is_html(): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) links = set() for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') fetcher.fetch() while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask) print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(seen_urls), time.time() - start, concurrency_achieved))

 

事件驱动-协程实现爬虫

什么是协程?

协程实际上是比起通常的子例程而言更宽泛的存在,子例程是协程的一种特例。

子例程的起始处是唯一的入口点,一旦退出即完成了子例程的执行,子例程的一个实例只会返回一次。

协程能够经过yield来调用其它协程。经过yield方式转移执行权的协程之间不是调用者与被调用者的关系,而是彼此对称、平等的。

协程的起始处是第一个入口点,在协程里,返回点以后是接下来的入口点。子例程的生命期遵循后进先出(最后一个被调用的子例程最早返回);相反,协程的生命期彻底由他们的使用的须要决定。

还记得咱们何时会用到yield吗,就是在生成器(generator)里,在迭代的时候每次执行next(generator)生成器都会执行到下一次yield的位置并返回,能够说生成器就是例程。

生成器实现协程模型

虽然生成器拥有一个协程该有的特性,但光这样是不够的,作异步编程还是困难的,咱们须要先用生成器实现一个协程异步编程的简单模型,它同时也是Python标准库asyncio的简化版,正如asyncio的实现,咱们会用到生成器,Future类,以及yield from语句。

首先实现Future类, Future类能够认为是专门用来存储将要发送给协程的信息的类。

class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self)

Future对象最开始处在挂起状态,当调用set_result时被激活,并运行注册的回调函数,该回调函数多半是对协程发送信息让协程继续运行下去的函数。

咱们改造一下以前从fetch到connected的函数,加入Future与yield。

这是以前回调实现的fetch:

class Fetcher: def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): print('connected!') # ...后面省略...

 

改造后,咱们将链接创建后的部分也放到了fetch中。

class Fetcher: def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass f = Future() def on_connected(): #链接创建后经过set_result协程继续从yield的地方往下运行 f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield f selector.unregister(sock.fileno()) print('connected!')

fetcher是一个生成器函数,咱们建立一个Future实例,yield它来暂停fetch的运行直到链接创建f.set_result(None)的时候,生成器才继续运行。那set_result时运行的回调函数是哪来的呢?这里引入Task类:

class Task: def __init__(self, coro): #协程 self.coro = coro #建立并初始化一个为None的Future对象 f = Future() f.set_result(None) #步进一次(发送一次信息) #在初始化的时候发送是为了协程到达第一个yield的位置,也是为了注册下一次的步进 self.step(f) def step(self, future): try: #向协程发送消息并获得下一个从协程那yield到的Future对象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) fetcher = Fetcher('/') Task(fetcher.fetch()) loop()

流程大体是这样的,首先Task初始化,向fetch生成器发送None信息(也能够想象成step调用了fetch,参数是None),fetch得以从开头运行到第一个yield的地方并返回了一个Future对象给step的next_future,而后step就在这个获得的Future对象注册了step。当链接创建时on_connected就会被调用,再一次向协程发送信息,协程就会继续往下执行了。

使用yield from分解协程

一旦socket链接创建成功,咱们发送HTTP GET请求到服务器并在以后读取服务器响应。如今这些步骤不用再分散在不一样的回调函数里了,咱们能够将其放在同一个生成器函数中:

def fetch(self): # ... 省略链接的代码 sock.send(request.encode('ascii')) while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield f selector.unregister(sock.fileno()) if chunk: self.response += chunk else: # 完成读取 break

可是这样代码也会越积越多,可不能够分解生成器函数的代码呢,从协程中提取出子协程?Python 3 的yield from能帮助咱们完成这部分工做。:

>>> def gen_fn():
... result = yield 1 ... print('result of yield: {}'.format(result)) ... result2 = yield 2 ... print('result of 2nd yield: {}'.format(result2)) ... return 'done' ...

yield from获得的子协程最后return的返回值

完整代码

from selectors import * import socket import re import urllib.parse import time class Future: def __init__(self): self.result = None self._callbacks = [] def result(self): return self.result def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self) def __iter__(self): yield self return self.result class Task: def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) urls_seen = set(['/']) urls_todo = set(['/']) #追加了一个能够看最高并发数的变量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) def read(sock): f = Future() def on_readable(): f.set_result(sock.recv(4096)) # Read 4k at a time. selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield from f selector.unregister(sock.fileno()) return chunk def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) class Fetcher: def __init__(self, url): self.response = b'' self.url = url def fetch(self): global concurrency_achieved, stopped concurrency_achieved = max(concurrency_achieved, len(urls_todo)) sock = socket.socket() yield from connect(sock, ('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) self._process_response() urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _process_response(self): if not self.response: print('error: {}'.format(self.url)) return if not self._is_html(): return urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) if defragmented not in urls_seen: urls_todo.add(defragmented) urls_seen.add(defragmented) Task(Fetcher(defragmented).fetch()) def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') Task(fetcher.fetch()) while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(urls_seen), time.time() - start, concurrency_achieved))

 

 

总结

至此,咱们在学习的过程当中掌握了:

  • 线程池实现并发爬虫
  • 回调方法实现异步爬虫
  • 协程技术的介绍
  • 一个基于协程的异步编程模型
  • 协程实现异步爬虫

三种爬虫的实现方式中线程池是最坏的选择,由于它既占用内存,又有线程竞争的危险须要程序员本身编程解决,并且产生的I/O阻塞也浪费了CPU占用时间。再来看看回调方式,它是一种异步方法,因此I/O阻塞的问题解决了,并且它是单线程的不会产生竞争,问题好像都解决了。然而它引入了新的问题,它的问题在于以这种方式编写的代码很差维护,也不容易debug。看来协程才是最好的选择,咱们实现的协程异步编程模型使得一个单线程可以很容易地改写为协程。那是否是每一次作异步编程都要实现Task、Future呢?不是的,你能够直接使用asyncio官方标准协程库,它已经帮你把Task、Future封装好了,你根本不会感觉到它们的存在,是否是很棒呢?若是你使用Python 3.5那更好,已经能够用原生的协程了,Python 3.5追加了async def,await等协程相关的关键词。

参考连接:Python - Python实现基于协程的异步爬虫 - 实验楼

原文地址:http://blog.csdn.net/whuhan2013/article/details/52529477

相关文章
相关标签/搜索