event 事件用来控制线程的执行,mysql
由一些线程去控制另外一些线程。sql
使用threading库中的Event对象,对象中包含了一个可由线程设置的信号标志,容许线程等待某些事件的发生服务器
在初始状况下,Event对象中的信号标志位False,若是有线程等待一个Event对象,这个Event对象的标志为假,那么这个线程将会被一直阻塞直至标志为真。一个线程若是将一个Event对象信号标志设置为真,它将唤醒全部等待这个Event对象的线程。多线程
# coding=utf-8 from threading import Event from threading import Thread import time # 调用类实例化出对象 e = Event() # 若程序中有以下代码,即为False,阻塞 # e.wait() # 若程序中有以下代码,则将其余线程的False改成True,进入就绪态和运行态 # e.set() # 模拟一个红绿灯 def light(): print("红灯亮") time.sleep(5) # 开始发信号给其余线程,告诉其余线程准备执行 e.set() print("绿灯亮") # 模拟一个个汽车 def car(): print("正在等红灯") e.wait() print("汽车开始起步") t1 = Thread(target=light) t1.start() for i in range(10): t2 = Thread(target=car) t2.start() 红灯亮 正在等红灯 正在等红灯 正在等红灯 正在等红灯 正在等红灯 正在等红灯 正在等红灯 正在等红灯 正在等红灯 正在等红灯 绿灯亮 汽车开始起步 汽车开始起步 汽车开始起步 汽车开始起步 汽车开始起步 汽车开始起步 汽车开始起步 汽车开始起步 汽车开始起步 汽车开始起步
e.wait()
:False,为阻塞状态并发
e.set():True,将其余线程的False改成True,进入就绪态和运行态
app
e.clear():回复event的状态值为False
dom
e.isSet():返回event的状态值
异步
例如,有多个工做线程尝试连接MySQL,咱们想要在连接前确保MySQL服务正常才让那些工做线程去链接MySQL服务器,若是链接不成功,都会去尝试从新链接。那么咱们就能够采用threading.Event机制来协调各个工做线程的链接操做socket
from threading import Thread,Event import threading import time,random def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise TimeoutError('连接超时') print('<%s>第%s次尝试连接' % (threading.current_thread().getName(), count)) event.wait(0.5) count+=1 print('<%s>连接成功' %threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == '__main__': event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start()
在刚开始学多进程或多线程时,咱们火烧眉毛地基于多进程或多线程实现并发的套接字通讯,然而这种实现方式的致命缺陷是:服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多,这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪,因而咱们必须对服务端开启的进程数或线程数加以控制,让机器在一个本身能够承受的范围内运行,这就是进程池或线程池的用途,例如进程池,就是用来存放进程的池子,本质仍是基于多进程,只不过是对开启进程的数目加上了限制
进程池和线程池:
用来控制当前程序容许建立(进程/线程)的数量
进程池和线程池的做用:
保证在硬件容许的范围内建立(进程/线程)的数量
# coding=utf-8 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time # 进程池能够加参数 表示开启进程数 # 若不写默认以CPU的个数限制进程数 # ProcessPoolExecutor() # 线程池能够加参数,表示开启的线程数 # 若不写默认以CPU 的个数 *5 限制线程数 # ThreadPoolExecutor() # 建立5个线程 pool = ThreadPoolExecutor(5) def task(res): print("线程任务开始") time.sleep(1) # print("线程任务结束") return 123 # 回调函数 def call_back(res): # print(type(res)) # 注意 回调函数接收一个参数 是 接收线程执行完的结果,用.result()接收 # 获得的数据能够拿一个变量名保存,新的变量名不要与回调函数参数同样 res2 = res.result() print(res2) for i in range(13): # 异步提交任务,每次并发执行最多只能有5个 pool.submit(task,1).add_done_callback(call_back) # 全部线程任务结束后执行下面代码 pool.shutdown() print("线程执行完毕了")
from concurrent.futures:提供了异步调用的接口
ProcessPoolExecutor():限制开启的进程数,若不写参数默认以CPU的个数限制进程数
ThreadPoolExecutor():限制开启的线程数,若不写参数默认以CPU的个数 * 5 限制线程数
pool.submit(函数名,参数):异步提交任务,限制每次并发执行最多的线程个数
add_done_callback:回调函数,线程执行完毕的函数返回值能够传到回调函数中,.result()获取线程执行返回的结果
pool.shutdown():全部线程任务执行完毕后执行线程池关闭,执行下面的代码,至关于进程池的pool.close()+pool.join()操做
进程池:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(1) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
能够为进程/线程池内的每一个进程/线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收线程任务的返回值当作参数,该函数就叫作回调函数
from concurrent.futures import ThreadPoolExecutor import requests import re import uuid pool = ThreadPoolExecutor(200) # 1.发送请求函数 def get_page(url): response = requests.get(url) return response # 2.解析主页获取视频ID号 def parse_index(response): id_list = re.findall('<a href="video_(.*?)".*?>',response.text,re.S) return id_list # 3.解析视频详情页获取真实 视频连接 def parse_detail(res): response = res.result() movie_detail_url = re.findall('srcUrl="(.*?)"', response.text, re.S)[0] print(f'往视频连接: {movie_detail_url}发送请求...') # 异步往视频详情页连接发送请求,把结果交给 pool.submit(get_page, movie_detail_url).add_done_callback(save_movie) return movie_detail_url # 4.往真实视频连接发送请求,获取数据并保存到本地 def save_movie(res): movie_response = res.result() name = str(uuid.uuid4()) print(f'{name}.mp4视频开始保存...') with open(f'{name}.mp4', 'wb') as f: f.write(movie_response.content) print('视频下载完毕!') if __name__ == '__main__': # 1.访问主页获取数据 index_response = get_page('https://www.pearvideo.com/') # # 2.解析主页获取全部的视频id号 id_list = parse_index(index_response) # 3.循环对每一个视频详情页连接进行拼接 for id in id_list: detail_url = 'https://www.pearvideo.com/video_' + id # 异步提交爬取视频详情页,把返回的数据,交给parse_detail(回调函数) pool.submit(get_page, detail_url).add_done_callback(parse_detail)
进程:资源单位
线程:执行单位
协程:在单线程下实现并发
协程即 基于单线程来实现并发,即只用一个主线程的状况下实现并发,是一种用户态的轻量级线程,是由用户程序本身控制调度的一张程序。
并发的概念:切换 + 保存状态
cpu正在运行一个任务,会有两种状况下切走去执行其余任务(切换操做由操做系统强制控制即多道技术),一种状况是该任务发生了阻塞,另一种状况是该任务执行时间过长,cpu会把使用权切断。
yield 代码级别的控制,能够保存当前状态
# 基于yield 并发执行 import time # 任务1:接收数据,处理数据 def cousumer(): while True: x = yield def producer(): g = cousumer() next(g) for i in range(10000000): g.send(i) # time.sleep(1) # 并发去执行,可是若是遇到IO就会阻塞住 # 并不会切换到该线程内其余任务去执行 start = time.time() # 基于yield保存状态,实现两个任务直接来回切换,即并发的效果 producer() stop = time.time() print(stop - start) # 2.9251673221588135
以上例子对纯计算密集型任务来讲,对于单线程下,咱们不可避免程序中出现io操做,但若是咱们能在本身的程序中(即用户程序级别,而非操做系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另一个任务去计算,这样就保证了该线程可以最大限度地处于就绪态,即随时均可以被cpu执行的状态,至关于咱们在用户程序级别将本身的io操做最大限度地隐藏起来,从而能够迷惑操做系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给咱们的线程。
总结协程特色:
一、 必须在只有一个单线程里实现并发
二、 修改共享数据不须要枷锁
三、 用户程序里本身保存多个控制流的上下文栈
from gevent import monkey from gevent import spawn,joinall import time # gevent 是一个第三方模块,能够帮你监听IO操做,并切换 # 监听该程序下全部的IO操做 monkey.patch_all() def func1(): print("1") # 模拟IO操做 time.sleep(1) def func2(): print("2") time.sleep(2) def func3(): print("3") time.sleep(3) start = time.time() # 实现单线程下,遇到IO,保存状态 + 切换 s1 = spawn(func1) s2 = spawn(func2) s3 = spawn(func3) # 发送信号,在单线程状况下至关于等待本身执行完毕以后再退出 joinall([s1,s2,s3]) end_time = time.time() print(end_time - start) # 6.013344049453735
经过手动模拟操做系统“多道技术”,实现切换 + 保存状态
优势:在IO密集型的状况下会提升效率
缺点:在计算密集型的状况下,来回切换,反而效率更低
如何实现协程:切换 + 保存状态
yield:保存状态
并发:切换
server端: # coding=utf-8 from gevent import monkey monkey.patch_all() import socket import time from threading import Thread from gevent import spawn,joinall server = socket.socket() server.bind(("127.0.0.1",8888)) server.listen(5) print("启动服务端...") # 线程任务,接收客户端消息与发送消息给客户端 def work1(conn): while True: try: data = conn.recv(1024).decode("utf-8") if not data:break conn.send(data.encode("utf-8")) except Exception as e: print(e) break conn.close() def work2(): while True: conn,addr = server.accept() spawn(work1,conn) if __name__ == '__main__': s1 = spawn(work2) s1.join()
client端: # coding=utf-8 import socket import time from threading import Thread,current_thread def client(): client = socket.socket() client.connect(("127.0.0.1",8888)) print("启动客户端...") num = 0 while True: send_data = f"{current_thread().name} {num}" client.send(send_data.encode("utf-8")) data = client.recv(1024) print(data.decode("utf-8")) num += 1 for i in range(30): t = Thread(target=client) t.start()