若是使用非阻塞I/O,它就不会傻傻地等在那里(好比等链接、等读取),而是会返回一个错误信息,虽说是说错误信息,它其实就是叫你过一会再来的意思,编程的时候都不把它当错误看。html
非阻塞I/O代码以下:python
sock = socket.socket()
sock.setblocking(False) try: sock.connect(('xkcd.com', 80)) except BlockingIOError: pass
这里抛出的异常无视掉就能够了。sql
有了非阻塞I/O这个特性,咱们就可以实现单线程上多个sockets
的处理了,学过C语言网络编程的同窗应该都认识select
这个函数吧?不认识也没关系,select
函数若是你不设置它的超时时间它就是默认一直阻塞的,只有当有I/O事件发生时它才会被激活,而后告诉你哪一个socket
上发生了什么事件(读|写|异常),在Python
中也有select
,还有跟select
功能相同可是更高效的poll
,它们都是底层C函数的Python
实现。编程
不过这里咱们不使用select
,而是用更简单好用的DefaultSelector
,是Python 3.4
后才出现的一个模块里的类,你只须要在非阻塞socket
和事件上绑定回调函数就能够了。ruby
代码以下:服务器
from selectors import DefaultSelector, EVENT_WRITE selector = DefaultSelector() sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass def connected(): selector.unregister(sock.fileno()) print('connected!') selector.register(sock.fileno(), EVENT_WRITE, connected)
这里看一下selector.register
的原型网络
register(fileobj, events, data=None)
其中fileobj
能够是文件描述符也能够是文件对象(经过fileno
获得),events
是位掩码,指明发生的是什么事件,data
则是与指定文件(也就是咱们的socket)与指定事件绑定在一块儿的数据。并发
如代码所示,selector.register
在该socket
的写事件上绑定了回调函数connected
(这里做为数据绑定)。在该socket
上第一次发生的写事件意味着链接的创建,connected
函数在链接创建成功后再解除了该socket
上全部绑定的数据。异步
看了以上selector
的使用方式,我想你会发现它很适合写成事件驱动的形式。socket
咱们能够建立一个事件循环,在循环中不断得到I/O事件:
def loop(): while True: events = selector.select() #遍历事件并调用相应的处理 for event_key, event_mask in events: callback = event_key.data callback()
其中events_key
是一个namedtuple
,它的结构大体以下(fileobj,fd,events,data),咱们从data获得以前绑定的回调函数并调用。 event_mask
则是事件的位掩码。
关于selectors
的更多内容,可参考官方文档: https://docs.python.org/3.4/library/selectors.html
如今咱们已经明白了基于回调函数实现事件驱动是怎么一回事了,接着来完成咱们的爬虫吧。
首先建立两个set,一个是待处理url的集合,一个是已抓取url的集合,同时初始化为根url '/'
urls_todo = set(['/']) seen_urls = set(['/'])
抓取一个页面会须要许多回调函数。好比connected
,它会在链接创建成功后向服务器发送一个GET
请求请求页面。固然它不会干等着服务器响应(那就阻塞了),而是再绑定另外一个接收响应的回调函数read_response
。若是read_response
在事件触发时没法一次性读取完整的响应,那么就会等下次事件触发时继续读取,直到读取到了完整的响应才解除绑定。
咱们将这些回调函数封装在 Fetcher
类中。它有三个成员变量:抓取的url
、socket
对象与获得的服务器响应response
。
class Fetcher: def __init__(self, url): self.response = b'' self.url = url self.sock = None
实现fetch
函数,绑定connected
:
# 在Fetcher类中实现 def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('xkcd.com', 80)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
注意到fetch
函数在内部调用connect
尝试创建socket
链接并绑定回调函数,I/O的处理则都是交给事件循环控制的。Fetcher
与事件循环的关系以下:
# Begin fetching http://xkcd.com/353/ fetcher = Fetcher('/353/') fetcher.fetch() # 事件循环 while True: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask)
connected
的实现:
def connected(self, key, mask): print('connected!') #解除该socket上的全部绑定 selector.unregister(key.fd) request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url) self.sock.send(request.encode('ascii')) # 链接创建后绑定读取响应的回调函数 selector.register(key.fd, EVENT_READ, self.read_response)
read_response
的实现:
def read_response(self, key, mask): global stopped chunk = self.sock.recv(4096) # 每次接收最多4K的信息 if chunk: self.response += chunk else: selector.unregister(key.fd) # 完成接收则解除绑定 links = self.parse_links() # Python set-logic: for link in links.difference(seen_urls): urls_todo.add(link) Fetcher(link).fetch() # 抓取新的url seen_urls.update(links) urls_todo.remove(self.url) if not urls_todo: stopped = True # 当抓取队列为空时结束事件循环
parse_links
如上一节课,它的做用是返回抓取到的页面中的全部发现的url的集合。 parse_links
以后,遍历了每个没抓取过的url并为其建立一个新的Fetcher
对象并调用fetch
函数开始抓取。
parse_links
等其它函数的实现:
def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def parse_links(self): if not self.response: print('error: {}'.format(self.url)) return set() if not self._is_html(): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) links = set() for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('xkcd.com', 'www.xkcd.com'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html')
将事件循环改成stopped时中止:
start = time.time() stopped = False def loop(): while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds'.format( len(seen_urls), time.time() - start))
这里先奉上完整代码:
from selectors import * import socket import re import urllib.parse import time urls_todo = set(['/']) seen_urls = set(['/']) #追加了一个能够看最高并发数的变量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False class Fetcher: def __init__(self, url): self.response = b'' self.url = url self.sock = None def fetch(self): global concurrency_achieved concurrency_achieved = max(concurrency_achieved, len(urls_todo)) self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): selector.unregister(key.fd) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) self.sock.send(get.encode('ascii')) selector.register(key.fd, EVENT_READ, self.read_response) def read_response(self, key, mask): global stopped chunk = self.sock.recv(4096) # 4k chunk size. if chunk: self.response += chunk else: selector.unregister(key.fd) # Done reading. links = self.parse_links() for link in links.difference(seen_urls): urls_todo.add(link) Fetcher(link).fetch() seen_urls.update(links) urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def parse_links(self): if not self.response: print('error: {}'.format(self.url)) return set() if not self._is_html(): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) links = set() for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') fetcher.fetch() while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask) print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(seen_urls), time.time() - start, concurrency_achieved))
输入python3 callback.py
命令查看效果。不要忘了先开网站的服务器哦。
想一想以前从创建链接到读取响应到解析新的url到工做队列中,这一切都可以在一个函数中完成,就像下面这样:
def fetch(url): sock = socket.socket() sock.connect(('localhost', 3000)) request = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = parse_links(response) q.add(links)
而用回调函数实现,整个总体就支离破碎了,哪里阻塞就又不得不在那里把函数一切为二,代码会显得很是乱,维护也变的很麻烦。更麻烦的是若是在回调函数中抛出了异常,你根本得不到什么有用的信息:
Traceback (most recent call last): File "loop-with-callbacks.py", line 111, in <module> loop() File "loop-with-callbacks.py", line 106, in loop callback(event_key, event_mask) File "loop-with-callbacks.py", line 51, in read_response links = self.parse_links() File "loop-with-callbacks.py", line 67, in parse_links raise Exception('parse error') Exception: parse error
你看不到这个回调函数的上下文是什么,你只知道它在事件循环里。你想在这个函数外抓取它的异常都没地方下手。可是这又是回调实现没法避免的缺陷,那咱们想实现并发异步应该怎么办咧?