阻塞:程序遇到IO阻塞,程序遇到IO立马会中止(挂起),cpu立刻切换,等IO结束后再执行python
非阻塞:程序没有IO或者遇到IO经过某种手段让CPU去执行同一个线程里面的其余的任务,尽量的占用CPUgit
# 异步回收任务的方式一: 将全部任务的结果统一收回 from concurrent.futures import ProcessPoolExecutor import os import time import random def task(): print(f'{os.getpid()} is running') time.sleep(random.randint(1, 3)) return f'{os.getpid()} is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) lst = [] for i in range(10): res = p.submit(task) # 异步发出 lst.append(res) # print(res.result()) # 在这里result()就会变成同步 p.shutdown(wait=True) # 1.阻止再向进程池投放新的任务 # 2.wait=True 一个任务完成了就减一,直至为0才执行下一行 for res in lst: print(res.result())
# 浏览器作的事情很简单,封装一些头部,发一个请求到服务器,服务器拿到请求信息,分析信息,分析正确以后,给浏览器返回一个文件,浏览器将这个文件的代码渲染就成了网页 # 爬虫: 利用requests模块,模拟浏览器,封装头给服务器发送请求,骗过服务器,服务器也给你返回一个文件, # 爬虫拿到文件进行数据清洗,获取想要的信息 # 爬虫: 分两步 # 第一步: 爬取服务端的文件(IO阻塞) # 第二步: 拿到文件,进行数据清洗(非IO,极少IO) # 版本一 from concurrent.futures import ProcessPoolExecutor import requests import time import os import random def get(url): # 爬取文件 response = requests.get(url) print(os.getpid(), '正在爬取:', url) time.sleep(random.randint(1, 3)) if response.status_code == 200: return response.text def parse(text): # 对爬取回来的字符串的分析,用len模拟一下 print('分析结果:', len(text)) if __name__ == '__main__': url_list = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/', 'https://www.cnblogs.com/' ] pool = ProcessPoolExecutor(4) obj_list = [] for url in url_list: obj = pool.submit(get, url) obj_list.append(obj) pool.shutdown(wait=True) for obj in obj_list: text = obj.result() parse(text) # 问题出在哪里? # 1.分析结果的过程是串行,效率低 # 2.将全部的结果所有爬取成功以后,放在一个列表中 ------------------------------------------------------- # 版本二:异步处理,获取结果的第二种方式 # 完成一个任务,返回一个结果,并发的获取结果 from concurrent.futures import ProcessPoolExecutor import requests import time import os import random def get(url): # 爬取文件 response = requests.get(url) print(os.getpid(), '正在爬取:', url) time.sleep(random.randint(1, 3)) if response.status_code == 200: parse(response.text) # return response.text def parse(text): # 对爬取回来的字符串的分析,用len模拟一下 print('分析结果:', len(text)) if __name__ == '__main__': url_list = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/', 'https://www.cnblogs.com/' ] pool = ProcessPoolExecutor(4) for url in url_list: obj = pool.submit(get, url) pool.shutdown(wait=True) # 问题,加强了耦合性 ------------------------------------------------------ # 版本三: 版本二,两个任务有耦合性.在上一个基础上,对其进行解耦 from concurrent.futures import ProcessPoolExecutor import requests import time import os import random def get(url): # 爬取文件 response = requests.get(url) print(os.getpid(), '正在爬取:', url) time.sleep(random.randint(1, 3)) if response.status_code == 200: return response.text def parse(obj): # 对爬取回来的字符串的分析,用len模拟一下 print(f'{os.getpid()}分析结果:', len(obj.result())) if __name__ == '__main__': url_list = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/', 'https://www.cnblogs.com/' ] pool = ProcessPoolExecutor(4) for url in url_list: obj = pool.submit(get, url) obj.add_done_callback(parse) # 增长一个回调函数 # 如今的进程完成的仍是网络爬取的任务,拿到返回值以后,丢给回调函数, # 进程继续完成下一个任务,回调函数进行分析结果 pool.shutdown(wait=True) # 回调函数是主进程实现的,回调函数帮咱们进行分析任务 # 明确了进程的任务就是网络爬取,分析任务交给回调函数执行,对函数之间解耦 # 极值状况: 若是回调函数是IO任务,那么因为回调函数是主进程作的,因此有可能影响效率 # 回调不是万能的,若是回调的任务是IO,那么异步+回调机制很差,此时若是须要效率,只能再开一个线程或进程池 # 异步就是回调? # 这个论点是错的,异步,回调是两个概念 # 若是多个任务,多进程多线程处理的IO任务 # 1. 剩下的任务 非IO阻塞 异步+回调机制 # 2. 剩下的任务有 IO 远小于 多个任务的IO 异步+回调机制 # 3. 剩下的任务 IO 大于等于 多个任务的IO 第二种解决方式,或者开启两个进程/线程池
FIFO: 先进先出github
import queue #不须要经过threading模块里面导入,直接import queue就能够了,这是python自带的 #用法基本和咱们进程multiprocess中的queue是同样的 q = queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(先进先出): first second third '''
LIFO: 后进先出(栈)数组
import queue q = queue.LifoQueue() # 队列,相似于栈 q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(后进先出): third second first '''
Priority: 优先级队列浏览器
import queue q = queue.PriorityQueue() # put进入一个元组,元组的第一个元素是优先级(一般是数字,也能够是非数字之间的比较),数字越小优先级越高 q.put((-10, 'a')) q.put((-5, 'a')) #负数也能够 # q.put((20, 'ws')) #若是两个值的优先级同样,那么按照后面的值的acsii码顺序来排序,若是字符串第一个数元素相同,比较第二个元素的acsii码顺序 # q.put((20, 'wd')) # q.put((20, {'a': 11})) #TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20, ('w', 1))) #优先级相同的两个数据,他们后面的值必须是相同的数据类型才能比较,能够是元祖,也是经过元素的ascii码顺序来排序 q.put((20, 'b')) q.put((20, 'a')) q.put((0, 'b')) q.put((30, 'c')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ''' 结果(数字越小优先级越高,优先级高的优先出队): (-10, 'a') (-5, 'a') (0, 'b') (20, 'a') (20, 'b') (30, 'c') '''
方法服务器
event.isSet(): 返回event的状态值 event.wait(): 若是 event.isSet() == False将阻塞线程 event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度 event.clear(): 恢复event的状态值为False
示例网络
from threading import Thread from threading import current_thread from threading import Event import time event = Event() # 默认False def task(): print(f'{current_thread().name}检测服务器是否正常开启....') time.sleep(3) event.set() # 改为True def task1(): print(f'{current_thread().name}正在尝试链接服务器') event.wait() # 阻塞,轮询检测event是否为True,当其为True,继续下一行代码 # event.wait(1) # 超时时间,超过期间不管是否为True都继续下一行代码 print(f'{current_thread().name}链接成功') if __name__ == '__main__': t1 = Thread(target=task1) t2 = Thread(target=task1) t3 = Thread(target=task1) t = Thread(target=task) t1.start() t2.start() t3.start() t.start()
1. 概念多线程
协程本质就是一条线程,多个任务在一条线程上来回切换 协程是操做系统不可见的 协程的概念自己并 没有规避I/O操做,可是咱们能够利用协程这个概念来实现规避I/O操做,进而达到了咱们将一条线程中 的I/O操做降到最低的目的 协程可以实现的大部分I/O操做都在网络并发
2. 相关模块概览和协程的应用app
gevent:利用了greenlet底层模块(C语言写的)完成的切换 + 自动规避io的功能
3. gevent模块
import gevent def eat(name): print('%s eat 1' %name) gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,name='egon') g1.join() g2.join() #或者gevent.joinall([g1,g2]) print('主') 遇到I/O切换
上例gevent.sleep(2)模拟的是gevent能够识别的io阻塞,
而time.sleep(2)或其余的阻塞,gevent是不能直接识别的须要用下面一行代码,打补丁,就能够识别了
from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块以前
或者咱们干脆记忆成:要用gevent,须要将from gevent import monkey;monkey.patch_all()放到文件的开头
from gevent import monkey;monkey.patch_all() #必须写在最上面,这句话后面的全部阻塞所有可以识别了 import gevent #直接导入便可 import time def eat(): #print() print('eat food 1') time.sleep(2) #加上mokey就可以识别到time模块的sleep了 print('eat food 2') def play(): print('play 1') time.sleep(1) #来回切换,直到一个I/O的时间结束,这里都是咱们个gevent作得,再也不是控制不了的操做系统了。 print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play_phone) gevent.joinall([g1,g2]) print('主')
咱们能够用threading.current_thread().getName()来查看每一个g1和g2,查看的结果为DummyThread-n,即假线程,虚拟线程,其实都在一个线程里面
进程线程的任务切换是由操做系统自行切换的,你本身不能控制
协程是经过本身的程序(代码)来进行切换的,本身可以控制,只有遇到协程模块可以识别的IO操做的时候,程序才会进行任务切换,实现并发效果,若是全部程序都没有IO操做,那么就基本属于串行执行了。
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(10): task(i) def asynchronous(): g_l=[spawn(task,i) for i in range(10)] joinall(g_l) if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() #上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行全部给定的greenlet。执行流程只会在 全部greenlet执行完后才会继续向下走。 协程:同步异步对比