Python中主要经过 multiprocess 包来操做和管理进程。html
python 启动进程方式1:python
import time from multiprocessing import Process def fork(thread_name): time.sleep(2) print("subprocess: " + thread_name) if __name__ == '__main__': p = Process(target=fork, args=('hello_1',)) p.start() # 启动进程 print("end...") # 结果输出 end... subprocess: hello_1
Process类参数说明:安全
Process([ target [, name [, args [, kwargs]]]]]) target 表示子进程要执行的任务 args 表示调用对象的位置参数元组,args=(1,2,'hello',) kwargs 表示调用对象的字典,kwargs={'name':'baby','age':18} name 子进程的名称
python 启动进程方式2:服务器
import time from multiprocessing import Process class MyProcess(Process): def __init__(self, thread_name): super().__init__() self.thread_name = thread_name def run(self): time.sleep(2) print("subprocess: " + self.thread_name) if __name__ == '__main__': p = MyProcess('hello_1') p.start() print("end...") # 结果输出 end... subprocess: hello_1
Tip:两种启动进程的方式没有优劣之分~网络
在主进程中经过 join 方法,可让主进程等待子进程执行完毕后,再继续往下执行并发
import time from multiprocessing import Process def fork(thread_name): time.sleep(2) print("subprocess: " + thread_name) if __name__ == '__main__': p = Process(target=fork, args=('hello_1',)) p.start() p.join() # 等待子进程执行完毕 print("end...") # 结果输出 subprocess: hello_1 end...
多个子进程同时运行app
import time from multiprocessing import Process def fork(thread_name): time.sleep(2) print("subprocess: " + thread_name) if __name__ == '__main__': p_list = [] for i in range(1, 4): p = Process(target=fork, args=('hello_' + str(i),)) p.start() p_list.append(p) [p.join() for p in p_list] # 等待子进程执行完毕 print("end...") # 结果输出 subprocess: hello_1 subprocess: hello_2 subprocess: hello_3 end...
如上是经过第一种方式启动子进程,使用继承 Process 类的形式启动子进程示例以下:dom
import time from multiprocessing import Process class MyProcess(Process): def __init__(self, thread_name): super().__init__() self.thread_name = thread_name def run(self): time.sleep(2) print("subprocess: " + self.thread_name) if __name__ == '__main__': p_list = [] for i in range(1, 4): p = MyProcess('hello_' + str(i)) p.start() p_list.append(p) [p.join() for p in p_list] print("end...")
import time from multiprocessing import Process def fork(thread_name): time.sleep(2) print("subprocess: " + thread_name) if __name__ == '__main__': p = Process(target=fork, args=('hello',)) p.start() # 进程的名称 print(p.name) # 输出:Process-1 # 布尔值,True 表示该进程为守护进程,默认为 False,这个值须要在 p.start() 以前设置 print(p.daemon) # 输出:False # 进程的pid print(p.pid) # 输出:7980 # 进程的身份验证键,默认是由 os.urandom() 随机生成的32字符的字符串。 print(p.authkey) # 输出:b'\xf2M)\xc8\xf6\xae8\x0c\xbet\xbcAT\xad7%ig9zl\xe5|\xb5|\x7f\xa6\xab\x8a\x8a\x94:' # 查看进程是否还在运行,若还在运行,则返回 True print(p.is_alive()) # 输出:True # 主进程等待子进程 p 执行结束,再继续往下执行 # p.join() # 强制终止子进程 p p.terminate() print('end...')
import time from multiprocessing import Process def fork(thread_name): for i in range(5): time.sleep(1) print("subprocess: " + thread_name + "..." + str(i)) if __name__ == '__main__': p = Process(target=fork, args=('hello',)) p.start() time.sleep(2) print('end...') # 输出结果: subprocess: hello...0 subprocess: hello...1 end... subprocess: hello...2 subprocess: hello...3 subprocess: hello...4
能够看到主进程的代码先运行完毕,运行完成后,它会等待子进程执行完成后再结束。如果将子进程设置为守护进程,则子进程会随着主进程的代码执行完毕而结束。注意守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children。异步
import time from multiprocessing import Process def fork(thread_name): for i in range(5): time.sleep(1) print("subprocess: " + thread_name + "..." + str(i)) if __name__ == '__main__': p = Process(target=fork, args=('hello',)) p.daemon = True # 设置进程 p 为守护进程 p.start() time.sleep(2) print('end...') # 输出结果: subprocess: hello...0 subprocess: hello...1 end...
值得注意的是:守护进程是在主进程代码执行结束后就终止,即主进程的代码执行完毕,守护进程就终止。来看以下示例:async
import time from multiprocessing import Process def fork_1(thread_name): for i in range(5): time.sleep(1) print("subprocess: " + thread_name + "..." + str(i), end="\n") def fork_2(thread_name): for i in range(7): time.sleep(1) print("subprocess: " + thread_name + "..." + str(i), end="\n") if __name__ == '__main__': p1 = Process(target=fork_1, args=('hello',)) p2 = Process(target=fork_2, args=('hi',)) p1.daemon = True # 设置进程 p1 为守护进程 p1.start() p2.start() time.sleep(2) print('end...') # 输出结果: subprocess: hello...0 subprocess: hi...0 subprocess: hello...1 subprocess: hi...1 end... subprocess: hi...2 subprocess: hi...3 subprocess: hi...4 subprocess: hi...5 subprocess: hi...6
如上示例中,p1 为守护进程,在主进程输出 ‘end…’ 后,即主进程的代码执行完毕后,守护进程 p1 就终止了。可是此时,主进程并无终止,它须要等待 p2 执行完毕以后再终止。
进程与进程之间数据是隔离的
from multiprocessing import Process def fork(thread_name): global n print("subprocess: " + thread_name + "...n=" + str(n)) n = 1 print("subprocess: " + thread_name + "...n=" + str(n)) if __name__ == '__main__': n = 100 p = Process(target=fork, args=('hello',)) p.start() p.join() print("main...n=" + str(n)) # 输出结果: subprocess: hello...n=100 subprocess: hello...n=1 main...n=100
经过如上示例能够看出,子进程 p 中的变量 n 和主进程中的变量 n 是两个独立的变量,存放在不一样的内存空间,更改其中一个变量并不会影响另外一个变量的值。
要想在进程间共享数据,可经过 Manager 类实现。Manager 类中提供了不少能够共享数据的数据类型,包括dict,list,Queue,Pipe 等。注意:Manager 中的数据是不安全的。当多个进程同时访问共享数据的时候,就会产生数据安全问题。
多进程同时抢购余票示例:
from multiprocessing import Process, Manager def work(m_dict): if m_dict['count'] > 0: print("%s get ticket %d" % (str(os.getpid()), m_dict['count'])) m_dict['count'] -= 1 if __name__ == '__main__': m = Manager() m_dict = m.dict({'count': 20}) p_list = [] for i in range(20): p = Process(target=work, args=(m_dict, )) p.start() p_list.append(p) for i in p_list: i.join() print("end..." + str(m_dict['count'])) # 输出结果: 32940 get ticket 20 32941 get ticket 19 32942 get ticket 18 32939 get ticket 17 32943 get ticket 16 32944 get ticket 15 32946 get ticket 14 32945 get ticket 13 32947 get ticket 12 32948 get ticket 11 32953 get ticket 11 32958 get ticket 9 32957 get ticket 8 32955 get ticket 7 32956 get ticket 7 32954 get ticket 6 32950 get ticket 5 32949 get ticket 5 32951 get ticket 3 32952 get ticket 2 end...1
输出结果中 “ticket 11” 被购买了2次,能够看到当多个进程对同一份数据进行操做的时候,就会引起数据安全问题。
在如上示例中,增长进程数据还有可能出现以下这样的报错:
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
这个报错的触发缘由并无深究,极有多是 manager 内部缘由,在 manager 管理进程的同时不能够进入主进程进行某些交互。能够经过在子进程中 sleep 一下 来避免这个问题(这并非根本的解决方式)
import time, os from multiprocessing import Process, Manager def work(m_dict): time.sleep(0.5) # sleep 0.5 s,能够绕过这个问题 if m_dict['count'] > 0: print("%s get ticket %d" % (str(os.getpid()), m_dict['count'])) m_dict['count'] -= 1 ...
如上的数据安全问题,能够在子进程中加锁来解决,即在同一时刻,仅容许一个进程执行 lock.acquire() 和 lock.release() 之间的代码
import os from multiprocessing import Process, Manager, Lock def work(m_dict, lock): lock.acquire() if m_dict['count'] > 0: print("%s get ticket %d" % (str(os.getpid()), m_dict['count'])) m_dict['count'] -= 1 lock.release() if __name__ == '__main__': m = Manager() m_dict = m.dict({'count': 20}) lock = Lock() p_list = [] for i in range(20): p = Process(target=work, args=(m_dict, lock)) p.start() p_list.append(p) for i in p_list: i.join() print("end..." + str(m_dict['count'])) # 输出结果: 33240 get ticket 20 33242 get ticket 19 33241 get ticket 18 33243 get ticket 17 33244 get ticket 16 33245 get ticket 15 33247 get ticket 14 33246 get ticket 13 33249 get ticket 12 33248 get ticket 11 33250 get ticket 10 33251 get ticket 9 33252 get ticket 8 33257 get ticket 7 33258 get ticket 6 33253 get ticket 5 33254 get ticket 4 33255 get ticket 3 33259 get ticket 2 33256 get ticket 1 end...0
Manager() 是经过共享进程来实现多进程之间数据共享。Manager() 返回的对象控制了一个 server 进程,这个 server 进程容许其余进程经过 proxies 来访问。多进程之间数据共享,除了 Manager() 外,还有 Value 、 Array,Value 和 Array 是经过共享内存的方式实现数据共享,一样为了保证数据安全,常常和同步互斥锁配合使用。
关于 Value 、 Array 的具体使用方式可参阅 https://www.cnblogs.com/gengyi/p/8661235.html。
使用 Value 实现上述的抢票示例:
import os from multiprocessing import Process, Value, Lock def work(count, lock): lock.acquire() if count.value > 0: print("%s get ticket %d" % (str(os.getpid()), count.value)) count.value -= 1 lock.release() if __name__ == '__main__': count = Value('l', 50) lock = Lock() p_list = [] for i in range(50): p = Process(target=work, args=(count, lock)) p.start() p_list.append(p) for i in p_list: i.join() print("end..." + str(count.value))
from multiprocessing import Queue queue = Queue(3) # 建立队列:Queue([maxsize]),maxsize 表示队列的最大长度 queue.put('a') queue.put('b') queue.put('c') print(queue.full()) # 输出 True,表示队列已经满了 # 若队列已经满了,继续向队列中插入数据,则程序会阻塞在这里,直到队列的另外一端有数据被取出,新的数据才能插入 # put 方法有两个可选参数:block 和 timeout。 # block 默认为 True,表示会阻塞 timeout 指定的时间,若是超时,会抛出 Queue.Full 异常。若是 block 为 False,在 put 时 队列已满,则会当即抛出 Queue.Full 异常。 # timeout 默认为 None,表示会一直阻塞。 # queue.put('d') # queue.put_nowait() # 等同于 queue.put(block = False) print(queue.get()) # 'a' print(queue.get()) # 'b' print(queue.get()) # 'c' print(queue.empty()) # 输出 True,表示队列已空 # 若队列已空,继续从该队列中 get 数据,则程序会阻塞在这里,直到队列中新插入了数据。 # get 方法也有两个参数:block 和 timeout,通 put 方法 # block 默认为 True,表示会阻塞 timeout 指定的时间,若是 timeout 之间以内仍是没有获取到数据,会抛出 Queue.Empty 异常。block 为 False 时,若队列中有数据,则会当即返回数据,若是队列为空,则会当即抛出 Queue.Empty 异常. # timeout 默认为 None,表示会一直阻塞。 # queue.get(False) # queue.get_nowait() # 等同于 queue.get(block = False) # print(queue.qsize()) # 获取队列的长度,某些系统上,此方法可能引起NotImplementedError异常。 # q.close() # 关闭队列
生产者和消费者示例
from multiprocessing import Process, Queue import time def producer(name, production, queue): for i in range(2): time.sleep(0.5) queue.put(production + '_' + str(i)) print('%s produce %s' % (name, production + '_' + str(i)), end="\n") def consumer(name, queue): while True: data = queue.get() if data is None: break # None 为结束信号 time.sleep(0.3) print('%s consume %s' % (name, data), end="\n") if __name__ == '__main__': queue = Queue() p_list = [] for index, f in enumerate(['apple', 'pear', 'peach']): p = Process(target=producer, args=('producer_' + str(index), f, queue)) p_list.append(p) p.start() Process(target=consumer, args=('consumer_1', queue)).start() Process(target=consumer, args=('consumer_2', queue)).start() [p.join() for p in p_list] # 有2个消费者,则发送2次 None queue.put(None) queue.put(None) # 输出结果: producer_1 produce pear_0 producer_2 produce peach_0 producer_0 produce apple_0 consumer_2 consume peach_0 consumer_1 consume pear_0 producer_1 produce pear_1 producer_2 produce peach_1 producer_0 produce apple_1 consumer_2 consume apple_0 consumer_1 consume peach_1 consumer_2 consume pear_1 consumer_1 consume apple_1
经过向队列中插入 None,来告诉消费者生产已经结束。这是一种比较低端的实现方式。
JoinableQueue 类是 Queue 类的扩展,JoinableQueue 类中的 task_done() 方法为消费者调用方法,表示从队列中获取的项目(queue.get() 获取的数据)已经被处理;JoinableQueue 类中的 join() 方法为生产者调用的方法,生产者在调用 join() 方法后会被阻塞,直到队列中的每一个项目都被调用 queue.task_done() 方法为止。
以下示例是经过 task_done() 方法 和 join() 方法来实现相似于上述的发送结束信号机制。
from multiprocessing import Process, JoinableQueue import time def producer(name, production, queue): for i in range(2): time.sleep(0.5) queue.put(production + '_' + str(i)) print('%s produce %s' % (name, production + '_' + str(i)), end="\n") queue.join() def consumer(name, queue): while True: data = queue.get() time.sleep(0.3) print('%s consume %s' % (name, data), end="\n") queue.task_done() if __name__ == '__main__': queue = JoinableQueue() p_list = [] for index, f in enumerate(['apple', 'pear', 'peach']): p = Process(target=producer, args=('producer_' + str(index), f, queue)) p_list.append(p) p.start() c1 = Process(target=consumer, args=('consumer_1', queue)) c2 = Process(target=consumer, args=('consumer_2', queue)) c1.daemon = True c2.daemon = True c1.start() c2.start() [p.join() for p in p_list] print('end...')
输出结果与上一个示例一致。这里将 2个 consumer 设置为守护进程,在等待 producer 完成后,也随主进程的结束而结束。
管道的使用:
from multiprocessing import Process, Pipe def func(pro, con): pro.close() while True: try: print(con.recv()) except EOFError: con.close() break if __name__ == '__main__': pro, con = Pipe() # pro, con 分别表示管道的两端 Process(target=func, args=(pro, con)).start() con.close() # 这里也能够不关闭 for i in range(5): pro.send(i) pro.close() # 输出结果: 0 1 2 3 4
传给进程的 conn(管道链接)是不会相互影响的,在一个进程中关闭了管道,并不会影响这个管道在另外一个进程中的使用。如果在一个进程中,管道的一端没有被用到,那么就应该将这一端关闭。例如在生产者中,应该关闭管道的 con 端(右端),在消费者中应该关闭管道的 pro 端(左端)。
当管道全部的入口都已经关闭(上述示例中,主进程和子进程中管道的入口都为 pro),消费者继续接收数据(调用 recv() 方法),当管道中已经没有数据时,就会抛出 EOFError。
若是管道有入口没有关闭,且该入口没有在向管道发送数据,那么消费者就会阻塞在 recv() 方法上。
如上示例是经过 抛出 EOFError 错误来结束管道,还有另外一种方式,就是经过管道中的数据(例如向管道中传递None)来结束管道
from multiprocessing import Process, Pipe def func(con): while True: data = con.recv() if data is None: break print(data) if __name__ == '__main__': pro, con = Pipe() # con, pro 分别表示管道的两端 Process(target=func, args=(con,)).start() for i in range(5): pro.send(i) pro.send(None)
多个消费者消费管道中的数据示例(加锁):
from multiprocessing import Process, Pipe, Lock import time def producer(pro, con, name, production): con.close() for i in range(4): time.sleep(0.5) pro.send(production + str(i)) print('%s produce %s' % (name, production + '_' + str(i)), end="\n") pro.close() def consumer(pro, con, name, lock): pro.close() while True: lock.acquire() try: data = con.recv() time.sleep(0.3) print('%s consume %s' % (name, data), end="\n") except EOFError: con.close() break finally: lock.release() if __name__ == '__main__': pro, con = Pipe() lock = Lock() Process(target=producer, args=(pro, con, 'producer', 'apple')).start() Process(target=consumer, args=(pro, con, 'c_1', lock)).start() Process(target=consumer, args=(pro, con, 'c_2', lock)).start() pro.close() con.close()
pipe(管道)是进程数据不安全的,队列进程之间是数据安全的,由于队列的实现就是基于管道和锁实现的。因此管道极少被用到,生产环境中 pipe 通常也不多被用到,使用较多的通常会是队列服务器,例如 rabbitmq,kafka…...
信号量也是一种锁,信号量与互斥锁区别在于,互斥锁的 acquire() 方法和 release() 方法之间,仅容许一个线程(或进程)执行,而信号量可容许多个线程(或进程)执行。信号量的一种应用就是控制并发执行的线程(或进程)数。
from multiprocessing import Process, Semaphore import time def func(semaphore, name): if semaphore.acquire(): print(name) time.sleep(2) semaphore.release() if __name__ == '__main__': semaphore = Semaphore(3) for i in range(9): Process(target=func, args=(semaphore, 'process_' + str(i), )).start()
Python中的事件(Event)主要用于主线程(进程)控制其余线程(进程)的执行,其主要方法包括 set、wait、clear,is_set。
若事件(Event)的标记取值为 False,则线程(进程)会阻塞在 event.wait() 方法,event.wait() 还能够设置一个参数 timeout,在等待 timeout 指定的时间后中止阻塞,继续运行。
方法说明:
event.set():将 event 的标记设置为 True,全部 阻塞在 event.wait() 的线程(进程)都会继续执行 event.clear():将 event 的标记设置为 False。 event.is_set():判断 event 的标志是否为 True。
以下示例,在主进程中控制子进程在什么时候继续向下执行。例如在主进程的 time.sleep(3) 处能够执行一些检测工做,确保子进程的运行,若检测没有问题则继续子进程的运行。
from multiprocessing import Process, Event import time def worker(name, event): print('Process_%s is ready' % name) event.wait() print('Process_%s is running' % name) if __name__ == '__main__': event = Event() for i in range(0, 2): Process(target=worker, args=(i, event)).start() time.sleep(3) event.set() # 结果输出: Process_0 is ready Process_1 is ready Process_0 is running Process_1 is running
如上示例,若主进程一直没有容许子进程继续执行(例如检测工做没有经过),则子进程会一直阻塞在 event.wait() 这儿,咱们但愿在子进程阻塞过程当中会有持续的提示信息,这个能够经过设置 event.wait 方法的 timeout 参数实现。
from multiprocessing import Process, Event import time def worker(name, event): while not event.is_set(): print('Process_%s is ready' % name) event.wait(1) print('Process_%s is running' % name) if __name__ == '__main__': event = Event() for i in range(0, 2): Process(target=worker, args=(i, event)).start() time.sleep(3) event.set() # 结果输出: Process_0 is ready Process_1 is ready Process_0 is ready Process_1 is ready Process_0 is ready Process_1 is ready Process_0 is ready Process_1 is ready Process_1 is running Process_0 is running
进程的建立和销毁都须要消耗系统资源,且每一台服务器的 cpu 核心数有限,建立过多的进程反而会下降执行效率。这里就可使用进程池,进程池一启动就会建立固定数量的进程,有执行须要了,就从进程池中获取一个进程处理对应的任务,处理完成后,进程不会被销毁,而是放回进程池中。若是同时须要执行的任务过多,没有获取到进程的任务须要等待,等有空闲的进程了才能运行。
进程池节省了操做系统在建立和销毁进程上所花去的开销,也限制了同一时间可以运行的进程总数,在必定程度上提高了多进程的执行效率。
以下示例是使用进程池启动进程和直接启动进程的效率差距:
from multiprocessing import Process, Pool import time def m_add(a): return a ** a if __name__ == '__main__': # print(os.cpu_count()) # 调试环境的 cpu 核数为 8 # 建立进程池 pool = Pool(8) start_t1 = time.time() # 使用进程池启动进程 res = pool.map(m_add, range(500)) print(time.time() - start_t1) p_list = [] start_t2 = time.time() # 直接启动进程 for i in range(500): p = Process(target=m_add, args=(i, )) p_list.append(p) p.start() for p in p_list: p.join() print(time.time() - start_t2) # 输出结果: 0.003328084945678711 0.6395020484924316
建立进程池:
Pool([numprocess [,initializer [, initargs]]]): numprocess:进程池中的固定继承数,默认为 cpu 核心数(os.cpu_count()) initializer:每次启动进程须要执行的可调用对象 initargs:传递给 initializer 的参数
Pool 的经常使用方法:
map(func, iterable):异步提交任务。iterable 为一个可迭代对象,这个可迭代对象的长度是多少,就启动多少个子进程,且可迭代对象的每个元素会做为参数传递给 func。注意,使用 map 方法开启子进程,只能传递一个参数,若子进程须要多个参数,则这个参数可使用 元组;将全部子进程的返回结果以列表的形式返回。 apply(func [, args [, kwargs]]):同步提交任务,返回子进程的执行结果。若是须要并发地执行 func,必须从不一样线程中调用同一个进程池的 apply() 方法; apply_async(func [, args [, kwargs]]):异步提交任务,返回 AsyncResult 类的实例,从 AsyncResult 实例中获取执行结果。与 map 方法的区别是,apply_async 方法能够为所欲为地传递参数; close():结束进程池接受任务; jion():感知进程池中的任务执行结束。即全部提交进来的任务都已经执行完毕,且没有新的任务提交进来。
Tip:进程池能够有返回值,这是进程池特有的,可是直接起进程,是作不到有返回值的。
apply 方法应用:
import time, os from multiprocessing import Pool def worker(i): print('worker_%s running, pid: %s' % (i, os.getpid())) time.sleep(1) return i * i if __name__ == '__main__': pool = Pool(3) res_list = [] for i in range(7): res = pool.apply(worker, args=(i, )) # 返回的 res 便是子进程的返回结果 res_list.append(res) print(res_list) print('...end') # 输出结果: worker_0 running, pid: 20584 worker_1 running, pid: 20585 worker_2 running, pid: 20586 worker_3 running, pid: 20584 worker_4 running, pid: 20585 worker_5 running, pid: 20586 worker_6 running, pid: 20584 [0, 1, 4, 9, 16, 25, 36] ...end
在同一个线程中使用 pool.apply 方法提交任务,是提交一个,执行一个,执行完成后才能继续提交下一个任务。如上输出结果也是逐个输出。
apply_async 方法应用:
import time, os from multiprocessing import Pool def worker(i): print('worker_%s running, pid: %s' % (i, os.getpid())) time.sleep(1) return i * i if __name__ == '__main__': pool = Pool(3) res_list = [] for i in range(7): res = pool.apply_async(worker, args=(i, )) # res 为 AsyncResult 类的实例 res_list.append(res) pool.close() pool.join() for i in res_list: print(i.get()) print('...end') # 输出结果: worker_0 running, pid: 20598 worker_1 running, pid: 20599 worker_2 running, pid: 20600 worker_3 running, pid: 20598 worker_4 running, pid: 20599 worker_5 running, pid: 20600 worker_6 running, pid: 20599 0 1 4 9 16 25 36 ...end
经过 AsyncResult 对象的 get 方法获取返回值,get 方法会阻塞,即阻塞到子进程执行完毕,而后获取其返回值。
通常使用 apply_async 方法 异步提交任务,须要在主进程中感知任务结束(join方法),而且在 join 方法前面结束进程池接受任务(close方法)
map 方法应用:
import time, os from multiprocessing import Pool def worker(i): print('worker_%s running, pid: %s' % (i, os.getpid())) time.sleep(1) return i * i if __name__ == '__main__': pool = Pool(3) res_list = pool.map(worker, range(7)) for i in res_list: print(i) print('...end') # 输出结果: worker_0 running, pid: 20713 worker_1 running, pid: 20714 worker_2 running, pid: 20715 worker_3 running, pid: 20714 worker_4 running, pid: 20713 worker_5 running, pid: 20715 worker_6 running, pid: 20715 0 1 4 9 16 25 36 ...end
map 方法自带 join 方法和 close 方法,map 方法启动子进程后,就不容许再提交任务,且 map 方法会阻塞,直到子进程所有执行完毕,且将全部子进程的返回结果以列表的形式返回。
如果不想阻塞在 map 方法,则可使用 map_async,只是用了 map_async 方法,须要本身进行 close 和 join。
import time, os from multiprocessing import Pool def worker(i): print('worker_%s running, pid: %s' % (i, os.getpid())) time.sleep(1) return i * i if __name__ == '__main__': pool = Pool(3) res_list = pool.map_async(worker, range(7)) pool.close() pool.join() for i in res_list.get(): print(i) print('...end')
返回结果与上述一致。
进程池中一个进程处理完任务以后,这进程能够调用一个函数去处理该进程返回的结果,这个函数就是回调函数。回调函数的主要做用是告诉主进程,这里已经执行完毕,主进程能够针对返回结果继续后续的处理。相对于主进程轮询等待子进程的返回结果,利用回调函数能够提升程序的执行效率。
注意回调函数是由主进程执行的,能够将一些比较耗IO的操做放到进程池中执行,由主进程统一处理它们的返回结果。
回调函数简单示例:
from multiprocessing import Pool def func(info): print('...' + str(info)) def worker(i): return i * i if __name__ == '__main__': pool = Pool(3) res_list = [] for i in range(7): res = pool.apply_async(worker, args=(i, ), callback=func) res_list.append(res) pool.close() pool.join() print('~end') # 输出结果: ...0 ...4 ...9 ...1 ...16 ...36 ...25 ~end
以下示例中,能够将具体的业务放在 worker 方法中,例如从网络上爬取数据,而后统一由回调函数 func 写到一个文件中。
from multiprocessing import Pool def func(info): with open('abc.txt', 'a+') as f: f.writelines(str(info) + '\n') def worker(i): return i * i if __name__ == '__main__': pool = Pool() for i in range(10): pool.apply_async(worker, (i,), callback=func) pool.close() pool.join()
.................^_^