消息队列:multiprocessing.Queue,Queue是对进程安全的队列,可使用Queue实现对进程之间的数据传输;还有一个重要做用是做为缓存使用。html
Queue(maxsize = 0) method of multiprocessing, returns a queue obiectpython
Queue(maxzize = 0)建立一个队列对象,maxsize 表示队列中最多存放消息的数量。编程
返回一个队列对象缓存
put(obj [, block=True[, timeout]])
调用队列对象的put()方法将obj插入到队列中,安全
第一个obj为必需参数,为插入项目的值;存入消息的种类不限制。多线程
第二个block为可选参数,默认为True,app
当block为True,timeout为空时,q.put([1,2,3])、q.put([1,2,3],True) 表示将序列插入到队尾,阻塞调用,若是q队列满时,一直等待(无超时限制的阻塞调用)。dom
当block为True,timeout为正整数时,q.put([1,],True,2) 表示阻塞调用进程最多timeout秒,若是超过该时间仍无空间可用,则抛出Queue.Full异常(带超时的阻塞调用)。函数
当block为False,q.put([1,], False) 表示调用进程时若是有空闲空间则将数据放入队列中,不然当即抛出Queue.Full异常。post
简而言之,timeout表示超时等待时间,当队列满时,再存入消息就会发生阻塞(True条件下有效),阻塞时间超过timeout等待时间则抛出异常。
get([block=True[, timeout]])
get方法能够将队列中读取并删除一个元素。
实际上,get()方法的使用与put()函数相似
第一个block可选参数,默认为True。
当block为True,timeout为空时,阻塞等待取值,直到取到为止。
当block为True,timeout为正整数时,在timeout时间内没有取到任何元素,则会抛出Queue.Empty异常;
当block为False时,若是能够取到至时,则会马上返回该值,若是没有取到元素则会当即抛出Queue.Empty异常。
q.full() 判断队列是否为满;若满,返回True,若不满,返回False
q.empty() 判断队列是否为空,若为空,则返回True
q.qsize() 获取队列中消息数量
示例1
from multiprocessing import Queue #建立队列对象,队列中最大消息数量为3 q = Queue(maxsize = 3) #存入消息 q.put(1) q.put('hello')#以字符串存入队列中 q.put([1,2,3,4]) q.put('hello world',True,5)
运行
Traceback (most recent call last): File "a.py", line 10, in <module> q.put('hello world',True,5) File "/usr/lib/python3.5/multiprocessing/queues.py", line 83, in put raise Full queue.Full
因最后存入时已经超过队列的最大数量,阻塞5秒后报错
示例2
from multiprocessing import Queue #建立队列对象,队列中最大消息数量为3 q = Queue(maxsize = 3) #存入消息 q.put(1) q.put('hello')#以字符串存入队列中 q.put([1,2,3,4]) # 判断队列是否为满 print(q.full())# True # 队列中消息数量 print(q.qsize())# 3 # 判断队列是否为空 print(q.empty()) # False # 取出消息 print(q.get())# 1 print(q.qsize())# 2 print(q.get()) # hello print(q.get()) # [1,2,3,4] #判断队列是否为空 print(q.empty()) #True print(q.get(True,3)) #报错queue.Empty
运行
True 3 False 1 2 hello [1, 2, 3, 4] True Traceback (most recent call last): File "a.py", line 25, in <module> print(q.get(True,3)) #报错 File "/usr/lib/python3.5/multiprocessing/queues.py", line 105, in get raise Empty queue.Empty
多个进程间的通讯 示例1
import multiprocessing def writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print(q.get(block = False)) except: pass if __name__ == "__main__": q = multiprocessing.Queue() writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join()
运行结果为 1 或 在终端上没有任何显示
实际上,这时因为多线程代码各自运行所致,若是取出先运行,同时又有报错处理设置,因此不会报错;而是没有任何输出。
为了达到目的,能够在取出函数中增长sleep()函数。
import multiprocessing import time def writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): time.sleep(1) try: print(q.get(block = False)) except: pass if __name__ == "__main__": q = multiprocessing.Queue() writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join()
通过改良后,每一次运行代码都可达到输入输出的目的。
多个子进程间的通讯示例2
多个子进程间的通讯就要采用第一步中的队列Queue,好比,有如下需求,一个子进程向队列中写数据,另外一个进程从队列中取数据,
# _*_ encoding:utf-8 _*_ from multiprocessing import Process,Queue,Pool,Pipe import os,time,random #写数据进程执行的代码: def write(p): for value in ['A','B','C']: print ('Write---Before Put value---Put %s to queue...' % value) p.put(value) print ('Write---After Put value') time.sleep(random.random()) print ('Write---After sleep') #读数据进程执行的代码: def read(p): while True: print ('Read---Before get value') value = p.get(True) print ('Read---After get value---Get %s from queue.' % value) if __name__ == '__main__': #父进程建立Queue,并传给各个子进程: p = Queue() pw = Process(target=write,args=(p,)) pr = Process(target=read,args=(p,)) #启动子进程pw,写入: pw.start() #启动子进程pr,读取: pr.start() #等待pw结束: pw.join() #pr进程里是死循环,没法等待其结束,只能强行终止: pr.terminate()
运行
Write---Before Put value---Put A to queue... Read---Before get value Write---After Put value Read---After get value---Get A from queue. Read---Before get value Write---After sleep Write---Before Put value---Put B to queue... Write---After Put value Read---After get value---Get B from queue. Read---Before get value Write---After sleep Write---Before Put value---Put C to queue... Write---After Put value Read---After get value---Get C from queue. Read---Before get value Write---After sleep
关于锁的应用,在不一样程序间若是有同时对同一个队列操做的时候,为了不错误,能够在某个函数操做队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操做,锁也要在manager对象中的锁
示例3
from multiprocessing import Process,Queue import time process_list = [] q = Queue() def fun(name): time.sleep(1) q.put('hello' + str(name)) for i in range(10): p = Process(target = fun,args = (i,)) p.start() process_list.append(p) for i in process_list: i.join() while not q.empty(): print(q.get())
运行
hello0
hello1
hello3
hello2
hello4
hello6
hello5
hello9
hello8
hello7
参考: