一。线程池并发
线程池是一个处理线程任务的集合,他是能够接受必定量的线程任务,并建立线程,处理该任务,处理结束后不会马上关闭池子,会继续等待提交的任务,也就是他们的进程/线程号不会改变。app
当线程池中的任务没有结束时是不会接受下一个任务的。异步
它的操做有:socket
pool = ThreadPoolExecutor()tcp
建立一个线程池,其中括号中表明的是一次能够接纳的线程任务,能够不加参数,不加参数其数量就是当前cpu的个数*5。函数
res = pool.submit(func,args)spa
提交一个任务,args表明的是函数的参数。res接受的是该submit的返回值,相似于以下的类:线程
<Future at 0x2057e656940 state=running>
state表明的是当前该线程的状态。3d
res.result()code
而使用result能够将提交的任务函数的返回值获取。
这里的result还有等待任务的返回值的做用。若是任务没结束,就会一直等待,能够将并行操做改为串行操做。
pool.shutdown()
能够将池子关闭,并等待池子终端 任务所有结束再执行下面代码。
例子:
import time from concurrent.futures import ThreadPoolExecutor import os from gevent import os pool = ThreadPoolExecutor(5) def task(n): print(n,os.getpid()) time.sleep(2) return n**2 list_1 = [] for i in range(20): res = pool.submit(task,i) #提交任务 print(res.result())#等待任务的返回值 list_1.append(res) pool.shutdown() #关闭池子,等待池子中的任务运行完毕 for j in list_1: print('>>>',j.result()) print('主')
进程池:
进程池的使用和线程池差很少,区别仅只有包名不一样,在进程池中咱们能够验证如下池中的进程/线程是不是用的一样的进程/线程,使用os。getpid()方法便可。
进程值不传值,里面的数值默认时cpu的个数。
import time from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os from gevent import os # pool = ThreadPoolExecutor(5) pool = ProcessPoolExecutor(5) def task(n): print(n,os.getpid()) time.sleep(2) return n**2 def callback(n): print(n.result()) if __name__ == '__main__': list_1 = [] for i in range(20): res = pool.submit(task,i).add_done_callback(callback) #提交任务 # print(res.result())#等待任务的返回值 list_1.append(res)
异步回调:
除了上面使用的将返回的future对象添加到列表,再调用result()方法返回其返回值之外,还能够对指派任务的返回值用
add_done_callback(callback)
方法,将该对象回调到callback(能够自定义)函数,由函数接纳处理该值,函数的参数就是任务的返回值,多个返回值要设置多个参数。
回调是在生产返回值时就运行的。
二。协程
协程就是在单线程的状况下实现并发。
通常程序的多道技术都是用 切换+保存状态 实现
在通常的cpu运算时,都是在五种状态中来回切换的,程序运行的5种状态:
1.新建。2.就绪。3.运行。4.阻塞。5.结束。
通常的,程序都是在2,3,4的状态来回切换,有2种状况。
1,程序遇到了io操做,由运行态进入到了阻塞态,直到io操做结束后再到阻塞态等待时间片。
2.程序的时间片用完,由运行态到就绪态。
协程的做用就是使得线程遇到io操做本身切换,运行的方式从1.变成2.线程持续不断的就绪,能够得到大量的cpu运算时间。
要实现这个功能须要考虑线程的保存状态问题。
这里就要用到迭代器的知识,yield,
yield能够保存上一次操做的状态,因此使用yield能够验证协程对计算密集型的线程操做后是否能加快效率。
#串行执行 0.8540799617767334 # import time # # def func1(): # for i in range(10000000): # i+1 # # def func2(): # for i in range(10000000): # i+1 # # start = time.time() # func1() # func2() # stop = time.time() # print(stop - start)
#基于yield并发执行 1.3952205181121826 # import time # def func1(): # while True: # 10000000+1 # yield # # def func2(): # g=func1() # for i in range(10000000): # time.sleep(100) # 模拟IO,yield并不会捕捉到并自动切换 # i+1 # next(g) # # start=time.time() # func2() # stop=time.time() # print(stop-start)
能够看到,在计算密集的线程中,不断切换线程是不利于程序的运行的。
而yield不能识别io操做,而进行线程之间的切换的,因此须要引入一个模块gevent。
gevent是一个能够识别io的魔块,但不能识别time.sleep,因此还要调用另一个模块识别time.sleep。
from gevent import monkey;monkey.patch_all()
# 因为该模块常常被使用 因此建议写成一行 from gevent import spawn import time
spawn()能够检测()中的全部任务
def heng(): print("哼") time.sleep(2) print('哼') def ha(): print('哈') time.sleep(3) print('哈') def heiheihei(): print('嘿嘿嘿') time.sleep(5) print('嘿嘿嘿') start = time.time() g1 = spawn(heng) g2 = spawn(ha) # spawn会检测全部的任务 g3 = spawn(heiheihei) g1.join() g2.join() g3.join() # heng() # ha() print(time.time() - start) 哼 哈 嘿嘿嘿 哼 哈 嘿嘿嘿 5.033252716064453
本来10秒钟的程序,如今须要5秒钟就能够运行结束了。
spawn能够将全部线程添加至一个列表,轮流运行其没有io操做的部分。
spawn有一个返回值g
注意,须要在程序最后等待全部程序都运行结束才结束程序,使用g.join方法。
三。使用gevent实现tcp的并发
from gevent import monkey;monkey.patch_all() import socket from gevent import spawn server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): while True: try: data = conn.recv(1024) if len(data) == 0:break print(data.decode('utf-8')) conn.send(data.upper()) except ConnectionResetError as e: print(e) break conn.close() def server1(): while True: conn, addr = server.accept() spawn(talk,conn) if __name__ == '__main__': g1 = spawn(server1) g1.join()
四。IO模型。
1.阻塞型IO
阻塞型io是在进行io操做时,先跳入阻塞态,而后等待数据。
数据得到后拷贝数据,
最后再进入就绪态,
其中等待数据和拷贝数据都是再阻塞状态:
2.非阻塞io
非阻塞io是在遇到io操做时,先发送接受数据请求,若是没有数据就返回一个没有的信号,以后会反复发送数据请求,直到有数据为止,这种模型很占cpu操做。
3.IO多路复用
这个模型中有一个select,是一个监测机制,相似于列表,管理io操做。
当须要进行io操做时,调用select寻找数据,若是找到数据就返回数据,
等待的操做所有交给select。
4.异步IO(asyn。。。。)
在遇到io操做时,有一个回调机制,当须要io操做时,回调机制(内存中)会去寻找数据,当寻找到数据后会返回数据