Golang实现简单爬虫框架(3)——简单并发版

在上篇文章Golang实现简单爬虫框架(2)——单任务版爬虫中咱们实现了一个简单的单任务版爬虫,对于单任务版爬虫,每次都要请求页面,而后解析数据,而后才能请求下一个页面。整个过程当中,获取网页数据速度比较慢,那么咱们就把获取数据模块作成并发执行。在项目的基础上,实现多任务并发版爬虫。html

项目github地址:github.com/NovemberCho… 回滚到相应记录食用,效果更佳。git

一、项目架构

首先咱们把但任务版爬虫架构中的Fetcher模块和Parser模块合并成一个Worker模块,而后并发执行Worker模块github

而后获得并发版的架构图:golang

  • 在并发版爬虫中,会同时执行多个Worker,每一个Worker任务接受一个Request请求,而后请求页面解析数据,输出解析出的RequestsItembash

  • 由于又不少RequestWorker,因此还须要Scheduler模块,负责对请求任务的调度处理架构

  • Engine模块接受Worker发送的RequestsItems,当前咱们先把Items打印出,把解析出的Request发送给调度器并发

  • 其中EngineScheduler是一个goroutineWorker包含多个goroutine,各个模块之间都是用channel进行链接框架

    先放上重构后的项目文件结构:函数

二、Worker实现

咱们从engine.go中提取下面功能做为Worker模块,同时把engine.go 改名为simple.go。修改后的simple.go文件请自行调整,或者去github项目源代码回滚查看。post

engine/worker.go

package engine

import (
	"crawler/fetcher"
	"log"
)

// 输入 Request, 返回 ParseResult
func worker(request Request) (ParseResult, error) {
	log.Printf("Fetching %s\n", request.Url)
	content, err := fetcher.Fetch(request.Url)
	if err != nil {
		log.Printf("Fetch error, Url: %s %v\n", request.Url, err)
		return ParseResult{}, err
	}
	return request.ParseFunc(content), nil
}
复制代码

对于每个Worker接受一个请求,而后返回解析出的内容

三、并发引擎Concurrent实现

请你们根据架构图来看,效果会更好。

package engine

import "log"

// 并发引擎
type ConcurrendEngine struct {
   Scheduler   Scheduler	// 任务调度器
   WorkerCount int			// 任务并发数量
}

// 任务调度器
type Scheduler interface {
   Submit(request Request) // 提交任务
   ConfigMasterWorkerChan(chan Request)	// 配置初始请求任务
}

func (e *ConcurrendEngine) Run(seeds ...Request) {

   in := make(chan Request)			// scheduler的输入
   out := make(chan ParseResult)	// worker的输出
   e.Scheduler.ConfigMasterWorkerChan(in)	// 把初始请求提交给scheduler

   // 建立 goruntine
   for i := 0; i < e.WorkerCount; i++ {
      createWorker(in, out)
   }

   // engine把请求任务提交给 Scheduler
   for _, request := range seeds {
      e.Scheduler.Submit(request)
   }

   itemCount := 0
   for {
      // 接受 Worker 的解析结果
      result := <-out
      for _, item := range result.Items {
         log.Printf("Got item: #%d: %v\n", itemCount, item)
         itemCount++
      }

      // 而后把 Worker 解析出的 Request 送给 Scheduler
      for _, request := range result.Requests {
         e.Scheduler.Submit(request)
      }
   }
}

// 建立任务,调用worker,分发goroutine
func createWorker(in chan Request, out chan ParseResult) {
   go func() {
      for {
         request := <-in
         result, err := worker(request)
         if err != nil {
            continue
         }
         out <- result
      }
   }()
}
复制代码

四、任务调度器Scheduler实现

scheduler/scheduler.go

package scheduler

import "crawler/engine"

type SimpleScheduler struct {
	workerChan chan engine.Request
}

func (s *SimpleScheduler) Submit(request engine.Request) {
	// 为每个 Request 建立 goroutine
	go func() {
		s.workerChan <- request
	}()
}

// 把初始请求发送给 Scheduler
func (s *SimpleScheduler) ConfigMasterWorkerChan(in chan engine.Request) {
	s.workerChan = in
}

复制代码

五、main函数

package main

import (
	"crawler/engine"
	"crawler/scheduler"
	"crawler/zhenai/parser"
)

func main() {
	e := engine.ConcurrendEngine{	// 配置爬虫引擎
		Scheduler:   &scheduler.SimpleScheduler{},
		WorkerCount: 50,
	}
	e.Run(engine.Request{		// 配置爬虫目标信息
		Url:       "http://www.zhenai.com/zhenghun",
		ParseFunc: parser.ParseCityList,
	})
}
复制代码

六、小结

本次博客咱们实现一个最简单的并发版爬虫,调度器源源不断的接受任务,一旦有一个worker空闲,就给其分配任务。这样子有一个缺点,就是咱们不知道咱们分发出那么多worker的工做状况,对worker的控制力比较弱,因此在下次博客中会用队列来实现任务调度。

若是想获取Google工程师深度讲解go语言视频资源的,能够在评论区留下邮箱。

项目的源代码已经托管到Github上,对于各个版本都有记录,欢迎你们查看,记得给个star,在此先谢谢你们了

相关文章
相关标签/搜索