# 进程 : 进行中的程序就是一个进程 占用资源 须要操做系统调度 pid : 可以惟一标识一个进程 计算机中最小的资源分配单位 # 并发 多个程序同时执行 : 只有一个cpu,多个程序轮流在一个cpu上执行 宏观上 : 多个程序在同时执行 微观上 : 多个程序轮流在一个cpu上执行 本质上仍是串行 # 并行 多个程序同时执行,而且同时在多个cpu上执行 # 同步 在作A事的时候发起B件事,必须等待B件事结束以后才能继续作A事件 # 异步 在作A事的时候发起B时间,不须要等待B事件结束就能够继续A事件 # 阻塞 若是CPU不工做 input accept recv recvfrom sleep connect # 非阻塞 CPU在工做 # 线程 线程是进程中的一个单位,不能脱离进程存在 线程是计算机中可以被CPU调度的最小单位
进程 线程 正常的开发语言 多线程能够利用多核 cpython解释器下的多个线程不能利用多核 : 规避了全部io操做的单线程 协程 是操做系统不可见的 协程本质就是一条线程 多个任务在一条线程上来回切换 利用协程这个概念实现的内容 : 来规避IO操做,就达到了咱们将一条线程中的io操做降到最低的目的 # 进程 数据隔离 数据不安全 操做系统级别 开销很是大 能利用多核 # 线程 数据共享 数据不安全 操做系统级别 开销小 不能利用多核 一些和文件操做相关的io只有操做系统能感知到 # 协程 数据共享 数据安全 用户级别 更小 不能利用多核 协程的全部的切换都基于用户,只有在用户级别可以感知到的io才会用协程模块作切换来规避(socket,请求网页的)
I/O操做 相对内存来讲 输入Input输出Output 输入是怎么输入 :键盘\input\read\recv 输出是怎么输出 :显示器 打印机 播放音乐\print\write\send 文件操做 :read write 网络操做 :send recv recvfrom 函数 :print input
计算机的工做分为两个状态
CPU工做 : 作计算(对内存中的数据进行操做)的时候工做
CPU不工做 : IO操做的时候
CPU的工做效率 500000条指令/ms
多道操做系统 :一个程序遇到IO就把CPU让给别人
顺序的一个一个执行的思路变成
共同存在在一台计算机中,其中一个程序执行让出cpu以后,另外一个程序能继续使用cpu
来提升cpu的利用率
单纯的切换会不会占用时间 : 会
可是多道操做系统的原理总体上仍是节省了时间,提升了CPU的利用率
时空复用的概念
单cpu 分时操做系统 : 把时间分红很小很小的段,每个时间都是一个时间片, 每个程序轮流执行一个时间片的时间,本身的时间片到了就轮到下一个程序执行 -- 时间片的轮转 老教授 24h全是计算 没有io 先来先服务 FCFS 研究生 5min全是计算 没有io 短做业优先 研究生2 5min全是计算 没有io 没有提升CPU的利用率 \ 提升了用户体验
html
并发:单个cpu,同时执行多个进程(来回切换的),看起来像是同时运行.python
并行:多个cpu,真正的同时运行多个进程.linux
阻塞:遇到IO才叫阻塞.git
一个cpu运行两个进程,其中一个进程彻底没有阻塞,github
非阻塞: 没有IO.web
什么是开启多个进程:socket:server,client两个进程编程
python中,若是一次想开启多个进程,必须是一个主进程,开启多个子进程json
linux,windows:有主进程开启子进程windows
相同点:主进程开启子进程,两个进程都有相互隔离的独立空间,互不影响安全
不一样点:
linux:子进程空间的初始数据彻底是从主(父)进程copy一份
windows:子进程空间初始数据彻底是从主(父)进程copy一份,可是有所不一样
就绪 运行 阻塞
# 就绪 -操做系统调度->运行 -遇到io操做-> 阻塞 -阻塞状态结束-> 就绪
-时间片到了-> 就绪
multiple 多元化的
processing 进程
multiprocessing 多元的处理进程的模块
python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。Python提供了multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。
multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。
须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
建立进程的类
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动) 强调: 1. 须要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍:
group参数未使用,值始终为None target表示调用对象,即子进程要执行的任务 args表示调用对象的位置参数元组,args=(1,2,'egon',) kwargs表示调用对象的字典,kwargs{'name':'egon','age':18} name为子进程的名称
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁
p.is_alive():若是p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
属性介绍:
p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置
p.name:进程的名称
p.pid:进程的pid
p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束(了解便可)
p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功(了解便可)
注意:在windows中Process()必须放到# if __name__ == '__main__':下
函数方法
from multiprocessing import Process import time def task(name): print('%s is runing' %(name)) time.sleep(3) print('%s is done' % (name)) if __name__ == '__main__': p = Process(target=task,args=('壮壮',)) # p = Process(target=task,kwargs={'name':'壮壮'}) 两种传参方式 p.start() print('====主')
类方法
from multiprocessing import Process import time # 方式二: class MyProcess(Process): def __init__(self,name): self.name = name super().__init__() def run(self): # 必须定义一个run方法 print('%s is runing' % (self.name)) time.sleep(3) print('%s is done' % (self.name)) if __name__ == '__main__': p = MyProcess('小明') p.start() print('===主')
多进程之间的数据隔离 from multiprocessing import Process n = 0 def func(): global n n += 1 if __name__ == '__main__': p_l = [] for i in range(100): p = Process(target=func) p.start() p_l.append(p) for p in p_l:p.join() print(n)
join 主进程等待子进程结束以后,在执行
join开启一个进程:
from multiprocessing import Process import time def task(name): time.sleep(1) print(f"{name}is running") if __name__ == '__main__': p = Process(target=task,args=("海洋",)) p.start() p.join() #告知主进程,p进程结束以后,主进程在结束,join有些阻塞的意思 print("___主进程") # p1.start() # p2.start() #p1,p2,p3三个子进程前后运行顺序不定,start只是通知一下操做系统 # p3.start() #操做系统调用cpu先运行谁,谁先执行
join串行:
from multiprocessing import Process import time def task(name,sec): time.sleep(sec) print(f"{name}is running") if __name__ == '__main__': p1 = Process(target=task, args=("小明",1)) p2 = Process(target=task, args=("明明",2)) p3 = Process(target=task ,args=("大明",3)) start_time = time.time() p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() print(f"主进程{time.time() - start_time}")
join并发:
from multiprocessing import Process import time def task(sec): time.sleep(sec) print(f"is running") if __name__ == '__main__': start_time = time.time() list = [] for i in range(1,4): p = Process(target=task, args=(i,)) p.start() list.append(p) for i in list: i.join() print(f"主进程{time.time() - start_time}")
属性:
from multiprocessing import Process import time def task(name): print(f"{name}is running") time.sleep(3) print(f"{name}is done") if __name__ == '__main__': p = Process(target=task,args=("小明",),name="大明") #name给进程对象设置name属性 p.start() # print(p.pid) #获取到进程号 time.sleep(1) #睡一秒,子进程已经执行完成 p.terminate() #强制结束子进程,强制执行也会有执行时间 #terminate跟start同样工做原理,都要通知操做系统开启子进程 #内存终止或者开启都要须要时间的 time.sleep(1) #睡一秒,让terminate杀死 print(p.is_alive()) #判断子进程是否存活,只是查看内存中p子进程是否还运行 print("主进程")
server
import socket from multiprocessing import Process def talk(conn): while True: msg = conn.recv(1024).decode('utf-8') ret = msg.upper().encode('utf-8') conn.send(ret) conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',9001)) sk.listen() while True: conn, addr = sk.accept() Process(target = talk,args=(conn,)).start() sk.close()
client
import time import socket sk = socket.socket() sk.connect(('127.0.0.1',9001)) while True: sk.send(b'hello') msg =sk.recv(1024).decode('utf-8') print(msg) time.sleep(0.5) sk.close()
init是全部进程的父进程: 僵尸进程,僵尸是什么,死而没有消失 主进程建立多个短暂周期的子进程,当子进程退出,是须要等待父进程处理,而父进程没有及时对子进程回收,那么子进程的进程符仍然保存在系统中,这种进程就是僵死进程 什么进程描述符:每个进程都有描述符,io请求,数据指针 from multiprocessing import Process import time import os def task(name): print(f"{name}is running") print(f"子进程开始了:{os.getpid()}") time.sleep(50) if __name__ == '__main__': for i in range(100): p = Process(target=task, args=("海洋",)) p.start() print(f"___主进程:{os.getpid()}")
孤儿进程:孤儿进程是由于主进程的退出,他下面的全部子进程都变成孤儿进程了,init会对孤儿进行回收,释 放掉占用系统的资源,这种回收也是为了节省内存。
孤儿进程无害,若是僵尸进程挂了,init会对孤儿进程回收,init是全部进程的祖进程,linux中为1,0系统
将一个子进程设置成守护进程,当父进程结束,子进程必定会结束,避免孤儿进程产生,应为回收机制
父进程不能建立子进程
函数方法:
#守护进程会在主进程代码执行结束后终止,守护进程内没法在开启子进程 from multiprocessing import Process import time import os def task(name): print(f"{name}is running") print(f"子进程开始了:{os.getpid()}") time.sleep(50) if __name__ == '__main__': p = Process(target=task,args=("海洋",)) p.daemon = True #将p子进程设置成守护进程,守护子进程,只要主进程结束 #子进程不管执行与否都立刻结束,daemon,开启在start上面 p.start() print(f"___主进程:{os.getpid()}")
from multiprocessing import Process 开启进程的另外一种方式 class 类名(Process): def __init__(self,参数): self.属性名 = 参数 super().__init__() def run(self): print('子进程要执行的代码') p = 类名() p.start() # 守护进程 : 会等待主进程代码结束以后就当即结束 p = 类名() p.daemon = True # 设置守护进程 p.start() # 通常状况下,多个进程的执行顺序,多是: # 主进程代码结束--> 守护进程结束-->子进程结束-->主进程结束 # 子进程结束 -->主进程代码结束-->守护进程结束-->主进程结束
第一种:基于文件+锁的形式:效率低,麻烦
第二种:基于队列,推荐的使用形式
第三种:基于管道,管道本身加锁,底层可能会出现数据丢失损坏,队列和管道都是将数据存放于内存中
互斥锁保证了每次只有一个线程进行写入操做,只有当这个线程解锁,在运行其余资源,上锁和解锁都须要本身添加
两种方式
from multiprocessing import Lock
第一种: lock = Lock() lock.acquire() print(1) lock.release() 第二种 with lock: buy_ticket(i)
#上锁: #必定要是同一把锁:只能按照这个规律,上锁一次,解锁一次 #互斥锁与join区别: #共同点:都是完成了进程之间的串行 #区别:join认为控制进程的串行,互斥锁是解决抢占的资源,保证公平性 from multiprocessing import Process from multiprocessing import Lock import time import os import random def task1(lock): print("test1") #验证CPU遇到IO切换 lock.acquire() print("task1 开始打印") time.sleep(random.randint(1,3)) print("task1 打印完成") lock.release() def task2(lock): print("test2") lock.acquire() #上锁 print("task2 开始打印") time.sleep(random.randint(1,3))#阻塞,cpu切换任务,别的任务都在锁,回来继续执行这个程序 print("task2 打印完成") lock.release() #解锁 def task3(lock): print("test2") lock.acquire() # lock.acquire() #死锁错误示例 print("task3 开始打印") time.sleep(random.randint(1,3)) print("task3 打印完成") lock.release() if __name__ == '__main__': lock = Lock() #一把锁 p1 = Process(target=task1,args=(lock,)) #三个进程哪一个先到先执行 p2 = Process(target=task2,args=(lock,)) p3 = Process(target=task3,args=(lock,)) p1.start() p2.start() p3.start()
互斥锁买票示例:
import json import time from multiprocessing import Process,Lock def search(i): with open('ticket',encoding='utf-8') as f: ticket = json.load(f) print('%s :当前的余票是%s张'%(i,ticket['count'])) def buy_ticket(i): with open('ticket',encoding='utf-8') as f: ticket = json.load(f) if ticket['count']>0: ticket['count'] -= 1 print('%s买到票了'%i) time.sleep(0.1) with open('ticket', mode='w',encoding='utf-8') as f: json.dump(ticket,f) def get_ticket(i,lock): search(i) with lock: # 代替acquire和release 而且在此基础上作一些异常处理,保证即使一个进程的代码出错退出了,也会归还钥匙 buy_ticket(i) if __name__ == '__main__': lock = Lock() # 互斥锁 for i in range(10): Process(target=get_ticket,args=(i,lock)).start()
1. 进程之间的通讯最好的方式是基于队列
2. 队列是实现进程之间通讯的工具,存在内存中的一个容器,最大的特色是符合先进先出的原则
from multiprocessing import Queue,Process def pro(q): for i in range(10): print(q.get()) def son(q): for i in range(10): q.put('hello%s'%i) if __name__ == '__main__': q = Queue() p = Process(target=son,args=(q,)) p.start() p = Process(target=pro, args=(q,)) p.start()
多个进程抢占一个资源:串行,有序以及数据安全,买票
多个进程实现并发的效果:生产者消费模型
import time import random from multiprocessing import Queue,Process def consumer(q,name): # 消费者:一般取到数据以后还要进行某些操做 while True: food = q.get() if food: print('%s吃了%s'%(name,food)) else:break def producer(q,name,food): # 生产者:一般在放数据以前须要先经过某些代码来获取数据 for i in range(10): foodi = '%s%s'%(food,i) print('%s生产了%s'%(name,foodi)) time.sleep(random.random()) q.put(foodi) if __name__ == '__main__': q = Queue() c1 = Process(target=consumer,args=(q,'alex')) c2 = Process(target=consumer,args=(q,'alex')) p1 = Process(target=producer,args=(q,'大壮','泔水')) p2 = Process(target=producer,args=(q,'b哥','香蕉')) c1.start() c2.start() p1.start() p2.start() p1.join() p2.join() q.put(None) q.put(None)
import requests from multiprocessing import Process,Queue url_dic = { 'cnblogs':'https://www.cnblogs.com/Eva-J/articles/8253549.html', 'douban':'https://www.douban.com/doulist/1596699/', 'baidu':'https://www.baidu.com', 'gitee':'https://gitee.com/old_boy_python_stack__22/teaching_plan/issues/IXSRZ', } def producer(name,url,q): ret = requests.get(url) q.put((name,ret.text)) def consumer(q): while True: tup = q.get() if tup is None:break with open('%s.html'%tup[0],encoding='utf-8',mode='w') as f: f.write(tup[1]) if __name__ == '__main__': q = Queue() pl = [] for key in url_dic: p = Process(target=producer,args=(key,url_dic[key],q)) p.start() pl.append(p) Process(target=consumer,args=(q,)).start() for p in pl:p.join() q.put(None) # join n 个进程 n个进程必须都执行完才继续 # for i in range(4): # print(q.get())
# Manager dict list 只要是共享的数据都存在数据不安全的现象 # 须要咱们本身加锁来解决数据安全问题 from multiprocessing import Process,Manager,Lock def change_dic(dic,lock): with lock: dic['count'] -= 1 if __name__ == '__main__': # m = Manager() with Manager() as m: lock = Lock() dic = m.dict({'count': 100}) # dic = {'count': 100} p_l = [] for i in range(100): p = Process(target=change_dic,args=(dic,lock)) p.start() p_l.append(p) for p in p_l : p.join() print(dic)
进程:进程是分配资源的基本单位,内存中开辟空间,为线程提供资源,一个程序能够开启多个进程
线程:CPU调度的最小单位,执行单位,线程也被称做为轻量级的进程,动态的
开启QQ:开启一个进程,在内存中开辟空间加载数据,启动一个线程执行代码
线程依赖进程的一个进程能够包含多个线程,可是必定有一个主线程,线程才是CPU执行的最小单元
1,开启多进程开销很是大,10-100倍,而开启线程开销很是小
2.开启多进程速度慢,开启多线程速度快
3.进程之间数据不共享,线程共享数据
在cpython解释器下 :GIL锁(全局解释器锁) 致使了同一个进程中的多个线程不能利用多核
并发:一个CPU能够来回切换(线程之间切换),多进程并发,多线程的并发
多进程并发:开启多个进程,并发的执行
多线程并发:开启线程,并发的执行
若是遇到并发:多线程居多
线程绝对要比进程开启速度快
#先打印小明,线程要比进程速度快,若是是进程先打印主线程 from threading import Thread def task(name): print(f'{name} is running') if __name__ == '__main__': t = Thread(target=task,args=("小明",)) t.start() print("主线程") #子进程睡眠3秒,先运行主进程 from threading import Thread import time x = 1000 def task(): time.sleep(3) print('子线程....') def main(): print('111') print('222') print('333') if __name__ == '__main__': t = Thread(target=task) t.start() main() # 结果是111 222 333 子线程....
from threading import Thread class MyThread(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): print(f'{self.name} is running') if __name__ == '__main__': t = MyThread("小明") t.start() print("主线程")
from threading import Thread x = 1000 def task(): global x x = 0 if __name__ == '__main__': t = Thread(target=task, ) t.start() t.join() # 告知主线程,等待子线程运行完毕在执行 print(f'主线程:{x}')
from threading import Thread import threading import time def task(name): time.sleep(1) print(f'{name} is running') if __name__ == '__main__': for i in range(5): t = Thread(target=task,args=("海洋",)) t.start() #线程对象的方法 # print(t.is_alive()) #判断线程是否存活 #threading模块的方法 print(threading.current_thread().name) #返回线程对象.name print(threading.enumerate()) #返回列表,返回的是全部线程对象 print(threading.active_count()) #获取活跃的线程数量(包括主线程) print("主线程")
守护线程必须等待主线程结束才结束,主线程必须等待全部的非守护线程结束才能结束,由于主线程的结束意味着进程的结束,这就是一个守护机制
多线程是同一个空间,同一个进程,进程表明,空间,资源,静态的:
import time from threading import Thread def son(): while True: print('in son') time.sleep(1) def son2(): for i in range(3): print('in son2 ****') time.sleep(1) # flag a 0s t = Thread(target=son) t.daemon = True t.start() Thread(target=son2).start() # flag b # 主线程会等待子线程结束以后才结束 # 主线程结束进程就会结束 # 守护线程随着主线程的结束而结束 # 守护线程会在主线程的代码结束以后继续守护其余子线程
守护进程须要主进程来回收资源 守护线程是随着进程的结束才结束的 其余子线程-->主线程结束-->主进程结束-->整个进程中全部的资源都被回收-->守护线程也会被回收 进程是资源分配单位 子进程都须要它的父进程来回收资源 线程是进程中的资源 全部的线程都会随着进程的结束而被回收的
多个线程同时操做全局变量/静态变量 会产生数据不安全现象 互斥锁 += -= 说明了线程之间数据的不安全 a = a.strip() 带返回值的都是先计算后赋值,数据不安全 a = a+1 /a+=1 数据不安全 if\while 数据不安全 append pop 说明了在线程中操做列表中的方法是数据安全的
from threading import Thread,Lock import time n = [] def append(): for i in range(500000): n.append(1) def pop(lock): for i in range(500000): with lock: if not n: time.sleep(0.0000001) # 强制CPU轮转 n.pop() t_l = [] lock = Lock() for i in range(20): t1 = Thread(target=append) t1.start() t2 = Thread(target=pop,args=(lock,)) t2.start() t_l.append(t1) t_l.append(t2) for t in t_l: t.join() print(n) # 不要操做全局变量,不要在类里操做静态变量 # += -= *= /= if while 数据不安全 # queue logging 数据安全的
import time class A: from threading import Lock __instance = None lock = Lock() def __new__(cls, *args, **kwargs): with cls.lock: if not cls.__instance: time.sleep(0.000001) # cpu轮转 cls.__instance = super().__new__(cls) return cls.__instance
# Lock 互斥锁 效率高 # RLock 递归(recursion)锁 效率相对低 l = Lock() l.acquire() print('但愿被锁住的代码') l.release() rl = RLock() # 在同一个线程中能够被acquire屡次 rl.acquire() print('但愿被锁住的代码') rl.release()
from threading import Thread,RLock as Lock def func(i,lock): lock.acquire() lock.acquire() print(i,': start') lock.release() lock.release() print(i, ': end') lock = Lock() for i in range(5): Thread(target=func,args=(i,lock)).start()
死锁现象是怎么产生的? 多把(互斥/递归)锁 而且在多个线程中 交叉使用 fork_lock.acquire() noodle_lock.acquire() fork_lock.release() noodle_lock.release() 若是是互斥锁,出现了死锁现象,最快速的解决方案把全部的互斥锁都改为一把递归锁 程序的效率会下降的 递归锁 效率低 可是解决死锁现象有奇效 互斥锁 效率高 可是多把锁容易出现死锁现象 一把互斥锁就够了
线程之间数据安全的容器队列
from queue import Empty # 不是内置的错误类型,而是queue模块中的错误 q = queue.Queue(4) # fifo 先进先出的队列 q.get() q.put(1) q.put(2) q.put(3) q.put(4) print('4 done') q.put_nowait(5) print('5 done') try: q.get_nowait() except Empty:pass print('队列为空,继续其余内容') # put_nowait: 不会等待队列有空闲位置再放入数据,若是数据放入不成功就直接崩溃 # get_nowait: 队列为空,取值的时候不等待,可是取不到值那么直接崩溃了
from queue import LifoQueue # last in first out 后进先出 栈 lq = LifoQueue() lq.put(1) lq.put(2) lq.put(3) print(lq.get()) print(lq.get()) print(lq.get())
from queue import PriorityQueue # 优先级队列 priq = PriorityQueue() priq.put((2,'alex')) priq.put((1,'wusir')) priq.put((0,'太白')) print(priq.get()) print(priq.get()) print(priq.get())
要在程序开始的时候,还没提交任务先建立几个线程或者进程
放在一个池子里,这就是池
若是先开好进程/线程,那么有任务以后就能够直接使用这个池中的数据了
而且开好的线程或者进程会一直存在在池中,能够被多个任务反复利用
这样极大的减小了开启\关闭\调度线程/进程的时间开销
池中的线程/进程个数控制了操做系统须要调度的任务个数,控制池中的单位
有利于提升操做系统的效率,减轻操做系统的负担
发展过程
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(a,b): print('start') print(a,b) print('end') if __name__ == '__main__': # p = ProcessPoolExecutor(max_workers=5) #限制进程数量,默认为cpu个数 p = ThreadPoolExecutor(4) #线程默认是CPU个数的五倍 for i in range(4): p.submit(func,1,2) #给进程池放置任务启动,1,2为传参
同步:
任务发出去以后等待,直到这个任务最终结束以后,给我一个返回值,发布下一个任务
from concurrent.futures import ProcessPoolExecutor import os import time import random def task(): print(f"{os.getpid()}is running") time.sleep(1) return f'{os.getpid()} is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) for i in range(10): obj = p.submit(task,) print(obj.result()) #同步等待一个进程内容所有执行完成在执行下一个
将任务发给进程,无论任务如何,直接运行下一个
from concurrent.futures import ProcessPoolExecutor import os import time import random def task(): print(f'{os.getpid()} is running') time.sleep(random.randint(0,2)) return f'{os.getpid()} is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) obj_l1 = [] for i in range(10): obj = p.submit(task,) # 异步发出. obj_l1.append(obj) # time.sleep(3) p.shutdown(wait=True) # 1. 阻止在向进程池投放新任务, # 2. wait = True 十个任务是10,一个任务完成了-1,直至为零.进行下一行. for i in obj_l1: print(i.result())
import os import time,random from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(a,b): print(os.getpid(),'start',a,b) time.sleep(random.randint(1,4)) print(os.getpid(),'end') return a*b if __name__ == '__main__': tp = ProcessPoolExecutor(4) futrue_l = {} for i in range(20): # 异步非阻塞的 ret = tp.submit(func,i,b=i+1) futrue_l[i] = ret # print(ret.result()) # Future将来对象 for key in futrue_l: # 同步阻塞的 print(key,futrue_l[key].result())
# map 只适合传递简单的参数,而且必须是一个可迭代的类型做为参数 import os import time,random from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(a): print(os.getpid(),'start',a[0],a[1]) time.sleep(random.randint(1,4)) print(os.getpid(),'end') return a[0]*a[1] if __name__ == '__main__': tp = ProcessPoolExecutor(4) ret = tp.map(func,((i,i+1) for i in range(20))) for key in ret: # 同步阻塞的 print(key)
# 回调函数 : 效率最高的 import time,random from threading import current_thread from concurrent.futures import ThreadPoolExecutor def func(a,b): print(current_thread().ident,'start',a,b) time.sleep(random.randint(1,4)) print(current_thread().ident,'end',a) return (a,a*b) def print_func(ret): # 异步阻塞 print(ret.result()) if __name__ == '__main__': tp = ThreadPoolExecutor(4) futrue_l = {} for i in range(20): # 异步非阻塞的 ret = tp.submit(func,i,b=i+1) ret.add_done_callback(print_func) # ret这个任务会在执行完毕的瞬间当即触发print_func函数,而且把任务的返回值对象传递到print_func作参数 # 异步阻塞 回调函数 给ret对象绑定一个回调函数,等待ret对应的任务有告终果以后当即调用print_func这个函数 # 就能够对结果当即进行处理,而不用按照顺序接收结果处理结果
from concurrent.futures import ThreadPoolExecutor import requests import os def get_page(url): # 访问网页,获取网页源代码 线程池中的线程来操做 print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): # 获取到字典结果以后,计算网页源码的长度,把https://www.baidu.com : 1929749729写到文件里 线程任务执行完毕以后绑定回调函数 res=res.result() print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # 得到一个线程池对象 = 开启线程池 tp = ThreadPoolExecutor(4) # 循环urls列表 for url in urls: # 获得一个futrue对象 = 把每个url提交一个get_page任务 ret = tp.submit(get_page,url) # 给futrue对象绑定一个parse_page回调函数 ret.add_done_callback(parse_page) # 谁先回来谁就先写结果进文件
协程:本质是一个线程 可以在一个线程内的多个任务之间来回切换 节省io操做的时间也只能是和网络操做相关的 特色:数据安全,用户级别,开销小,不能利用多核,可以识别的io操做少 gevent 第三方模块 完成并发的socket server 协程对象.spawn(func,参数) 能识别的io操做也是有限的 而且要想让gevent可以识别一些导入的模块中的io操做 from gevent import monkey;monkey.patch_all() asyncio 内置模块 await 写好的asyncio中的阻塞方法 async 标识一个函数时协程函数,await语法必须用在async函数中
切换 并 规避io 的两个模块 gevent = 利用了 greenlet 底层模块完成的切换 + 自动规避io的功能 asyncio = 利用了 yield 底层语法完成的切换 + 自动规避io的功能 tornado 异步的web框架 yield from - 更好的实现协程 send - 更好的实现协程 asyncio模块 基于python原生的协程的概念正式的被成立 特殊的在python中提供协程功能的关键字 : aysnc await # 用户级别的协程还有什么好处: # 减轻了操做系统的负担 # 一条线程若是开了多个协程,那么给操做系统的印象是线程很忙,这样能多争取一些时间片时间来被CPU执行,程序的效率就提升了
import gevent def func(): # 带有io操做的内容写在函数里,而后提交func给gevent print('start func') gevent.sleep(1) # gevent.sleep是一个特殊的,time.sleep在这里不行 # 若是想用time就要在用下面的代码 print('end func') g1 = gevent.spawn(func) g2 = gevent.spawn(func) g3 = gevent.spawn(func) gevent.joinall([g1,g2,g3])
time
import time print(time.sleep) # 这里的time和from gevent import mockey里的不一样 from gevent import monkey monkey.patch_all() import time import gevent def func(): # 带有io操做的内容写在函数里,而后提交func给gevent print('start func') time.sleep(1) print('end func') g1 = gevent.spawn(func) g2 = gevent.spawn(func) g3 = gevent.spawn(func) gevent.joinall([g1,g2,g3]) # 阻塞 直到协程g1任务执行结束 # 要有阻塞才能执行
基于gevent协程实现socket并发
import socket print(socket.socket) # 在patch all以前打印一次 from gevent import monkey # gevent 如何检测是否能规避某个模块的io操做呢? monkey.patch_all() import socket import gevent print(socket.socket) # 在patch all以后打印一次,若是两次的结果不同,那么就说明可以规避io操做 def func(conn): while True: msg = conn.recv(1024).decode('utf-8') MSG = msg.upper() conn.send(MSG.encode('utf-8')) sk = socket.socket() sk.bind(('127.0.0.1',9001)) sk.listen() while True: conn,_ = sk.accept() gevent.spawn(func,conn)
client
import time import socket from threading import Thread def client(): sk = socket.socket() sk.connect(('127.0.0.1',9001)) while True: sk.send(b'hello') msg = sk.recv(1024) print(msg) time.sleep(0.5) for i in range(500): Thread(target=client).start()
import asyncio async def func(name): print('start',name) # await 可能会发生阻塞的方法 # await 关键字必须写在一个async函数里 await asyncio.sleep(1) print('end') loop = asyncio.get_event_loop() loop.run_until_complete(func("alex")) # 单个任务 # loop.run_until_complete(asyncio.wait([func('alex'),func('太白')]))接收多个,接受列表