线程的一个关键特性是每一个线程都是独立运行且状态不可预测。若是程序中的其余线程须要经过判断某个线程的状态来肯定本身下一步的操做,这时线程同步问题就 会变得很是棘手。为了解决这些问题,咱们须要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它容许线程等待某些事件的发生。在初始状况下,Event对象中的信号标志被设置为假。若是有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程若是将一个Event对象的信号标志设置为真,它将唤醒全部等待这个Event对象的线程。若是一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行。python
event.isSet(): 返回event的状态值True或者False; event.wait(): 若是 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度; event.clear(): 恢复event的状态值为False。
能够考虑一种应用场景(仅仅做为说明),例如,咱们有多个线程从Redis队列中读取数据来处理,这些线程都要尝试去链接Redis的服务,通常状况下,若是Redis链接不成功,在各个线程的代码中,都会去尝试从新链接。若是咱们想要在启动时确保Redis服务正常,才让那些工做线程去链接Redis服务器,那么咱们就能够采用threading.Event机制来协调各个工做线程的链接操做:主线程中会去尝试链接Redis服务,若是正常的话,触发事件,各工做线程会尝试链接Redis服务。redis
import threading,time event = threading.Event() def foo(): while not event.is_set(): print('wait....') event.wait() print('Connect to redis server') print('attempt to start redis server') for i in range(5): t = threading.Thread(target=foo) t.start() time.sleep(10) event.set() ''' 运行结果: attempt to start redis server wait.... wait.... wait.... wait.... wait.... Connect to redis server Connect to redis server Connect to redis server Connect to redis server Connect to redis server '''
import threading,time,logging logging.basicConfig(level=logging.DEBUG, format='%(threadName)-10s %(message)s') def worker(event): logging.debug('Waiting for redis ready...') event.wait() logging.debug('redis ready,and connect to redis server and do some work [%s]',time.ctime()) time.sleep(1) def main(): readis_ready=threading.Event() t1=threading.Thread(target=worker,args=(readis_ready,),name='t1') t1.start() t2=threading.Thread(target=worker,args=(readis_ready,),name='t2') t2.start() logging.debug('first of all,check redis server,make sure it is OK,and then trigger the redis ready event') time.sleep(3) readis_ready.set() if __name__=='__main__': main()
threading.Event的wait方法还接受一个超时参数,默认状况下若是事件一致没有发生,wait方法会一直阻塞下去,而加入这个超时参数以后,若是阻塞时间超过这个参数设定的值以后,wait方法会返回。对应于上面的应用场景,若是Redis服务器一致没有启动,咱们但愿子线程可以打印一些日志来不断地提醒咱们当前没有一个能够链接的Redis服务,咱们就能够经过设置这个超时参数来达成这样的目的:服务器
import threading,time event = threading.Event() def foo(): while not event.is_set(): print('wait....') event.wait(2) print('Connect to redis server') print('attempt to start redis server') for i in range(2): t = threading.Thread(target=foo) t.start() time.sleep(5) event.set() ''' 运行结果: attempt to start redis server wait.... wait.... wait.... wait.... wait.... wait.... Connect to redis server Connect to redis server '''
def worker(event): while not event.is_set(): logging.debug('Waiting for redis ready...') event.wait(2) logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1)
这样,咱们就能够在等待Redis服务启动的同时,看到工做线程里正在等待的状况。多线程
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.app
''' 建立一个“队列”对象 import queue q = queue.Queue(maxsize = 10) queue.Queue类便是一个队列的同步实现。队列长度可为无限或者有限。可经过Queue的构造函数的可选参数 maxsize来设定队列长度。若是maxsize小于1就表示队列长度无限。 将一个值放入队列中 q.put(10) 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。若是队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。若是block为0,put方法将引起Full异常。 将一个值从队列中取出 q.get() 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。若是队列为空且block为True,get()就使调用线程暂停,直至有项目可用。若是队列为空且block为False,队列将引起Empty异常。 '''
import queue q = queue.Queue(3) q.put(11) q.put('hello') q.put(3.123) print(q.get()) print(q.get()) print(q.get()) ''' 运行结果: 11 hello 3.123 '''
''' join() 阻塞进程,直到全部任务完成,须要配合另外一个方法task_done。 def join(self): with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait() task_done() 表示某个任务完成。每一条get语句后须要一条task_done。 import queue q = queue.Queue(5) q.put(10) q.put(20) print(q.get()) q.task_done() print(q.get()) q.task_done() q.join() print("ending!") '''
import queue,threading q = queue.Queue(3) def foo(): q.put(11) q.put('hello') q.put(3.123) q.join() def bar(): print(q.get()) q.task_done() #注释掉本行,程序将不会结束。 t1 = threading.Thread(target=foo) t1.start() for i in range(3): t = threading.Thread(target=bar) t.start() ''' 运行结果: 11 hello 3.123 '''
''' 此包中的经常使用方法(q = queue.Queue()): q.qsize() 返回队列的大小 q.empty() 若是队列为空,返回True,反之False q.full() 若是队列满了,返回True,反之False q.full 与 maxsize 大小对应 q.get([block[, timeout]]) 获取队列,timeout等待时间 q.get_nowait() 至关q.get(False)非阻塞 q.put(item) 写入队列,timeout等待时间 q.put_nowait(item) 至关q.put(item, False) q.task_done() 在完成一项工做以后,q.task_done() 函数向任务已经完成的队列发送一个信号 q.join() 实际上意味着等到队列为空,再执行别的操做 '''
Python queue模块有三种队列及构造函数: 1、Python queue模块的FIFO队列先进先出。 class queue.Queue(maxsize) 2、LIFO相似于堆栈,即先进后出。 class queue.LifoQueue(maxsize) 3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) import queue #先进后出 q=queue.LifoQueue() q.put(34) q.put(56) q.put(12) print(q.get()) print(q.get()) print(q.get()) ''' 运行结果: 12 56 34 ''' #优先级 q=queue.PriorityQueue() q.put([5,100]) q.put([7,200]) q.put([3,"hello"]) q.put([4,{"name":"alex"}]) while 1: data=q.get() print(data) ''' 运行结果: [3, 'hello'] [4, {'name': 'alex'}] [5, 100] [7, 200] '''
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。dom
生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。异步
这就像,在餐厅,厨师作好菜,不须要直接和客户交流,而是交给前台,而客户去饭菜也不须要不找厨师,直接去前台领取便可,这也是一个解耦的过程。async
import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: print("making........") time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 print("ok......") def Consumer(name): count = 0 while count <10: time.sleep(random.randrange(3)) if not q.empty(): data = q.get() print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) p1.start() c1.start()
''' 运行结果: making........ Producer A has produced 0 baozi.. ok...... making........ Consumer B has eat 0 baozi... Producer A has produced 1 baozi.. ok...... making........ Producer A has produced 2 baozi.. ok...... making........ Consumer B has eat 1 baozi... Producer A has produced 3 baozi.. ok...... making........ Consumer B has eat 2 baozi... Consumer B has eat 3 baozi... Producer A has produced 4 baozi.. ok...... making........ Producer A has produced 5 baozi.. ok...... making........ Consumer B has eat 4 baozi... Consumer B has eat 5 baozi... Producer A has produced 6 baozi.. ok...... making........ Producer A has produced 7 baozi.. ok...... making........ Producer A has produced 8 baozi.. ok...... making........ Consumer B has eat 6 baozi... Consumer B has eat 7 baozi... Producer A has produced 9 baozi.. ok...... Consumer B has eat 8 baozi... Consumer B has eat 9 baozi... '''
Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.ide
因为GIL的存在,python中的多线程其实并非真正的多线程,若是想要充分地使用多核CPU的资源,在python中大部分状况须要使用多进程。函数
multiprocessing包是Python中的多进程管理包。与threading.Thread相似,它能够利用multiprocessing.Process对象来建立一个进程。该进程能够运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象能够像多线程那样,经过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。因此,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
# Process类调用 from multiprocessing import Process import time def f(name): print('hello', name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin:%s'%i,)) p_list.append(p) p.start() for i in p_list: p.join() print('end') ''' 运行结果: hello alvin:0 Wed Jul 19 16:06:40 2017 hello alvin:2 Wed Jul 19 16:06:40 2017 hello alvin:1 Wed Jul 19 16:06:40 2017 end '''
#继承Process类调用 from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() # self.name = name def run(self): print ('hello', self.name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end') ''' 运行结果: hello MyProcess-3 Wed Jul 19 16:09:39 2017 hello MyProcess-1 Wed Jul 19 16:09:39 2017 hello MyProcess-2 Wed Jul 19 16:09:39 2017 end '''
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前尚未实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,若是实例进程时未制定传入target,这star执行t默认run()方法。
terminate():无论任务是否完成,当即中止工做进程
属性:
daemon:和线程的setDeamon功能同样
name:进程名字。
pid:进程号。
from multiprocessing import Process import os import time def info(name): print("name:",name) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("------------------") time.sleep(1) if __name__ == '__main__': info('main process line') p1 = Process(target=info, args=('alvin',)) p2 = Process(target=info, args=('egon',)) p1.start() p2.start() p1.join() p2.join() print("ending") ''' 运行结果: name: main process line parent process: 3400 process id: 1712 ------------------ name: alvin parent process: 1712 process id: 8428 ------------------ name: egon parent process: 1712 process id: 8212 ------------------ ending '''
from multiprocessing import Process, Queue def f(q,n): q.put(n*n+1) print("son process",id(q)) if __name__ == '__main__': q = Queue() #若是使用线程间的队列queue.Queue则没法运行 print("main process",id(q)) for i in range(3): p = Process(target=f, args=(q,i)) p.start() print(q.get()) print(q.get()) print(q.get()) ''' 运行结果: main process 41655376 son process 45073408 1 son process 44942336 2 son process 44942392 5 '''
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).
pipe()函数返回由管道链接的一对链接对象,该管道默认是双向的(双向的)。
For example:
from multiprocessing import Process, Pipe def f(conn): conn.send([12, {"name": "yuan"}, 'hello']) response = conn.recv() print("response", response) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() #管道两个对象 p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) parent_conn.send("儿子你好!") p.join() ''' 运行结果: [12, {'name': 'yuan'}, 'hello'] response 儿子你好! '''
Pipe()返回的两个链接对象表明管道的两端。 每一个链接对象都有send()和recv()方法(等等)。 请注意,若是两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏
Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另外一个进程的数据。
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
manager()返回的manager对象控制一个保存Python对象的服务器进程,并容许其余进程使用代理来操做它们。
from multiprocessing import Process, Manager def f(d, l, n): d[n] = n d["name"] ="alvin" l.append(n) #print("l",l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() #字典 l = manager.list(range(5)) #列表 print(d,'\n',l) p_list = [] for i in range(10): p = Process(target=f, args=(d,l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l) ''' 运行结果: {} 初始化的字典 [0, 1, 2, 3, 4] 初始化的列表 {3: 3, 'name': 'alvin', 0: 0, 2: 2, 7: 7, 5: 5, 4: 4, 1: 1, 6: 6, 8: 8, 9: 9} [0, 1, 2, 3, 4, 3, 0, 2, 7, 5, 4, 1, 6, 8, 9]
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
from multiprocessing import Pool import time def foo(args): time.sleep(5) print(args) if __name__ == '__main__': p = Pool(5) for i in range(30): p.apply_async(func=foo, args= (i,)) p.close() # 等子进程执行完毕后关闭进程池 # time.sleep(2) # p.terminate() # 马上关闭进程池 p.join() # 没有join会当即结束
进程池中有如下几个主要方法:
一、设计五个线程,2个生产者3消费者:一个生产者每秒钟生产1一个产品放入队列,一个生产者每秒钟生产2个产品放入队列。
每一个消费者每秒钟从队列中消费1-5之间的一个随机数个产品。
对于生产者:
队列多于10个时,生产者等待,不然生产者继续生产;
对于消费者:
队列空时,消费者等待,队列有产品时,消费者继续消费。
每一个产品有本身独特的标记。
import threading,time,queue,random class Producer(threading.Thread): def __init__(self,name,i): super().__init__() self.name=name self.i=i def run(self): while True: time.sleep(self.i) if q.qsize()<10: a=random.choice(['baozi','jianbing','doujiang'])+str(random.randint(1,10)) q.put(a) print('%s produce %s current menu %s'%(self.name,a,q.queue)) class Consumer(threading.Thread): def __init__(self,name,q): super().__init__() self.name=name def run(self): while True: time.sleep(1) if not q.empty(): for i in range(random.randint(1,5)): a=q.get() print('%s eat %s'%(self.name,a)) if __name__ == '__main__': q = queue.Queue() p=Producer('egon0',1) p.start() p = Producer('egon1', 0.5) p.start() for i in range(3): c=Consumer('yuan%s'%i,q) c.start() 参考答案
二、设计一个关于红绿灯的线程,5个关于车的线程;
对于车线程,每隔一个随机秒数,判断红绿灯的状态,是红灯或者黄灯,打印waiting;是绿灯打印running。
对于红绿灯线程: 首先默认是绿灯,作一个计数器,十秒前,每隔一秒打印“light green”;第十秒到第十三秒,每隔一秒打印“light yellow”,13秒到20秒, ‘light red’,20秒之后计数器清零。从新循环。
知识点:event对象(提示:event对象即红绿灯,为true是即绿灯,false时为黄灯或者红灯)
import threading,random,time event=threading.Event() def traffic_lights(): count=0 lights=['green light','yellow light','red light'] current_light=lights[0] while True: while count<10: print(current_light,9-count) count+=1 time.sleep(1) else: current_light=lights[1] event.set() while count<13: print(current_light,12-count) count+=1 time.sleep(1) else: current_light=lights[2] while count<20: print(current_light,19-count) count += 1 time.sleep(1) if count == 20: count=0 current_light=lights[0] event.clear() break def car(name): print(name,'starting...') while True: time.sleep(random.randint(1,4)) if not event.is_set(): print('%s is running'%name) else: print('%s is waiting'%name) if __name__ == '__main__': t=threading.Thread(target=traffic_lights) t.start() for i in range(5): c=threading.Thread(target=car,args=('car%s'%(i+1),)) c.start() 参考答案