''' 一个线程在使用这个共享的时候,其余线程必须等待他结束 经过"锁"实现,做用就是防止多个线程使用这片内存空间 进程:程序的一次执行 线程:cpu运算的基本调度单位 多线程:大量密集I/O处理,在等待响应的时候,其余线程去工做 多进程:大量的密集并行计算 scrapy:异步网络框架(不少协程在处理) 页码队列--线程取页码爬取(采集线程--网络IO)--数据队列(获得的响应)--线程解析网页(解析线程磁盘IO)--解析后的数据存储 ''' # 请求 import requests # 队列 from multiprocessing import Queue # 线程 from threading import Thread import threading # 解析 from lxml import etree # 存储 import json import time class ThreadCrawl(Thread): def __init__(self, threadName, pageQueue, dataQueue): # 调用父类的初始化方法 super(ThreadCrawl, self).__init__() self.threadName = threadName self.pageQueue = pageQueue self.dataQueue = dataQueue self.headers = {"User-Agent":"Mozilla/5.0(Windows NT 10.0;WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.170 Safari/537.36"} # thread.start()会执行run方法 def run(self): print("启动"+self.threadName) while not CRAWL_EXIT: try: # 从页码队列取出一个数字, # 可选参数block(默认Ture) # 1.队列为空,block为Ture,会进入阻塞状态,直到有新的值进入队列 # 2.若是队列为空.block为False,会弹出Queue.empty()出错 page = self.pageQueue.get(False) url = "https://www.qiushibaike.com/text/page/" + str(page) + "/" content = requests.get(url,headers=self.headers).text #调用数据队列,将源码放进去 self.dataQueue.put(content) except: pass print("结束"+self.threadName) class ThreadParse(Thread): def __init__(self,threadName,dataQueue,filename,lock): super(ThreadParse,self).__init__() self.threadName = threadName self.dataQueue = dataQueue self.filename = filename self.lock = lock def run(self): while not PARSE_EXIT: try: html = self.dataQueue.get(False) self.parse(html) except: pass def parse(self,html): html = etree.HTML(html) print(html) # with 后面有两个必须执行的操做:__enter__ 和 _exit__ # 无论里面的操做结果如何,都会执行打开、关闭 # 打开锁、处理内容、释放锁 with self.lock: # 写入存储的解析后的数据 self.filename.write(json.dumps(html, ensure_ascii=False).encode("utf-8") + "\n") CRAWL_EXIT = False PARSE_EXIT = False def main(): # 页码队列,能够存储20个值 pageQueue = Queue(20) # 放入1-10数字,先进先出 for i in range(1, 21): pageQueue.put(i) # 数据队列,HTML源码,不写参数,默认无限 dataQueue = Queue() # 建立锁 lock = threading.Lock() # 采集线程名字 crawlList = ["采集线程1号", "采集线程2号", "采集线程3号"] # 存储采集线程 thread_crawl = [] for threadName in crawlList: # 写一个 thread = ThreadCrawl(threadName, pageQueue, dataQueue) thread.start() thread_crawl.append(thread) filename = open("duanzi.json","a") #解析线程名字 parseList = ["解析线程1号","解析线程2号","解析线程3号"] threadparse = [] for threadName in parseList: thread = ThreadParse(threadName,dataQueue,filename,lock) thread.start() threadparse.append(thread) #若是队列不为空,一直在这等待 while not pageQueue.empty(): pass #若是队列为空,CRAWL_EXIT = True 退出 global CRAWL_EXIT CRAWL_EXIT = True #加阻塞,线程作完才能运行主线程 for thread in thread_crawl: thread.join() print(thread) while not dataQueue.empty(): pass global PARSE_EXIT PARSE_EXIT = True for thread in threadparse: thread.join() print(thread) with lock: # 关闭文件 filename.close() print("谢谢使用") if __name__ == '__main__': main()