直奔主题,在用多线程写一个爬虫项目时,很天然的想到用“生产者-消费者”模式,可是又想实现一个额外的需求,即若是爬虫爬到了队列中已存在的url,则不进行入队操做。想实现这个,大概面临两个问题:多线程
Queue
对象不支持in
操做Queue
对象外部经过if item in q
这种形式也行不通,由于该过程无锁,多线程状况下就没法保证查询结果的准确性进而观察了一下queue模块的源代码,由于Queue类是基于deque类实现的,那么就好办了:对put
方法稍加改造,增长一个unique
参数,依据此参数是否为True,在底层的deque中判断要加入的元素的存在性,以此可实现相似set的特性。ide
put方法源代码:url
def put(self, item, block=True, timeout=None): with self.not_full: if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise Full elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while self._qsize() >= self.maxsize: remaining = endtime - time() if remaining <= 0.0: raise Full self.not_full.wait(remaining) self._put(item) self.unfinished_tasks += 1 self.not_empty.notify()
修改后的put方法线程
def put(self, item, block=True, timeout=None, unique=False): """增长了unique参数""" with self.not_full: #----- 如下三行为新增代码 -----# if unique: if item in self.queue: return #----- 新增代码结束 -----# if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise Full elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while self._qsize() >= self.maxsize: remaining = endtime - time() if remaining <= 0.0: raise Full self.not_full.wait(remaining) self._put(item) self.unfinished_tasks += 1 self.not_empty.notify()
固然也能够仿照put_nowait
方法增长一个快捷方法put_unique
,以下code
def put_unique(self, item, block=True, timeout=None): return self.put(item, block, timeout, unique=True)
Have fun!对象