每次执行程序(好比说浏览器,音乐播放器)的时候都会完成必定的功能,好比说浏览器帮咱们打开网页。 进程就是一个程序在一个数据集上的一次动态执行过程。进程通常由程序、数据集、进程控制块三部分组成。进程的建立、撤销和切换的开销比较大
python
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程当中的最小单元,线程的引入减少了程序并发执行时的开销。线程没有本身的系统资源,只拥有在运行时必不可少的资源。但线程能够与同属与同一进程的其余线程共享进程所拥有的其余资源。线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。linux
协程,又称微线程,线程是系统级别的它们由操做系统调度,而协程则是程序级别的由程序根据须要本身调度。在一个线程中会有不少函数,咱们把这些函数称为子程序,在子程序执行过程当中能够中断去执行别的子程序,而别的子程序也能够中断回来继续执行以前的子程序,这个过程就称为协程。也就是说在同一线程内一段代码在执行过程当中会中断而后跳转执行别的代码,接着在以前中断的地方继续开始执行,相似与yield操做。协程是一中多任务实现方式,它不须要多个进程或线程就能够实现多任务。算法
multiprocessing是python的多进程管理包。windows
threading 模块创建在 _thread 模块之上。_thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块经过对 thread 进行二次封装,提供了更方便的 api 来处理线程。api
greenlet、gevent(第三方模块)能够实现协程浏览器
程序是指令和数据的有序集合,其自己没有任何运行的含义,是一个静态的概念。安全
进程是一个“执行中的程序”,进程的实质是程序的一次执行过程,进程是动态产生,动态消亡的。进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位;进程由程序、数据和进程控制块三部分组成。因为进程间的相互制约,使进程具备执行的间断性,即进程按各自独立的、不可预知的速度向前推动服务器
先来先服务(FCFS)调度算法是一种最简单的调度算法,该算法既能够做业调度,也能够做用域进程调度。FCFS算法比较有利于长做业(进程),而不利于短做业(进程)。由此可知,本算法适合于CPU繁忙型做业,而不利于I/O繁忙型做业(进程)。markdown
短做业(进程)优先调度算法(SJ/PF)是指对短做业或者短进程优先调度的算法,该算法既能够用于做业调度,也可用于进程调度。但其对长做业不利;不能保证紧迫性做业(进程)被及时处理;做业的长短只是被估算出来的。网络
时间片轮转(Round Robin,RR)将CPU的处理时间分红固定大小的时间片,若是一个进程在被调度选中以后用完了系统规定的时间片,但又未完成要求的任务,则它自行释放本身所占有的CPU而排到就绪队列的末尾,等待下一次调度。同时,进程调度程序又去调度当前就绪队列中的第一个进程。
并行:并行是指二者同时执行,好比赛跑,两我的都在不停的往前跑;(资源够用,好比三个线程,四核CPU)
并发:并行是指资源有限的状况下,二者交替轮流使用资源,好比一段路(单核CPU资源)同时只能过一我的,A走一段后,让给B,B用完继续给A,交替使用,目的是提升效率。
区别:
并行是从微观上,也就是在一个精确的时间片刻,有不一样的程序在执行,这就要求必须有多个处理器。
并发是从宏观上,在一个时间段上能够看出是同时执行,好比一个服务器同时处理多个session。
同步:所谓同步就是一个任务的完成须要依赖另一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列
。要么成功都成功,失败都失败,两个任务的状态能够保持一致。
异步:所谓异步是不须要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工做,依赖的任务也当即执行,只要本身完成了整个任务就算完成了
。至于被依赖的任务最终是否真正完成,依赖它的任务没法肯定,因此它是不可靠的任务序列
。
好比我去银行办理业务,可能会有两种方式: 第一种 :选择排队等候; 第二种 :选择取一个小纸条上面有个人号码,等到排到我这一号时由柜台的人通知我轮到我去办理业务了; 第一种:前者(排队等候)就是同步等待消息通知,也就是我要一直在等待银行办理业务状况; 第二种:后者(等待别人通知)就是异步等待消息通知。在异步消息处理中,等待消息通知者(在这个例子中就是等待办理业务的人)每每注册一个回调机制,
在所等待的事件被触发时由触发机制(在这里是柜台的人)经过某种机制(在这里是写在小纸条上的号码,喊号)找到等待该事件的人。
阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来讲的
不管是排队仍是使用号码等待通知,若是在这个等待的过程当中,等待者除了等待消息通知以外不能作其它的事情,那么该机制就是阻塞的,表如今程序中,也就是该程序一直阻塞在
该函数调用处不能继续往下执行。相反,有的人喜欢在银行办理这些业务的时候一边打打电话发发短信一边等待,这样的状态就是非阻塞的,由于他(等待者)没有阻塞在这个消息
通知上,而是一边作本身的事情一边等待。 注意:同步非阻塞形式其实是效率低下的,想象一下你一边打着电话一边还须要抬头看到底队伍排到你了没有。若是把打电话和观察排队的位置当作是程序的两个操做的话,这个
程序须要在这两种不一样的行为之间来回的切换,效率可想而知是低下的;而异步非阻塞形式却没有这样的问题,由于打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)
的事情,程序没有在两种不一样的操做中来回切换。
(1)同步阻塞形式
效率最低。拿上面的举例来讲,就是你专心排队,什么别的事都不作
(2)异步阻塞形式
若是在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知),也就是领了一张小纸条,假如在这段时间里他不能离开银行去作其它的事情,那么很显然,这我的被阻塞在了这个等待的操做上面;异步操做也能够被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知书时被阻塞。
(3)同步非阻塞形式
其实是效率低下的。想象一下你一边打着电话一边还须要抬头看到底队伍排到你了没有,若是把打电话和观察排队的位置当作是程序的两个操做的话,这个程序须要在这两种不一样的行为之间来回的切换
,效率可想而知是低下的。
(4)异步非阻塞形式
效率更高,由于打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不一样的操做中来回切换。
好比说,这我的忽然发觉本身烟瘾犯了,须要出去抽根烟,因而他告诉大堂经理说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操做上面,天然这个就是异步+非阻塞的方式了。
不少人会把同步和阻塞混淆,是由于不少时候同步操做会以阻塞的形式表现出来
不少人也会把异步和非阻塞混淆,由于异步操做通常都不会在真正的IO操做处被阻塞
。
一、multiprocessing模块
python中的多线程没法利用多核优点,若是想要充分的使用CPU资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。Python中提供了multiprocess模块。multiprocess中几乎包含了和进程有关的全部子模块。大体分为四个部分:建立进程部分、进程同步部分、进程池部分、进程之间数据共享。multiprocessing经常使用组件及功能:
1.一、管理进程模块:
1.二、同步子进程模块:
二、Array,Value---共享数据
若是你真有须要共享数据, multiprocessing提供了两种方式。
(1)multiprocessing,Array,Value
数据能够用Value或Array存储在一个共享内存地图里,以下:
from multiprocessing importArray,Value,Process def func(a,b): a.value = 3.333333333333333 for i in range(len(b)): b[i] = -b[i] if __name__ == "__main__": num = Value('d',0.0) arr = Array('i',range(11)) c = Process(target=func,args=(num,arr)) d= Process(target=func,args=(num,arr)) c.start() d.start() c.join() d.join() print(num.value) for i in arr: print(i)<br> #输出: #3.1415927 #[0, -1, -2,-3, -4, -5, -6, -7, -8, -9]
建立num和arr时,“d”和“i”参数由Array模块使用的typecodes建立:“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。
Array(‘i’, range(10))中的‘i’参数:
‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte
‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint
‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double
(2)Manager
由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。
from multiprocessing importProcess,Manager def f(d,l): d["name"] = "zhangyanlin" d["age"] = 18 d["Job"] = "pythoner" l.reverse() if __name__ == "__main__": with Manager() as man: d = man.dict() l = man.list(range(10)) p = Process(target=f,args=(d,l)) p.start() p.join() print(d) print(l) #输出: #{0.25: None, 1: '1', '2': 2} #[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Process模块是一个建立进程的模块,借助这个模块,就能够完成进程的建立。
Process([group [, target [, name [, args [, kwargs]]]]]) 强调: 1. 须要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
方法介绍:
1 p.start():启动进程,并调用该子进程中的p.run() 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法 3 p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁 4 p.is_alive():若是p仍然运行,返回True 5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
属性介绍:
1 p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程。设定为True后,p不能建立本身的新进程,必须在p.start()以前设置 2 p.name:进程的名称 3 p.pid:进程的pid 4 p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束(了解便可) 5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性(了解便可)
#方法一; import os from multiprocessing import Process def func1(name): print('hello', name) print("我是子进程: %d;个人父进程id是:%d" % (os.getpid(), os.getppid())) def func2(): print('hello') if __name__ == '__main__': p1 = Process(target=func1, args=('xiaobai',)) # 此处传参必须是元组数据类型 p1.start() print("我是父进程:%d" % os.getpid()) p2 = Process(target=func2) p2.start() ''' # 执行结果 我是父进程:12612 hello xiaobai 我是子进程: 5760; 个人父进程id是:12612 '''
# 方法二:# 经过继承Process类的形式开启进程的方式 import os from multiprocessing import Process class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): #固定名字run !!! print(os.getpid()) print('%s 正在和女神聊天' % self.name) if __name__ == '__main__': p1 = MyProcess('xiaobai') p2 = MyProcess('xiaohei') p1.start() # start会自动调用run方法 p2.start() # 说明:若是须要传参,必须写入到__init__方法里面,且必须加上super().__init__();由于父类Process里面也有__init__方法。
Process对象的join方法
import time from multiprocessing import Process def func(name): print("hello", name) time.sleep(1) print('我是子进程') if __name__ == '__main__': p = Process(target=func, args=('xiaobai',)) p.start() p.join() # 加上join方法后,父进程就会阻塞等待子进程结束而结束。 print("父进程")
Process开启多进程
多个进程同事运行(注意,子进程的执行顺序不是根据自动顺序决定的)
import time from multiprocessing import Process def func(name): print("hello 进程 %d" % name ) time.sleep(1) if __name__ == '__main__': for i in range(10): p = Process(target=func, args=(i,)) p.start()
import time from multiprocessing import Process def func(name): print("hello 进程 %d" % name ) time.sleep(0.1) if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=func, args=(i,)) p.start() p_lst.append(p) p.join() print("父进程执行中")
进程之间的数据隔离问题
from multiprocessing import Process n = 100 #在windows系统中把全局变量定义在if __name__ == '__main__'之上就能够了 def work(): global n n = 0 print("子进程内:", n) if __name__ == '__main__': p = Process(target=work) p.start() print("主进程内:", n)
主进程建立守护进程,守护进程会随着主进程的结束而结束。守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children
import time from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止. #可能p1执行的打印信息任务会由于主进程打印(main----)被终止.
socket聊天并发实例
from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': #windows下start进程必定要写到这下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start() 使用多进程实现socket聊天并发-server
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8')) 使用多进程实现socket聊天并发-client
当多个进程使用同一份数据资源的时候,就会引起数据安全或顺序混乱问题。
# 多进程抢占输出资源 import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) if __name__ == '__main__': for i in range(3): p = Process(target=work, args=(i,)) p.start() # 执行结果 """ 0: 14316 is running 1: 9900 is running 2: 10056 is running 1: 9900 is done 2: 10056 is done 0: 14316 is done """
# 使用锁维护执行顺序 import os import time import random from multiprocessing import Process, Lock def work(lock, n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock = Lock() for i in range(3): p = Process(target=work, args=(lock, i)) p.start() # 执行结果 """ 0: 15276 is running 0: 15276 is done 1: 6360 is running 1: 6360 is done 2: 14776 is running 2: 14776 is done """
上面这种状况虽然使用加锁的形式实现了顺序的执行,可是程序又从新变成串行了,没错,加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行修改,速度是慢了,但牺牲了速度却保证了数据的安全性。所以咱们最好找寻一种解决方案可以兼顾:一、效率高(多个进程共享一块内存的数据)二、帮咱们处理好锁问题,这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道
队列和管道都是将数据存放于内存中,队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来,咱们应该尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可扩展性。
四、进程池(Using a pool of workers)
Pool类描述了一个工做进程池,他有几种不一样的方法让任务卸载工做进程。
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。咱们能够用Pool类建立一个进程池, 展开提交的任务给进程池。 例:
#apply from multiprocessing import Pool import time def f1(i): time.sleep(0.5) print(i) return i + 100 if __name__ == "__main__": pool = Pool(5) for i in range(1,31): pool.apply(func=f1,args=(i,)) #apply_async def f1(i): time.sleep(0.5) print(i) return i + 100 def f2(arg): print(arg) if __name__ == "__main__": pool = Pool(5) for i in range(1,31): pool.apply_async(func=f1,args=(i,),callback=f2) pool.close() pool.join()
一个进程池对象能够控制工做进程池的哪些工做能够被提交,它支持超时和回调的异步结果,有一个相似map的实现。
注意:Pool对象的方法只能够被建立pool的进程所调用。
apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,因为这个缘由,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : 是apply()的一个变体,会返回一个结果对象。若是callback被指定,那么callback能够接收一个参数而后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被当即完成,不然处理结果的线程会被阻塞。
close() : 阻止更多的任务提交到pool,待任务完成后,工做进程会退出。
terminate() : 无论任务是否完成,当即中止工做进程。在对pool对象进程垃圾回收的时候,会当即调用terminate()。
join() : wait工做线程的退出,在调用join()前,必须调用close() or terminate()。这样是由于被终止的进程须要被父进程调用wait(join等价与wait),不然进程会成为僵尸进程。
map(func, iterable[, chunksize])
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
imap(func, iterable[, chunksize])
imap_unordered(func, iterable[, chunksize])
starmap(func, iterable[, chunksize])
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
threading 模块创建在 _thread 模块之上。thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块经过对 thread 进行二次封装,提供了更方便的 api 来处理线程。Thread方法:
t.start() : 激活线程,
t.getName() : 获取线程的名称
t.setName() : 设置线程的名称
t.name : 获取或设置线程的名称
t.is_alive() : 判断线程是否为激活状态
t.isAlive() :判断线程是否为激活状态
t.setDaemon() 设置为后台线程或前台线程(默认:False);经过一个布尔值设置线程是否为守护线程,必须在执行start()方法以后才可使用。若是是后台线程,主线程执行过程当中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均中止;若是是前台线程,主线程执行过程当中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序中止
t.isDaemon() : 判断是否为守护线程
t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法以后该属性才有效,不然它只返回None。
t.join() :逐个执行每一个线程,执行完毕后继续往下执行,该方法使得多线程变得无心义
t.run() :线程被cpu调度后自动执行线程对象的run方法
from threading import Thread from threading import currentThread # 获取当前线程对象的 对象 import time def task(): print('%s is runing' %currentThread().getName()) # 获取线程名 time.sleep(2) print('%s is down' % currentThread().getName()) if __name__ == '__main__': t = Thread(target=task, name='这里设置子线程初始化名') t.start() t.setName('设置线程名') # !!!! t.join() # 等待子线程运行结束 # currentThread() 等同于 线程对象t 因此获取线程名也能够t.getName() print('主线程', currentThread().getName()) # 但在主线程内(并无线程对象)要获取线程名必须用 currentThread().getName() t.isAlive() # 线程是否存活! 查看线程对象是否存活
#方法1 from threading import Thread # 建立线程的模块 def task(name): print(name) if __name__ == '__main__': # 开启线程 参数1:方法名(不要带括号) 参数2:参数(元祖) 返回对象 p = Thread(target=task, args=('线程1',)) p.start() # 只是给操做系统发送了一个就绪信号,并非执行。操做系统接收信号后安排cpu运行 print('主') #方法2 - 类的方法 from threading import Thread # 建立线程的模块 class MyThread(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): # 固定名字run !!!必须用固定名 print(self.name) if __name__ == '__main__': # 必需要这样启动 p = MyThread('子线程1') p.start() print('主)
二、线程锁threading.RLock和threading.Lock
因为线程之间是进行随机调度,而且每一个线程可能只执行n条执行以后,CPU接着执行其余线程。为了保证数据的准确性,引入了锁的概念。因此可能出现以下问题:
例:假设列表A的全部元素就为0,当一个线程从前向后打印列表的全部元素,另一个线程则从后向前修改列表的元素为1,那么输出的时候,列表的元素就会一部分为0,一部分为1,这就致使了数据的不一致。锁的出现解决了这个问题。
import threading import time globals_num = 0 lock = threading.RLock() def Func(): lock.acquire() # 得到锁 global globals_num globals_num += 1 time.sleep(1) print(globals_num) lock.release() # 释放锁 for i in range(10): t =threading.Thread(target=Func) t.start()
RLock容许在同一线程中被屡次acquire。而Lock却不容许这种状况。 若是使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。
三、threading.Event
python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。
import threading def do(event): print('start') event.wait() print('execute') event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() inp = input('input:') if inp == 'true': event_obj.set()
当线程执行的时候,若是flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。
五、threading.Condition
Condition类实现了一个conditon变量。 这个conditiaon变量容许一个或多个线程等待,直到他们被另外一个线程通知。 若是lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来作底层锁。不然,会建立一个新的Rlock对象,用来作底层锁。
wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,若是调用它的线程没有获得锁,那么会抛出一个RuntimeError 异常。 wati()释放锁之后,在被调用相同条件的另外一个进程用notify() or notify_all() 叫醒以前 会一直阻塞。wait() 还能够指定一个超时时间。
若是有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒全部在等待conditon变量的线程。 notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会马上返回他们的wait() 调用。除非线程调用notify()和notify_all()以后放弃了锁的全部权。
例子: 生产者-消费者模型,
import threading import time def consumer(cond): with cond: print("consumer before wait") cond.wait() print("consumer after wait") def producer(cond): with cond: print("producer before notifyAll") cond.notifyAll() print("producer after notifyAll") condition = threading.Condition() c1 = threading.Thread(name="c1", target=consumer, args=(condition,)) c2 = threading.Thread(name="c2", target=consumer, args=(condition,)) p = threading.Thread(name="p", target=producer, args=(condition,)) c1.start() time.sleep(2) c2.start() time.sleep(2) p.start()
五、queue模块
Queue 就是对队列,它是线程安全的,,举例来讲,咱们去麦当劳吃饭。饭店里面有厨师职位,前台负责把厨房作好的饭卖给顾客,顾客则去前台领取作好的饭。这里的前台就至关于咱们的队列。造成管道样,厨师作好饭经过前台传送给顾客,所谓单向队列
这个模型也叫生产者-消费者模型。
import queue q = queue.Queue(maxsize=0) # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。 q.join() # 等到队列为kong的时候,在执行别的操做 q.qsize() # 返回队列的大小 (不可靠) q.empty() # 当队列为空的时候,返回True 不然返回False (不可靠) q.full() # 当队列满的时候,返回True,不然返回False (不可靠) q.put(item, block=True, timeout=None) # 将item放入Queue尾部,item必须存在,能够参数block默认为True,表示当队列满时,会等待队列给出可用位置, 为False时为非阻塞,此时若是队列已满,会引起queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,事后, 若是队列没法给出放入item的位置,则引起 queue.Full 异常 q.get(block=True, timeout=None) # 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,若是队列为空,则阻塞,为False时,不阻塞, 若此时队列为空,则引起 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,事后,若是队列为空,则引起Empty异常。 q.put_nowait(item) # 等效于 put(item,block=False) q.get_nowait() # 等效于 get(item,block=False)
代码以下:
#!/usr/bin/env python import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t =threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t =threading.Thread(target=consumer, args=(i,)) t.start()
协程存在的意义:对于多线程应用,CPU经过切片的方式来切换线程间的执行,线程切换时须要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。协程的适用场景:当程序中存在大量不须要CPU的操做时(IO),适用于协程;
import time def A(): while 1: print('------A-----') time.sleep(0.1) yield() def B(): while 1: print('-------B-----') time.sleep(0.1) next(a) a = A() B()
执行结果:
-------B----- ------A----- -------B----- ------A----- -------B----- ------A----- -------B----- ------A----- -------B----- ------A----- ···
yield能实现协程,不过实现过程不易于理解,greenlet是在这方面作了改进。
from greenlet import greenlet import time def A(): while 1: print('-------A-------') time.sleep(0.5) g2.switch() def B(): while 1: print('-------B-------') time.sleep(0.5) g1.switch() g1 = greenlet(A) #建立协程g1 g2 = greenlet(B) g1.switch() #跳转至协程g1
执行结果:
-------A------- -------B------- -------A------- -------B------- -------A------- ···
greenlet能够实现协程,不过每一次都要人为的去指向下一个该执行的协程,显得太过麻烦。gevent每次遇到io操做,须要耗时等待时,会自动跳到下一个协程继续执行。
gevent 是一个第三方库,能够轻松经过gevent实现协程程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。gevent会主动识别程序内部的IO操做,当子程序遇到IO后,切换到别的子程序。若是全部的子程序都进入IO,则阻塞。
import gevent def A(): while 1: print('-------A-------') gevent.sleep(1) #用来模拟一个耗时操做,注意不是time模块中的sleep def B(): while 1: print('-------B-------') gevent.sleep(0.5) #每当碰到耗时操做,会自动跳转至其余协程 g1 = gevent.spawn(A) # 建立一个协程 g2 = gevent.spawn(B) g1.join() #等待协程执行结束 g2.join()
执行结果:
-------A------- -------B------- -------B------- -------A------- -------B------- -------B------- -------A------- -------B------- -------B------- ···
import gevent from gevent import monkey,socket monkey.patch_all() #有IO才作时须要这一句 s = socket.socket(2,1) #用的都是gevent模块中的socket,但用法同样 s.setsockopt(1,2,1) s.bind(('',8080)) s.listen(1024) def func_accept(): while 1: cs,userinfo = s.accept() print('来了一个客户'+str(userinfo)) g = gevent.spawn(func_recv,cs) #每当有用户链接,增长一条协程 def func_recv(cs): while 1: recv_data = cs.recv(1024) print(recv_data) #程谁堵塞了,便会跳转至其余协程 if len(recv_data) > 0: cs.send(recv_data) else: cs.close() break g1 = gevent.spawn(func_accept) g1.join()