我的理解,并发是计算机在逻辑上能处理多任务的能力。通常分类三种类型:编程
这里还有一个概念须要注意,在使用并发的时候弄清楚须要并发的任务是计算密集仍是IO密集。
由于异步对于计算密集的任务是无效的。由于异步的本质是 IO 操做过程当中阻塞时的控制权交换。在计算密集的任务中是没有这样的阻塞的。segmentfault
前面说了异步的本质是控制权的交换,这里经过一个生产者消费者模型的例子来体会一下这么个过程。网络
def consumer(): # 定义消费者,因为有yeild关键词,此消费者为一个生成器 print("[Consumer] Init Consumer ......") r = "init ok" # 初始化返回结果,并在启动消费者时,返回给生产者 while True: n = yield r # 消费者经过yield接收生产者的消息,同时返给其结果 print("[Consumer] conusme n = %s, r = %s" % (n, r)) r = "consume %s OK" % n # 消费者消费结果,下个循环返回给生产者 def produce(c): # 定义生产者,此时的 c 为一个生成器 print("[Producer] Init Producer ......") r = c.send(None) # 启动消费者生成器,同时第一次接收返回结果 print("[Producer] Start Consumer, return %s" % r) n = 0 while n < 5: n += 1 print("[Producer] While, Producing %s ......" % n) r = c.send(n) # 向消费者发送消息并准备接收结果。此时会切换到消费者执行 print("[Producer] Consumer return: %s" % r) c.close() # 关闭消费者生成器 print("[Producer] Close Producer ......") produce(consumer())
# 异步IO例子:适配Python3.5,使用async和await关键字 async def hello(index): # 经过关键字async定义协程 print('Hello world! index=%s, thread=%s' % (index, threading.currentThread())) await asyncio.sleep(1) # 模拟IO任务 print('Hello again! index=%s, thread=%s' % (index, threading.currentThread())) loop = asyncio.get_event_loop() # 获得一个事件循环模型 tasks = [hello(1), hello(2)] # 初始化任务列表 loop.run_until_complete(asyncio.wait(tasks)) # 执行任务 loop.close() # 关闭事件循环列表
async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: print(url, resp.status) print(url, await resp.text()) loop = asyncio.get_event_loop() # 获得一个事件循环模型 tasks = [ # 初始化任务列表 get("http://zhushou.360.cn/detail/index/soft_id/3283370"), get("http://zhushou.360.cn/detail/index/soft_id/3264775"), get("http://zhushou.360.cn/detail/index/soft_id/705490") ] loop.run_until_complete(asyncio.wait(tasks)) # 执行任务 loop.close() # 关闭事件循环列表
这里以进程的multiprocessing
模块举例,线程可使用multiprocessing.dummy
,全部的API均相同。session
import multiprocessing as mp ############## 直接实例化 ############ def func(number): result = number * 2 p = mp.Process(target=func, args=(3, )) #实例化进程对象 p.start() #运行进程 ############ 类封装 ############# class MyProcess(mp.Process): def __init__(self, interval): mp.Process.__init__(self) # 须要重载的函数 def run(self): print('I'm running) p = MyProcess(1) p.start() ################################# p.terminal() # 主动结束进程 p.join() #让主进程等待子进程结束 # 一些经常使用的属性 p.pid #得到进程的id号 p.name #得到进程名 p.is_alive() #判断进程是否还存活 p.daemon = True #设置进程随主进程一块儿结束 mp.active_children() #得到当前进程的全部子进程 mp.current_process() #返回正在运行的进程 os.getpid() #得到当前进程的pid
from multiprocessing.dummy import Pool as ThreadPool tasks = list() def do_task(item): return item pool = ThreadPool(3) ################ 原始操做 ####################### for item in items: pool.apply_async(do_task, (item,)) #添加进程,非阻塞,返回执行结果 pool.apply(do_task, (item,)) #阻塞 ############## map操做 #####################3 results = pool.map(do_task, items) ################################ pool.close() #关闭进程池后不会有新的进程被建立 pool.join() #等到结束,必须在close后使用
# Lock(锁) # 限制对资源的访问 def func(lock): #使用with with lock: print('I got lock') def func(lock): #不使用with lock.acquire() #请求锁 try: print('I got lock') finally: lock.release() #释放锁 lock = mp.Lock() #申请锁 p = mp.Process(target=func, args=(lock,)) p.start() ############################################ # Semaphore(信号量) # 限制资源的最大链接数 def func(s): s.aquire() #请求链接 s.release() #断开链接 s = mp.Semaphore(2) #定义信号量的最大链接数 for i in range(5): p = mp.Process(target=func, arg=(s)) p.start ############################################ # Event(事件) # 进程间同步 def func(e): e.wait() #定义等待时间,默认等待到e.set()为止,阻塞 e.is_set() #判断消息是否被发出 print('got') e = mp.Event() p = mp.Process(target=func, args=(e,)) p.start() e.set() #发出消息 ############################################ # Queue(队列) # 多进程之间的数据传递 import Queue Queue.Queue(maxsize = 0) # 先进先出, maxsize小于等于则不限大小 Queue.LifoQueue(maxsize = 0) # 后进先出 Queue.PriorityQueue(maxsize = 0) # 构造一个优先级队列 #异常 Queue.Empty #当调用非阻塞的get()获取空队列的元素时, 引起异常 Queue.Full #当调用非阻塞的put()向满队列中添加元素时, 引起异常 # 生存者消费者模型 def produce(q): try: data = q.put(data, block=, timeout=) # 若block为False且队列已满,则当即抛出Queue.Full # 若block为True进程会阻塞timeout指定时间,直到队列有空间,不然抛出Queue.Full except: def cosume(q): try: q.get(block=, timeout=) #与上同理 except: q = mp.Queue() pro = mp.Process(target=produce, args=(q, )) cos = mp.Process(target=cosume, args=(q, )) pro.start() cos.start() pro.join() cos.join() ############################################ # Pipe(管道) # 多进程之间的数据传递 def func1(pipe): while True: pipe.send(1) def func2(pipe): while True: pipe.recv() #若是管道内无消息可接受,则会阻塞 pipe = mp.Pipe(duplex=) #参数默认为True即管道的两边都可收发 # 返回(conn1, conn2),当参数为False时conn1只能收信息,conn2只能发消息 p1 = mp.Process(target=func1, args=(pipe[0], )) p2 = mp.Process(target=func2, args=(pipe[1], )) p1.start() p2.start() p1.join() p2.join()
新的并发池模块concurrent.futures
再次封装了并发操做,能够用于量大但简单并发操做。
且进程线程通用关键字换一下就行。多线程
from concurrent.futures import ThreadPoolExecutor import time def working(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 建立一个最大可容纳2个task的线程池 worker1 = pool.submit(working, ("hello")) # 往线程池里面加入一个task worker2 = pool.submit(working, ("world")) # 往线程池里面加入一个task # submit 返回了一个future对象,即未完成的操做,咱们能够经过调用函数来查看其状态 worker1.done() # 判断task1是否结束 worker1.result() # 查看task1返回的结果 worker2.result() # 查看task2返回的结果
import concurrent.futures items = list() # 任务对象 def do_task(item): # 处理函数 return item #################### submit ######################### with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = {executor.submit(do_task, item): item for item in items} for future in concurrent.futures.as_completed(futures): item = futures[future] result = future.result() print(item, result) #################### map ######################### # map跟submit的区别在于submit是无序的,而map是有序的 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: for item, result in zip(items, executor.map(do_task, items)): print(item, result) #################### wait ######################### # wait返回一个元组,包含已完成任务的集合和未完成任务的集合。 pool = ThreadPoolExecutor(5) futures = [] for item in items: futures.append(pool.submit(do_task, item)) concurrent.futures.wait(futures, timeout=None, return_when='FIRST_COMPLETED')
return_when
参数可选FIRST_COMPLETED
, FIRST_EXCEPTION
和ALL_COMPLETE
ALL_COMPLETE
会阻塞并发