对应github地址:知乎和拉钩css
from selenium import webdriver import time import pickle def start_requests(self): browser = webdriver.Chrome() browser.get('https://www.zhihu.com/signin') input1 = browser.find_element_by_css_selector("input[name=username]") input1.send_keys('xx') input2 = browser.find_element_by_css_selector("input[name=password]") input2.send_keys('xx') button = browser.find_element_by_class_name('SignFlow-submitButton') button.click() time.sleep(10) Cookies = browser.get_cookies() print(Cookies) cookie_dict = {} for cookie in Cookies: f = open('./cookie/' + cookie['name']+'.zhihu', 'wb') pickle.dump(cookie, f) f.close() cookie_dict[cookie['name']] = cookie['value'] browser.close() return [scrapy.Request(url=self.start_urls[0], dont_filter=True, cookies=cookie_dict, headers=self.headers)] 使用scrapy读取本地cookie文件的时候,须要在加上最后一行代码 并在zhihu_sql.py中添加以下信息,这样能够保证后续的request请求都把cookie信息自动加上去 custom_settings = { "COOKIES_ENABLED": True, "DOWNLOAD_DELAY": 1.5, }
headers = { # HOST就是要访问的域名地址,https://blog.csdn.net/zhangqi_gsts/article/details/50775341
"HOST": "www.zhihu.com", # referer表示从哪一个网页跳转过来的,可防止盗链。https://blog.csdn.net/shenqueying/article/details/79426884
"Referer": "https://www.zhihu.com", 'User-Agent': "user-agent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/60.0.3112.113 Safari/537.36" }
# 重点:防止被ban
custom_settings = { "COOKIES_ENABLED": True, "DOWNLOAD_DELAY": 1 }
def parse(self, response): all_urls = response.css("a::attr(href)").extract() all_urls = [parse.urljoin(response.url, url) for url in all_urls] # 使用lambda函数对于每个url进行过滤,若是是true放回列表,返回false去除。
all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls) for url in all_urls: # 具体问题以及具体答案的url咱们都要提取出来。用或关系实现,要用小括号括起来。由于具体答案的url没斜杠
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url) if match_obj: # 若是提取到question相关的页面则下载后交由提取函数进行提取
request_url = match_obj.group(1) yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question) else: # 注释这里方便调试
pass
# 若是不是question页面则直接进一步跟踪
yield scrapy.Request(url, headers=self.headers, callback=self.parse)
from urllib import parse url1 = response.css("a::attr(href)").extract() url2 = [parse.urljoin(response.url, url) for url in url1]
url3 = filter(lambda x: True if x.startswith("https") else False, url2) 若是以为很差理解,能够以下写 url_list = [] for url in url3: if url.startwith("https"): url_list = url_list.append(url)
def is_odd(n): return n % 2 == 1 tmplist = filter(is_odd, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) newlist = list(tmplist) print(newlist)
# 一个列表的状况
>>> map(lambda x: x ** 2, [1, 2, 3, 4, 5]) [1, 4, 9, 16, 25] # 提供了两个列表,对相同位置的列表数据进行相加
>>> map(lambda x, y: x + y, [1, 3, 5, 7, 9], [2, 4, 6, 8, 10]) [3, 7, 11, 15, 19]
# 知乎问题的item
class ZhihuQuestionItem(scrapy.Item): zhihu_id = scrapy.Field() topoics = scrapy.Field() url = scrapy.Field() title = scrapy.Field() content = scrapy.Field() answer_num = scrapy.Field() comments_num = scrapy.Field() watch_user_num = scrapy.Field() click_num = scrapy.Field() crawl_time = scrapy.Field() crawl_update_time = scrapy.Field() # 知乎回答的item
class ZhihuAnswerItem(scrapy.Item): zhihu_id = scrapy.Field() url = scrapy.Field() question_id = scrapy.Field() author_id = scrapy.Field() content = scrapy.Field() praise_num = scrapy.Field() comments_num = scrapy.Field() create_time = scrapy.Field() update_time = scrapy.Field() crawl_time = scrapy.Field() crawl_update_time = scrapy.Field()
def parse_question(self, response): # 处理新版本, 新版本有惟一类QuestionHeader-title来设置标题,老版本没这个类
if "QuestionHeader-title" in response.text: match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url) if match_obj: # group(2)取到的为(\d+)中的内容
question_id = int(match_obj.group(2)) # 使用scrapy默认提供的ItemLoder使代码更简洁,首先实例化
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response) item_loader.add_value("url_object_id", get_md5(response.url)) item_loader.add_value("zhihu_id", question_id) item_loader.add_css("title", "h1.QuestionHeader-title::text") # 下面一个回答内容的例子,可参考下,提取content的方法提取了全部回答内容
# response.css(".QuestionAnswers-answers .List-item:nth-child(1) .RichContent-inner span::text").extract()
item_loader.add_css("content", ".QuestionAnswers-answers") item_loader.add_css("topics", ".QuestionHeader-topics .Tag.QuestionTopic .Popover div::text") item_loader.add_css("answer_num", ".List-headerText span::text") item_loader.add_css("comments_num", ".QuestionHeader-Comment button::text") # 这里的watch_user_num 包含Watch 和 click, 在clean data中分离
item_loader.add_css("watch_user_num", ".NumberBoard-itemValue ::text") item_loader.add_value("url", response.url) question_item = item_loader.load_item() # 发起向后台具体answer的接口请求
yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer) yield question_item
item_loader.add_xpath("title", "//*[@id='zh-question-title']/h2/a/text()|//*[@id='zh-question-title']/h2/span/text()")
start_answer_url = "https://www.zhihu.com/api/v4/questions/{0}/answers?include=data%5B%2A%5D.is_normal%2Cadmin_closed_comment%2Creward_info%2Cis_collapsed%2Cannotation_action%2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelevant_info%2Cquestion%2Cexcerpt%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%3Bdata%5B%2A%5D.mark_infos%5B%2A%5D.url%3Bdata%5B%2A%5D.author.follower_count%2Cbadge%5B%2A%5D.topics&limit={1}&offset={2}&sort_by=default"
def parse_answer(self, response): # json.loads把json字符串转变为python格式的
ans_json = json.loads(response.text) # 判断是否有后续页面,以及下一个页面的URL,就是页面分析中Preview里的paging信息
is_end = ans_json["paging"]["is_end"] next_url = ans_json["paging"]["next"] # 提取answer的具体字段
for answer in ans_json["data"]: answer_item = ZhihuAnswerItem() #answer_item["url_object_id"] = get_md5(url=answer["url"])
answer_item["zhihu_id"] = answer["id"] answer_item["question_id"] = answer["question"]["id"] # 有时候回答是匿名的,此时author字段中没id值,那么就返回None
answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None answer_item["author_name"] = answer["author"]["name"] if "name" in answer["author"] else None answer_item["content"] = answer["content"] if "content" in answer else None answer_item["praise_num"] = answer["voteup_count"] answer_item["comments_num"] = answer["comment_count"] answer_item["url"] = "https://www.zhihu.com/question/{0}/answer/{1}".format(answer["question"]["id"], answer["id"]) answer_item["create_time"] = answer["created_time"] answer_item["update_time"] = answer["updated_time"] answer_item["crawl_time"] = datetime.now() yield answer_item # 若是不是最后一个URL,继续请求下一个页面
if not is_end: yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
def get_insert_sql(self): # 插入知乎question表的sql语句
insert_sql = """ insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num, watch_user_num, click_num, crawl_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num), watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num) """
# scrapy.Field返回类型为列表
zhihu_id = self["zhihu_id"][0] topics = ",".join(self["topics"]) url = self["url"][0] title = "".join(self["title"]) content = "".join(self["content"]) # extract_num就是上面定义的get_nums,只是把它从新定义为一个经常使用函数了
answer_num = extract_num("".join(self["answer_num"])) comments_num = extract_num("".join(self["comments_num"])) # 浏览数和点击数是一块儿取出来的,而且用逗号分隔,须要单独取出来
if len(self["watch_user_num"]) == 2: watch_user_num_click = self["watch_user_num"] watch_user_num = extract_num_include_dot(watch_user_num_click[0]) click_num = extract_num_include_dot(watch_user_num_click[1]) else: watch_user_num_click = self["watch_user_num"] watch_user_num = extract_num_include_dot(watch_user_num_click[0]) click_num = 0 # 要把时间格式转为字符串格式
crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT) # 顺序要和sql语句中的保持同样
params = (zhihu_id, topics, url, title, content, answer_num, comments_num, watch_user_num, click_num, crawl_time) return insert_sql, params
def do_insert(self, cursor, item): # 根据不一样的Item构建不一样的sql语句并插入到mysql中
insert_sql, params = item.get_insert_sql() cursor.execute(insert_sql, params)
def get_insert_sql(self): # 插入知乎question表的sql语句
insert_sql = """ insert into zhihu_answer(zhihu_id, url, question_id, author_id, author_name, content, comments_num, create_time, update_time, crawl_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), update_time=VALUES(update_time) """
# int类型转为datetime类型,须要使用fromtimestamp函数;再转为字符串类型,须要strftime函数
create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT) update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT) params = ( self["zhihu_id"], self["url"], self["question_id"], self["author_id"], self["author_name"], self["content"], self["comments_num"], create_time, update_time, self["crawl_time"].strftime(SQL_DATETIME_FORMAT), ) return insert_sql, params