Go语言在分布式系统领域有着更高的开发效率,提供了海量并行的支持。本博文介绍的是采用Go语言搭建一个并行版爬虫信息采集框架,博文中使用58同城中租房网页作案例。相比较其余爬虫程序它的优势是:git
程序源代码放到github上,连接地址是: https://github.com/GuoZhaoran/crawlergithub
下面是项目总体架构的示意图:正则表达式
该爬虫架构中Request请求能够理解为:抓取请求url的内容,例如抓取58同城北京市的租房信息时,请求的url是:https://bj.58.com/chuzu/
打开url会发现,网页页面中是房源列表信息,那么接下来要作的工做就是抓取房源详情信息和分页后的下一页房源列表信息。因而就会有新的请求Request,对应不一样的url连接地址。算法
咱们在拿到Request请求以后,抓取到网页页面内容,就须要有单独的程序去解析页面,提取相关信息,这就是worker所要作的工做。数据库
Go语言在构建并行处理程序中有着自然的优点,在该框架中处理Request请求和使用Worker提取相关信息也都是并行工做的。程序中会同时存在着不少个Request,也会有不少个Worker在处理不一样Request页面中的内容。因此分别须要一个Request队列和Worker队列来管理它们。编程
调度器的职责是将Request分配给空闲的Worker来处理,实现任务调度。由于Request和Worker分别使用队列进行管理,能够经过调度器来控制程序的运行过程,例如:分配不一样数量的Worker,将特定的Request分配给相应的Worker进行处理等。浏览器
下面咱们来看一下项目的目录结构,了解一下爬虫架构的功能模块,再详细对每个功能模块的实现过程作介绍:安全
经过上面对项目架构介绍能够看出,运行该爬虫程序,须要的数据结构体很简单,定义数据结构的程序文件是:engine/type.go数据结构
package engine //请求数据结构 type Request struct { Url string //请求url ParserFunc func([]byte) ParseResult //内容解析函数 } //通过内容解析函数解析后的返回数据结构体 type ParseResult struct { Requests []Request //请求数据结构切片 Items []interface{} //抓取到的有用信息项 }
Request(请求)所要包含的信息是请求url和解析函数,不一样的url所需的解析函数是不同的,好比咱们要提取的“58同城房源列表”和“房源详情页面”信息是不同的,所需解析函数也是不同的,接下来会对者者两个页面的解析函数进行介绍。
Worker对请求进行处理以后,返回的结果中可能有新的Request,好比从房源列表中提取出房源详情页面的连接。在房源详情页面中咱们会拿到详情信息,这些详情信息咱们经过Items进行输出便可(企业中更通用的作法是将这些信息存储到数据库,用来作数据分析,这里咱们只是对并行爬虫框架实现思路作介绍)架构
采集器实现的功能是根据url提取网页内容,使用Go语言处理很简单,只须要封装一个简单的函数便可,下面是源代码,不作过多介绍。(若是想要将采集器作的更通用一些,同城还须要对不一样网站url的编码作兼容处理),采集器相关的代码实如今:fetcher/fetcher.go
//根据网页连接获取到网页内容 func Fetch(url string) ([]byte, error) { resp, err := http.Get(url) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("wrong status code: %d", resp.StatusCode) } bodyReader := bufio.NewReader(resp.Body) return ioutil.ReadAll(bodyReader) }
解析器要作的工做是根据fetch拿到的网页内容,从中提取出有用的信息。上边咱们提到过Request结构体中,不一样的Url须要不一样的解析器,下面咱们就分别看一下房源列表解析器和房源详情页面解析器。房源列表解析器代码实现代码是:samecity/parser/city.go
package parser import ( "depthLearn/goCrawler/engine" "regexp" "strings" ) const housesRe = `<a href="(//short.58.com/[^"]*?)"[^>]*>([^<]+)</a>` const nextPage = `<a class="next" href="([^>]+)"><span>下一页</span></a>` func ParseCity(contents []byte) engine.ParseResult { re := regexp.MustCompile(housesRe) matches := re.FindAllSubmatch(contents, -1) result := engine.ParseResult{} for _, m := range matches { name := string(m[2]) //格式化抓取的url fmtUrl := strings.Replace(string(m[1]), "/", "https:/", 1) result.Items = append( result.Items, "User "+string(m[2])) result.Requests = append( result.Requests, engine.Request{ Url: fmtUrl, ParserFunc: func(c []byte) engine.ParseResult { return ParseRoomMsg(c, name) }, }) } nextRe := regexp.MustCompile(nextPage) linkMatch := nextRe.FindStringSubmatch(string(contents)) if len(linkMatch) >= 2 { result.Requests = append( result.Requests, engine.Request{ Url:linkMatch[1], ParserFunc:ParseCity, }, )} return result }
从代码中能够看出,列表解析器所作的工做是提取房源详情连接,和下一页房源列表连接。如图所示:
正则表达式定义到函数循环外部是由于提取连接所用的正则表达式都是同样的,程序只须要定义一次,检查正则表达式是否编译经过(regexp.MustCompile)就能够了。
经过浏览器工具查看源代码咱们会发现咱们提取的连接并非标准的url形式,而是以下格式的字符串://legoclick.58.com/cpaclick?target=pZwY0jCfsvFJsWN3shPf......,咱们要作的就是把字符串前边加上https://,这也很容易实现,使用Go语言标准库函数strings.Replace就能够实现。
另一个须要注意的地方就是,咱们提取到的房源列表url和房源详情url所须要的解析函数(ParseFunc)是不同的,从代码中能够看出,房源列表url的解析函数是ParseCity,而房源详情解析函数是ParseRoomMsg。咱们会发现。咱们经过解析房源列表url,会获得新的房源列表url和房源详情url,房源详情url能够经过解析函数直接拿到咱们想要的数据,而新的房源列表url须要进一步的解析,而后获得一样的内容,直到最后一页,房源列表url解析后再也没有新的房源列表url位置,数据就抓取完毕了,这种层层递进的处理数据的方法在算法上叫作:深度优先遍历算法,感兴趣的同窗能够查找资料学习一下。
上面咱们提到了解析器,信息模版代码实现文件是:/samecity/parser/profile.go,它所定以的仅仅是咱们要提取信息的一个模版struct。以下图所示是一个房源详情页面,红圈部分是咱们要提取的数据信息:
咱们再来对比一下profile.go信息模版中所定义的数据结构:
package model //成员信息结构体 type Profile struct { Title string //标题 Price int //价格 LeaseStyle string //租赁方式 HouseStyle string //房屋类型 Community string //所在小区 Address string //详细地址 }
将信息模版单独定义一个文件也是为了可以使程序更加模块化,模块化带来的好处是代码易于维护,假如咱们想要抓取其余网站的信息,就能够经过修改解析器的规则,配置信息模版来使用。正如前边提到的咱们的爬虫框架比较通用。
“调度器”是整个框架中最核心的部分,它实现了将请求分配到worker的调度。为了让数据爬取工做可以顺利进行,咱们将Worker和每个Request都使用队列进行管理。咱们先来看一个调度器的接口和实现。
调度器的接口定义是这样的:
type Scheduler interface { Submit(Request) ConfigureWorkerMasterChan(chan chan Request) WorkerReady(chan Request) Run() }
下面咱们看一下这些方法的具体实现(/scheduler/queue.go)
package scheduler import "depthLearn/goCrawler/engine" //队列调度器 type QueuedScheduler struct { requestChan chan engine.Request workerChan chan chan engine.Request } //将任务提交 func (s *QueuedScheduler) Submit(r engine.Request) { s.requestChan <- r } //当有worker能够接收新的任务时 func (s *QueuedScheduler) WorkerReady(w chan engine.Request) { s.workerChan <- w } //将request的channel送给调度器 func (s *QueuedScheduler) ConfigureWorkerMasterChan(c chan chan engine.Request) { s.workerChan = c } func (s *QueuedScheduler) Run(){ s.workerChan = make(chan chan engine.Request) s.requestChan = make(chan engine.Request) go func() { //创建request队列和worker队列 var requestQ []engine.Request var workerQ []chan engine.Request for { //查看是否既存在request又存在worker,取出做为活动的request和worker var activeRequest engine.Request var activeWorker chan engine.Request if len(requestQ) > 0 && len(workerQ) > 0 { activeWorker = workerQ[0] activeRequest = requestQ[0] } select { //调度器中有请求时,将请求加入到请求队列 case r := <-s.requestChan: requestQ = append(requestQ, r) //调度器中有能够接收任务的worker时,将请求加入到worker中 case w := <-s.workerChan: workerQ = append(workerQ, w) //当同时有请求又有worker时,将请求分配给worker执行,从队列中移除 case activeWorker <- activeRequest: workerQ = workerQ[1:] requestQ = requestQ[1:] } } }() }
咱们重点看一下Run方法,首先创建好两个队列(workerChan和requestChan),而后开启一个协程挂起任务,当有request时,加入request队列;当有worker时,加入worker队列;当worker和request同时存在时,就将第一个request分配给第一个worker。这样咱们就实现了调度器,worker和解析器并行工做了。
全部工做都作完以后,咱们就能够经过ConcurrentEngine,实现程序了,ConcurrentEngine所作的工做就是配置worker数量,接收一个种子url,将调度器,采集器和worker都发动起来工做了,代码的实现文件是:/engine/concurrent.go
package engine import "fmt" type ConcurrentEngine struct { Scheduler Scheduler WorkerCount int } type Scheduler interface { Submit(Request) ConfigureMasterWorkerChan(chan chan Request) WorkerReady(chan Request) Run() } func (e *ConcurrentEngine) Run(seeds ...Request) { out := make(chan ParseResult) e.Scheduler.Run() for i := 0; i < e.WorkerCount; i++ { createWorker(out, e.Scheduler) } for _, r := range seeds { e.Scheduler.Submit(r) } for { result := <- out for _, item := range result.Items { fmt.Printf("Got item: %v", item) } for _, request := range result.Requests { e.Scheduler.Submit(request) } } } func createWorker(out chan ParseResult, s Scheduler) { go func() { in := make(chan Request) for { s.WorkerReady(in) // tell scheduler i'm ready request := <- in result, err := worker(request) if err != nil { continue } out <- result } }() }
配置worker数量,让worker工做起来,createWorker就是当worker接收到Request以后开始工做,工做完成以后告诉调度器(经过WorkerReady方法)。worker的实现也很简单,以下所示:
func worker(r Request) (ParseResult, error){ log.Printf("Fetching %s", r.Url) body, err := fetcher.Fetch(r.Url) if err != nil { log.Printf("Fetcher: error " + "fetching url %s: %v", r.Url, err) return ParseResult{}, err } return r.ParserFunc(body), nil }
至此,全部的工做都准备好了,就能够开始工做了,入口文件crawler.go:
package main import ( "depthLearn/ConcurrentCrawler/engine" "depthLearn/ConcurrentCrawler/scheduler" "depthLearn/ConcurrentCrawler/zhenai/parser" ) func main() { e := engine.ConcurrentEngine{ Scheduler: &scheduler.QueuedScheduler{}, WorkerCount: 100, } e.Run(engine.Request{ Url: "http://www.samecity.com/zhenghun", ParserFunc: parser.ParseCityList, }) }
下面是命令行打印出来的效果图:
能够看到,咱们抓取到数据了。
咱们的爬虫程序功能还算完备,当时还有不少能够改进优化的地方,我以为最主要的有三点:
整体来讲咱们的并行爬虫框架仍是挺不错的,其中涉及到的模块化编程,队列管理,调度器等在工做中仍是值得借鉴的。固然,笔者水平有限,语言组织能力也不是太好,虽然参考了不少其余资料,代码中存在不少值得优化的地方,但愿你们可以留言指正。谢谢你们!