目录python
queue队列:使用import queue
,用法与进程Queue同样编程
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(先进先出): first second third '''
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(后进先出): third second first '''
class queue.PriorityQueue(maxsize=0)数组
import queue q=queue.PriorityQueue() #put进入一个元组,元组的第一个元素是优先级(一般是数字,也能够是非数字之间的比较),数字越小优先级越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 结果(数字越小优先级越高,优先级高的优先出队): (10, 'b') (20, 'a') (30, 'c') '''
exception queue.Empty:在Queue对象为空时调用非阻塞get()(或get_nowait())时引起异常。并发
exception queue.Full:在已满的Queue对象上调用非阻塞put()(或put_nowait())时引起异常。app
Queue.qsize()异步
Queue.empty():若是为空则返回Trueasync
Queue.full():若是已满,则返回True异步编程
Queue.put(item, block=True, timeout=None):将项目放入队列。若是可选的args块为true且timeout为None(默认值),则在必要时阻塞,直到有空闲插槽可用。若是timeout是一个正数,它会阻止最多超时秒,若是在该时间内没有可用的空闲槽,则会引起Full异常。不然(块为假),若是空闲插槽当即可用,则将项目放入队列,不然引起彻底异常(在这种状况下忽略超时)。函数
Queue.put_nowait(item):至关于put(item,False)。学习
Queue.get(block=True, timeout=None):从队列中删除并返回一个项目。若是可选的args块为true且timeout为None(默认值),则在必要时阻止,直到某个项可用为止。若是timeout是一个正数,它会阻止最多超时秒,若是在该时间内没有可用的项,则会引起Empty异常。不然(块为假),若是一个项当即可用则返回一个项,不然引起Empty异常(在这种状况下忽略超时)。
Queue.get_nowait():至关于get(False)。
提供了两种方法来支持跟踪守护进程消费者线程是否已彻底处理入队任务。
Queue.task_done():表示之前排队的任务已完成。由队列使用者线程使用。对于用于获取任务的每一个get(),对task_done()的后续调用会告知队列该任务的处理已完成。
若是join()当前正在阻塞,则它将在全部项目都已处理后恢复(这意味着已为每一个已放入队列的项目收到task_done()调用)。
若是调用的次数超过队列中放置的项目,则引起ValueError。
Queue.join(): block直到queue被消费完毕。
from threading import Timer,current_thread def task(x): print('%s run....' %x) print(current_thread().name) if __name__ == '__main__': t=Timer(3,task,args=(10,)) # 3s后执行该线程 t.start() print('主')
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor:进程池,提供异步调用
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{currentThread().name} 在执行任务 {i}') # print(f'进程 {current_process().name} 在执行任务 {i}') time.sleep(1) return i**2 if __name__ == '__main__': pool = ThreadPoolExecutor(4) # 池子里只有4个线程 # pool = ProcessPoolExecutor(4) # 池子里只有4个线程 fu_list = [] for i in range(20): # pool.submit(task,i) # task任务要作20次,4个线程负责作这个事 future = pool.submit(task,i) # task任务要作20次,4个进程负责作这个事 # print(future.result()) # 若是没有结果一直等待拿到结果,致使了全部的任务都在串行 fu_list.append(future) pool.shutdown() # 关闭了池的入口,会等待全部的任务执行完,结束阻塞. for fu in fu_list: print(fu.result())
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{currentThread().name} 在执行任务 {i}') # print(f'进程 {current_process().name} 在执行任务 {i}') time.sleep(1) return i**2 def parse(future): # 处理拿到的结果 print(future.result()) if __name__ == '__main__': pool = ThreadPoolExecutor(4) # 池子里只有4个线程 # pool = ProcessPoolExecutor(4) # 池子里只有4个线程 fu_list = [] for i in range(20): # pool.submit(task,i) # task任务要作20次,4个线程负责作这个事 future = pool.submit(task,i) # task任务要作20次,4个进程负责作这个事 future.add_done_callback(parse) # 为当前任务绑定了一个函数,在当前任务执行结束的时候会触发这个函数, # 会把future对象做为参数传给函数 # 这个称之为回调函数,处理完了回来就调用这个函数. # print(future.result()) # 若是没有结果一直等待拿到结果,致使了全部的任务都在串行 # pool.shutdown() # 关闭了池的入口,会等待全部的任务执行完,结束阻塞. # for fu in fu_list: # print(fu.result())
协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的。
须要强调的是:
对比操做系统控制线程的切换,用户在单线程内控制协程的切换。
优势以下:
缺点以下:
总结协程特色:
g1=gevent.spawn(func,1,,2,3,x=4,y=5):建立一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面能够有多个参数,能够是位置实参或关键字实参,都是传给函数eat的
g2=gevent.spawn(func2)
g.join():等待g1结束
g2.join():等待g2结束
上述两步合做一步:gevent.joinall([g1,g2])
g1.value:拿到func1的返回值
Gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): # 同步 for i in range(10): task(i) def asynchronous(): # 异步 g_l=[spawn(task,i) for i in range(10)] joinall(g_l) print('DONE') if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() # 上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 # 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数, # 后者阻塞当前流程,并执行全部给定的greenlet任务。执行流程只会在 全部greenlet执行完后才会继续向下走。
from gevent import monkey;monkey.patch_all() # gevent识别不了 time.sleep等io操做 须要打monkey补丁才能识别 import gevent import time def eat(): print('eat food 1') time.sleep(2) print('eat food 2') def play(): print('play 1') time.sleep(1) print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) print('主')