python的版本是2.7.10,使用了两个第三方模块bs4和happybase,能够经过pip直接安装。html
1.logger利用python自带的logging模块配置了一个简单的日志输出python
2.getHtml利用自带的urllib2模块得到网站html的内容。另外urllib2还能够配置代理的,后期还能够作成每一个线程用一个代理地址,防止线程开得太多让人家给封了。数据库
3.get_html_class和get_html_id是两个几乎相同的函数,使用bs4模块抓取网页的相关内容并发
4.getcounty,getvillage和getintro是根据实际的网页过滤想要的信息,这个要根据每一个网站的实际状况来定。app
5.writetohbase利用了happybase模块把抓到的数据写入到hbase数据库里。这里要注意hbase要开启thrift1,happybase和thrift2是不兼容的。ide
6.div_list把一个大的列表,根据线程的数量散分红n个小的列表,好让每一个线程只处理本身的那个列表函数
7.代码里的url只是天津市的,线程开了4个,每一个线程都只抓51条,真的想要抓的话须要改一下网站
1 #!/usr/local/bin/python 2 # -*- coding: utf8 -*- 3 4 import urllib2,threading,time,logging,msvcrt 5 from bs4 import BeautifulSoup 6 import happybase 7 8 # 配置log 9 def logger(messagetype,message): 10 logging.basicConfig(level=logging.INFO,format='%(asctime)s [%(levelname)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S',filename="spider.log") 11 if (messagetype=="info"): 12 logging.info(message) 13 elif (messagetype=="warning"): 14 logging.warning(message) 15 16 # 主页的网址,后面抓到的地址都是相对地址,要加上主页地址才能使用 17 def mainhtml(): 18 mainhtml="http://www.yigecun.com" 19 return mainhtml 20 21 # 打开网页 22 def getHtml(url): 23 page = urllib2.urlopen(url) 24 html = page.read() 25 return html 26 27 # 获取网页中指定的内容 28 def get_html_class(url,htmltype,htmlclass): 29 html = getHtml(url) 30 soup = BeautifulSoup(html,"html.parser",from_encoding="UTF-8") 31 divs=soup.find_all(htmltype,class_=htmlclass) 32 content=[] 33 for div in divs: 34 content.append(str(div).decode("utf8")) 35 return content 36 37 def get_html_id(url,htmltype,htmlid): 38 html = getHtml(url) 39 soup = BeautifulSoup(html,"html.parser",from_encoding="UTF-8") 40 content=soup.find_all(htmltype,id=htmlid) 41 return str(content[0]).decode("utf8") 42 43 # 县的名字和地址 44 def getcounty(url): 45 # 从网页上获取省的名字 46 province=get_html_class(url,"li","active")[0] 47 province=province.split(">")[1].split("<")[0] 48 49 # 定义一个列表存放县的信息及对应的连接地址 50 list_county=[] 51 52 # 从网页上得到省市县的信息及对应的连接地址 53 content=get_html_class(url,"div","cunnavtagguoup") 54 content[0]=content[0].replace("<div class=\"cunnavtagguoup\">\n","").replace("\n</div>","") 55 for item in content[0].split("</div>"): 56 57 # 获取地级市的名字 58 if (item.split(">")[0]=="<div class=\"cunnavtaga\""): 59 city=item.split(">")[1] 60 # 地级市的名字前面加上省名的名字作前缀 61 city=province+"-"+city 62 63 # 获取到县的名字及相应的地址,并存放到列表里 64 if (item.split(">")[0]=="<div class=\"cunnavtagb\""): 65 # 县的名字前面再加上省市的名字 66 county=city+"-"+item.split(">")[2].replace("</a","") 67 path=mainhtml()+item.split("\"")[3] 68 list=[] 69 list.append(county) 70 list.append(path) 71 list_county.append(list) 72 73 return list_county 74 75 # 得到村的名字和地址 76 def getvillage(list_county): 77 list_village=[] 78 79 content=get_html_class(list_county[1],"div","cunnavtagguoup cunmargintd10") 80 content[0]=content[0].replace("<div class=\"cunnavtagguoup cunmargintd10\">\n","").replace("\n</div>","") 81 82 for item in content[0].split("</div>"): 83 84 # 得到镇一级的名字 85 if (item.split(">")[0]=="<div class=\"cunnavtaga\""): 86 town=item.split(">")[1] 87 # 镇的名字前面加上对应的省,市,县的名字 88 town=list_county[0]+"-"+town 89 90 # 获取到村的名字及相应的地址,并存放到列表里 91 if (item.split(">")[0]=="<div class=\"cunnavtagb\""): 92 # 村在名字前面加上省市县镇的名字 93 village=town+"-"+item.split(">")[2].replace("</a","") 94 path=mainhtml()+item.split("\"")[3] 95 list=[] 96 list.append(village) 97 list.append(path) 98 list_village.append(list) 99 return list_village 100 101 # 得到村的简介 102 def getintro(villagelist): 103 intro=get_html_id(villagelist[1],"div","cunintr") 104 intro=intro.replace("<div id=\"cunintr\">","").replace("</br>","").replace("</div>","").replace("\n","").replace(" ","") 105 newvillagelist=[] 106 newvillagelist.append(villagelist[0]) 107 newvillagelist.append(intro) 108 return newvillagelist 109 110 #写到hbase数据库 111 def writetohbase(villagelist): 112 113 # 村的名字作rowkey 114 rowkey=villagelist[0] 115 116 # 其余内容放入到info列族的相应列里 117 content=villagelist[1].split("<br>") 118 119 # 初始化每一个列的值,由于有可能有的列是空的,直接调用要报错 120 intro="" 121 company="" 122 farmprod="" 123 resource="" 124 unit="" 125 other="" 126 127 for i in range (0,len(content)): 128 if (i==0): 129 intro=content[i] 130 elif (content[i].split(u":")[0]==u"村内企业"): 131 company=content[i].split(u":")[1] 132 elif (content[i].split(u":")[0]==u"主要农产品"): 133 farmprod=content[i].split(u":")[1] 134 elif (content[i].split(u":")[0]==u"村内资源"): 135 resource=content[i].split(u":")[1] 136 elif (content[i].split(u":")[0]==u"村里单位"): 137 unit=content[i].split(u":")[1] 138 else: 139 other=content[i] 140 141 connection=happybase.Connection("192.168.161.101") 142 table=connection.table("village") 143 table.put(rowkey.encode("utf8"),{"info:intro":intro.encode("utf8"),"info:company":company.encode("utf8"),"info:farmprod":farmprod.encode("utf8"),"info:resource":resource.encode("utf8"),"info:unit":unit.encode("utf8"),"info:other":other.encode("utf8")}) 144 145 # 抓数据 146 def work(thread_id,list_village): 147 148 logger("info",u"线程"+"(thread_id="+str(thread_id)+u")已启动,总共须要爬取的数量是 "+str(len(list_village))) 149 150 count=0 151 errorlist=[] 152 153 for village in list_village: 154 155 # 碰到错误最多重试三次 156 error=0 157 158 while (error<3) and (count<=50): 159 try: 160 newvillagelist=getintro(village) 161 writetohbase(newvillagelist) 162 # print "(thread_id="+str(thread_id)+")"+newvillagelist[0]+"done!" 163 logger("info","(thread_id="+str(thread_id)+")"+newvillagelist[0]+"done!") 164 count=count+1 165 break; 166 except: 167 error=error+1 168 time.sleep(5) 169 170 if (error>=3): 171 # print "(thread_id="+str(thread_id)+")"+newvillagelist[0]+"failed!" 172 errorlist.append(newvillagelist[0]) 173 logger("warning","(thread_id="+str(thread_id)+")"+newvillagelist[0]+"failed!") 174 175 logger("info","(thread_id="+str(thread_id)+u")工做结束,成功:"+str(count)+u",失败:"+str(len(errorlist))) 176 177 # 相似于数据库的hash分区同样,把一个大的列表取模拆分红几个小的列表,这样能够方便多个线程进行并发处理 178 def div_list(list,n): 179 divlist=[] 180 181 for m in range (0,n): 182 divlist.append([]) 183 184 for i in range (0,len(list)): 185 divlist[i%n].append(list[i]) 186 187 result=[] 188 189 for j in range (0,n): 190 result.append(divlist[j]) 191 return result 192 193 if (__name__=="__main__"): 194 195 # print "spider begin" 196 logger("info",u"主程序启动") 197 198 url="http://www.yigecun.com/lelist/listxian.aspx?id=0D59910890BB01DB" 199 num_thread=4 200 201 list=[] 202 203 204 for list_county in getcounty(url): 205 for list_village in getvillage(list_county): 206 list.append(list_village) 207 208 # print u"经统计须要爬取行政村的数量:"+str(len(list)) 209 logger("info",u"经统计须要爬取行政村的数量: "+str(len(list))) 210 print u"按任意键继续" 211 msvcrt.getch() 212 213 214 newlist=[] 215 newlist=div_list(list,num_thread) 216 217 for i in range (0,num_thread): 218 t=threading.Thread(target=work,args=(i,newlist[i])) 219 t.start() 220
程序启动的时候:url
结束的时候:spa
使用hive对hbase作mapreduce统计: