一句话归纳本文:python
本节对queue.py模块进行了详细的讲解,写了一个实战例子: 多线程抓取半次元Cos频道的全部今日热门图片,最后分析了 一波模块的源码,了解他的实现套路。git
大蕾姆镇楼:github
引言:ajax
原本是准备写multiprocessing进程模块的,而后呢,白天的时候随手 想写一个爬半次元COS频道小姐姐的脚本,接着呢,就遇到了一个使人 很是困扰的问题:国内免费的高匿代理ip都被玩坏了(不少站点都锁了), 几千个里可能就十个不到能用的,对于这种状况,有一种应付的策略 就是:写While True死循环,一直换代理ip直到能拿到数据为止。 可是,假如是咱们以前的那种单线程的话,须要等待很是久的时间, 想一想一个个代理去试,而后哪怕你设置了5s的超时,也得花上很多 时间,而你抓取的网页不止一个的话,这个时间就不是通常的长了, 这个时候不用多线程还等什么?咱们能够把要请求的页面都丢到 一个容器里,而后加锁,而后新建页面数量 x 访问线程,而后每一个 线程领取一个访问任务,而后各自执行任访问,直到所有访问完毕, 最后反馈完成信息。在学完threading模块后,相信你第一个想到的 会是条件变量Contition,acquire对集合加锁,取出一枚页面连接, notify唤醒一枚线程,而后release锁,接着重复这个操做,直到集合 里的再也不有元素为止,大概套路就是这样,若是你有兴趣能够本身 试着去写下,在Python的**queue模块
**里已经实现了一个线程安全的 多生产者,多消费者队列,自带锁,多线程并发数据交换必备。数据库
内置三种类型的队列数组
Queue
:FIFO(先进先出);LifoQueue
:LIFO(后进先出);PriorityQueue
:优先级最小的先出;构造函数的话,都是(maxsize=0),设置队列的容量,若是 设置的maxsize小于1,则表示队列的长度无限长安全
两个异常:bash
Queue.Empty
:当调用非堵塞的get()获取空队列元素时会引起; Queue.Full
:当调用非堵塞的put()满队列里添加元素时会引起;数据结构
相关函数:多线程
qsize
():返回队列的近似大小,注意:qsize()> 0不保证随后的get()不会 阻塞也不保证qsize() < maxsize后的put()不会堵塞;empty
():判断队列是否为空,返回布尔值,若是返回True,不保证后续 调用put()不会阻塞,同理,返回False也不保证get()调用不会被阻塞;full
():判断队列是否满,返回布尔值若是返回True,不保证后续 调用get()不会阻塞,同理,返回False也不保证put()调用不会被阻塞;put
(item, block=True, timeout=None):往队列中放入元素,若是block 为True且timeout参数为None(默认),为堵塞型put(),若是timeout是 正数,会堵塞timeout时间并引起Queue.Full异常,若是block为False则 为非堵塞put()put_nowait
(item):等价于put(item, False),非堵塞put()get
(block=True, timeout=None):移除一个队列元素,并返回该元素, 若是block为True表示堵塞函数,block = False为非堵塞函数,若是设置 了timeout,堵塞时最多堵塞超过多少秒,若是这段时间内没有可用的 项,会引起Queue.Empty异常,若是为非堵塞状态,有数据可用返回数据 无数据当即抛出Queue.Empty异常;官方给出的多线程例子:
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
复制代码
关于文档的解读大概就这些了,仍是比较简单的,接下来实战 写个用到Queue队列的多线程爬虫例子~
拉到底部(中途加载了更多图片,猜想又是ajax):
嗯,直接是日期耶,应该是请求参数里的一个,F12打开开发者模式,Network 抓包开起来,随手点开个02月08日,看下打开新连接的相关信息:
打开目录结构看看,要找的元素都在这里,数了下30个:
否则得出这样的抓包信息:
抓取地址:https://bcy.net/coser/toppost100 请求方式:Get 请求参数: type(固定):lastday date:20180208
清理一波,而后滚动下,抓下加载更多的那个接口:
一样是Ajax加载技术,不过数据不是Json,直接就是XML,点击Preview看下:
好家伙,果真是XML,而后不难看出**<li class="_box">
**包着的就是 一个元素,搜了下有20个,就是每次加载20个咯,算一算每日最热 天天的图片就是30+20 = 50个咯,整理下抓包信息:
抓取地址:https://bcy.net/coser/index/ajaxloadtoppost 请求方式:Post 请求参数: p(固定):1 type(固定):lastday date:20180207
嗯,两个要抓的接口都一清二楚了,而后就是得到日期的范围了, 这个就要本身慢慢试了,二分查找套路,慢慢缩减范围,知道得 出日期的前一天和日期内容相同,日期的后一天与内容不一样为止, 这里直接给出起始时间:20150918,开始抓的时间就是这个, 截止时间就是今天,好比:2018.02.09。
分析完毕,接下来就一步步写代码了~
比较简单,利用datetime模块格式化下日期,弄个循环,轻松完成;
简单介绍下,cpn是我本身写的一个模块,**get_dx_proxy_ip()随机获取 一个大象代理的代理ip,接着的get_bs()**则是获取一个BeautifulSoup对象, write_str_data()是往文件里追加一串字符串的函数。最后还把异常给打印 出来了,运行下就知道了,这个是很是频繁的,threading.current_thread() 得到当前线程,只是方便排查,若是不想打印任何东西,这里直接改为pass就 能够了。另外,使用Θ分隔图片名与下载连接(由于还没学到数据库那里,暂时 就先写txt里...)
和2相似...
继承threading.Thread类,__init__构造函数传入一个执行函数, 重写run函数,在此处调用传入的执行函数。
循环,若是队列不为空,从里面取出一枚数据,执行两个抓数据 的函数,执行完毕后,调用queue对象的task_done()通知数目-1;
这里就是建立了和任务队列同样数目的线程,调用daemon=True是为了 避免由于线程死锁或者堵塞,而后程序没法中止的状况,保证当程序只 剩下主线程时可以正常退出。
运行截图:
是的,这种HTTPSConnectionPool的异常就是那么频发,代理ip问题,不是 你程序的缘由,打开bcycos_url.xml,验证下数据有没有问题:
(PS:这里有些重复是网站原本就重复,一开始还觉得是我程序出错... 还有,这里没有抓取全部的,只抓了:20150918到20150930的,数据多得一批...)
就是处理字符串,得到下载连接,还有图片名的拼接而已~
运行截图:
能够打开输入目录验证下:
使用Queue编写一个多线程爬虫就是那么简单~ 接下来会抠下Queue的源码,有兴趣的能够继续看,没兴趣的话直接跳过便可~
直接点进去queue.py,源码只有249行,还好,看下源码结构
点开两个异常,很是简单,继承Exception而已,咱们更关注**__all__
**
all:在模块级别暴露公共接口,好比在导库的时候不建议写 *from xxx import ,由于会把xxx模块里全部非下划线开头的成员都 引入到当前命名空间中,可能会污染当前命名空间。若是显式声明了 all,import * 就只会导入 all 列出的成员。 (不建议使用:**from xxx import *** 这种语法!!!)
接着看下Queue类结构,老规矩,先撸下**init**方法
文档注释里写了:建立一个maxsize大小的队列,若是<=0,队列大小是无穷的。 设置了maxsize,而后调用self._init(maxsize),点进去看下:
这个deque是什么?
实际上是collections模块提供的双端队列,能够从队列头部快速 增长和取出对象,对应两个方法:popleft()与appendleft(), 时间复杂度只有O(1),相比起**list对象的insert(0,v)和pop(0)**的 时间复杂度为O(N),列表元素越多,元素进出耗时会越长!
回到源码,接着还定义了: mutex:threading.Lock(),定义一个互斥锁 not_empty = threading.Condition(self.mutex):定义一个非空的条件变量 not_full = threading.Condition(self.mutex):定义一个非满的条件变量 all_tasks_done = threading.Condition(self.mutex):定义一个任务都完成的条件变量 unfinished_tasks = 0:初始化未完成的任务数量为0
接着到**task_done()**方法:
with加锁,未完成任务数量-1,判断未完成的任务数量, 小于0,抛出异常:task_done调用次数过多,等于0则唤醒 全部等待线程,修改未完成任务数量;
再接着到**join()**方法:
with加锁,若是还有未完成的任务,wait堵塞调用者进程; 接下来是qsize,empty和full函数,with加锁返回大小而已:
接着是**put()**函数:
with加锁,判断maxsize是否大于0,上面也讲了maxsize<=0表明 队列是能够无限扩展的,那就不存在队列满的状况,maxsize<=0 的话直接就往队列里放元素就能够了,同时未完成任务数+1,随机 唤醒等待线程。
若是maxsize大于0表明有固定容量,就会出现队列满的状况,就须要 进行细分了:
再接着是get()函数,和put()相似,只是抛出的异常为:Empty
这两个就不用说了,非堵塞put()和get(),最后就是操做双端队列的方法而已;
另外两种类型的队列也很是简单,继承Queue类,而后重写对应的四个 方法而已~
PriorityQueue优先级队里的heappush()和heappop()是heapq模块 提供的两个方法,heap队列,q队列,堆通常可看作是一棵树的 数组对象(二叉树堆),规则以下: 某个节点的值老是不大于或不小于其孩子节点的值 而后又分最大堆和最小堆:
(这里大概知道是二叉树就行了,笔者数据结构也学的比较烂...)
利用:heappush()能够把数据放到堆里,会自动按照二叉树的结构进行存储; 利用:heappop(heap):从heap堆中删除最小元素,并返回,heap再按彻底二叉树规范重排;
queue.py模块大概的流程就是这个样子咯,总结下套路把:
关键点核心:三个条件变量,
not_empty:get的时候,队列空或在超时时间内,堵塞读取线程,非空唤醒读取线程; not_full:put的时候,队列满或在超时时间内,堵塞写入线程,非满唤醒写入线程; all_tasks_done:未完成任务unfinished_tasks不为0的时候堵塞调用队列的线程, 未完成任务不为0时唤醒全部调用队列的线程;
大概就这样~
本节把queue模块个撸了一遍,不止是熟悉API,还把源码给撸了, 撸源码感受就是在一件件脱妹子的衣服同样,每次总能发现新大陆~ 嘿嘿,挺好玩的,就说那么多吧~
(PS:Coser的质量真是良莠不齐,大部分是靠的化妆和滤镜,我仍是喜欢素颜 小姐姐还有萌大奶~,最后来个辣眼睛的Coser给你洗洗眼。O(∩_∩)O)
本节源码下载
来啊,Py交易啊
想加群一块儿学习Py的能够加下,智障机器人小Pig,验证信息里包含: Python,python,py,Py,加群,交易,屁眼 中的一个关键词便可经过;
验证经过后回复 加群 便可得到加群连接(不要把机器人玩坏了!!!)~~~ 欢迎各类像我同样的Py初学者,Py大神加入,一块儿愉快地交流学♂习,van♂转py。