阅读本文大约须要 10 分钟。
进程html
实现方式python
fork()
示例:程序员
import os print('current_pid :%d' % os.getpid()) res = os.fork() # 子进程返回的是 0 if res == 0: print('res: %d' % res) print('sub_pid: %d' % os.getpid()) # 主进程返回的是子进程的 pid else: print('main_pid: %d' % os.getpid()) print('res:%d' % res) # 结果为 current_pid :12775 main_pid: 12775 res:12776 res: 0 sub_pid: 12776multiprocessing.Process
multiprocessing.Process
示例:算法
from multiprocessing import Process import os, time print('man_process pid : %d' % os.getpid()) class NewProcess(Process): def __init__(self): Process.__init__(self) def run(self): time.sleep(3) print('%d process was runing' % os.getpid()) np = NewProcess() np.start() # 结果为 man_process pid : 7846 7847 process was runing
multiprocessing.Pool安全
同步(apply)多线程
示例:并发
from multiprocessing import Pool import time, os, random print('main_process pid: %d' % os.getpid()) def run(): time.sleep(random.random()) # random.random() 随机生成一个小于 1 的浮点数 print('%d process was runing' % os.getpid()) p = Pool(3) for i in range(4): p.apply(run, args=()) p.close() print('waiting for sub_process') while True: # 获取 Pool 中剩余的进程数量 count = len(p._cache) if count != 0: print('there was %d sub_process' % count) time.sleep(random.random()) else: break print('sub_process has done') # 结果为 main_process pid: 4295 4297 process was runing 4296 process was runing 4298 process was runing 4297 process was runing wating for sub_process sub_process has done
异步(apply_async)
示例:app
from multiprocessing import Pool import time, os, random print('main_process pid: %d' % os.getpid()) def run(): # random.random() 随机生成一个小于 1 的浮点数 time.sleep(random.random()) print('%d process was runing' % os.getpid()) p = Pool(3) for i in range(4): p.apply_async(run, args=()) p.close() while True: # 获取 Pool 中剩余的进程数量 count = len(p._cache) if count != 0: print('there was %d sub_process' % count) time.sleep(random.random()) else: break print('wiating for sub_process..') p.join() print('sub_process has done') # 结果为 main_process pid: 4342 wiating for sub_process.. there was 4 sub_process 4344 process was runing there was 3 sub_process 4345 process was runing 4344 process was runing 4343 process was runing sub_process has done
优缺点dom
fork()
是计算机最底层的进程实现方式,一个fork()
方法建立出来的进程有两个:主进程、子进程。fork()
建立出来的进程,主进程不会等待子进程。multiprocessing
模块经过将fork
方法封装成一个Process
类,该类有一个start()
方法,当调用该方法时,会自动调用run()
方法,开启一个进程。而且由Process
建立出来的进程,可使用join()
方法,使得主进程堵塞,被迫等待子进程。multiprocess
下另外一种开启进程的方式是经过Pool
进程池来实现。进程池能够开启多个进程来执行多个任务,可是进程数最大不会超过系统 CPU 核数。一样的,由Pool
建立出来的进程,主进程也不会等待子进程,经过join()
方法能够迫使主进程等待子进程,或者使用apply()
同步的方式。进程通讯
进程之间的通讯能够经过队列(Queue)来进行,多个进程一部分向队列里写入数据,一部分从队列里读取数据,从而完成多进程之间的通讯问题。
示例:异步
from multiprocessing import Process, Queue import random, time, os def write(q): if not q.full(): for i in range(4): q.put(i) print('%d was writing data[%d] to queue' % (os.getpid(), i)) time.sleep(random.random()) else: print('queue is full') def read(q): # 等待队列被写入数据 time.sleep(random.random()) while True: if not q.empty(): data = q.get() print('%d was reading data{%d} from queue' % (os.getpid(), data)) else: print('queue is empty') break # 建立通讯队列,进程之间,全局变量不共享 q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.join() print('end') # 结果为 4640 was writing data[0] to queue 4640 was writing data[1] to queue 4640 was writing data[2] to queue 4641 was reading data{0} from queue 4641 was reading data{1} from queue 4641 was reading data{2} from queue queue is empty 4640 was writing data[3] to queue end
因为进程的执行顺序问题,形成了 pr 先于 pw 执行,因此 pr 未读取到数据,pr 进程任务结束,堵塞解开,主进程继续向下运行,最后 pw 任务结束。
进程通讯改良
示例:
from multiprocessing import Process, Queue import random, time, os def write(q): if not q.full(): for i in range(4): q.put(i) print('%d was writing data[%d] to queue' % (os.getpid(), i)) time.sleep(random.random()) else: print('queue is full') def read(q): # 等待队列被写入数据 time.sleep(random.random()) while True: data = q.get() print('%d was reading data{%d} from queue' % (os.getpid(), data)) # 建立通讯队列,进程之间,没有全局变量共享之说 q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() # pr 进程马上结束 pr.terminate() print('end') # 结果为 12898 was writing data[0] to queue 12898 was writing data[1] to queue 12898 was writing data[2] to queue 12899 was reading data{0} from queue 12899 was reading data{1} from queue 12899 was reading data{2} from queue 12898 was writing data[3] to queue 12899 was reading data{3} from queue end
线程
另外,Python 中的线程依据的是 Java 中的线程模型,若是有兴趣的同窗能够研究一下。
实现方式
示例:
import threading, time def run(): time.sleep(1) # currentThread() 返回的是当前的线程对象信息 print('%s was runing' % threading.currentThread()) print('current thread\'name: %s' % threading.currentThread().getName()) # 建立一个线程 t = threading.Thread(target=run, args=()) # 启动线程 t.start() # get_ident 返回的是当前线程对象所在的内存地址(id),该地址是惟一能够验证线程的数据 # 也可以使用 currentThread().getName() 来简单的区分线程 print('current thread\'name: %s' % threading.currentThread().getName()) print('main_thread tid: %s' % threading.get_ident()) # 结果为 current thread'name: MainThread main_thread tid: 140427132020480 <Thread(Thread-1, started 140427100555008)> was runing current thread'name: Thread-1
线程通讯
通讯队列
通讯队列做为相对来讲最为安全的线程通讯手段,其中Queue
模块自身拥有全部所需的锁,这使得通讯队列中的对象能够安全的在多线程之间共享。
这里用常见的「生产者-消费者模型」来介绍。
示例:
import threading, queue, time, random flag = object() def producter(q): for i in range(4): q.put(i) print('%s put data{%d} in queue' % (threading.currentThread().getName(), i)) time.sleep(random.random()) q.put(flag) def consumer(q): time.sleep(random.random()) while True: res = q.get() if res == flag: q.put(flag) break else: print('%s get data{%d} from queue' % (threading.currentThread().getName(), res)) # 建立队列 q = queue.Queue() # 建立线程 pro = threading.Thread(target=producter, args=(q,)) con = threading.Thread(target=consumer, args=(q,)) pro.start() con.start() # 结果为 Thread-1 put data{0} in queue Thread-1 put data{1} in queue Thread-2 get data{0} from queue Thread-2 get data{1} from queue Thread-1 put data{2} in queue Thread-2 get data{2} from queue Thread-1 put data{3} in queue Thread-2 get data{3} from queue end
这里有一个细节。在多线程下,当生产者任务完成以后,向队列queue
里添加了一个特殊对象(终止信号)flag
,这样当消费者从queue
中取出任务时,当取到flag
时,意味着全部任务被取出,并再次将flag
添加至queue
中,这样其余线程中的消费者在接收到这个终止信号后,也会得知当前生产者任务已经所有发布。
轮询
经过为数据操做添加while
循环判断,迫使线程被迫等待操做。(为了优化等待时间,应在最核心的位置添加判断条件)
示例:
import threading class NewThread(threading.Thread): flag = 0 g_num = 0 def __init__(self): super().__init__() def run(self): print('%s was runing' % threading.currentThread().getName()) if self.name == 'Thread-1': self.add_num() NewThread.flag = 1 else: # 轮询 # Thread-2 被迫等待 Thread-1 完成任务以后才能执行 while True: if NewThread.flag: self.add_num() break @classmethod def add_num(cls): global g_num for i in range(1000000): cls.g_num += 1 print('on the %s, g_num: %d' % (threading.currentThread().getName(), cls.g_num)) t1 = NewThread() t2 = NewThread() t1.start() t2.start() # 结果为 Thread-1 was runing Thread-2 was runing on the Thread-1, g_num: 1000000 on the Thread-2, g_num: 2000000
互斥锁优化
示例:
import threading class NewThread(threading.Thread): g_num = 0 # 生成锁对象 lock = threading.Lock() def __init__(self): super().__init__() def run(self): # 判断当前线程是否上锁,若未上锁,则一直尝试上锁(acquire)直至成功 with NewThread.lock: print('%s was runing' % self.name) self.add_num() @classmethod def add_num(cls): for i in range(1000000): cls.g_num += 1 print('on the %s g_num: %d' % (threading.currentThread().getName(), cls.g_num)) t1 = NewThread() t2 = NewThread() t1.start() t2.start() # 结果为 Thread-1 was runing on the Thread-1 g_num: 1000000 Thread-2 was runing on the Thread-2 g_num: 2000000
死锁问题解决
threading.Lock().acquire(timeout=3)
只要在上锁时设置超时时间timeout=
,只要超过期间,线程就会再也不等待是否解锁,而是直接运行。可是这种方式很危险,可能会带来大量的等待时间。进程与线程的区别
join()
方法使得主程序发生堵塞,来等待子进程。而主线程的任务结束后,程序会等待子线程结束才会结束。故不须要特地使用join()
方法来使主线程等待子线程。协程
实现方式
生成器(yield)
生成器相关内容可看问题 13。
这里以一个简单的「生产者-消费者模型」来解释如何使用生成器实现协程。
示例:
import threading def producter(c): next(c) n = 4 print('%s was running' % threading.currentThread().getName()) while n: print('product data: %d' % n) res = c.send(n) print(res) n -= 1 print('sale out') def consumer(): res = '' print('%s was running' % threading.currentThread().getName()) while True: n = yield res print('consume data: %d' % n) res = '200 OK' print('%s was running' % threading.currentThread().getName()) c = consumer() producter(c) # 结果为 MainThread was running MainThread was running MainThread was running product data: 4 consume data: 4 200 OK product data: 3 consume data: 3 200 OK product data: 2 consume data: 2 200 OK product data: 1 consume data: 1 200 OK sale out
能够看到,生产者事先不知道消费者具体要消费多少数据,生产者只是一直在生产。而消费者则是利用生成器的中断特性,consumer
函数中,程序每一次循环遇到yield
关键字就会停下,等待producter
函数启动生成器,再继续下一次循环。
在这中间只有一个线程在运行,任务的切换时机由程序员本身控制,避免了因为多线程之间的切换消耗,这样就简单实现了协程。
asyncio
库,该库适用于高并发。本身目前不会,就不瞎 BB 了,具体可看文档。
未写完,下次更新补上