进程html
进程 : 什么是进程? 是操做系统的发展过程当中,为了提升CPU的利用率,在操做系统同时运行多个程序的时候,为了数据的安全\代码不混乱而被创造出来的一个概念 每个程序运行起来都至少是一个进程. 进程是计算机中最小的资源分配单位 进程被操做系统调度的,有不少相关的算法 - 早期的操做系统 进程之间是数据隔离的 进程的三状态 就绪 运行 阻塞 同步异步 同步 : 一个任务的执行依赖另外一个事务的结束 join lock 异步 : 一个任务的执行不依赖另外一个事务的结束 start terminate 阻塞非阻塞 阻塞 : accept recv recvfrom queue.get join 非阻塞 : setblocking = False 并发并行 并行是特殊的并发 并行就是 同一时刻 两个以上的程序同时在cpu上执行 并发就是 同一时段 两个以上的程序看起来在同时执行 IO概念 : 文件操做 数据库操做 网络传输 用户输入输出 Input 获得bytes/str Output 发送数据/输出数据 由于进程与进程之间本质上是异步且数据隔离 守护进程 : 子进程等待主进程的代码结束就结束了 同步控制 join 锁 - 互斥锁 : 多个进程同时对一个数据进行操做的时候 操做同一个文件/数据库/管道/Manager.dict 信号量 事件 数据共享 - 数据不安全 Manager IPC-进程之间通讯 管道 队列 - 生产者消费者模型(为了解决数据的生产和处理的效率问题) 第三方工具(消息队列,消息中间件) 进程池 解决大量任务 开启多个进程的开销过大的问题 节省资源,提升并发效率的 通常开进程数 cpu_count * 1 or 2
Python并不支持真正意义上的多线程。Python中提供了多线程模块,但若是想经过多线程提升代码的速度,并不推荐使用多线程模块。Python中有一个全局锁Global Interpreter Lock(GIL),全局锁会确保任什么时候候多个线程中只有一个会被执行。线程的执行速度很是快,会误觉得线程是并行执行的,但实际上都是轮流执行。通过GIL处理后,会增长线程执行的开销。
全局锁 GIL(Global interpreter lock) 并非 Python 的特性,而是在实现 Python 解析器(CPython)时所引入的一个概念。Python有CPython,PyPy,Psyco 等不一样的 Python 执行环境,其中 JPython 没有GIL。CPython 是大部分环境下默认的 Python 执行环境,GIL 并非 Python 的特性,Python 彻底能够不依赖于 GIL。
GIL 限制了同一时刻只能有一个线程运行,没法发挥多核 CPU 的优点。GIL 本质是互斥锁,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。在一个 Python 的进程内,不只有主线程或者由主线程开启的其它线程,还有解释器开启的垃圾回收等解释器级别的线程。进程内,全部数据都是共享的,代码做为一种数据也会被全部线程共享,多个线程先访问到解释器的代码,即拿到执行权限,而后将 target 的代码交给解释器的代码去执行,解释器的代码是全部线程共享的,因此垃圾回收线程也可能访问到解释器的代码而去执行,所以为了保证数据安全须要加锁处理,即 GIL。
因为GIL 的存在,同一时刻同一进程中只有一个线程被执行。多核 CPU能够并行完成计算,所以多核能够提高计算性能,但 CPU 一旦遇到 I/O 阻塞,仍然须要等待,因此多核CPU对 I/O 密集型任务提高不明显。根据执行任务是计算密集型仍是I/O 密集型,不一样场景使用不一样的方法,对于计算密集型任务,多进程占优点,对于 I/O 密集型任务,多线程占优点
全部的程序 - 任务 全部的任务 - 进程 进程 ? 进行中的程序 PID进程ID 进程是计算机中资源分配的最小单位 进程间数据隔离 要通讯的话运用socket
进程调度:--先来先服务调度算法,短做业优先,时间片轮转法,多级反馈机制
并发 资源有限的状况下,AB程序交替使用cpu目的是提升效率 并行 同一时刻都在执行 多核 (是并发里的一种特殊状况) 更苛刻的条件 同步: 程序顺序执行,多个任务之间串行执行 (洗衣完--作饭完--洗碗) 异步: 多个任务同时运行 (在同一时间内洗衣作饭洗碗) 阻塞: 程序因为不符合某个条件或者等待某个条件知足 而在某一个地方进入等待状态 非阻塞: 程序正常执行
同步阻塞
一件事儿一件事儿的作
中间还要被阻塞
同步非阻塞 : 费力不讨好
一件事儿一件事儿的作
可是不阻塞
异步阻塞
同时进行的
每一件事儿都会遇到阻塞事件
异步非阻塞
几个事情同时进行
每一件事都不阻塞
import os,time print(os.getpid()) #获取当前进程号 print(os.getppid()) #获取当前父进程id time.sleep(5) print(os.getpid())
import multiprocessing #是个包 from multiprocessing import Process import os def son_process(): # 这个函数的代码实在子进程执行的 print('执行我啦',os.getpid(),os.getppid()) print() if __name__ == '__main__': print('1-->',os.getpid()) #主进程 p = Process(target=son_process) #实例化 p.start() #进程开始 下面是打印结果 # 1--> 6416 # 执行我啦 7388 6416
import time from multiprocessing import Process # 经过并发实现一个有并发效果的socket server # 1.开启了一个子进程就已经实现了并发: 父进程(主进程)和子进程并发(同时执行) def son_process(): print('son start') time.sleep(1) print('son end') if __name__ == '__main__': p = Process(target=son_process) #建立子进程 p.start() #通知操做系统开启一个子进程 os响应需求 分配资源 执行进程中的代码 print('主进程')
import time from multiprocessing import Process # 经过并发实现一个有并发效果的socket server # 1.开启了一个子进程就已经实现了并发: 父进程(主进程)和子进程并发(同时执行) def son_process(): print('son start') time.sleep(1) print('son end') if __name__ == '__main__': p = Process(target=son_process) #建立子进程 p.start() #通知操做系统开启一个子进程 os响应需求 分配资源 执行进程中的代码 for i in range(5): print('主进程') time.sleep(0.3)
import time from multiprocessing import Process def son_process(): print('son start') time.sleep(1) print('son end') if __name__ == '__main__': for i in range(3): p = Process(target=son_process) p.start()
import time from multiprocessing import Process def son_process(i): print('son start',i) time.sleep(1) print('son end',i) if __name__ == '__main__': for i in range(10): p = Process(target=son_process,args=(i,)) #args必须元组 p.start() # 通知操做系统 start并不意味着子进程已经开始了
import time from multiprocessing import Process def son_process(i): print('son start',i) time.sleep(1) print('son end',i) if __name__ == '__main__': for i in range(10): p = Process(target=son_process,args=(i,)) #子进程不支持返回值 p.start() print('主进程的代码执行完毕') # 主进程会等待子进程结束以后才结束 # 为何? # 父进程负责建立子进程,也负责回收子进程的资源
- server.py- import socket,time from multiprocessing import Process def talk(conn): conn, addr = sk.accept() print(conn) while True: msg = conn.recv(1024).decode() time.sleep(10) conn.send(msg.upper().encode()) if __name__ == '__main__': # 这句话下面的全部代码都只在主进程中执行 sk = socket.socket() sk.bind(('127.0.0.1',9000)) sk.listen() while True: conn,addr = sk.accept() Process(target=talk,args=(sk,)).start() # 卡 大量的while True 而且代码中并无太多的其余操做 # 若是咱们使用socketserver,不会这么卡 # 多进程确实能够帮助咱们实现并发效果,可是还不够完美 # 操做系统没开启一个进程要消耗大量的资源 # 操做系统要负责调度进程 进程越多 调度起来就越吃力 - clitent.py- import socket sk = socket.socket() sk.connect(('127.0.0.1',9000)) while True: sk.send(b'hello') print(sk.recv(1024))
def son_process(i): while True: print('son start',i) time.sleep(0.5) print('son end',i) if __name__ == '__main__': p = Process(target=son_process, args=(1,)) p.start() # 开启一个子进程,异步的 print('主进程的代码执行完毕') print(p.is_alive()) # 子进程还活着 p.terminate() # 结束一个子进程,异步的 print(p.is_alive()) # 子进程还在活着 time.sleep(0.1) print(p.is_alive()) # False
n = [100] def sub_n(): # 减法 global n # 子进程对于主进程中的全局变量的修改是不生效的 n.append(1) print('子进程n : ',n) #子进程n : [100, 1] if __name__ == '__main__': p = Process(target = sub_n) p.start() p.join() # 阻塞 直到子进程p结束 print('主进程n : ',n) #主进程n : [100]
# 主进程里的print('主进程n : ',n)这句话在十个子进程执行完毕以后才执行 n = [100] import random def sub_n(): global n # 子进程对于主进程中的全局变量的修改是不生效的 time.sleep(random.random()) n.append(1) print('子进程n : ',n) if __name__ == '__main__': p_lst = [] #进程添加进列表 for i in range(10): p = Process(target = sub_n) #建立 p.start() #通知os 开启 p_lst.append(p) for p in p_lst:p.join() # 阻塞 只有一个条件是可以让我继续执行 这个条件就是子进程结束 print('主进程n : ',n)
n = [100] def sub_n(): global n # 子进程对于主进程中的全局变量的修改是不生效的 n.append(1) print('子进程n : ',n) time.sleep(10) print('子进程结束') if __name__ == '__main__': p = Process(target = sub_n) p.start() p.join(timeout = 5) # 若是不设置超时时间 join会阻塞直到子进程p结束 # # timeout超时 # # 若是设置的超时时间,那么意味着若是不足5s子进程结束了,程序结束阻塞 # # 若是超过5s尚未结束,那么也结束阻塞 print('主进程n : ',n) p.terminate() # 也能够强制结束一个子进程
# 设置子进程为守护进程,守护进程会随着主进程代码的结束而结束 # 因为主进程要负责给全部的子进程收尸,因此主进程必须是最后结束,守护进程只能在主进程的代码结束以后就认为主进程结束了 # 守护进程在主进程的代码结束以后就结束了,不会等待其余子进程结束 # # 但愿守护进程必须等待全部的子进程结束以后才结束 # ???? # import time # from multiprocessing import Process # def alive(): # while True: # print('链接监控程序,而且发送报活信息') # time.sleep(0.6) # # def func(): # '主进程中的核心代码' # while True: # print('选择的项目') # time.sleep(1) # print('根据用户的选择作一些事儿') # # if __name__ == '__main__': # p = Process(target=alive) # p.daemon = True # 设置子进程为守护进程,守护进程会随着主进程代码的结束而结束 # p.start() # p = Process(target=func) # p.start() # p.join() # 在主进程中等待子进程结束,守护进程就能够帮助守护其余子进程了 # 守护进程 # 1.守护进程会等待主进程的代码结束而结束,不会等待其余子进程的结束 # 2.要想守护进程等待其余子进程,只须要在主进程中加上join
for i in range(5): pass print(i) # i=4 lst = [] for i in range(5): p = Process() lst.append(p) p.start() for p in lst: p.join() p.terminate()
import os from multiprocessing import Process class MyProcess(Process): #继承 def __init__(self,参数): super().__init__() #父类 初始化 self.一个属性 = 参数 def run(self): print('子进程中要执行的代码') if __name__ == '__main__': conn = '一个连接' mp = MyProcess(conn) mp.start()
当多个进程使用同一份数据资源的时候,就会引起数据安全或顺序混乱问题。 接下来,咱们以 模拟抢票 为例,来看看数据安全的重要性。 import json import time from multiprocessing import Process,Lock def search(name): '''查询余票的功能''' with open('ticket') as f: # 'r' dic = json.load(f) # 读取 dict print(name , dic['count']) def buy(name): # 买票 with open('ticket') as f: dic = json.load(f) time.sleep(0.1) if dic['count'] > 0: print(name,'买到票了') dic['count'] -= 1 time.sleep(0.1) with open('ticket','w') as f: json.dump(dic,f) # 写进去 def get_ticket(name,lock): #整个操做 search(name) # 先查 lock.acquire() # 只有第一个到达的进程才能获取锁,剩下的其余人都须要在这里阻塞 上锁 buy(name) # 再买 lock.release() # 有一我的还锁,会有一我的再结束阻塞拿到钥匙 还锁 if __name__ == '__main__': lock = Lock() # 实例化锁 for i in range(10): # 10个进程 p = Process(target=get_ticket,args=('name%s'%i,lock)) # 建立 p.start() # 通知os 开启 # tips : ticket 里面的数据结构 {"count": 0} # 模拟过程描述: # 第一个来的人 取钥匙 开门 进门 关门 带着钥匙反锁 # 第一个拿到钥匙的人 开门 出门 锁门 挂钥匙
进程 状态码 Z/z 僵尸进程 linux命令 主进程中控制子进程的方法: p = Process(target,args) #建立这一刻 根本没有通知操做系统 p.start() #通知os 开启子进程 异步非阻塞 p.terminate() #通知os,关闭子进程,异步非阻塞 p.is_alive() # 查看子进程是否还活着 p.join(timeout=10) # 阻塞 直到子进程结束 超时时间理解 # 守护进程 # 守护进程是一个子进程 # 守护进程会在主进程代码结束以后才结束 # 为何会这样? # 因为主进程必需要回收全部的子进程的资源 # 因此主进程必须在子进程结束以后才能结束 # 而守护进程就是为了守护主进程存在的 # 不能守护到主进程结束,就只能退而求其次,守护到代码结束了 # 守护到主进程的代码结束,意味着若是有其余子进程没有结束,守护进程没法继续守护 # 解决方案 : 在主进程中加入对其余子进程的join操做,来保证守护进程能够守护全部主进程和子进程的执行 # 如何设置守护进程 # 子进程对象.daemon = True 这句话写在start以前 # 锁 # 为何要用锁? # 因为多个进程的并发,致使不少数据的操做都在同时进行 # 因此就有可能产生多个进程同时操做 : 文件\数据库 中的数据 # 致使数据不安全 # 因此给某一段修改数据的程序加上锁,就能够控制这段代码永远不会被多个进程同时执行 # 保证了数据的安全 # Lock 锁(互斥锁) # 锁其实是把你的某一段程序变成同步的了,下降了程序运行的速度,为了保证数据的安全性 # 没有数据安全的效率都是耍流氓
# 对于锁 保证一段代码同一时刻只能有一个进程执行 # 对于信号量 保证一段代码同一时刻只能有n个进程执行 # 流量控制 import time import random from multiprocessing import Process,Semaphore def ktv(name,sem): sem.acquire() #拿锁 print("%s走进了ktv"%name) time.sleep(random.randint(5,10)) print("%s走出了ktv" % name) sem.release() #还锁 if __name__ == '__main__': sem = Semaphore(4) # 同时只能有4个进程执行 for i in range(25): p = Process(target=ktv,args = ('name%s'%i,sem)) p.start() # 信号量原理 : 锁 + 计数器实现的 # 普通的锁 acquire 1次 # 信号量 acquire 屡次 # count计数 # count = 4 # acquire count -= 1 # 当count减到0的时候 就阻塞 # release count + = 1 # 只要count不为0,你就能够继续acquire
# from multiprocessing import Event # Event 事件类 # e = Event() # e 事件对象 # 事件自己就带着标识 : False # wait 阻塞 # 它的阻塞条件是 对象标识为False # 结束阻塞条件是 对象标识为True # 对象的标识相关的 : # set 将对象的标识设置为True # clear 将对象的标识设置为False # is_set 查看对象的标识是否为True import time import random from multiprocessing import Event,Process def traffic_light(e): print('\033[1;31m红灯亮\033[0m') while True: time.sleep(2) if e.is_set(): # 若是当前是绿灯 print('\033[1;31m红灯亮\033[0m') # 先打印红灯亮 e.clear() # 再把灯改为红色 else : # 当前是红灯 print('\033[1;32m绿灯亮\033[0m') # 先打印绿灯亮 e.set() # 再把灯变绿色 # def car(e,carname): if not e.is_set(): # False print('%s正在等待经过'%carname) e.wait() #阻塞 print('%s正在经过'%carname) if __name__ == '__main__': e = Event() #建立 p = Process(target=traffic_light,args = (e,)) #建立进程 p.start() #开始 for i in range(100): #100辆车 time.sleep(random.randrange(0,3)) #随机 p = Process(target=car, args=(e,'car%s'%i)) p.start() # 太复杂了 # 在咱们进行并发操做的时候不多用到这么复杂的场景 # Event事件 # 放到进程中的代码必定不止一段 # 这两个操做之间 存在同步关系 # 一个操做去确认另外一个操做的执行条件是否完成 # 标识 控制wait是否阻塞的关键 # 如何修改这个标识 : clear set # 如何查看这个标识 : is_set
# 管道数据不安全 管道加锁就是队列 from multiprocessing import Pipe,Process def f(conn): #接收子conn conn.send('hello world') #发消息 conn.close() if __name__ == '__main__': parent_conn,child_conn = Pipe() p = Process(target=f,args=(child_conn,)) #传子conn p.start() print(parent_conn.recv()) #父conn接收 p.join()
# 进程之间 数据隔离 # 凭什么判断 子进程是否执行完毕了???? # lock对象 # a进程 acquire了 b进程在acquire的地方一直阻塞直到a release # 你在b进程 如何知道a进程release了? # 你以前学习的lock semaphore event实际上都用到了进程之间的通讯 # 只不过这些通讯都是很是简单而固定的信号 # 在你使用这些工具的过程当中并感知不到 # 对于用户来说 : 就但愿可以去进行一些更加复杂的 不固定的内容的交互 # 这种状况下使用lock semaphore event就不可行了 # 进程间通讯 IPC # IPC Inter-Process Communication # 实现进程之间通讯的两种机制: # 管道 Pipe 数据不安全 # 队列 Queue 管道+锁 # from multiprocessing import Queue,Process # # def consumer(q): # print( # '子进程 :', q.get() # ) # # # if __name__ == '__main__': # q = Queue() # p = Process(target=consumer,args=(q,)) # p.start() # q.put('hello,world') # 生产者消费者模型 import time from multiprocessing import Queue,Process def producer(name,food,num,q): '''生产者''' for i in range(num): time.sleep(0.3) foodi = food + str(i) print('%s生产了%s'%(name,foodi)) q.put(foodi) def consumer(name,q): while True: food = q.get() # 等待接收数据 if food == None:break print('%s吃了%s'%(name,food)) time.sleep(1) if __name__ == '__main__': q = Queue(maxsize=10) p1 = Process(target=producer,args = ('宝元','泔水',20,q)) p2 = Process(target=producer,args = ('战山','鱼刺',10,q)) c1 = Process(target=consumer, args=('alex', q)) c2 = Process(target=consumer, args=('wusir', q)) p1.start() # 开始生产 p2.start() # 开始生产 c1.start() c2.start() p1.join() # 生产者结束生产了 p2.join() # 生产者结束生产了 q.put(None) # put None 操做永远放在全部的生产者结束生产以后 q.put(None) # 有几个消费者 就put多少个None # 为何队列为空 为满 这件事情不够准确 # q.qsize() 队列的大小 # q.full() 是否满了 满返回True # q.empty() 是否空了 空返回True
import time from multiprocessing import JoinableQueue,Process def consumer(name,q): while True: food = q.get() time.sleep(1) print('%s消费了%s'%(name,food)) q.task_done() def producer(name,food,num,q): '''生产者''' for i in range(num): time.sleep(0.3) foodi = food + str(i) print('%s生产了%s'%(name,foodi)) q.put(foodi) q.join() # 消费者消费完毕以后会结束阻塞 if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer, args=('宝元', '泔水', 20, q)) c1 = Process(target=consumer, args=('alex', q)) c2 = Process(target=consumer, args=('wusir', q)) c1.daemon = True c2.daemon = True p1.start() c1.start() c2.start() p1.join() # 消费者每消费一个数据会给队列发送一条信息 # 当每个数据都被消费掉以后 joinablequeue的join阻塞行为就会结束 # 以上就是为何咱们要在生产完全部数据的时候发起一个q.join() # 随着生产者子进程的执行完毕,说明消费者的数据都消费完毕了 # 这个时候主进程中的p1.join结束 # 主进程的代码结束 # 守护进程也结束了
from multiprocessing import Manager,Process,Lock def work(d,lock): # with lock: #不加锁而操做共享的数据,确定会出现数据错乱 # d['count']-=1 ''' 等价于下面的代码 ''' lock.acquire() d['count'] -= 1 lock.release() if __name__ == '__main__': lock=Lock() m = Manager() dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) # Manager是一个类 内部有一些数据类型可以实现进程之间的数据共享 # dict list这样的数据 内部的数字进行自加 自减 是会引发数据不安全的,这种状况下 须要咱们手动加锁完成 # 所以 咱们通常状况下 不适用这种方式来进行进程之间的通讯 # 咱们宁肯使用Queue队列或者其余消息中间件 来实现消息的传递 保证数据的安全
import time from multiprocessing import Process def func(i): i -= 1 if __name__ == '__main__': start = time.time() #计时开始 l = [] for i in range(100): p = Process(target=func,args=(i,)) p.start() l.append(p) for p in l: p.join() print(time.time() - start) #计时结束
计算时间差 import time from multiprocessing import Pool # 池 def func(i): i -= 1 if __name__ == '__main__': start = time.time() p = Pool(5) #池 你的池中打算放多少个进程,个数cpu的个数 * 1/2 p.map(func,range(100)) # 自动带join print(time.time()-start)
from multiprocessing import Pool def f(i): i -= 1 return i**2 if __name__ == '__main__': p = Pool(5) #池 你的池中打算放多少个进程,个数cpu的个数 * 1/2 ret = p.map(f,range(100)) # 自动带join print(ret)
import time from multiprocessing import Pool # 池 def func(i): i -= 1 time.sleep(0.5) return i**2 if __name__ == '__main__': p = Pool(5) #你的池中打算放多少个进程,个数cpu的个数 * 1|2 for i in range(100): ret = p.apply(func,args=(i,)) # 自动带join 串行/同步 apply就是同步提交任务 print(ret)
import time from multiprocessing import Pool # 池 def func(i): i -= 1 time.sleep(0.3) # print(i) return i**2 if __name__ == '__main__': p = Pool(5) lst = [] for i in range(100): ret = p.apply_async(func,args=(i,)) # 自动带join 异步的 apply_async异步提交任务 lst.append(ret) # p.close() # 关闭进程池的任务提交 今后以后不能再向p这个池提交新的任务 # p.join() # 阻塞 一直到全部的任务都执行完 # print('结束') for i in lst : print(i.get())
什么是进程池? 有限的进程的池子 为何要用进程池? 任务不少 cpu个数*5个任务以上 为了节省建立和销毁进程的时间 和 操做系统的资源 通常进程池中进程的个数: cpu的1-2倍 若是是高计算,彻底没有io,那么就用cpu的个数 随着IO操做越多,可能池中的进程个数也能够相应增长 向进程池中提交任务的三种方式 map 异步提交任务 简便算法 接收的参数必须是 子进程要执行的func,可迭代的(可迭代中的每一项都会做为参数被传递给子进程) 可以传递的参数是有限的,因此比起apply_async限制性比较强 apply 同步提交任务(你删了吧) apply_async 异步提交任务 可以传递比map更丰富的参数,可是比较麻烦 首先 apply_async提交的任务和主进程彻底异步 能够经过先close进程池,再join进程池的方式,强制主进程等待进程池中任务的完成 也能够经过get获取返回值的方式,来等待任务的返回值 咱们不能在apply_async提交任务以后直接get获取返回值 for i in range(100): ret = p.apply_async(func,args=(i,)) # 自动带join 异步的 apply_async异步提交任务 l.append(ret) for ret in l: print(ret.get())
回调函数 import os import time import random from multiprocessing import Pool # 池 def func(i): # [2,1,1.5,0,0.2] i -= 1 time.sleep(random.uniform(0,2)) return i**2 def back_func(args): print(args,os.getpid()) if __name__ == '__main__': print(os.getpid()) p = Pool(5) l = [] for i in range(100): ret = p.apply_async(func,args=(i,),callback=back_func) # 5个任务 p.close() p.join() callback回调函数 主动执行func,而后在func执行完毕以后的返回值,直接传递给back_func做为参数,调用back_func 处理池中任务的返回值 回调函数是由谁执行的? 主进程
import re import json from urllib.request import urlopen #请求页面包 from multiprocessing import Pool def get_page(i): #页面 ret = urlopen('https://movie.douban.com/top250?start=%s&filter='%i).read() ret = ret.decode('utf-8') return ret def parser_page(s): #页面数据分析 com = re.compile( '<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>\d+).*?<span class="title">(?P<title>.*?)</span>' '.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)评价</span>', re.S) ret = com.finditer(s) with open('file','a',encoding='utf-8') as f: for i in ret: dic = { "id": i.group("id"), "title": i.group("title"), "rating_num": i.group("rating_num"), "comment_num": i.group("comment_num"), } f.write(json.dumps(dic,ensure_ascii=False)+'\n') if __name__ == '__main__': p = Pool(5) count = 0 for i in range(10): p.apply_async(get_page,args=(count,),callback=parser_page) count += 25 p.close() p.join()
import json with open('file2','w',encoding='utf-8') as f: json.dump({'你好':'alex'},f,ensure_ascii=False)
线程 - python
线程 轻量级 进程 解决并发 总体效率高于进程 在进程中数据共享 资源共享 是进程的一部分,不能独立存在的 被CPU调度的最小单位 使用场景 socketserver web的框架 django flask tornado 多线程来接收用户并发的请求 python里 同一个进程中的多个线程能不能同时使用多个cpu 在整个程序界: 若是你的程序须要数据隔离 : 多进程 若是你的程序对并发的要求很是高 : 多线程 python 初期 面向单核的 一个cpu 做为一门脚本语言 解释型语言 线程锁这件事儿是由Cpython解释器完成 对于python来讲 同一时刻只能有一个线程被cpu访问 完全的解决了多核环境下的安全问题 线程锁 : 全局解释器锁 GIL 1.这个锁是锁线程的 2.这个锁是解释器提供的 多线程仍然有它的优点 你的程序中用到cpu真的多么 若是100% 90%的时间都消耗在计算上,那么cpython解释器下的多线程对你来讲确实没用 可是你写的大部分程序 的时间实际上都消耗在了IO操做上 遇到高计算型 开进程 4个进程 每一个进程里开n个线程 换个解释器
import os import time from threading import Thread def func(): print('start',os.getpid()) time.sleep(1) print('end') if __name__ == '__main__': t = Thread(target=func) t.start() for i in range(5): print('主线程',os.getpid()) time.sleep(0.3)
import time from threading import Thread def func(): n = 1 + 2 + 3 n ** 2 if __name__ == '__main__': start = time.time() lst = [] for i in range(100): t = Thread(target=func) t.start() lst.append(t) for t in lst: t.join() print(time.time() - start) # import time from multiprocessing import Process as Thread def func(): n = 1 + 2 + 3 n**2 if __name__ == '__main__': start = time.time() lst = [] for i in range(100): t = Thread(target=func) t.start() lst.append(t) for t in lst: t.join() print(time.time() - start)
from threading import Thread n = 100 def func(): global n n -= 1 t = Thread(target=func) t.start() t.join() print(n)
from threading import Thread class Mythread(Thread): def __init__(self,arg): super().__init__() self.arg = arg def run(self): print('in son',self.arg) t = Mythread(123) t.start()
import time from threading import Thread,currentThread,activeCount,enumerate class Mythread(Thread): def __init__(self,arg): super().__init__() self.arg = arg def run(self): time.sleep(1) print('in son',self.arg,currentThread()) # t = Mythread(123) # t.start() # print('主',currentThread()) #当前线程 # for i in range(10): t = Mythread(123) t.start() # print(t.ident) #当前线程id print(activeCount()) #几个活跃的线程 11 print(enumerate()) # 一共几个线程 []
- server.py - import socket from threading import Thread def talk(conn): while True: msg = conn.recv(1024).decode() #解码 conn.send(msg.upper().encode()) #编码 sk = socket.socket() #建立 sk.bind(('127.0.0.1',9000)) #bind sk.listen() #监听 while True: conn,addr = sk.accept() #接收 Thread(target=talk,args = (conn,)).start() #建立线程并开启 - client.py - import socket sk = socket.socket() #建立 sk.connect(('127.0.0.1',9000)) #链接 while True: sk.send(b'hello') #发 print(sk.recv(1024)) #收
import time from threading import Thread def func(): while True: print('in func') time.sleep(0.5) def func2(): print('start func2') time.sleep(10) print('end func2') Thread(target=func2).start() t = Thread(target=func) t.setDaemon(True) #线程 守护操做 t.start() print('主线程') time.sleep(2) print('主线程结束') # 守护进程 只守护主进程的代码,主进程代码结束了就结束守护,守护进程在主进程以前结束 # 守护线程 随着主线程的结束才结束,守护线程是怎么结束的 直到主子线程都结束 进程结束 # 进程 terminate 强制结束一个进程的 # 线程 没有强制结束的方法 # 线程结束 : 线程内部的代码执行完毕 那么就自动结束了
import time from threading import Thread,currentThread def func(): print(currentThread()) print('开始') for i in range(10): Thread(target=func).start() # time.sleep(2) print('主线程')
锁 用来保证数据安全 有了GIL仍是会出现数据不安全的现象,因此仍是要用锁 import time from threading import Thread,Lock n = 100 def func(lock): #锁 global n #用 全局的n # n -= 1 with lock: tmp = n-1 # n-=1 # time.sleep(0.1) n = tmp if __name__ == '__main__': l = [] lock = Lock() #实例化 for i in range(100): t = Thread(target=func,args=(lock,)) #建立 t.start() #开启 l.append(t) for t in l: t.join() print(n)
import dis n = 1 #全局空间的 += -= 操做 都不是数据安全的 def func(): n = 100 #局部空间内 永远安全 n -= 1 dis.dis(func) # 会出现线程不安全的两个条件 # 1.是全局变量 # 2.出现 += -=这样的操做 #下面是解析 LOAD 仅有这个 数据安全 STORE 有load store 就会不安全 # 列表 字典 # 方法 l.append l.pop l.insert dic.update 都是线程安全的 # l[0] += 1 不安全 # d[k] += 1 不安全
# 科学家吃面问题 import time from threading import Thread,Lock # noodle_lock = Lock() # fork_lock = Lock() # 死锁不是时刻发生的,有偶然的状况整个程序都崩了 # 每个线程之中不止一把锁,而且套着使用 # 若是某一件事情须要两个资源同时出现,那么不该该将这两个资源经过两把锁控制 # 而应看作一个资源 # # def eat1(name): # noodle_lock.acquire() # print('%s拿到面条了'%name) # fork_lock.acquire() # print('%s拿到叉子了'%name) # print('%s开始吃面'%name) # time.sleep(0.2) # fork_lock.release() # print('%s放下叉子了' % name) # noodle_lock.release() # print('%s放下面了' % name) # # def eat2(name): # fork_lock.acquire() # print('%s拿到叉子了' % name) # noodle_lock.acquire() # print('%s拿到面条了' % name) # print('%s开始吃面' % name) # time.sleep(0.2) # noodle_lock.release() # print('%s放下面了' % name) # fork_lock.release() # print('%s放下叉子了' % name) # # Thread(target=eat1,args=('wei',)).start() # Thread(target=eat2,args=('hao',)).start() # Thread(target=eat1,args=('太',)).start() # Thread(target=eat2,args=('宝',)).start() lock = Lock() def eat1(name): lock.acquire() print('%s拿到面条了'%name) print('%s拿到叉子了'%name) print('%s开始吃面'%name) time.sleep(0.2) lock.release() print('%s放下叉子了' % name) print('%s放下面了' % name) def eat2(name): lock.acquire() print('%s拿到叉子了' % name) print('%s拿到面条了' % name) print('%s开始吃面' % name) time.sleep(0.2) lock.release() print('%s放下面了' % name) print('%s放下叉子了' % name) Thread(target=eat1,args=('alex',)).start() Thread(target=eat2,args=('wusir',)).start() Thread(target=eat1,args=('太白',)).start() Thread(target=eat2,args=('宝元',)).start() # 先临时解决 fork_lock=noodle_lock = Lock() # 而后再找到死锁的缘由,再去修改 终极办法一把锁
from threading import RLock,Lock,Thread # 互斥锁 # 不管在相同的线程仍是不一样的线程,都只能连续acquire一次 # 要想再acquire,必须先release # 递归锁 # 在同一个线程中,能够无限次的acquire # 可是要想在其余线程中也acquire, # 必须如今本身的线程中添加和acquire次数相同的release rlock = RLock() #每一次acquire都像进去一道门 rlock.acquire() rlock.acquire() rlock.acquire() rlock.acquire() #直到全都release 才能下我的进门 print('锁不住') lock = Lock() #普通锁/互斥锁 lock.acquire() print('1') #到这里 hang住了 lock.acquire() print('2')
from threading import RLock rlock = RLock() def func(num): rlock.acquire() #锁 print('aaaa',num) rlock.acquire() print('bbbb',num) rlock.release() #必须 还锁 rlock.release() #必须 Thread(target=func,args=(1,)).start() Thread(target=func,args=(2,)).start() # aaaa 1 # bbbb 1 # aaaa 2 # bbbb 2
import time from threading import RLock,Lock,Thread noodle_lock = fork_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s开始吃面'%name) time.sleep(0.2) fork_lock.release() print('%s放下叉子了' % name) noodle_lock.release() print('%s放下面了' % name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了' % name) print('%s开始吃面' % name) time.sleep(0.2) noodle_lock.release() print('%s放下面了' % name) fork_lock.release() print('%s放下叉子了' % name) Thread(target=eat1,args=('alex',)).start() Thread(target=eat2,args=('wusir',)).start() Thread(target=eat1,args=('太白',)).start() Thread(target=eat2,args=('宝元',)).start()
import time from threading import Semaphore,Thread def func(name,sem): sem.acquire() print(name,'start') time.sleep(1) print(name,'stop') sem.release() sem = Semaphore(5) for i in range(20): Thread(target=func,args=(i,sem)).start()
from threading import Event # 事件 # wait() 阻塞 到事件内部标识为True就中止阻塞 # 控制标识 # set # clear # is_set # 链接数据库 import time import random from threading import Thread,Event def connect_sql(e): count = 0 while count < 3: e.wait(0.5) if e.is_set(): print('链接数据库成功') break else: print('数据库未链接成功') count += 1 def test(e): time.sleep(random.randint(0,3)) e.set() e = Event() Thread(target=test,args=(e,)).start() #测试 Thread(target=connect_sql,args=(e,)).start() #链接
from threading import Timer def func(): print('执行我啦') t = Timer(3,func) # 如今这个时间点我不想让它执行,而是预估一下大概多久以后它执行比较合适 t.start() print('主线程的逻辑') t.join() print('ok ')
# wait 阻塞 # notify(n) 给信号 # 假如如今有20个线程 # 全部的线程都在wait这里阻塞 # notify(n) n传了多少 # 那么wait这边就能得到多少个解除阻塞的通知 # notifyall # acquire # release import threading def run(n): con.acquire() con.wait() print("run the thread: %s" % n) con.release() if __name__ == '__main__': con = threading.Condition() #条件 for i in range(10): #10个线程 t = threading.Thread(target=run, args=(i,)) t.start() #开启 while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release() print('****') # 设置某个条件 # 若是知足这个条件 就能够释放线程 # 监控测试个人网速 # 20000个任务 # 测试个人网速 /系统资源 # 发现系统资源有空闲,我就放行一部分任务
import queue # 线程队列 线程之间数据安全 q = queue.Queue() # # 普通队列 # q.put(1) # # print(q.get()) # try: # q.put_nowait(2) # except queue.Full: # print('您丢失了一个数据2') # print(q.get_nowait()) # 若是有数据我就取,若是没数据不阻塞而是报错 # 非阻塞的状况下 q.put(10) print(q.get(timeout=2)) # # # 算法里 栈 # lfq = queue.LifoQueue() # 栈 # lfq.put(1) # lfq.put(2) # lfq.put(3) # print(lfq.get()) # print(lfq.get()) # print(lfq.get()) # # # 优先级队列,是根据第一个值的大小来排定优先级的 # # ascii码越小,优先级越高 # q = queue.PriorityQueue() # q.put((2,'a')) # q.put((1,'c')) # q.put((1,'b')) # # print(q.get()) # 线程+队列 实现生产者消费者模型
concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用 import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor #线程池 # from concurrent.futures import ProcessPoolExecutor as Pool #进程池 def func(num): print('in %s func'%num,currentThread()) time.sleep(random.random()) return num**2 tp = ThreadPoolExecutor(5) #5个线程 ret_l = [] for i in range(30): ret = tp.submit(func,i) #提交 ret_l.append(ret) for ret in ret_l: #取值 print(ret.result())
import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor as Pool import os def func(num): # print('in %s func'%num,currentThread()) print('in %s func'%num,os.getpid()) time.sleep(random.random()) return num**2 if __name__ == '__main__': # tp = ThreadPoolExecutor(5) tp = Pool(5) ret = tp.map(func,range(30)) # print(list(ret)) for i in ret: print(i)
# 回调函数 add_done_callback import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor as Pool def func1(num): print('in func1 ',num,currentThread()) return num*'*' def func2(ret): print('--->',ret.result(),currentThread()) tp = Pool(5) print('主 : ',currentThread()) for i in range(10): tp.submit(func1,i).add_done_callback(func2) # 回调函数收到的参数是须要使用result()获取的 # 回调函数是由谁执行的? 主线程
import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor as Pool from urllib.request import urlopen def func(name,url): content = urlopen(url).read() return name,content def parserpage(ret): name,content = ret.result() with open(name,'wb') as f: f.write(content) urls = { # 'baidu.html':'https://www.baidu.com', # 'python.html':'https://www.python.org', # 'openstack.html':'https://www.openstack.org', 'github.html':'https://help.github.com/', 'sina.html':'http://www.sina.com.cn/' } tp = Pool(2) for k in urls: tp.submit(func,k,urls[k]).add_done_callback(parserpage)
线程 锁 为何有了GIL以后还须要锁 多个线程同时操做全局变量的时候 当出现"非原子性操做",例如+= -= *= /= l.append(1) 原子性操做 a += 1 a = a + 1 tmp = a +1 a = tmp 死锁现象 什么是死锁现象? 两个以上的线程争抢同一把锁, 其中一个线程获取到锁以后不释放 另外的其余线程就都被锁住了 比较容易出现问题的状况 : 两把锁套在一块儿用了 死锁现象的本质 :代码逻辑问题 递归锁 一把锁在同一个线程中acquire屡次而不被阻塞 若是另外的线程想要使用,必须release相同的次数, 才能释放锁给其余线程 信号量 控制几个线程同一时刻只能有n个线程执行某一段代码 锁 + 计数器 事件 两件事情 一件事情要想执行依赖于另外一个任务的结果 条件 n个线程在某处阻塞 由另外一个线程控制这n个线程中有多少个线程能继续执行 定时器 规定某一个线程在开启以后的n秒以后执行 队列\栈\优先级队列 import queue 线程之间数据安全 多个线程get不可能同时取走一个数据,致使数据的重复获取 多个线程put也不可能同时存入一个数据,致使数据的丢失 队列 先进先出 栈 先进后出 优先级 优先级高的先出 线程池 concurrent.futrues ThreadPoolExcuter ProcessPoolExcuter submit 异步提交任务 shutdown 等待池内任务完成 result 获取进程函数的返回值 map 异步提交任务的简便用法 add_done_callback 回调函数 进程 主进程执行 线程
协程 - linux
进程 计算机中最小的资源分配单位 线程 计算机中能被CPU调度的最小单位 线程是由操做系统建立的,开启和销毁仍然占用一些时间 调度 1.一条线程陷入阻塞以后,这一整条线程就不能再作其余事情了 2.开启和销毁多条线程以及cpu在多条线程之间切换仍然依赖操做系统 你了解协程 ? 了解 协程(纤程,轻型线程) 对于操做系统来讲协程是不可见的,不须要操做系统调度 协程是程序级别的操做单位 协程效率高不高 和操做系统自己没有关系,和线程也没有关系 而是看程序的调度是否合理 协程指的只是在同一条线程上可以互相切换的多个任务 遇到io就切换其实是咱们利用协程提升线程工做效率的一种方式
# 切换 + 状态保存 yield import time def consumer(res): '''任务1:接收数据,处理数据''' pass def producer(): '''任务2:生产数据''' res=[] for i in range(100000): #1亿次 res.append(i) return res # start=time.time() res=producer() consumer(res) # 写成consumer(producer())会下降执行效率 stop=time.time() print(stop-start) import time def consumer(): while True: res = yield def producer(): g = consumer() next(g) for i in range(100000): g.send(i) start =time.time() producer() print(time.time() - start) # yield这种切换 就已经在一个线程中出现了多个任务,这多个任务以前的切换 本质上就是协程,consumer是一个协程,producer也是一个协程 # 单纯的切换还会消耗时间 # 可是若是可以在阻塞的时候切换,而且多个程序的阻塞时间共享,协程可以很是大限度的提升效率
协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的 # greenlet 协程模块 在多个任务之间来回切换 # gevent 基于greenlet实现的,多个任务交给gevent管理,遇到IO就使用greenlet进行切换 import time from greenlet import greenlet def play(): print('start play') g2.switch() # 开关 time.sleep(1) print('end play') def sleep(): print('start sleep') time.sleep(1) print('end sleep') g1.switch() g1 = greenlet(play) g2 = greenlet(sleep) g1.switch() # 开关
import time import gevent def play(): # 协程1 print(time.time()) print('start play') gevent.sleep(1) print('end play') def sleep(): # 协程2 print('start sleep') print('end sleep') print(time.time()) g1 = gevent.spawn(play) g2 = gevent.spawn(sleep) # g1.join() # g2.join() # 精准的控制协程任务,必定是执行完毕以后join当即结束阻塞 gevent.joinall([g1,g2])
from gevent import monkey;monkey.patch_all() import time import gevent url_lst = ['https://www.python.org/','https://www.yahoo.com/','https://github.com/'] def get_page(url): ret = urlopen(url).read() return ret.decode('utf-8') start = time.time() g_l = [] for url in url_lst: g = gevent.spawn(get_page,url) g_l.append(g) # gevent.joinall(g_l) print(time.time()-start)
- server.py - from gevent import monkey;monkey.patch_all() import socket import gevent def talk(conn): while True: msg = conn.recv(1024).decode() conn.send(msg.upper().encode()) sk = socket.socket() sk.bind(('127.0.0.1',9000)) sk.listen() while True: conn,addr = sk.accept() gevent.spawn(talk,conn) - client.py - import socket import threading def task(): sk = socket.socket() sk.connect(('127.0.0.1',9000)) while True: sk.send(b'hello') print(sk.recv(1024)) for i in range(500): threading.Thread(target=task).start()
协程
一条线程在多个任务之间相互切换
数据安全的
不能利用多核
可以规避一个线程上的IO阻塞
一条线程可以起500个协程
4c的机器
5个进程
每个进程20个线程
每个线程500个协程
5*20*500 = 50000