事前最好了解一下什么是进程,什么是线程,什么是GIL,本文再也不赘述,直接介绍模块的使用:html
普通的python爬虫是单进程单线程的,这样在遇到大量重复的操做时就只能逐个进行,咱们就很难过了。举个栗子:你有1000个美图的连接,逐个喂给下载器(函数),看着图片只能一个个蹦出来,你不心急吗?因而咱们想,能不能同时跑多个下载器,实现多图同时下载?——答案是能够的,使用多进程/多线程,把每一个带着不一样参数下载器分给每一个进程/线程就,而后同时跑多个进程/线程就好了。python
本文就介绍如何用多线程和多进程给爬虫加速算法
补充主线程与子线程(进程同理):json
Python标准库本来有threading和multiprocessing模块编写相应的多线程/多进程代码。但从Python3.2开始,标准库为咱们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。多进程咱们介绍futures的ProcessPoolExecutor
注:python 2.7 请安装future模块,pip install future
segmentfault
ProcessPoolExecutor类是Executor类的子类,实例化ProcessPoolExecutor类以建立进程池,在实例化的过程当中应指定同时运行的最大进程数api
from concurrent.futures import ProcessPoolExecutor pool = ProcessPoolExecutor(max_workers=4) # 运行最大进程数4 #进程池的操做... pool.shutdown(wait=True) # 关闭进程池,默认等待全部进程的完成。 print('Deep') # 有shutdown的状况下全部进程完成后才会运行下面的print,没有的话会立刻运行 '建立进程也可用with,这时会自带shutdown功能 with ProcessPoolExecutor(4) as pool: #进程池的操做... '
该类有两种方法对进程池提交任务创建进程(函数及一组参数构成一个任务),分别是submit()
和map()
,若是单纯想多开进程别无他想,用哪一个都行,但submit()会有更灵活的用法网络
能够理解这是python自带map()的多进程版,他返回的是一个迭代器,包含每一个任务对应的返回值(有序的),下面用例子来分析多线程
from concurrent.futures import ProcessPoolExecutor import time def test(x): time.sleep(x) # 时间阻塞 print(str(x)+'s') return x if __name__ == '__main__': with ProcessPoolExecutor(4) as pool: p = pool.map(test,[2,3,10,5,6]) for i in p: print(i)
输出并发
2s 2 3s 3 5s 6s 10s 10 5 6
分析(下面以参数代替某个进程):app
2s,3s,5s,6s,10s
2,3,10,5,6
,是有序的,对应各任务的返回值在爬虫中,上面代码中的时间阻塞会对应着网络I/O阻塞,任务中每每包含着网络请求。好比你有不少个图片连接,就写一个下载图片的函数(接收一个图片连接的参数),把函数和图片连接的集合喂给map()就实现多进程了加速了。
该方法是往进程池中提交可回调的任务,并返回一个future实例。提交多个任务可用循环实现,返回的future实例用列表存起来,每一个future表明一个进程。关于future对象有许多方法:
from concurrent.futures import ProcessPoolExecutor,as_completed import time def test(x): time.sleep(x) print(str(x)+'s') return x if __name__ == '__main__': with ProcessPoolExecutor(4) as pool: futures = [pool.submit(test,i) for i in [2,3,10,5,6]] '''for j in futures: print(j.result()) # 对应接收参数有序输出,输出2,3,10,5,6 ''' for j in as_completed(futures): print(j.result()) # 对应进程完成顺序输出,输出2,3,5,6,10
建议当心使用,虽然多线程能实现高并发,但因为线程资源共享的特性,某个线程操做这些共享的资源时可能操到一半就中止让给另外一个线程操做,致使错乱的发生。为避免此状况发生对某些操做须要加锁,因此这里介绍对锁有支持的threading模块,python自带直接导入。
若是你确信这些操做不会发生错乱,能够直接使用concurrent.future 的 ThreadPoolExecutor,方法什么的和ProcessPoolExecutor的同样
建立线程有两种方法:
实例化 threading.Thread 类,target接收函数,arg以可迭代形式接收参数。这种方法最简单
import threading import time def test(x): time.sleep(x) print(str(x)+'s') return x t1 = threading.Thread(target=test, args=(1,)) # 建立线程 t2 = threading.Thread(target=test, args=(3,)) t1.start() # 启动线程 t2.start()
继承threading.Thread 类,重写run方法,把函数及参数接收写进本身写的多线程类中。这种方法更灵活,threading.Thread 类并无供获取线程调用函数返回值的方法,若是须要函数返回值就须要继承该类本身实现
import threading import time class TestThread(threading.Thread): def __init__(self,x): threading.Thread.__init__(self) self.x = x # 参数接收 def run(self): time.sleep(self.x) # 原来的函数写到run中 print(str(self.x)+'s') def result(self): # 实现获取调用函数的返回值的方法 return self.x t1 = TestThread(1) #建立线程 t2 = TestThread(3) t1.start() # 启动线程 t2.start() t1.join() # 等待线程结束 t2.join() print(t1.result(),t2.result())
线程相关方法和属性:
线程间资源共享,若是多个线程共同对某个数据修改,可能会出现错误,为了保证数据的正确性,须要对多个线程进行同步。这时就须要引入锁了(利用GIL),锁只有一个,一个线程在持有锁的状态下对某些数据进行操做,其余线程就没法对该数据进行操做,直至该线程释放锁让其余线程抢,谁抢到谁就有权修改。
threading提供Lock和RLock两类锁,前者一个线程只能获取获取一次锁,后者容许一个线程能重复获取锁。若是某个线程对全局数据的操做是割裂的(分块的),那就使用RLock。
一个错乱的例子及锁的使用:
import time, threading lock = threading.Lock() # rlock = threading.RLock() balance = [0] def test(n): for i in range(100000): # 理想的状况是执行了+n,-n操做后才让另外一个线程处理,结果永0 #lock.acquire() balance[0] = balance[0] + n # 某个线程可能处理到这里就终止让给另外一个线程处理了,循环一大,结果可能错乱不为0 balance[0] = balance[0] - n #lock.release() t1 = threading.Thread(target=test, args=(5,)) t2 = threading.Thread(target=test, args=(8.0,)) t1.start() t2.start() t1.join() t2.join() print(balance[0])
在不加锁的状况下多跑几回,你会的到不一样的结果。可是加了锁以后,+n,-n两个操做完整执行,不会中途中断,结果永0。
使用 threading.Semaphore 类就行,Semaphore 在内部管理着一个计数器。调用 acquire() 会使这个计数器减1,release() 则是加1。计数器的值永远不会小于 0。当计数器到 0 时,再调用 acquire() 就会阻塞,直到其余线程来调用release(),这样就限制了同时运行线程的数量。
使用上很是简单,实例化Semaphore并指定线程数后,给函数的头加个acquire(),尾加个release()就行。
import threading, time def test(x): semaphore.acquire() time.sleep(x) print(x) semaphore.release() semaphore = threading.Semaphore(4) # 最大4个线程同时进行 ts = [threading.Thread(target=test,args=(i,)) for i in [2,3,5,10,6]] [t.start() for t in ts] '输出:2,3,5,6,10 (原理和上面多进程的那个差很少)'
关于threading的其余高级用法本文并未说起,以上都是些经常使用的用法,若是有更高级的须要,能够参考这文章
讲了这么多,都是模块的用法,没怎么提到爬虫。那么最后大概的讲下如何把多进程/多线程运用到爬虫中,并给个代码实例用做参考。
下面给个多进程/多线程结合的网易云音乐评论下载器(下载某首音乐的多页评论),包含加密算法,如不清楚可看以前的文章,咱们用多进程加速加密过程,用多线程加速爬取过程。
本代码较长,长到高亮效果都没有了,所以该长代码分为两部分,前半部分是以前文章提到的加密方法,后半部分是本文的多进程多线程重点代码:
import json, re, base64, random, requests, binascii, threading from Crypto.Cipher import AES#新的加密模块只接受bytes数据,否者报错,密匙明文什么的要先转码 from concurrent.futures import ProcessPoolExecutor from math import ceil secret_key = b'0CoJUm6Qyw8W8jud'#第四参数,aes密匙 pub_key ="010001"#第二参数,rsa公匙组成 modulus = "00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7"#第三参数,rsa公匙组成 headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36'} def random_16(): return bytes(''.join(random.sample('1234567890DeepDarkFantasy',16)),'utf-8') #aes加密 def aes_encrypt(text,key): pad = 16 - len(text)%16#对长度不是16倍数的字符串进行补全,而后在转为bytes数据 try: #若是接到bytes数据(如第一次aes加密获得的密文)要解码再进行补全 text = text.decode() except: pass text = text + pad * chr(pad) try: text = text.encode() except: pass encryptor = AES.new(key,AES.MODE_CBC,b'0102030405060708') ciphertext = encryptor.encrypt(text) ciphertext = base64.b64encode(ciphertext)#获得的密文还要进行base64编码 return ciphertext #rsa加密 def rsa_encrypt(ran_16,pub_key,modulus): text = ran_16[::-1]#明文处理,反序并hex编码 rsa = int(binascii.hexlify(text), 16) ** int(pub_key, 16) % int(modulus, 16) return format(rsa, 'x').zfill(256) #返回加密后内容 def encrypt_data(data): ran_16 = random_16() text = json.dumps(data) params = aes_encrypt(text,secret_key) params = aes_encrypt(params,ran_16) encSecKey = rsa_encrypt(ran_16,pub_key,modulus) return {'params':params.decode(), 'encSecKey':encSecKey }
class OnePageComment(threading.Thread): # 下载一页评论的线程类 def __init__(self,post_url, enc_data): threading.Thread.__init__(self) self.post_url = post_url self.enc_data = enc_data self.comment = '' # 建立一个comment变量储存爬到的数据 def run(self): semaphore.acquire() content = requests.post(self.post_url, headers = headers, data = self.enc_data ).json() if 'hotComments' in content: if content['hotComments']: self.comment += '*************精彩评论\n\n' self.common(content, 'hotComments') self.comment += '\n\n*************最新评论\n\n' self.common(content, 'comments') else: self.common(content, 'comments') semaphore.release() def common(self, content,c_type): for each in content[c_type]: if each ['beReplied']: if each['beReplied'][0]['content']: self.comment += each['content'] + '\n\t回复:\n\t' + each['beReplied'][0]['content'] + '\n' + '-' * 60 + '\n' else: self.comment += each['content'] + '\n' + '-' * 60 + '\n' def get_comment(self): # 选择返回评论而不是直接写入文件,由于多个线程同时操做一个文件有风险,应先返回,后统一写入 return self.comment def get_enc_datas(pages, max_workers=4): # 多进程加密 raw_datas = [] for i in range(pages): if i == 0: raw_datas.append({'rid':"", 'offset':'0', 'total':"true", 'limit':"20", 'csrf_token':""}) else: raw_datas.append({'rid':"", 'offset':str(i*20), 'total':"false", 'limit':"20", 'csrf_token':""}) with ProcessPoolExecutor(max_workers) as pool: # 多进程适合计算密集型任务,如加密 result = pool.map(encrypt_data,raw_datas) return list(result) def one_song_comment(id_): # 爬取一首歌的评论并写入txt,网络I/O密集使用多线程 post_url = 'http://music.163.com/weapi/v1/resource/comments/R_SO_4_' + str(id_) + '?csrf_token=' ts = [OnePageComment(post_url,i) for i in enc_datas] [i.start() for i in ts] [i.join() for i in ts] comments = [i.get_comment() for i in ts] with open(id_ + '.txt', 'w', encoding='utf-8') as f: f.writelines(comments) if __name__ == '__main__': semaphore = threading.Semaphore(4) # 最大线程4 enc_datas = get_enc_datas(10) # 获取加密后的数据,对全部歌曲都是通用的,这里有十页的加密数据,对应爬十页评论 one_song_comment('29498682')
效果提高惊人!!不信你跑一下上面的程序,而后和本身写的单线程/单进程比较
cpu和网络都跑到了峰值,网络峰值在cpu峰值以后,由于是先多进程加密数据,后多线程爬取