17.进程线程python
进程间通讯方式:管道Pipe;队列Queue;共享内存Value、Array、Manager;安全
多进程同步:锁Lock、递归锁RLock、Condition(条件变量);事件event;信号signal;信号量Semaphore;app
from multiprocessing import Process import time import os def test(sub_p): for j in range(20): print('%s----pid=%s' % (sub_p, os.getpid())) time.sleep(1) for i in range(3): p = Process(target=test, name='sub_%s' % i, args=(i,)) print('sub process %s' % p.name) p.start() p.join(5) # join表示延时时间,也就是等待子进程的时间,当10秒过了之后,则会运行主进程。 for i in range(60): print('----') time.sleep(1)
from multiprocessing import Process import time import os class ProcessClass(Process): def __init__(self, sub_p): super(ProcessClass, self).__init__() self.sub_p = sub_p def run(self): # 重写run方法,当调用start方法时,则会默认调用run方法,因此不用再填写target参数。 for j in range(20): print('%s----pid=%s' % (self.sub_p, os.getpid())) time.sleep(1) for i in range(3): p = ProcessClass(i) p.start() p.join(5) # 这里将会等待子进程单独运行5秒。 for i in range(60): # 主进程,当join等待结束收,则会父子进程一块儿运行。可是若是当父进程运行完,子进程尚未结束,那么父进程会继续等子进程。 print('--main--') time.sleep(1)
方法 | 描述 |
apply() | 以同步方式添加进程 |
apply_async() | 以异步方式添加进程 |
close() | 关闭Pool,使其不接受新任务(还可使用) |
terminate() | 无论任务是否完成,当即终止 |
join() | 主进程阻塞,等待子进程的退出,必须在close和terminate后使用 |
from multiprocessing import Pool # 导入Pool模块类 import os import time def work(num): print('进程的pid是%d,进程值是%d' % (os.getpid(), num)) time.sleep(2) p = Pool(2) # 实例化对象,参数2表示建立2个子进程,就是说每次只能执行2个进程。 for i in range(6): print('--%d--' % i) # 向实例对象添加6次任务,就是6个进程,可是实例对象的进程池只有2个,因此每次往进程池中放2个进程, # 当进程池中的2个进程执行完之后,再容许向进程池中添加进程。 p.apply_async(work, (i,)) p.close() # 关闭进程池,再也不接收进程任务。 p.join() # 当子进程工做结束后,则会运行主进程。
方法 | 描述 |
put | q.put(数据),放入数据(如队列已满,则程序进入阻塞状态,等待队列取出后再放入) |
put_nowait | q.put_nowati(数据),放入数据(如队列已满,则不等待队列取出后再放入,直接报错) |
get | q.get(数据),取出数据(如队列为空,阻塞等待队列放入数据后再取出) |
get_nowait | q.get_nowait(数据),取出数据(如队列为空,不等待队列取出以前数据,直接报错),放入数据后立马判断是否为空有时为True,缘由是放入值和判断同时进行 |
qsize | q.qsize() 消息数量 |
empty | q.empty() 返回True或False,判断是否为空 |
full | q.full() 返回值为True或False,判断是否为满 |
from multiprocessing import Process, Pipe def f(conn): conn.send([1, 'test', None]) conn.send([2, 'test', None]) print('child get: %s' % conn.recv()) # 没数据时读阻塞
conn.close() if __name__ == "__main__": # Pipe(duplex=True)返回管道的两端,duplex=True时双向管道;False时单向parent_conn只读,child_conn只写
parent_conn, child_conn = Pipe(duplex=True) p = Process(target=f, args=(child_conn,)) p.start() res = parent_conn.recv() print('parent get: %s, type=%s' % (res, type(res))) print('parent get: %s' % parent_conn.recv()) parent_conn.send('father test') p.join()
Python中进程间共享数据,处理基本的queue,pipe和value+array外,还提供了更高层次的封装。使用multiprocessing.Manager能够简单地使用这些高级接口。异步
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象能够被其余的进程经过proxies来访问。从而达到多进程间数据通讯且安全。async
Manager支持的类型有:list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。函数
from multiprocessing import Manager, Process, Lock def work(d, lock): with lock: # 不加锁而操做共享的数据,确定会出现数据错乱 d['count'] -= 1 if __name__ == '__main__': lock = Lock() with Manager() as m: dic = m.dict({'count': 100}) p_l = [] for i in range(100): p = Process(target=work, args=(dic, lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
注意:spa
使用Manager能够方便的进行多进程数据共享,但当使用Manager处理list、dict等可变数据类型时,须要很是注意一个陷阱。看下面的代码:线程
from multiprocessing import Process, Manager manager = Manager() m = manager.list() m.append({'id':1}) def test(): m[0]['id'] = 2 p = Process(target=test) p.start() p.join() print(m[0])
执行结果是:{'id': 1},
不是预期的:{'id': 2}要达到预期的结果,代码应改成:code
from multiprocessing import Process, Manager manager = Manager() m = manager.list() m.append({'id':1}) def test(): hack = m[0] hack['id'] = 2 m[0] = hack p = Process(target=test) p.start() p.join() print(m[0])
以上代码中让人困惑的操做的目的是绕过Manager的一个隐秘问题,这个问题是指:Manager对象没法监测到它引用的可变对象值的修改,须要经过触发__setitem__
方法来让它得到通知
server