协程,英文名Coroutine。
前面介绍Python的多线程,以及用多线程实现并发(参见这篇文章【浅析Python多线程】),今天介绍的协程也是经常使用的并发手段。本篇主要内容包含:协程的基本概念、协程库的实现原理以及Python中常见的协程库。html
咱们知道线程的调度(线程上下文切换)是由操做系统决定的,当一个线程启动后,何时占用CPU、何时让出CPU,程序员都没法干涉。假设如今启动4个线程,CPU线程时间片为 5 毫秒,也就是说,每一个线程每隔5ms就让出CPU,让其余线程抢占CPU。可想而知,等4个线程运行结束,要进行多少次切换?python
若是咱们可以自行调度本身写的程序,让一些代码块遇到IO操做时,切换去执行另一些须要CPU操做的代码块,是否是节约了不少无畏的上下文切换呢?是的,协程就是针对这一状况而生的。咱们把写好的一个应用程序分为不少个代码块,以下图所示:程序员
把应用程序的代码分为多个代码块,正常状况代码自上而下顺序执行。若是代码块A运行过程当中,可以切换执行代码块B,又可以从代码块B再切换回去继续执行代码块A,这就实现了协程(一般是遇到IO操做时切换才有意义)。示意图以下:编程
因此,关于协程能够总结如下两点:多线程
(1)线程的调度是由操做系统负责,协程调度是程序自行负责。并发
(2)与线程相比,协程减小了无畏的操做系统切换。app
实际上当遇到IO操做时作切换才更有意义,(由于IO操做不用占用CPU),若是没遇到IO操做,按照时间片切换,无心义。socket
举个例子,你在作一顿饭你要蒸饭和炒菜:最笨的方法是先蒸饭,饭蒸好了再去炒菜。这样一顿饭得花很多时间,就跟咱们没采用并发编程同样。async
多线程至关于,你5分钟在作蒸饭的工做,到了5分钟开始炒菜,又过了5分钟,你又去忙蒸饭。分布式
协程至关于,你淘完米,放在电饭锅,按下煮饭键以后,你开始去炒菜。炒菜的时候油没热,你能够调佐料。这样,你炒两个菜出来,饭蒸好了。整个过程你没闲着,可是节约了很多时间。
如1中所述,代码块A可以中断去执行代码块B,代码块B可以中断,执行代码块A。这不是和yield功能一模一样吗?咱们先回忆一下yield的功能:
(1) 在函数中,语句执行到yield,会返回yield 后面的内容;当再回来执行时,从yield的下一句开始执行;
(2) 使用yield语法的函数是一个生成器;
(3) python3中,经过 .__next__() 或者 next() 方法获取生成器的下一个值。
来看一个yield实现协程的例子:
from collections import deque def sayHello(n): while n > 0: print("hello~", n) yield n n -= 1 print('say hello') def sayHi(n): x = 0 while x < n: print('hi~', x) yield x += 1 print("say hi") # 使用yield语句,实现简单任务调度器 class TaskScheduler(object): def __init__(self): self._task_queue = deque() def new_task(self, task): ''' 向调度队列添加新的任务 ''' self._task_queue.append(task) def run(self): ''' 不断运行,直到队列中没有任务 ''' while self._task_queue: task = self._task_queue.popleft() try: next(task) self._task_queue.append(task) except StopIteration: # 生成器结束 pass sched = TaskScheduler() sched.new_task(sayHello(10)) sched.new_task(sayHi(15)) sched.run()
上例执行时,你会看到sayHello()和sayHi() 不断交替执行,当执行sayHello()时,在yield处中断,当执行sayHi()时从yield处中断,切换回sayHello()从yield以后的一句开始执行。。。如此来回交替无缝链接。
actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。下面咱们经过yield来实现:
from collections import deque class ActorScheduler: def __init__(self): self._actors = {} self._msg_queue = deque() def new_actor(self, name, actor): self._msg_queue.append((actor, None)) self._actors[name] = actor def send(self, name, msg): actor = self._actors.get(name) if actor: self._msg_queue.append((actor, msg)) def run(self): while self._msg_queue: # print("队列:", self._msg_queue) actor, msg = self._msg_queue.popleft() # print("actor", actor) # print("msg", msg) try: actor.send(msg) except StopIteration: pass if __name__ == '__main__': def say_hello(): while True: msg = yield print("say hello", msg) def say_hi(): while True: msg = yield print("say hi", msg) def counter(sched): while True: n = yield print("counter:", n) if n == 0: break sched.send('say_hello', n) sched.send('say_hi', n) sched.send('counter', n-1) sched = ActorScheduler() # 建立初始化 actors sched.new_actor('say_hello', say_hello()) sched.new_actor('say_hi', say_hi()) sched.new_actor('counter', counter(sched)) sched.send('counter', 10) sched.run()
上例中:
(1) ActorScheduler 负责事件循环
(2) counter() 负责控制终止
(3) say_hello() / say_hi() 至关于切换的协程,当程序运行到这些函数内部的yield处,就开始切换。
因此,当执行时,咱们可以看到say_hello() / say_hi()不断交替切换执行,直到counter知足终止条件以后,协程终止。看懂上例可能须要花费一些时间。实际上咱们已经实现了一个“操做系统”的最小核心部分。 生成器函数(含有yield的函数)就是认为,而yield语句是任务挂起的信号。 调度器循环检查任务列表直到没有任务要执行为止。
有了前面对协程的了解,咱们能够思考怎样去实现一个协程库?我以为能够从如下两个个方面去思考:
(1)事件循环 (event loop)。事件循环须要实现两个功能,一是顺序执行协程代码;二是完成协程的调度,即一个协程“暂停”时,决定接下来执行哪一个协程。
(2)协程上下文的切换。基本上Python 生成器的 yeild 已经能完成切换,Python3中还有特定语法支持协程切换。
咱们看一个比较复杂的例子:
from collections import deque from select import select class YieldEvent: def handle_yield(self, sched, task): pass def handle_resume(self, sched, task): pass # 任务调度(至关于EventLoop) class Scheduler: def __init__(self): self._numtasks = 0 # 任务总数量 self._ready = deque() # 等待执行的任务队列 self._read_waiting = {} # 正等待读的任务 self._write_waiting = {} # 正等待写的任务 # 利用I/O多路复用 监听读写I/0 def _iopoll(self): rset, wset, eset = select(self._read_waiting, self._write_waiting, []) for r in rset: evt, task = self._read_waiting.pop(r) evt.handle_resume(self, task) for w in wset: evt, task = self._write_waiting.pop(w) evt.handle_resume(self, task) def new(self, task): """添加一个新的任务""" self._ready.append((task, None)) self._numtasks += 1 def add_ready(self, task, msg=None): """添加到任务对列等待执行""" self._ready.append((task, msg)) def _read_wait(self, fileno, evt, task): self._read_waiting[fileno] = (evt, task) def _write_wait(self, fileno, evt, task): self._write_waiting[fileno] = (evt, task) def run(self): while self._numtasks: # 若是任务数量为空,阻塞在select处,保持监听 if not self._ready: self._iopoll() task, msg = self._ready.popleft() try: r = task.send(msg) if isinstance(r, YieldEvent): r.handle_yield(self, task) else: raise RuntimeError('unrecognized yield event') except StopIteration: self._numtasks -= 1 # 示例: 将协程抽象成YieldEvent的子类,并重写handle_yield和handle_resume方法 class ReadSocket(YieldEvent): def __init__(self, sock, nbytes): self.sock = sock self.nbytes = nbytes def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): data = self.sock.recv(self.nbytes) sched.add_ready(task, data) class WriteSocket(YieldEvent): def __init__(self, sock, data): self.sock = sock self.data = data def handle_yield(self, sched, task): sched._write_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): nsent = self.sock.send(self.data) sched.add_ready(task, nsent) class AcceptSocket(YieldEvent): def __init__(self, sock): self.sock = sock def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): r = self.sock.accept() sched.add_ready(task, r) class Socket(object): def __init__(self, sock): self._sock = sock def recv(self, maxbytes): return ReadSocket(self._sock, maxbytes) def send(self, data): return WriteSocket(self._sock, data) def accept(self): return AcceptSocket(self._sock) def __getattr__(self, name): return getattr(self._sock, name) if __name__ == '__main__': from socket import socket, AF_INET, SOCK_STREAM def readline(sock): chars = [] while True: c = yield sock.recv(1) print(c) if not c: break chars.append(c) if c == b'\n': break return b''.join(chars) # socket server 使用生成器 class EchoServer: def __init__(self, addr, sched): self.sched = sched sched.new(self.server_loop(addr)) def server_loop(self, addr): s = Socket(socket(AF_INET, SOCK_STREAM)) s.bind(addr) s.listen(5) while True: c, a = yield s.accept() print('Got connection from ', a) print("got", c) self.sched.new(self.client_handler(Socket(c))) def client_handler(self, client): while True: try: line = yield from readline(client) if not line: break print("from Client::", str(line)) except Exception: break while line: try: nsent = yield client.sendall(line) print("nsent", nsent) line = line[nsent:] except Exception: break client.close() print('Client closed') sched = Scheduler() EchoServer(('localhost', 9999), sched) sched.run()
Scheduler至关于实现事件循环并调度协程, 添加到事件循环中的事件必须继承YieldEvent, 并重写它定义的两个方法。此例比较难,看不懂能够忽略。
咱们看一下Python3中的协程库asyncio是怎么实现的:
import asyncio @asyncio.coroutine def say_hi(n): print("start:", n) r = yield from asyncio.sleep(2) print("end:", n) loop = asyncio.get_event_loop() tasks = [say_hi(0), say_hi(1)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() # start: 1 # start: 0 # 停顿两秒 # end: 1 # end: 0
(1)@asyncio.coroutine把一个generator标记为coroutine类型,而后,咱们就把这个coroutine扔到EventLoop中执行。
(2)yield from语法可让咱们方便地调用另外一个generator。因为asyncio.sleep()也是一个coroutine,因此线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就能够从yield from拿到返回值(此处是None),而后接着执行下一行语句。
(3)asyncio.sleep(1)至关于一个耗时1秒的IO操做,在此期间,主线程并未等待,而是去执行EventLoop中其余能够执行的coroutine了,所以能够实现并发执行。
asyncio中get_event_loop()就是事件循环,而装饰器@asyncio.coroutine标记了一个协程,并yield from 语法实现协程切换。在Python3.5中,新增了async
和await
的新语法,代替装饰器和yield from。上例能够用新增语法彻底代替。
async def say_hi(n): print("start:", n) r = await asyncio.sleep(2) print("end:", n) loop = asyncio.get_event_loop() tasks = [say_hi(0), say_hi(1)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() # start: 1 # start: 0 # 停顿两秒 # end: 1 # end: 0
将@asyncio.coroutine换成async, 将yield from 换成await 便可。
(1)使用协程,只能使用单线程,多线程的便利就一点都用不到。例如,I/O阻塞程序,CPU仍然会将整个任务挂起直到操做完成。
(2) 一旦使用协程,大部分ython库并不能很好的兼容,这就会致使要改写大量的标准库函数。
因此,最好别用协程,一旦用很差,协程给程序性能带来的提高,远远弥补不了其带来的灾难。