那些年,我爬过的北科(四)——爬虫进阶之极简并行爬虫框架开发

写在前面

在看过目录以后,读者可能会问为何这个教程没有讲一个框架,好比说scrapy或者pyspider。在这里,我认为理解爬虫的原理更加剧要,而不是学习一个框架。爬虫说到底就是HTTP请求,与语言无关,与框架也无关。html

在本节,咱们将用26行代码开发一个简单的并发的(甚至分布式的)爬虫框架。python

爬虫的模块

首先,咱们先来讲一下爬虫的几个模块。git

任务产生器——Producer

定义任务,如:爬取什么页面?怎么解析github

下载器——Downloader

下载器,接受任务产生器的任务,下载完成后给解析器进行解析。主要是I/O操做,受限于网速。sql

解析器——Parser

解析器,将下载器下载的内容进行解析,传给输出管道。主要是CPU操做,受限于下载器的下载速度。数据库

输出管道——Pipeline

如何展现爬取的数据,如以前咱们一直都在用print,其实也就是一个ConsolePipeline。固然你也能够定义FilePipeline、MysqlPipeline、Sqlite3Pipeline等等。json

  • ConsolePipeline: 把想要的内容直接输出到控制台。
  • FilePipeline: 把想要的内容输出到文件里保存,好比保存一个json文件。
  • MongoDBPipeline: 把想要的内容存入MongoDB数据库中。
  • 等等......

爬虫框架的结构

上面的四个模块也就构成了四个部分。安全

  • 1 . 首先,会有个初始的任务产生器产生下载任务。
  • 2 . 下载器不断从任务队列中取出任务,下载完任务后,放到网页池中。
  • 3 . 不一样的解析器取出网页进行解析,传给对应的输出管道。期间,解析器也会产生新的下载任务,放入到任务队列中。
  • 4 . 输出管道对解析的结果进行存储、显示。

简易的爬虫框架的架构

其实,咱们也能够把爬虫不要分的那么细,下载+解析+输出其实均可以归类为一个Worker。架构

就像下面同样,首先初始的任务产生器会产生一个下载任务,而后系统为下载任务建立几个Worker,Worker对任务进行下载解析输出,同时根据解析的一些连接产生新下载的任务放入任务队列。如此循环,直到没有任务。 并发

进程间通讯

下面,咱们说一下进程间通讯。

这里咱们举一个生产者消费者的例子。假设有两个进程,一个叫生产者,一个叫作消费者。生产者只负责生产一些任务,并把任务放到一个池子里面(任务队列),消费者从任务队列中拿到任务,并对完成任务(把任务消费掉)。

咱们这里的任务队列使用multiprocessing的Queue,它能够保证多进程间操做的安全。

from multiprocessing import Process, Queue
import time


def produce(q):  # 生产
    for i in range(10):
        print('Put %d to queue...' % value)
        q.put(i)
        time.sleep(1)


def consume(q):  # 消费
    while True:
        if not q.empty():
            value = q.get(True)
            print('Consumer 1, Get %s from queue.' % value)


if __name__ == '__main__':
    q = Queue()
    producer = Process(target=produce, args=(q,))
    consumer = Process(target=consume, args=(q,))
    producer.start()
    consumer.start()

    producer.join()  # 等待结束, 死循环使用Ctrl+C退出
    consumer.join()
复制代码

固然,也能够尝试有多个生产者,多个消费者。下面建立了两个生产者和消费者。

from multiprocessing import Process, Queue
import time


def produce(q):  # 生产
    for i in range(10000):
        if i % 2 == 0:
            print("Produce ", i)
            q.put(i)
            time.sleep(1)


def produce2(q):  # 生产
    for i in range(10000):
        if i % 2 == 1:
            print "Produce ", i
            q.put(i)
            time.sleep(1)


def consume(q):  # 消费
    while True:
        if not q.empty():
            value = q.get(True)
            print 'Consumer 1, Get %s from queue.' % value


def consume2(q):  # 消费
    while True:
        if not q.empty():
            value = q.get(True)
            print 'Consumer 2, Get %s from queue.' % value


if __name__ == '__main__':
    q = Queue(5)   # 队列最多放5个任务, 超过5个则会阻塞住
    producer = Process(target=produce, args=(q,))
    producer2 = Process(target=produce2, args=(q,))
    consumer = Process(target=consume, args=(q,))
    consumer2 = Process(target=consume2, args=(q,))

    producer.start()
    producer2.start()
    consumer.start()
    consumer2.start()

    producer.join()  # 等待结束, 死循环使用Ctrl+C退出
    producer2.join()
    consumer.join()
    consumer2.join()
复制代码

这里生产者生产的时间是每秒钟两个,消费者消费时间几乎能够忽略不计,属于“狼多肉少”系列。运行后,能够看到控制台每秒都输出两行。Consumer1和Consumer2的争抢十分激烈。

考虑一下“肉多狼少”的情形,代码以下:

from multiprocessing import Process, Queue
import time


def produce(q):  # 生产
    for i in range(10000):
        print("Produce ", i)
        q.put(i)


def consume(q):  # 消费
    while True:
        if not q.empty():
            value = q.get(True)
            print('Consumer 1, Get %s from queue.' % value)
            time.sleep(1)


def consume2(q):  # 消费
    while True:
        if not q.empty():
            value = q.get(True)
            print('Consumer 2, Get %s from queue.' % value)
            time.sleep(1)


if __name__ == '__main__':
    q = Queue(5)    # 队列最多放5个数据, 超过5个则会阻塞住
    producer = Process(target=produce, args=(q,))
    consumer = Process(target=consume, args=(q,))
    consumer2 = Process(target=consume2, args=(q,))

    producer.start()
    consumer.start()
    consumer2.start()

    producer.join()  # 等待结束, 死循环使用Ctrl+C退出
    consumer.join()
    consumer2.join()
复制代码

这里生产者不停的生产,直到把任务队列塞满。而两个消费者每秒钟消费一个,每当有任务被消费掉,生产者又会立马生产出新的任务,把任务队列塞满。

上面的说明,系统总体的运行速度其实受限于速度最慢的那个。像咱们爬虫,最耗时的操做就是下载,总体的爬取速度也就受限于网速。

以上的生产和消费者相似爬虫中的Producer和Worker。Producer扮演生产者,生成下载任务,放入任务队列中;Worker扮演消费者,拿到下载任务后,对某个网页进行下载、解析、数据;在此同时,Worker也会扮演生产者,根据解析到的连接生成新的下载任务,并放到任务队列中交给其余的Worker执行。

DIY并发框架

下面咱们来看看咱们本身的并发爬虫框架,这个爬虫框架的代码很短,只有26行,除去空行的话只有21行代码。

from multiprocessing import Manager, Pool


class SimpleCrawler:
    def __init__(self, c_num):
        self.task_queue = Manager().Queue()  # 任务队列
        self.workers = {}                    # Worker, 字典类型, 存放不一样的Worker
        self.c_num = c_num                   # 并发数,开几个进程

    def add_task(self, task):
        self.task_queue.put(task)

    def add_worker(self, identifier, worker):
        self.workers[identifier] = worker

    def start(self):
        pool = Pool(self.c_num)
        while True:
            task = self.task_queue.get(True)
            if task['id'] == "NO":  # 结束爬虫
                pool.close()
                pool.join()
                exit(0)
            else:  # 给worker完成任务
                worker = self.workers[task['id']]
                pool.apply_async(worker, args=(self.task_queue, task))
复制代码

这个类中一共就有四个方法:构造方法、添加初始任务方法、设置worker方法、开始爬取方法。

__init__方法:

在构造方法中,咱们建立了一个任务队列,(这里注意使用了Manager.Queue(),由于后面咱们要用到进程池,因此要用Manager类),workers字典,以及并发数配置。

crawler = SimpleCrawler(5)  # 并发数为5
复制代码

add_task方法:

负责添加初始任务方法,task的形式为一个字典。有id、url等字段。id负责分配给不一样的worker。以下:

crawler.add_task({
    "id": "worker",
    "url": "http://nladuo.cn/scce_site/",
    "page": 1
})
复制代码

add_worker方法:

负责配置worker,以id做为键存放在workers变量中,其中worker能够定义为一个抽象类或者一个函数。这里为了简单起见,咱们直接弄一个函数。

def worker(queue, task):
    url = task["url"]
    resp = requests.get(url)
    # ......,爬取解析网页
    queue.put(new_task) # 可能还会添加新的task
    # ......

crawler.add_worker("worker", worker)
复制代码

start方法:

start方法就是启动爬虫,这里看上面的代码,建立了一个进程池用来实现并发。而后不断的从queue中取出任务,根据任务的id分配给对应id的worker。咱们这里规定当id为“NO”时,咱们则中止爬虫。

crawler.start()
复制代码

爬取两级页面

下面,咱们来使用这个简单的爬虫框架,来实现一个两级页面的爬虫。

首先看第一级页面:nladuo.cn/scce_site/。其实就是以前的新闻列表页。咱们能够爬到新闻的标题,以及该标题对应的网页连接。

第二级页面是:nladuo.cn/scce_site/a…,也就是新闻的详情页,这里能够获取到新闻的内容以及点击数目等。

下面咱们建立两个worker,一个负责爬取列表页面,一个负责爬取新闻详情页。

def worker(queue, task):
    """ 爬取新闻列表页 """
    pass


def detail_worker(queue, task):
    """ 爬取新闻详情页 """
    pass
复制代码

主代码

对于main代码,这里首先须要建立一个crawler。而后添加两个worker,id分别为“worker”和“detail_worker”。而后添加一个初始的任务,也就是爬取新闻列表页的首页。

if __name__ == '__main__':
    crawler = SimpleCrawler(5)
    crawler.add_worker("worker", worker)
    crawler.add_worker("detail_worker", detail_worker)
    crawler.add_task({
        "id": "worker",
        "url": "http://nladuo.cn/scce_site/",
        "page": 1
    })
    crawler.start()
复制代码

worker代码编写

接下来,完成咱们的worker代码,worker接受两个参数:queue和task。

  • queue: 用于解析网页后,添加新的任务
  • task: 要完成的任务

而后worker①首先下载网页,②其次解析网页,③再根据解析的列表进一步须要爬取详情页,因此要添加爬取详情页的任务;④最后判断当前是否是最后一页,若是是就发送退出信号,不然添加下一页的新闻列表爬取任务。

def worker(queue, task):
    """ 爬取新闻列表页 """
    # 下载任务
    url = task["url"] + "%d.html" % task["page"]
    print("downloading:", url)
    resp = requests.get(url)

    # 解析网页
    soup = BeautifulSoup(resp.content, "html.parser")
    items = soup.find_all("div", {"class", "list_title"})

    for index, item in enumerate(items):
        detail_url = "http://nladuo.cn/scce_site/" + item.a['href']
        print("adding:", detail_url)
        # 添加新任务: 爬取详情页
        queue.put({
            "id": "detail_worker",
            "url": detail_url,
            "page": task["page"],
            "index": index,
            "title": item.get_text().replace("\n", "")
        })

    if task["page"] == 10:  # 添加结束信号
        queue.put({"id": "NO"})
    else:
        # 添加新任务: 爬取下一页
        queue.put({
            "id": "worker",
            "url": "http://nladuo.cn/scce_site/",
            "page": task["page"]+1
        })
复制代码

detail_worker代码编写

detail_worker的任务比较简单,只要下载任务,而后解析网页并打印便可。这里为了让屏幕输出没那么乱,咱们只获取点击数。

def detail_worker(queue, task):
    """ 爬取新闻详情页 """
    # 下载任务
    print("downloading:", task['url'])
    resp = requests.get(task['url'])
    # 解析网页
    soup = BeautifulSoup(resp.content, "html.parser")
    click_num = soup.find("div", {"class", "artNum"}).get_text()
    print(task["page"], task["index"], task['title'], click_num)
复制代码

思考

到这里,咱们就用咱们本身开发的框架实现了一个多级页面的爬虫。读者能够考虑一下如下的问题。

  • 如何实现爬虫的自动结束?考虑监控队列的状况和worker的状态。
  • 如何实现一个分布式爬虫?考虑使用分布式队列:celery
相关文章
相关标签/搜索