问:线程学完了,如今咱们开始学习进程了吧?html
答:是的。前面说到线程就是咱们的手,咱们如今能够学习一下咱们的“胳膊”了。linux
咱们有了多线程,为何还要学习多进程呢?这是由于在Python当中有一把GIL锁的存在,好比某些耗CPU的运算的时候,咱们能够运行多进程多个CPU并发的操做进行操做。对于IO操做来讲,咱们的瓶颈不在于咱们的CPU所以咱们用多线程操做。进程切换操做不是轻量级的。编程
咱们首先举例一个数据密集型的操做,来计算斐波那契数列:多线程
from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor import time def fib(n): if n<=2: return 1 return fib(n-1) + fib(n-2) if __name__ == '__main__': with ThreadPoolExecutor(3) as executor: all_task = [executor.submit(fib,(num)) for num in range(25,40)] start_time = time.time() for future in as_completed(all_task): data = future.result() print("get result:= {}".format(data)) print("multithread last time is {}".format(time.time()-start_time)) with ProcessPoolExecutor(3) as executor: all_task = [executor.submit(fib,(num)) for num in range(25,40)] start_time = time.time() for future in as_completed(all_task): data = future.result() print("get result:= {}".format(data)) print("multiprocess last time is {}".format(time.time()-start_time)) # # multithread last time is 43.156678199768066 # multiprocess last time is 27.62783455848694
咱们明显看到多进程比多线程快。并发
咱们在以一个IO操做来进行对比:app
from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor import time def random_sleep(n): time.sleep(n) return n if __name__ == '__main__': with ThreadPoolExecutor(3) as executor: all_task = [executor.submit(random_sleep,(num)) for num in [2]*30] start_time = time.time() for future in as_completed(all_task): data = future.result() print("get result:= {}".format(data)) print("multithread last time is {}".format(time.time()-start_time)) with ProcessPoolExecutor(3) as executor: all_task = [executor.submit(random_sleep,(num)) for num in [2]*30] start_time = time.time() for future in as_completed(all_task): data = future.result() print("get result:= {}".format(data)) print("multiprocess last time is {}".format(time.time()-start_time)) # # multithread last time is 20.035860300064087 # multiprocess last time is 20.641016483306885
正式进入咱们的进程操做:dom
import os import time # fork只能用于linux下面 pid = os.fork() print("bobby") if pid == 0: print("子进程{},父进程是{}".format(os.getpid(),os.getppid())) else: print("我是父进程:{}".format(pid)) time.sleep(2)
这段代码只能在Linux下运行。咱们发现的问题是若是主进程结束了,子进程仍是会运行的。async
问:进程如何进行编程?性能
答:咱们懂了线程的编程,进程的编程会变得很是的简单。多余的内容就再也不讲解,咱们讲解一些不一样的包,其实这些包的应用也是跟进程差很少的。学习
multiprocessing
import multiprocessing import time def get_html(n): time.sleep(n) return n if __name__ == '__main__': progress = multiprocessing.Process(target=get_html,args=(2,)) progress.start() progress.join()
咱们还能够直接获取进程的pid和ppid。
其余和咱们多线程差不都就不详解了。
使用进程池:
进程池:Pool和ProcessPoolExecutor。后那个跟线程同样。咱们单独说一下Pool这个进程池。
import multiprocessing import time from multiprocessing import Pool def get_html(n): time.sleep(n) return n if __name__ == '__main__': progress = multiprocessing.Process(target=get_html,args=(1,)) progress.start() progress.join() pool = Pool(multiprocessing.cpu_count()) print(multiprocessing.cpu_count()) result = pool.apply_async(get_html,args=(3,)) pool.close()
注意最后要关闭线程池。详细的关于线程池的代码能够参照这里:https://www.cnblogs.com/noah0532/p/10938771.html
特别要说明的是有两个方法:imap 和 imap_unordered(这个是谁先完成先打印谁)
for result in pool.imap(get_html,[1,5,3]):
进程间的通讯:
进程间的通讯和线程间的通讯有同样的也有不同的地方,好比锁就不能使用了。
举一个简单的例子:用队列进行通讯
from multiprocessing import Process,Queue # from queue import Queue # 这个queue就不能用了 import time def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == '__main__': queue = Queue(10) my_producer = Process(target=producer,args=(queue,)) my_consumer = Process(target=consumer, args=(queue,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join()
在多进程的编程中不能用以前的queue了,带用multiprocessing里面的queue,这一带你要注意
咱们再举一个共享变量的例子:
from multiprocessing import Process import time def producer(a): a += 1 time.sleep(2) def consumer(a): time.sleep(2) print(a) if __name__ == '__main__': a = 1 my_producer = Process(target=producer,args=(a,)) my_consumer = Process(target=consumer, args=(a,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join()
咱们发现咱们的全局变量不能用了,正如咱们前面说的,咱们再进程中每一块的变量是单独的,不能共享的。
另外multiprocessing中的queue也不能用在进程池当中。若是咱们想在进程当中应用就带用Manager当中的Queue
from multiprocessing import Process,Queue,Manager,Pool import time def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == '__main__': queue = Manager().Queue(10) pool = Pool(2) pool.apply_async(producer,args=(queue,)) pool.apply_async(consumer, args=(queue,)) pool.close() pool.join()
另外,咱们还能够经过咱们的pipe管道来进行通信,可是Pipe只能使用两个进程间的通讯,若是是两个交换pipe的性能比queue高
from multiprocessing import Process,Queue,Manager,Pool,Pipe import time def producer(pipe): pipe.send("bobby") def consumer(pipe): print(pipe.recv()) if __name__ == '__main__': # pipe只能用于两个进程间的通信 receive_pipe,send_pipe = Pipe() my_producer = Process(target=producer,args=(send_pipe,)) my_consumer = Process(target=consumer, args=(receive_pipe,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join()
重点:进程间的共享内存操做:Manager().dict(),array()....经常使用的数据类型都有。
from multiprocessing import Process,Queue,Manager,Pool,Pipe def add_data(p_dict,key,value): p_dict[key] = value if __name__ == '__main__': progress_dict = Manager().dict() first_progess = Process(target=add_data,args=(progress_dict,"bobby1",22)) second_progess = Process(target=add_data, args=(progress_dict, "bobby1", 23)) first_progess.start() second_progess.start() first_progess.join() second_progess.join() print(progress_dict) # {'bobby1': 23}