Python的并发实现有三种方法。python
串行:同时只能执行单个任务
并行:同时执行多个任务数据库
在Python中,虽然严格说来多线程与协程都是串行的,但其效率高,在遇到阻塞时会将阻塞任务交给系统执行,经过合理调度任务,使得程序高效。编程
最高效的固然是多进程了,但因为多进程依赖硬件配置,而且当任务量超过CPU核心数时,多进程会有进程上下文切换开销,而这个开销很大,因此不是最佳解决方案。安全
多线程对比单线程,因为GIL的存在,切换线程须要不断加锁、释放锁,效率反而更低;多进程至关于多个CPU同时工做,所以效率很高。网络
IO密集型能够是磁盘IO、网络IO、数据库IO等,都属于计算量小,IO等待浪费高。越是IO等待时间长,则多线程的优点相比单线程越明显,多进程效率高但依赖配置资源。多线程
单线程老是最慢的,多线程适合在IO密集型场景使用,多进程适合CPU计算要求高的场景下使用,多进程虽然老是最快的,但须要CPU资源支持。并发
Python建立多线程有两种方法。app
from threading import Thread def func(): for i in range(2): print('Hello world!') sleep(1) th1 = Thread(target=func) th1.start() th2 = Thread(target=func) th2.start()
这个类必须继承Thread,必须重载run()
方法框架
from threading import Thread class MyThread(Thread): def __init__(self): super().__init__() self.name = 'Bob' def run(self): for i in range(2): print('Hello world!') sleep(1) th1 = MyThread() th2 = MyThread() th1.start() th2.start()
import threading lock = threading.Lock() # 生成锁,全局惟一 lock.acquire() # 加锁 lock.release() # 释放锁
加锁与解锁必须成对出现,或者使用上下文管理器with
来管理锁。异步
在Redis分布式锁中提到过,用于让非阻塞线程重复得到锁来发送或读取数据,这里的可重入锁仅指让同一线程能够屡次获取锁。
import threading rlock = threading.RLock() # 生成可重入锁
死锁一般有两种。
多进程是真正的并行,而多线程是伪并行,实际是多个线程交替执行。
遇到GIL影响性能的状况,要么考虑用多进程替代多线程,要么更换Python解释器。
经常使用线程通讯方法。
threading.Event
threading.Condition
queue.Queue
import threading event = threading.Event() event.clear() # 重置event,使全部该event事件都处于待命状态 event.wait() # 等待接收event指令,决定是否阻塞程序执行 evnet.set() # 发送event指令,全部该event事件的线程开始执行
import time import threading class MyThread(threading.Thread): def __init__(self, name, event): super().__init__() self.name = name self.event = event def run(self): self.event.wait() # 等待event.set()才能执行下去 time.sleep(1) print('{} Done'.format(self.name)) threads = [] event = threading.Event() for i in range(5): threads.append(MyThread(event)) event.clear() # 重置event,使event.wait()生效 for t in threads: t.start() print('Waiting 3s') time.sleep(3) print('Awake all threads') event.set() # 发送event指令,全部绑定了event的线程开始执行
全部线程在调用start()
方法后并不会执行完,而是在event.wait()
处停住了,须要发送event.set()
指令才能继续执行。
import threading cond = threading.Condition() cond.acquire() cond.release() cond.wait() # 等待指令触发,同时临时释放锁,直到调用notify才从新占有锁 cond.notify() # 发送指令
Condition与Event很相似,不过因为wait()
与notify()
能够反复调用,所以通常做为编程人员可控调用锁来使用,放在run()
方法下。
队列是线程安全的,经过put()
和get()
方法来操做队列。
from queue import Queue q = Queue(maxsize=0) # 设置0表示无限长队列 q.get(timeout=0.5) # 阻塞程序,等待队列消息,能够设置超时时间 q.put() # 发送消息 q.join() # 等待全部消息被消费完 # 不经常使用但要了解的方法 q.qsize() # 返回消息个数 q.empty() # 返回bool值,队列是否空 q.full() # 返回bool值,队列是否满
Queue
是FIFO队列,还有queue.LifoQueue
,queue.PriorityQueue
。
两个线程的变量不能被相互访问。
一般使用threading.local
类来实现,该类的实例是一个字典型对象,直接经过key-value形式存入变量,如threading.local().name = 'bob'
。
若是想要实现一个线程内的全局变量或实现线程间的信息隔离,就使用local类。
多线程并非越多越好,由于在切换线程时会切换上下文环境(固然相比多进程的开销要小的多),在量大时依然会形成CPU的开销。
所以出现了线程池的概念,即预先建立好合适数量的线程,使任务能马上使用。
经过concurrent.futures
库的ThreadPoolExecutor
类来实现。
import time import threading from concurrent.futures import ThreadPoolExecutor def target(): for i in range(5): print('{}-{}\n'.format(threading.get_ident(), i) time.sleep(1) pool = ThreadPoolExecutor(5) # 线程池数量限制为5 for i in range(100): pool.submit(target) # 往线程中提交并运行
学习协程,要先理解生成器,由于Python的协程是从生成器中诞生并演变到如今这个样子的。
可迭代对象,其类或元类都实现了__iter__()
方法,而该方法返回一个对象支持迭代,既能够是string/list/tuple/dict等内置类型的对象,也能够是本身写的对象(这个对象的类实现了遍历元素的__iter__
方法)。
迭代器对象,可迭代对象是迭代器的基础,迭代器只是比可迭代对象多了一个__next__()
方法,这个方法让咱们能够再也不用for循环来获取元素。
生成器对象,在迭代器的基础上,实现了yield
,至关于函数中的return
,在每次for循环遍历或调用next()时,都会返回一个值并阻塞等待下一次调用。
可迭代对象、迭代器都是将全部元素放在内存里,而生成器则是须要时临时生成元素,因此生成器节省时间、空间。
两个方法。
next()
send(None)
这两个方法是等价的,但因为send方法能够传值进去,因此在协程中大有用处。
经过inspect
库的getgeneratorstate
方法获取状态信息。
StopIteration
生成器引入了函数暂停执行(yield)功能,后来又引入了向暂停的生成器发送信息的功能(send),并以此催生了协程。
协程是为非抢占式多任务产生子程序的计算机程序组件,协程容许不一样入口点在不一样位置暂停或开始执行程序。
协程和线程有类似点,多个协程之间与线程同样,只会交叉串行执行;也有不一样点,线程之间要频繁切换,加锁、解锁,协程不须要。
协程经过yield暂停生成器,将程序的执行流程交给其它子程序,从而实现不一样子程序之间的交替执行。
经过例子演示如何向生成器发送信息。
def func(n): index = 0 while index < n: num = yield index # 这里分红两部分,yield index将index return给外部程序, num = yield接受外部send的信息并赋值给num if num is None: num = 1 index += num f = func(5) print(next(f)) # 0 print(f.send(2)) # 2 print(next(f)) # 3 print(f.send(-1)) # 2
yield from
语法从Python3.3才出现的语法。
yield from
后面须要添加可迭代对象(迭代器、生成器固然知足要求)。
# 拼接一个可迭代对象 # 使用yield astr = 'ABC' alist = [1, 2, 3] adict = dict(name='kct', age=18) agen = (i for i in range(5)) def gen(*args): for item in args: for i in item: yield i new_list = gen(astr, alist, adict, agen) print("use yield:", list(new_list)) # 使用yield from def gen(*args): for item in args: yield from item new_flist = fgen(astr, alist, adict, agen) print("use yield from:", list(new_flist))
能够看出,使用yield from
能够直接从可迭代对象中yield全部元素,减小了一个for循环,代码更简洁,固然yield from
不止作了这件事。
yield from
后能够接生成器,以此造成生成器嵌套,yield from
就帮咱们处理了各类异常,让咱们只需专心于业务代码便可。
具体讲解yield from
前先了解几个概念:
yield from
表达式的生成器函数yield from
后接的生成器函数举个例子,实时计算平均值
# 子生成器 def average_gen(): total = 0 count = 0 average = 0 while True: num = yield average count += 1 total += num average = total/count # 委托生成器 def proxy_gen(): while True: yield from average_gen() # 调用函数 def main(): get_average = proxy_gen() next(get_average) # 第一次调用不传值,让子生成器开始运行 print(get_average.send(10)) # 10 print(get_average.send(20)) # 15 print(get_average.send(30)) # 20
委托生成器的做用是在调用函数与子生成器之间创建一个双向通讯通道,调用函数能够send消息给子生成器,子生成器yield值也是直接返回给调用函数。
有时会在yield from
前做赋值操做,这是用于作结束操做,改造上面的例子。
# 子生成器 def average_gen(): total = 0 count = 0 average = 0 while True: num = yield average if num is None: break count += 1 total += num average = total/count return total, count, average # 当协程结束时,调用return # 委托生成器 def proxy_gen(): while True: total, count, average = yield from average_gen() # 只有子生成器的协程结束了才会进行赋值,后面的语句才会执行 print('Count for {} times, Total is {}, Average is {}'.format(count, total, average)) # 调用函数 def main(): get_average = proxy_gen() next(get_average) # 第一次调用不传值,让子生成器开始运行 print(get_average.send(10)) # 10 print(get_average.send(20)) # 15 print(get_average.send(30)) # 20 get_average.send(None) # 结束协程,若是后面再调用send,将会另起一协程
yield from
作了全面的异常处理。直接调用子生成器,首先就要处理StopIteration异常,其次若子生成器不是协程生成器而是迭代器,则会有其它异常抛出,所以须要知道,委托生成器在这之中扮演着重要角色,不可忽略。
asyncio
asyncio
是Python3.4引入的标准库,直接内置对异步IO的支持。
虽然学了yield
和yield from
,但仍是不知如何入手去作并发,asyncio
则是为了提供这么个框架来精简复杂的代码操做。
经过前面学习,咱们知道调用函数/委托生成器/子生成器这三剑客中,子生成器就是协程,那么asyncio
如何来定义建立协程呢?
asyncio
经过在函数定义前增长async
关键字来定义协程对象,经过isinstance(obj, Coroutine)
便可判断是不是协程,这个协程类从collections.abc
包导入。
咱们也知道,生成器是协程的基础,那么有什么办法将生成器变成协程来使用?
经过@asyncio.coroutine
装饰器能够标记生成器函数为协程对象,可是经过isinstance(obj, Generator)
、isinstance(obj, Coroutine)
仍然能够看到,这个生成器函数只是被标记为协程了,但其本质依然是生成器。
async
关键字定义的函数,调用它不会当即执行函数,而是返回一个协程对象,这个协程对象须要注册到事件循环中,由事件循环调用;import asyncio async def hello(name): print('Hello, ', name) coroutine = hello('World') # 建立事件循环 loop = asyncio.get_event_loop() # 将协程转换为任务 task = loop.create_task(coroutine) # 将任务放入事件循环对象中触发 loop.run_until_complete(task)
await
和yield
这二者都能实现暂停的效果,但功能是不兼容的,在生成器中不能用await
,在async
定义的协程中不能用yield
。
而且,yield from
后可接可迭代对象、迭代器、生成器、future对象、协程对象,await
后只能接future对象、协程对象。
前面咱们知道经过async
能够定义一个协程对象,那么如何建立一个future对象呢?
答案是经过task,只须要建立一个task对象便可。
# 在前一个例子中,咱们先建立了事件循环,而后经过事件循环建立了task,咱们来测试下 import asyncio from asyncio.futures import Future async def hello(name): print('Hello, ', name) coroutine = hello('World') # 建立事件循环 loop = asyncio.get_event_loop() # 将协程转换为任务 task = loop.create_task(coroutine) print(isinstance(task, Future)) # 结果是True # 不创建事件循环的方法 task = asyncio.ensure_future(coroutine) print(isinstance(task, Future)) # 结果也是True
知道了建立future对象(也便是建立task对象)的方法,那么咱们验证下await
和yield
后接coroutine和future对象。
import sys import asyncio async def f1(): await asyncio.sleep(2) return 'Hello, {}'.format(sys._getframe().f_code.co_name) @asyncio.coroutine def f2(): yield from asyncio.sleep(2) return 'Hello, {}'.format(sys._getframe().f_code.co_name) async def f3(): await asyncio.ensure_future(asyncio.sleep(2)) return 'Hello, {}'.format(sys._getframe().f_code.co_name) @asyncio.coroutine def f4(): yield from asyncio.ensure_future(asyncio.sleep(2)) return 'Hello, {}'.format(sys._getframe().f_code.co_name) tasks = [ asyncio.ensure_future(f1()), asyncio.ensure_future(f2()), asyncio.ensure_future(f3()), asyncio.ensure_future(f4()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print(task.result()) loop.close()
异步IO都是在IO高的地方挂起,等IO操做结束后再继续执行,大多数时候,咱们后续的代码执行都是须要依赖IO的返回值的,此时就要用到回调了。
回调的实现有两种方式。
这种方法要求咱们可以取得协程的await的返回值。经过task
对象的result()
方法能够得到返回结果。
import time import asyncio async def _sleep(x): time.sleep(x) return 'Stopped {} seconds!'.format(x) coroutine = _sleep(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task) # 直接经过task获取任务结果 print('Result: {}'.format(task.result()))
asyncio
自带的添加回调函数功能实现import time import asyncio async def _sleep(x): time.sleep(x) return 'Stopped {} seconds!'.format(x) def callback(future): print('Result: {}'.format(future.result())) coroutine = _sleep(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) # 添加回调函数 task.add_done_callback(callback) loop.run_until_complete(task)
asyncio
实现并发,就须要多个协程来完成任务,前面作await
和yield
的验证时就用了并发。
每当有任务阻塞的时候就await
,而后其余协程继续工做。
# 协程函数 async def worker(n): print('Waiting: {}'.format(n)) await asyncio.sleep(n) return 'Done {}'.format(n) # 协程对象 c1 = worker(1) c2 = worker(2) c3 = worker(4) # 协程转换为task tasks = [ asyncio.ensure_future(c1), asyncio.ensure_future(c2), asyncio.ensure_future(c3) ] loop = asyncio.get_event_loop()
有两种方法,这两种方法的区别后面说。
return
的结果能够经过task.result()
查看。
# asyncio.wait() loop.run_until_complete(asyncio.wait(tasks)) # asyncio.gather() loop.run_until_complete(asyncio.gather(*tasks)) # *不能省略 # 查看结果 for task in tasks: print('Result: {}'.format(task.result()))
使用async
能够定义协程,协程用于耗时的IO操做,咱们也能够封装更多的IO操做过程,实现一个协程中await
另外一个协程,实现协程的嵌套。
# 内部协程函数 async def worker(n): print('Waiting: {}'.format(n)) await asyncio.sleep(n) return 'Done {}'.format(n) # 外部协程函数 async def main(): c1 = worker(1) c2 = worker(2) c3 = worker(4) tasks = [ asyncio.ensure_future(c1), asyncio.ensure_future(c2), asyncio.ensure_future(c3) ] dones, pendings = await asyncio.wait(tasks) for task in tasks: print('Result: {}'.format(task.result())) loop = asyncio.get_event_loop() loop.run_until_complete(main())
若是外部协程使用的asyncio.gather()
,那么做以下替换。
results = await asyncio.gather(*tasks) for result in results: print('Result: {}'.format(result))
讲生成器时提到了四种状态,对协程咱们也了解一下其状态(准确地说是future/task对象的状态)。
wait接收的tasks,必须是一个list
对象,该list
对象中存放多个task
,既能够经过asyncio.ensure_future
转为task
对象也能够不转。
gather也能够接收list
对象,但*
不能省,也能够直接将多个task
做为可变长参数传入,参数能够是协程对象或future对象。
wait返回dones
和pendings
,前者表示已完成的任务,后者表示未完成的任务,须要经过task.result()
手工获取结果。
gather直接将值返回。
# FIRST_COMPLETED:完成第一个任务就返回 # FIRST_EXCEPTION:产生第一个异常就返回 # ALL_COMPLETED:全部任务完成再返回(默认选项) dones, pendings = loop.run_until_complete( asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) # 控制运行时间:1秒后返回 dones, pendings = loop.run_until_complete( asyncio.wait(tasks, timeout=1))
在asyncio
中如何动态添加协程到事件循环中?
两种方法,一种是同步的,一种是异步的。
import time import asyncio from queue import Queue from threading import Thread # 在后台永远运行的事件循环 def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def do_sleep(x, queue, msg=""): time.sleep(x) queue.put(msg) queue = Queue() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print(time.ctime()) # 动态添加两个协程 # 这种方法在主线程是同步的 new_loop.call_soon_threadsafe(do_sleep, 6, queue, 'First') new_loop.call_soon_threadsafe(do_sleep, 3, queue, 'Second') while True: msg = queue.get() print('{} is done'.format(msg)) print(time.ctime())
import time import asyncio from queue import Queue from threading import Thread # 在后台永远运行的事件循环 def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def do_sleep(x, queue, msg=""): await asyncio.sleep(x) queue.put(msg) queue = Queue() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print(time.ctime()) # 动态添加两个协程 # 这种方法在主线程是异步的 asyncio.run_coroutine_threadsafe(do_sleep(6, queue, 'First'), new_loop) asyncio.run_coroutine_threadsafe(do_sleep(3, queue, 'Second'), new_loop) while True: msg = queue.get() print('{} is done'.format(msg)) print(time.ctime())