内容目录:
- multiprocessing.Queue()
- JoinableQueue
- 进程间的信号传递 Event
- 控制对资源的访问 Lock
- 同步操做 Condition
- 控制对资源的并发访问 Semaphore
- 管理共享状态 Manager
- 共享命名空间 mgr.Namespace()
- 进程池 multiprocessing.Pool
和线程同样,多进程的一个常见的使用模式是将一个任务划分为几个worker,以便并行运行。有效地使用多进程一般须要它们之间的一些通讯,这样工做就能够被分割,结果能够被聚合。一种简单方法是使用队列multiprocessing.Queue()
来回传递消息。任何能够用pickle序列化的对象均可以经过队列。python
import multiprocessing class MyFancyClass: def __init__(self, name): self.name = name def do_something(self): proc_name = multiprocessing.current_process().name print('Doing something fancy in {} for {}!'.format( proc_name, self.name)) def worker(q): obj = q.get() obj.do_something() if __name__ == '__main__': queue = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(queue,)) p.start() queue.put(MyFancyClass('Fancy Dan')) # Wait for the worker to finish queue.close() queue.join_thread() p.join()
结果:当q是空的时候,q.get()会等。并发
Doing something fancy in Process-1 for Fancy Da
JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:app
使用None这个特殊值来判断是否结束Workerui
import multiprocessing import time class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name while True: next_task = self.task_queue.get() if next_task is None: # Poison pill means shutdown print('{}: Exiting'.format(proc_name)) self.task_queue.task_done() break # next_task是Task()的一个实例,打印next_task会输出__str__ print('{}: {}'.format(proc_name, next_task)) # 执行next_task()会执行__call__ answer = next_task() self.task_queue.task_done() self.result_queue.put(answer) class Task: def __init__(self, a, b): self.a = a self.b = b def __call__(self): time.sleep(0.1) # pretend to take time to do the work return '{self.a} * {self.b} = {product}'.format( self=self, product=self.a * self.b) def __str__(self): return '{self.a} * {self.b}'.format(self=self) if __name__ == '__main__': # Establish communication queues tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() # Start consumers num_consumers = multiprocessing.cpu_count() * 2 print('Creating {} consumers'.format(num_consumers)) consumers = [ Consumer(tasks, results) for i in range(num_consumers) ] for w in consumers: w.start() # Enqueue jobs num_jobs = 10 for i in range(num_jobs): tasks.put(Task(i, i)) # Add a poison pill for each consumer for i in range(num_consumers): tasks.put(None) # Wait for all of the tasks to finish tasks.join() # Start printing results while num_jobs: result = results.get() print('Result:', result) num_jobs -= 1
执行结果:spa
Creating 8 consumers Consumer-4: 0 * 0 Consumer-1: 1 * 1 Consumer-2: 2 * 2 Consumer-4: 3 * 3 Consumer-1: 4 * 4 Consumer-2: 5 * 5 Consumer-1: 6 * 6 Consumer-6: 7 * 7 Consumer-4: 8 * 8 Consumer-2: 9 * 9 Consumer-1: Exiting Consumer-4: Exiting Consumer-6: Exiting Consumer-2: Exiting Consumer-5: Exiting Consumer-8: Exiting Consumer-3: Exiting Consumer-7: Exiting Result: 0 * 0 = 0 Result: 1 * 1 = 1 Result: 2 * 2 = 4 Result: 4 * 4 = 16 Result: 3 * 3 = 9 Result: 5 * 5 = 25 Result: 6 * 6 = 36 Result: 8 * 8 = 64 Result: 7 * 7 = 49 Result: 9 * 9 = 81
Event类提供了一种简单的方法来在进程之间传递状态信息。
当wait()超时时,它返回时不会出现错误。调用者负责使用is_set()检查事件的状态线程
import multiprocessing import time def wait_for_event(e): """Wait for the event to be set before doing anything""" print('wait_for_event: starting') e.wait() print('wait_for_event: e.is_set()->', e.is_set()) def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" print('wait_for_event_timeout: starting') e.wait(t) print('wait_for_event_timeout: e.is_set()->', e.is_set()) if __name__ == '__main__': e = multiprocessing.Event() w1 = multiprocessing.Process( name='block', target=wait_for_event, args=(e,), ) w1.start() w2 = multiprocessing.Process( name='nonblock', target=wait_for_event_timeout, args=(e, 2), ) w2.start() print('main: waiting before calling Event.set()') time.sleep(3) e.set() print('main: event is set')
执行结果:code
main: waiting before calling Event.set() wait_for_event: starting wait_for_event_timeout: starting wait_for_event_timeout: e.is_set()-> False main: event is set wait_for_event: e.is_set()-> True
在须要在多个进程之间共享单个资源的状况下,可使用锁来避免冲突的访问。orm
import multiprocessing import sys def worker_with(lock): with lock: sys.stdout.write('Lock acquired via with\n') def worker_no_with(lock): lock.acquire() try: sys.stdout.write('Lock acquired directly\n') finally: lock.release() if __name__ == '__main__': lock = multiprocessing.Lock() w = multiprocessing.Process( target=worker_with, args=(lock,), ) nw = multiprocessing.Process( target=worker_no_with, args=(lock,), ) w.start() nw.start() w.join() nw.join()
运行结果:对象
Lock acquired via with Lock acquired directly
cond.wait()等着,cond.notify_all()通知能够往下运行了队列
import multiprocessing import time def stage_1(cond): """perform first stage of work, then notify stage_2 to continue """ name = multiprocessing.current_process().name print('Starting', name) with cond: print('{} done and ready for stage 2'.format(name)) cond.notify_all() def stage_2(cond): """wait for the condition telling us stage_1 is done""" name = multiprocessing.current_process().name print('Starting', name) with cond: cond.wait() print('{} running'.format(name)) if __name__ == '__main__': condition = multiprocessing.Condition() s1 = multiprocessing.Process(name='s1', target=stage_1, args=(condition,)) s2_clients = [ multiprocessing.Process( name='stage_2[{}]'.format(i), target=stage_2, args=(condition,), ) for i in range(1, 3) ] for c in s2_clients: c.start() time.sleep(1) s1.start() s1.join() for c in s2_clients: c.join()
运行结果:在这个例子中,两个进程并行地运行第二阶段的工做,可是只有在第一个阶段完成以后。
Starting stage_2[1] Starting stage_2[2] Starting s1 s1 done and ready for stage 2 stage_2[1] running stage_2[2] running
有时,容许多个worker一次访问一个资源是颇有用的,但要限制了数量。
import multiprocessing import time def worker(s, i): s.acquire() print(multiprocessing.current_process().name + "acquire"); time.sleep(i) print(multiprocessing.current_process().name + "release\n"); s.release() if __name__ == "__main__": s = multiprocessing.Semaphore(2) for i in range(5): p = multiprocessing.Process(target = worker, args=(s, i*2)) p.start()
运行结果:
Process-2acquire Process-3acquire Process-2release Process-4acquire Process-3release Process-1acquire Process-1release Process-5acquire Process-4release Process-5release
经过Manager共享信息,全部进程都能看获得。
import multiprocessing import pprint def worker(d, key, value): d[key] = value if __name__ == '__main__': mgr = multiprocessing.Manager() d = mgr.dict() jobs = [ multiprocessing.Process( target=worker, args=(d, i, i * 2), ) for i in range(10) ] for j in jobs: j.start() for j in jobs: j.join() print('Results:', d
运行结果:经过Manager建立列表,它是共享的,而且在全部进程中均可以看到更新。字典也支持。
Results: {0: 0, 2: 4, 3: 6, 1: 2, 4: 8, 6: 12, 5: 10, 7: 14, 8: 16, 9: 18}
除了字典和列表以外,管理者还能够建立一个共享的名称空间。
import multiprocessing def producer(ns, event): ns.value = 'This is the value' event.set() def consumer(ns, event): try: print('Before event: {}'.format(ns.value)) except Exception as err: print('Before event, error:', str(err)) event.wait() print('After event:', ns.value) if __name__ == '__main__': mgr = multiprocessing.Manager() namespace = mgr.Namespace() event = multiprocessing.Event() p = multiprocessing.Process( target=producer, args=(namespace, event), ) c = multiprocessing.Process( target=consumer, args=(namespace, event), ) c.start() p.start() c.join() p.join()
运行结果:能够看到在另外一个进程中能够对mgr.Namespace()进行复制,其余进程能够访问。
Before event, error: 'Namespace' object has no attribute 'value' After event: This is the value
重要的是要知道mgr.Namespace()中可变值的内容的更新不会自动传播。
import multiprocessing def producer(ns, event): # DOES NOT UPDATE GLOBAL VALUE! ns.my_list.append('This is the value') event.set() def consumer(ns, event): print('Before event:', ns.my_list) event.wait() print('After event :', ns.my_list) if __name__ == '__main__': mgr = multiprocessing.Manager() namespace = mgr.Namespace() namespace.my_list = [] event = multiprocessing.Event() p = multiprocessing.Process( target=producer, args=(namespace, event), ) c = multiprocessing.Process( target=consumer, args=(namespace, event), ) c.start() p.start() c.join() p.join()
运行结果:
Before event: [] After event : []
池类可用于管理固定数量的worker,用于简单的工做,在这些状况下,能够将工做分解并独立地分配给worker。
import multiprocessing def do_calculation(data): return data * 2 def start_process(): print('Starting', multiprocessing.current_process().name) if __name__ == '__main__': inputs = list(range(10)) print('Input :', inputs) builtin_outputs = map(do_calculation, inputs) print('Built-in:', builtin_outputs) pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool( processes=pool_size, initializer=start_process, ) pool_outputs = pool.map(do_calculation, inputs) pool.close() # no more tasks pool.join() # wrap up current tasks print('Pool :', pool_outputs)
运行结果:进程的返回值被收集并做为一个列表返回。
Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] Built-in: <map object at 0x000000000256A080> Starting SpawnPoolWorker-2 Starting SpawnPoolWorker-3 Starting SpawnPoolWorker-4 Starting SpawnPoolWorker-1 Starting SpawnPoolWorker-6 Starting SpawnPoolWorker-5 Starting SpawnPoolWorker-7 Starting SpawnPoolWorker-8 Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
在默认状况下,池会建立固定数量的worker进程,并将做业传递给他们,直到没有更多的工做。设置maxtasksperchild参数告诉池在完成了几个任务后从新启动worker进程,防止长时间运行的worker消耗更多的系统资源。
import multiprocessing def do_calculation(data): return data * 2 def start_process(): print('Starting', multiprocessing.current_process().name) if __name__ == '__main__': inputs = list(range(10)) print('Input :', inputs) builtin_outputs = map(do_calculation, inputs) print('Built-in:', builtin_outputs) pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool( processes=pool_size, initializer=start_process, maxtasksperchild=2, ) pool_outputs = pool.map(do_calculation, inputs) pool.close() # no more tasks pool.join() # wrap up current tasks print('Pool :', pool_outputs)
运行结果:当工人完成分配的任务时,即便没有更多的工做,他们也会从新开始工做。在这个输出中,有9个worker被建立,尽管只有10个任务,有的worker一次能够完成其中的两个任务。
Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] Built-in: <map object at 0x00000000025CA080> Starting SpawnPoolWorker-4 Starting SpawnPoolWorker-2 Starting SpawnPoolWorker-1 Starting SpawnPoolWorker-5 Starting SpawnPoolWorker-3 Starting SpawnPoolWorker-8 Starting SpawnPoolWorker-6 Starting SpawnPoolWorker-7 Starting SpawnPoolWorker-9 Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]